第15天-常用内建模块
哪吒 2023/6/15
# 第15天-常用内建模块
# 学习目标
通过本章学习,你将掌握:
- 理解Python内建模块的概念和作用
- 掌握常用内建模块的使用方法
- 学会使用os、sys、datetime、json等核心模块
- 掌握collections、itertools、functools等高级模块
- 学会使用random、math、statistics等数学模块
- 理解模块的导入机制和最佳实践
- 学会在实际项目中合理选择和使用内建模块
# 一、内建模块概述
# 1.1 什么是内建模块
import sys
import os
def builtin_modules_introduction():
"""内建模块介绍"""
print("=== Python内建模块介绍 ===")
print("""
内建模块(Built-in Modules)是Python标准库的一部分,
随Python解释器一起安装,无需额外安装即可使用。
主要特点:
• 官方维护,稳定可靠
• 性能优化,通常用C语言实现
• 功能丰富,覆盖各种常见需求
• 跨平台兼容
• 文档完善
常用分类:
• 系统交互:os, sys, platform
• 时间处理:datetime, time, calendar
• 数据处理:json, csv, pickle
• 数学计算:math, statistics, random
• 集合工具:collections, itertools
• 函数工具:functools, operator
• 文件处理:pathlib, glob, shutil
• 网络通信:urllib, http, socket
• 并发编程:threading, multiprocessing, asyncio
""")
# 查看已加载的模块
print("\n当前已加载的模块数量:", len(sys.modules))
# 查看内建模块列表(部分)
builtin_module_names = [
'os', 'sys', 'datetime', 'json', 'math', 'random',
'collections', 'itertools', 'functools', 'pathlib'
]
print("\n常用内建模块:")
for module_name in builtin_module_names:
try:
module = __import__(module_name)
print(f" {module_name}: {module.__doc__.split('.')[0] if module.__doc__ else '系统模块'}")
except ImportError:
print(f" {module_name}: 模块未找到")
# 模块导入方式
print("\n模块导入方式:")
import_examples = [
"import os # 导入整个模块",
"import os.path # 导入子模块",
"from os import getcwd # 导入特定函数",
"from os import * # 导入所有(不推荐)",
"import os as operating_sys # 使用别名",
"from os import getcwd as pwd # 函数别名"
]
for example in import_examples:
print(f" {example}")
# 运行介绍
builtin_modules_introduction()
# 1.2 模块导入机制
import sys
import importlib
from types import ModuleType
def module_import_mechanism():
"""模块导入机制详解"""
print("=== 模块导入机制详解 ===")
# 1. 模块搜索路径
print("\n1. 模块搜索路径:")
print("Python按以下顺序搜索模块:")
for i, path in enumerate(sys.path, 1):
print(f" {i}. {path}")
# 2. 导入过程
print("\n2. 模块导入过程:")
import_process = [
"1. 检查sys.modules缓存",
"2. 在sys.path中搜索模块文件",
"3. 编译模块代码(如果需要)",
"4. 执行模块代码",
"5. 将模块对象添加到sys.modules",
"6. 将模块绑定到当前命名空间"
]
for step in import_process:
print(f" {step}")
# 3. 动态导入
print("\n3. 动态导入示例:")
# 使用importlib动态导入
module_name = 'math'
math_module = importlib.import_module(module_name)
print(f"动态导入{module_name}模块: {math_module}")
print(f"计算π的值: {math_module.pi}")
# 使用__import__
os_module = __import__('os')
print(f"使用__import__导入os: {os_module}")
# 4. 模块重新加载
print("\n4. 模块重新加载:")
print("注意:模块只会被导入一次,后续import会使用缓存")
# 检查模块是否已加载
if 'json' in sys.modules:
print("json模块已在缓存中")
# 重新加载模块
json_module = importlib.reload(sys.modules['json'])
print("json模块已重新加载")
# 5. 模块属性检查
print("\n5. 模块属性检查:")
import json
module_info = {
'模块名称': json.__name__,
'模块文件': getattr(json, '__file__', '内建模块'),
'模块文档': json.__doc__[:50] + '...' if json.__doc__ else 'None',
'模块版本': getattr(json, '__version__', '未知')
}
for key, value in module_info.items():
print(f" {key}: {value}")
# 6. 查看模块内容
print("\n6. json模块的主要函数:")
json_functions = [name for name in dir(json) if not name.startswith('_')]
print(f" 公共函数/属性: {json_functions}")
# 运行导入机制演示
module_import_mechanism()
# 二、系统交互模块
# 2.1 os模块 - 操作系统接口
import os
import platform
from pathlib import Path
def os_module_demo():
"""os模块演示"""
print("=== os模块演示 ===")
# 1. 系统信息
print("\n1. 系统信息:")
system_info = {
'操作系统': os.name,
'平台': platform.system(),
'架构': platform.architecture()[0],
'处理器': platform.processor(),
'主机名': platform.node(),
'Python版本': platform.python_version()
}
for key, value in system_info.items():
print(f" {key}: {value}")
# 2. 路径操作
print("\n2. 路径操作:")
current_dir = os.getcwd()
print(f" 当前工作目录: {current_dir}")
# 路径拼接
file_path = os.path.join(current_dir, 'test', 'example.txt')
print(f" 拼接路径: {file_path}")
# 路径分解
dir_name = os.path.dirname(file_path)
base_name = os.path.basename(file_path)
name, ext = os.path.splitext(base_name)
print(f" 目录名: {dir_name}")
print(f" 文件名: {base_name}")
print(f" 文件名(无扩展名): {name}")
print(f" 扩展名: {ext}")
# 路径判断
path_checks = {
'是否存在': os.path.exists(current_dir),
'是否为文件': os.path.isfile(current_dir),
'是否为目录': os.path.isdir(current_dir),
'是否为绝对路径': os.path.isabs(current_dir)
}
print(f"\n 路径检查 ({current_dir}):")
for check, result in path_checks.items():
print(f" {check}: {result}")
# 3. 目录操作
print("\n3. 目录操作:")
# 列出目录内容
try:
files = os.listdir('.')
print(f" 当前目录文件数量: {len(files)}")
print(f" 前5个文件: {files[:5]}")
except PermissionError:
print(" 无权限访问目录")
# 递归遍历目录
print("\n 递归遍历目录结构:")
for root, dirs, files in os.walk('.'):
level = root.replace('.', '').count(os.sep)
indent = ' ' * 2 * level
print(f"{indent}{os.path.basename(root)}/")
subindent = ' ' * 2 * (level + 1)
for file in files[:3]: # 只显示前3个文件
print(f"{subindent}{file}")
if len(files) > 3:
print(f"{subindent}... 还有{len(files) - 3}个文件")
if level >= 2: # 限制遍历深度
break
# 4. 环境变量
print("\n4. 环境变量:")
# 获取环境变量
important_env_vars = ['PATH', 'HOME', 'USER', 'PYTHONPATH']
for var in important_env_vars:
value = os.environ.get(var, '未设置')
if len(str(value)) > 50:
value = str(value)[:50] + '...'
print(f" {var}: {value}")
# 设置环境变量
os.environ['MY_APP_CONFIG'] = 'production'
print(f" 设置自定义环境变量: {os.environ.get('MY_APP_CONFIG')}")
# 5. 进程操作
print("\n5. 进程信息:")
process_info = {
'进程ID': os.getpid(),
'父进程ID': os.getppid() if hasattr(os, 'getppid') else '不支持',
'用户ID': os.getuid() if hasattr(os, 'getuid') else '不支持',
'组ID': os.getgid() if hasattr(os, 'getgid') else '不支持'
}
for key, value in process_info.items():
print(f" {key}: {value}")
# 运行os模块演示
os_module_demo()
# 2.2 sys模块 - 系统特定参数
import sys
import gc
def sys_module_demo():
"""sys模块演示"""
print("=== sys模块演示 ===")
# 1. Python解释器信息
print("\n1. Python解释器信息:")
interpreter_info = {
'Python版本': sys.version,
'版本信息': sys.version_info,
'平台': sys.platform,
'可执行文件路径': sys.executable,
'字节序': sys.byteorder,
'默认编码': sys.getdefaultencoding(),
'文件系统编码': sys.getfilesystemencoding()
}
for key, value in interpreter_info.items():
if isinstance(value, str) and len(value) > 60:
value = value[:60] + '...'
print(f" {key}: {value}")
# 2. 命令行参数
print("\n2. 命令行参数:")
print(f" 脚本名称: {sys.argv[0] if sys.argv else 'None'}")
print(f" 参数列表: {sys.argv}")
print(f" 参数数量: {len(sys.argv)}")
# 3. 模块路径
print("\n3. 模块搜索路径:")
print(f" 路径数量: {len(sys.path)}")
for i, path in enumerate(sys.path[:5], 1):
print(f" {i}. {path}")
if len(sys.path) > 5:
print(f" ... 还有{len(sys.path) - 5}个路径")
# 4. 内存和性能信息
print("\n4. 内存和性能信息:")
# 引用计数
test_list = [1, 2, 3]
print(f" test_list的引用计数: {sys.getrefcount(test_list)}")
# 对象大小
objects_size = {
'整数1': sys.getsizeof(1),
'字符串"hello"': sys.getsizeof("hello"),
'列表[1,2,3]': sys.getsizeof([1, 2, 3]),
'字典{"a":1}': sys.getsizeof({"a": 1})
}
for obj, size in objects_size.items():
print(f" {obj}: {size} 字节")
# 递归限制
print(f" 递归限制: {sys.getrecursionlimit()}")
# 5. 标准输入输出
print("\n5. 标准输入输出:")
# 标准流信息
streams = {
'stdin': sys.stdin,
'stdout': sys.stdout,
'stderr': sys.stderr
}
for name, stream in streams.items():
print(f" {name}: {type(stream).__name__}")
if hasattr(stream, 'encoding'):
print(f" 编码: {stream.encoding}")
# 重定向示例
print("\n 输出重定向示例:")
original_stdout = sys.stdout
# 创建字符串缓冲区
from io import StringIO
string_buffer = StringIO()
# 重定向stdout
sys.stdout = string_buffer
print("这条消息被重定向到缓冲区")
print("这是第二条消息")
# 恢复stdout
sys.stdout = original_stdout
# 获取重定向的内容
captured_output = string_buffer.getvalue()
print(f" 捕获的输出: {repr(captured_output)}")
# 6. 程序退出
print("\n6. 程序退出控制:")
# 退出钩子
def cleanup_function():
print(" 执行清理操作...")
import atexit
atexit.register(cleanup_function)
print(" 已注册退出钩子函数")
# 异常钩子
def exception_handler(exc_type, exc_value, exc_traceback):
print(f" 捕获未处理异常: {exc_type.__name__}: {exc_value}")
# 设置异常钩子(仅作演示,实际使用需谨慎)
original_excepthook = sys.excepthook
sys.excepthook = exception_handler
print(" 已设置异常钩子")
# 恢复原始异常钩子
sys.excepthook = original_excepthook
# 7. 模块管理
print("\n7. 已加载模块:")
loaded_modules = list(sys.modules.keys())
print(f" 已加载模块数量: {len(loaded_modules)}")
# 显示一些常见模块
common_modules = ['os', 'sys', 'json', 'datetime', 'math']
loaded_common = [m for m in common_modules if m in sys.modules]
print(f" 常见已加载模块: {loaded_common}")
# 运行sys模块演示
sys_module_demo()
# 2.3 platform模块 - 平台信息
import platform
import sys
def platform_module_demo():
"""platform模块演示"""
print("=== platform模块演示 ===")
# 1. 系统信息
print("\n1. 系统基本信息:")
basic_info = {
'系统名称': platform.system(),
'系统版本': platform.release(),
'系统详细版本': platform.version(),
'平台标识': platform.platform(),
'架构': platform.architecture(),
'机器类型': platform.machine(),
'处理器': platform.processor(),
'网络名称': platform.node()
}
for key, value in basic_info.items():
if isinstance(value, tuple):
value = ' | '.join(str(v) for v in value)
print(f" {key}: {value}")
# 2. Python信息
print("\n2. Python解释器信息:")
python_info = {
'Python版本': platform.python_version(),
'Python版本元组': platform.python_version_tuple(),
'Python分支': platform.python_branch(),
'Python修订版': platform.python_revision(),
'Python实现': platform.python_implementation(),
'Python编译器': platform.python_compiler()
}
for key, value in python_info.items():
if isinstance(value, tuple):
value = '.'.join(value)
print(f" {key}: {value}")
# 3. 特定系统信息
print("\n3. 特定系统信息:")
system = platform.system()
if system == 'Windows':
try:
win_info = {
'Windows版本': platform.win32_ver(),
'Windows版本(详细)': platform.win32_edition() if hasattr(platform, 'win32_edition') else '不支持'
}
for key, value in win_info.items():
if isinstance(value, tuple):
value = ' | '.join(str(v) for v in value if v)
print(f" {key}: {value}")
except:
print(" 无法获取Windows特定信息")
elif system == 'Linux':
try:
linux_info = {
'Linux发行版': platform.linux_distribution() if hasattr(platform, 'linux_distribution') else '已弃用',
'Libc版本': platform.libc_ver()
}
for key, value in linux_info.items():
if isinstance(value, tuple):
value = ' | '.join(str(v) for v in value if v)
print(f" {key}: {value}")
except:
print(" 无法获取Linux特定信息")
elif system == 'Darwin': # macOS
try:
mac_info = {
'macOS版本': platform.mac_ver()
}
for key, value in mac_info.items():
if isinstance(value, tuple):
value = ' | '.join(str(v) for v in value if v)
print(f" {key}: {value}")
except:
print(" 无法获取macOS特定信息")
# 4. 硬件信息
print("\n4. 硬件信息:")
# CPU信息
try:
import multiprocessing
cpu_count = multiprocessing.cpu_count()
print(f" CPU核心数: {cpu_count}")
except:
print(" 无法获取CPU信息")
# 内存信息(需要psutil库,这里只做演示)
try:
import psutil
memory = psutil.virtual_memory()
print(f" 总内存: {memory.total // (1024**3)} GB")
print(f" 可用内存: {memory.available // (1024**3)} GB")
print(f" 内存使用率: {memory.percent}%")
except ImportError:
print(" 内存信息需要psutil库")
# 5. 系统兼容性检查
print("\n5. 系统兼容性检查:")
compatibility_checks = {
'是否为Windows': system == 'Windows',
'是否为Linux': system == 'Linux',
'是否为macOS': system == 'Darwin',
'是否为64位': platform.architecture()[0] == '64bit',
'Python版本>=3.6': sys.version_info >= (3, 6),
'Python版本>=3.8': sys.version_info >= (3, 8)
}
for check, result in compatibility_checks.items():
status = "✓" if result else "✗"
print(f" {status} {check}")
# 6. 环境报告
print("\n6. 完整环境报告:")
def generate_environment_report():
"""生成环境报告"""
report = []
report.append("=== 系统环境报告 ===")
report.append(f"操作系统: {platform.platform()}")
report.append(f"Python版本: {platform.python_version()}")
report.append(f"Python实现: {platform.python_implementation()}")
report.append(f"架构: {platform.architecture()[0]}")
report.append(f"处理器: {platform.processor()}")
report.append(f"主机名: {platform.node()}")
report.append(f"生成时间: {platform.python_build()}")
return "\n".join(report)
report = generate_environment_report()
print(report)
# 运行platform模块演示
platform_module_demo()
# 三、时间处理模块
# 3.1 datetime模块 - 日期时间处理
import datetime
from datetime import date, time, datetime as dt, timedelta, timezone
import calendar
def datetime_module_demo():
"""datetime模块演示"""
print("=== datetime模块演示 ===")
# 1. 基本日期时间对象
print("\n1. 基本日期时间对象:")
# 当前时间
now = dt.now()
today = date.today()
current_time = dt.now().time()
print(f" 当前日期时间: {now}")
print(f" 当前日期: {today}")
print(f" 当前时间: {current_time}")
# 创建特定日期时间
specific_date = date(2023, 12, 25)
specific_time = time(14, 30, 0)
specific_datetime = dt(2023, 12, 25, 14, 30, 0)
print(f" 特定日期: {specific_date}")
print(f" 特定时间: {specific_time}")
print(f" 特定日期时间: {specific_datetime}")
# 2. 日期时间格式化
print("\n2. 日期时间格式化:")
format_examples = [
("%Y-%m-%d", "年-月-日"),
("%Y/%m/%d", "年/月/日"),
("%d/%m/%Y", "日/月/年"),
("%Y-%m-%d %H:%M:%S", "完整日期时间"),
("%A, %B %d, %Y", "英文格式"),
("%Y年%m月%d日", "中文格式"),
("%H:%M:%S", "时:分:秒"),
("%I:%M %p", "12小时制")
]
for fmt, description in format_examples:
formatted = now.strftime(fmt)
print(f" {description}: {formatted} ({fmt})")
# 3. 字符串解析
print("\n3. 字符串解析:")
date_strings = [
("2023-12-25", "%Y-%m-%d"),
("25/12/2023", "%d/%m/%Y"),
("2023-12-25 14:30:00", "%Y-%m-%d %H:%M:%S"),
("December 25, 2023", "%B %d, %Y")
]
for date_str, fmt in date_strings:
try:
parsed = dt.strptime(date_str, fmt)
print(f" '{date_str}' -> {parsed}")
except ValueError as e:
print(f" '{date_str}' 解析失败: {e}")
# 4. 时间计算
print("\n4. 时间计算:")
# 时间差
start_date = dt(2023, 1, 1)
end_date = dt(2023, 12, 31)
time_diff = end_date - start_date
print(f" 开始日期: {start_date.date()}")
print(f" 结束日期: {end_date.date()}")
print(f" 时间差: {time_diff.days} 天")
# 时间增减
base_date = dt.now()
time_operations = [
(timedelta(days=7), "7天后"),
(timedelta(days=-7), "7天前"),
(timedelta(hours=3), "3小时后"),
(timedelta(weeks=2), "2周后"),
(timedelta(days=30), "30天后")
]
for delta, description in time_operations:
result_date = base_date + delta
print(f" {description}: {result_date.strftime('%Y-%m-%d %H:%M')}")
# 5. 时区处理
print("\n5. 时区处理:")
# UTC时间
utc_now = dt.now(timezone.utc)
print(f" UTC时间: {utc_now}")
# 本地时间
local_now = dt.now()
print(f" 本地时间: {local_now}")
# 时区转换
try:
import zoneinfo # Python 3.9+
# 创建不同时区的时间
tokyo_tz = zoneinfo.ZoneInfo("Asia/Tokyo")
london_tz = zoneinfo.ZoneInfo("Europe/London")
tokyo_time = utc_now.astimezone(tokyo_tz)
london_time = utc_now.astimezone(london_tz)
print(f" 东京时间: {tokyo_time}")
print(f" 伦敦时间: {london_time}")
except ImportError:
print(" 时区转换需要Python 3.9+或pytz库")
# 6. 日期时间属性
print("\n6. 日期时间属性:")
sample_dt = dt(2023, 12, 25, 14, 30, 45, 123456)
attributes = {
'年': sample_dt.year,
'月': sample_dt.month,
'日': sample_dt.day,
'时': sample_dt.hour,
'分': sample_dt.minute,
'秒': sample_dt.second,
'微秒': sample_dt.microsecond,
'星期几': sample_dt.weekday(), # 0=Monday
'ISO星期几': sample_dt.isoweekday(), # 1=Monday
'年中第几天': sample_dt.timetuple().tm_yday
}
for attr, value in attributes.items():
print(f" {attr}: {value}")
# 7. 实用函数
print("\n7. 实用日期时间函数:")
def get_age(birth_date):
"""计算年龄"""
today = date.today()
age = today.year - birth_date.year
if today.month < birth_date.month or (today.month == birth_date.month and today.day < birth_date.day):
age -= 1
return age
def get_days_until(target_date):
"""计算距离目标日期的天数"""
today = date.today()
if isinstance(target_date, dt):
target_date = target_date.date()
return (target_date - today).days
def get_quarter(date_obj):
"""获取季度"""
return (date_obj.month - 1) // 3 + 1
def is_weekend(date_obj):
"""判断是否为周末"""
return date_obj.weekday() >= 5
# 测试实用函数
birth_date = date(1990, 5, 15)
new_year = date(2024, 1, 1)
print(f" 年龄计算: 生于{birth_date},现在{get_age(birth_date)}岁")
print(f" 距离新年: {get_days_until(new_year)}天")
print(f" 当前季度: 第{get_quarter(today)}季度")
print(f" 今天是否周末: {is_weekend(today)}")
# 8. 日历操作
print("\n8. 日历操作:")
# 获取月份信息
year, month = 2023, 12
# 月份天数
days_in_month = calendar.monthrange(year, month)[1]
print(f" {year}年{month}月有{days_in_month}天")
# 是否闰年
is_leap = calendar.isleap(year)
print(f" {year}年是否闰年: {is_leap}")
# 月份第一天是星期几
first_weekday = calendar.monthrange(year, month)[0]
weekdays = ['周一', '周二', '周三', '周四', '周五', '周六', '周日']
print(f" {year}年{month}月1日是{weekdays[first_weekday]}")
# 显示月历
print(f"\n {year}年{month}月日历:")
month_calendar = calendar.month(year, month)
print(month_calendar)
# 运行datetime模块演示
datetime_module_demo()
# 3.2 time模块 - 时间处理
import time
import datetime
def time_module_demo():
"""time模块演示"""
print("=== time模块演示 ===")
# 1. 时间戳
print("\n1. 时间戳操作:")
# 当前时间戳
current_timestamp = time.time()
print(f" 当前时间戳: {current_timestamp}")
print(f" 时间戳(整数): {int(current_timestamp)}")
# 时间戳转换
timestamp_to_struct = time.localtime(current_timestamp)
timestamp_to_string = time.ctime(current_timestamp)
print(f" 时间戳转结构: {timestamp_to_struct}")
print(f" 时间戳转字符串: {timestamp_to_string}")
# 字符串转时间戳
time_string = "2023-12-25 14:30:00"
time_struct = time.strptime(time_string, "%Y-%m-%d %H:%M:%S")
timestamp = time.mktime(time_struct)
print(f" 字符串'{time_string}'转时间戳: {timestamp}")
# 2. 时间结构
print("\n2. 时间结构操作:")
# 当前时间结构
local_time = time.localtime()
utc_time = time.gmtime()
print(f" 本地时间结构: {local_time}")
print(f" UTC时间结构: {utc_time}")
# 时间结构属性
time_attributes = {
'年': local_time.tm_year,
'月': local_time.tm_mon,
'日': local_time.tm_mday,
'时': local_time.tm_hour,
'分': local_time.tm_min,
'秒': local_time.tm_sec,
'星期几': local_time.tm_wday, # 0=Monday
'年中第几天': local_time.tm_yday,
'是否夏令时': local_time.tm_isdst
}
for attr, value in time_attributes.items():
print(f" {attr}: {value}")
# 3. 时间格式化
print("\n3. 时间格式化:")
format_examples = [
("%Y-%m-%d %H:%M:%S", "标准格式"),
("%Y/%m/%d", "日期格式"),
("%H:%M:%S", "时间格式"),
("%A, %B %d, %Y", "完整英文格式"),
("%c", "本地完整格式"),
("%x", "本地日期格式"),
("%X", "本地时间格式")
]
for fmt, description in format_examples:
formatted = time.strftime(fmt, local_time)
print(f" {description}: {formatted}")
# 4. 性能测量
print("\n4. 性能测量:")
# 使用time.time()测量
start_time = time.time()
# 模拟一些计算
total = sum(i * i for i in range(100000))
end_time = time.time()
execution_time = end_time - start_time
print(f" 计算结果: {total}")
print(f" 执行时间(time): {execution_time:.6f}秒")
# 使用time.perf_counter()测量(更精确)
start_perf = time.perf_counter()
# 同样的计算
total = sum(i * i for i in range(100000))
end_perf = time.perf_counter()
perf_time = end_perf - start_perf
print(f" 执行时间(perf_counter): {perf_time:.6f}秒")
# 使用time.process_time()测量CPU时间
start_process = time.process_time()
# 同样的计算
total = sum(i * i for i in range(100000))
end_process = time.process_time()
process_time = end_process - start_process
print(f" CPU时间(process_time): {process_time:.6f}秒")
# 5. 睡眠和延迟
print("\n5. 睡眠和延迟:")
print(" 开始睡眠测试...")
# 短暂睡眠
sleep_start = time.perf_counter()
time.sleep(0.1) # 睡眠0.1秒
sleep_end = time.perf_counter()
actual_sleep = sleep_end - sleep_start
print(f" 预期睡眠: 0.1秒")
print(f" 实际睡眠: {actual_sleep:.6f}秒")
print(f" 误差: {abs(actual_sleep - 0.1):.6f}秒")
# 6. 时区信息
print("\n6. 时区信息:")
# 时区偏移
timezone_offset = time.timezone
daylight_offset = time.altzone if time.daylight else time.timezone
print(f" 时区偏移: {timezone_offset}秒 ({timezone_offset/3600}小时)")
print(f" 夏令时偏移: {daylight_offset}秒 ({daylight_offset/3600}小时)")
print(f" 是否支持夏令时: {bool(time.daylight)}")
# 时区名称
if hasattr(time, 'tzname'):
print(f" 时区名称: {time.tzname}")
# 7. 实用时间函数
print("\n7. 实用时间函数:")
def format_duration(seconds):
"""格式化持续时间"""
if seconds < 60:
return f"{seconds:.2f}秒"
elif seconds < 3600:
minutes = seconds // 60
secs = seconds % 60
return f"{int(minutes)}分{secs:.2f}秒"
else:
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
return f"{int(hours)}小时{int(minutes)}分{secs:.2f}秒"
def time_since(timestamp):
"""计算距离某个时间戳的时间"""
now = time.time()
diff = now - timestamp
if diff < 60:
return f"{int(diff)}秒前"
elif diff < 3600:
return f"{int(diff//60)}分钟前"
elif diff < 86400:
return f"{int(diff//3600)}小时前"
else:
return f"{int(diff//86400)}天前"
def benchmark_function(func, *args, **kwargs):
"""函数性能基准测试"""
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
return result, end - start
# 测试实用函数
durations = [0.5, 65, 3665, 7325]
for duration in durations:
print(f" {duration}秒 = {format_duration(duration)}")
# 测试时间距离
past_timestamp = current_timestamp - 3665 # 1小时前
print(f" 时间距离: {time_since(past_timestamp)}")
# 基准测试示例
def test_function():
return sum(range(10000))
result, duration = benchmark_function(test_function)
print(f" 基准测试: 结果={result}, 耗时={duration:.6f}秒")
# 运行time模块演示
time_module_demo()
# 四、数据处理模块
# 4.1 json模块 - JSON数据处理
import json
from datetime import datetime
from decimal import Decimal
def json_module_demo():
"""json模块演示"""
print("=== json模块演示 ===")
# 1. 基本序列化和反序列化
print("\n1. 基本JSON操作:")
# Python对象转JSON
python_data = {
"name": "张三",
"age": 25,
"is_student": True,
"scores": [85, 92, 78],
"address": {
"city": "北京",
"district": "朝阳区"
},
"phone": None
}
# 转换为JSON字符串
json_string = json.dumps(python_data, ensure_ascii=False, indent=2)
print(f" Python对象转JSON:")
print(json_string)
# JSON字符串转Python对象
parsed_data = json.loads(json_string)
print(f"\n JSON转Python对象: {parsed_data}")
print(f" 数据类型: {type(parsed_data)}")
# 2. 文件操作
print("\n2. JSON文件操作:")
# 写入JSON文件
filename = "test_data.json"
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(python_data, f, ensure_ascii=False, indent=2)
print(f" 数据已写入文件: {filename}")
# 读取JSON文件
with open(filename, 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
print(f" 从文件读取数据: {loaded_data['name']}")
except Exception as e:
print(f" 文件操作错误: {e}")
# 3. JSON格式化选项
print("\n3. JSON格式化选项:")
sample_data = {"中文": "测试", "numbers": [1, 2, 3], "nested": {"key": "value"}}
format_options = [
({}, "默认格式"),
({"indent": 2}, "缩进格式"),
({"indent": 2, "ensure_ascii": False}, "支持中文"),
({"separators": (',', ':')}, "紧凑格式"),
({"sort_keys": True}, "键排序"),
({"indent": 2, "ensure_ascii": False, "sort_keys": True}, "完整格式")
]
for options, description in format_options:
formatted = json.dumps(sample_data, **options)
print(f" {description}: {formatted}")
# 4. 自定义序列化
print("\n4. 自定义序列化:")
# 包含不可序列化对象的数据
complex_data = {
"datetime": datetime.now(),
"decimal": Decimal('10.50'),
"set": {1, 2, 3},
"bytes": b"hello"
}
# 自定义JSON编码器
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, set):
return list(obj)
elif isinstance(obj, bytes):
return obj.decode('utf-8')
return super().default(obj)
# 使用自定义编码器
try:
custom_json = json.dumps(complex_data, cls=CustomJSONEncoder, indent=2)
print(f" 自定义序列化结果:")
print(custom_json)
except Exception as e:
print(f" 序列化错误: {e}")
# 使用default参数
def json_serializer(obj):
"""自定义序列化函数"""
if isinstance(obj, datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(obj, Decimal):
return str(obj)
elif isinstance(obj, set):
return list(obj)
elif isinstance(obj, bytes):
return obj.decode('utf-8')
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
try:
default_json = json.dumps(complex_data, default=json_serializer, indent=2)
print(f"\n 使用default参数:")
print(default_json)
except Exception as e:
print(f" 序列化错误: {e}")
# 5. JSON验证和错误处理
print("\n5. JSON验证和错误处理:")
# 有效和无效的JSON字符串
json_samples = [
('{"name": "张三", "age": 25}', "有效JSON"),
('{"name": "张三", "age": 25,}', "尾随逗号(无效)"),
('{name: "张三", age: 25}', "无引号键(无效)"),
('[1, 2, 3]', "数组格式"),
('"simple string"', "简单字符串"),
('123', "数字"),
('true', "布尔值"),
('null', "空值"),
('{"nested": {"deep": {"value": 42}}}', "嵌套对象")
]
for json_str, description in json_samples:
try:
parsed = json.loads(json_str)
print(f" ✓ {description}: {parsed}")
except json.JSONDecodeError as e:
print(f" ✗ {description}: 解析错误 - {e.msg} (位置: {e.pos})")
# 6. JSON Schema验证(概念演示)
print("\n6. JSON数据验证:")
def validate_user_data(data):
"""简单的用户数据验证"""
required_fields = ['name', 'age', 'email']
errors = []
# 检查必需字段
for field in required_fields:
if field not in data:
errors.append(f"缺少必需字段: {field}")
# 类型检查
if 'age' in data and not isinstance(data['age'], int):
errors.append("age字段必须是整数")
if 'age' in data and data['age'] < 0:
errors.append("age字段必须是正数")
if 'email' in data and '@' not in str(data['email']):
errors.append("email字段格式无效")
return len(errors) == 0, errors
# 测试数据验证
test_users = [
{"name": "张三", "age": 25, "email": "zhangsan@example.com"},
{"name": "李四", "age": "25", "email": "lisi@example.com"}, # age类型错误
{"name": "王五", "age": -5, "email": "wangwu@example.com"}, # age负数
{"name": "赵六", "age": 30}, # 缺少email
{"name": "钱七", "age": 28, "email": "qianqi.example.com"} # email格式错误
]
for i, user in enumerate(test_users, 1):
is_valid, errors = validate_user_data(user)
status = "✓" if is_valid else "✗"
print(f" {status} 用户{i}: {user.get('name', '未知')}")
if errors:
for error in errors:
print(f" - {error}")
# 7. JSON性能优化
print("\n7. JSON性能测试:")
import time
# 创建大量数据
large_data = {
"users": [
{"id": i, "name": f"用户{i}", "score": i * 10}
for i in range(1000)
]
}
# 测试序列化性能
start_time = time.perf_counter()
json_str = json.dumps(large_data)
serialize_time = time.perf_counter() - start_time
# 测试反序列化性能
start_time = time.perf_counter()
parsed_data = json.loads(json_str)
deserialize_time = time.perf_counter() - start_time
print(f" 数据大小: {len(large_data['users'])} 个用户")
print(f" JSON字符串长度: {len(json_str)} 字符")
print(f" 序列化时间: {serialize_time:.6f}秒")
print(f" 反序列化时间: {deserialize_time:.6f}秒")
# 8. 实用JSON工具函数
print("\n8. 实用JSON工具函数:")
def pretty_print_json(data):
"""美化打印JSON数据"""
return json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True)
def json_diff(json1, json2):
"""比较两个JSON对象的差异"""
def get_differences(obj1, obj2, path=""):
differences = []
if type(obj1) != type(obj2):
differences.append(f"{path}: 类型不同 ({type(obj1).__name__} vs {type(obj2).__name__})")
return differences
if isinstance(obj1, dict):
all_keys = set(obj1.keys()) | set(obj2.keys())
for key in all_keys:
new_path = f"{path}.{key}" if path else key
if key not in obj1:
differences.append(f"{new_path}: 仅在第二个对象中存在")
elif key not in obj2:
differences.append(f"{new_path}: 仅在第一个对象中存在")
else:
differences.extend(get_differences(obj1[key], obj2[key], new_path))
elif isinstance(obj1, list):
if len(obj1) != len(obj2):
differences.append(f"{path}: 列表长度不同 ({len(obj1)} vs {len(obj2)})")
for i in range(min(len(obj1), len(obj2))):
differences.extend(get_differences(obj1[i], obj2[i], f"{path}[{i}]"))
else:
if obj1 != obj2:
differences.append(f"{path}: 值不同 ({obj1} vs {obj2})")
return differences
return get_differences(json1, json2)
def flatten_json(data, separator='.'):
"""扁平化JSON对象"""
def _flatten(obj, parent_key=''):
items = []
if isinstance(obj, dict):
for key, value in obj.items():
new_key = f"{parent_key}{separator}{key}" if parent_key else key
items.extend(_flatten(value, new_key).items())
elif isinstance(obj, list):
for i, value in enumerate(obj):
new_key = f"{parent_key}{separator}{i}" if parent_key else str(i)
items.extend(_flatten(value, new_key).items())
else:
return {parent_key: obj}
return dict(items)
return _flatten(data)
# 测试工具函数
test_data = {"user": {"name": "张三", "details": {"age": 25, "city": "北京"}}}
print(f" 美化打印:")
print(pretty_print_json(test_data))
# JSON差异比较
data1 = {"name": "张三", "age": 25}
data2 = {"name": "李四", "age": 25, "city": "北京"}
differences = json_diff(data1, data2)
print(f"\n JSON差异:")
for diff in differences:
print(f" {diff}")
# JSON扁平化
flattened = flatten_json(test_data)
print(f"\n 扁平化结果: {flattened}")
# 清理测试文件
try:
import os
if os.path.exists(filename):
os.remove(filename)
print(f" 已删除测试文件: {filename}")
except Exception as e:
print(f" 删除文件失败: {e}")
# 运行json模块演示
json_module_demo()
# 4.2 csv模块 - CSV文件处理
import csv
import io
from datetime import datetime
def csv_module_demo():
"""csv模块演示"""
print("=== csv模块演示 ===")
# 1. 基本CSV读写
print("\n1. 基本CSV操作:")
# 准备测试数据
test_data = [
['姓名', '年龄', '城市', '薪资'],
['张三', '25', '北京', '8000'],
['李四', '30', '上海', '12000'],
['王五', '28', '广州', '9500'],
['赵六', '35', '深圳', '15000']
]
# 写入CSV文件
filename = 'employees.csv'
try:
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(test_data)
print(f" 数据已写入文件: {filename}")
# 读取CSV文件
with open(filename, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
print(" 读取的数据:")
for i, row in enumerate(reader):
print(f" 行{i+1}: {row}")
except Exception as e:
print(f" 文件操作错误: {e}")
# 2. 字典格式读写
print("\n2. 字典格式CSV操作:")
# 字典数据
dict_data = [
{'姓名': '张三', '年龄': 25, '城市': '北京', '薪资': 8000},
{'姓名': '李四', '年龄': 30, '城市': '上海', '薪资': 12000},
{'姓名': '王五', '年龄': 28, '城市': '广州', '薪资': 9500}
]
dict_filename = 'employees_dict.csv'
try:
# 写入字典格式CSV
with open(dict_filename, 'w', newline='', encoding='utf-8') as f:
fieldnames = ['姓名', '年龄', '城市', '薪资']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader() # 写入表头
writer.writerows(dict_data)
print(f" 字典数据已写入: {dict_filename}")
# 读取字典格式CSV
with open(dict_filename, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
print(" 读取的字典数据:")
for i, row in enumerate(reader, 1):
print(f" 员工{i}: {dict(row)}")
except Exception as e:
print(f" 字典操作错误: {e}")
# 3. CSV方言和格式化选项
print("\n3. CSV方言和格式化:")
# 自定义CSV方言
csv.register_dialect('custom',
delimiter='|',
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
lineterminator='\n')
# 使用自定义方言
custom_filename = 'custom_format.csv'
try:
with open(custom_filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f, dialect='custom')
writer.writerows(test_data)
print(f" 自定义格式文件: {custom_filename}")
# 读取自定义格式
with open(custom_filename, 'r', encoding='utf-8') as f:
reader = csv.reader(f, dialect='custom')
print(" 自定义格式数据:")
for row in reader:
print(f" {row}")
except Exception as e:
print(f" 自定义格式错误: {e}")
# 4. 处理特殊字符和引号
print("\n4. 特殊字符处理:")
special_data = [
['产品名称', '描述', '价格'],
['iPhone 15', 'Apple最新款手机,配备"A17 Pro"芯片', '7999'],
['MacBook Pro', '专业级笔记本电脑\n适合开发者使用', '12999'],
['AirPods Pro', '降噪耳机,支持"空间音频"功能', '1999']
]
special_filename = 'special_chars.csv'
try:
# 写入包含特殊字符的数据
with open(special_filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f, quoting=csv.QUOTE_ALL)
writer.writerows(special_data)
print(f" 特殊字符文件: {special_filename}")
# 读取并显示
with open(special_filename, 'r', encoding='utf-8') as f:
content = f.read()
print(" 文件内容:")
print(content)
except Exception as e:
print(f" 特殊字符处理错误: {e}")
# 5. 内存中的CSV操作
print("\n5. 内存中CSV操作:")
# 使用StringIO在内存中处理CSV
csv_string = """姓名,年龄,部门
张三,25,技术部
李四,30,销售部
王五,28,市场部"""
# 从字符串读取CSV
string_io = io.StringIO(csv_string)
reader = csv.DictReader(string_io)
print(" 从字符串读取CSV:")
employees = list(reader)
for emp in employees:
print(f" {emp}")
# 写入到字符串
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=['姓名', '年龄', '部门', '薪资'])
writer.writeheader()
# 添加薪资信息
for emp in employees:
emp['薪资'] = str(int(emp['年龄']) * 500) # 简单计算
writer.writerow(emp)
result_csv = output.getvalue()
print("\n 生成的CSV字符串:")
print(result_csv)
# 6. CSV数据分析
print("\n6. CSV数据分析:")
def analyze_csv_data(filename):
"""分析CSV文件数据"""
try:
with open(filename, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
data = list(reader)
if not data:
return "文件为空"
analysis = {
'总行数': len(data),
'列数': len(data[0]) if data else 0,
'列名': list(data[0].keys()) if data else [],
'数值列分析': {}
}
# 分析数值列
for column in analysis['列名']:
values = [row[column] for row in data]
# 尝试转换为数字
numeric_values = []
for value in values:
try:
numeric_values.append(float(value))
except ValueError:
continue
if numeric_values:
analysis['数值列分析'][column] = {
'最小值': min(numeric_values),
'最大值': max(numeric_values),
'平均值': sum(numeric_values) / len(numeric_values),
'数值个数': len(numeric_values)
}
return analysis
except Exception as e:
return f"分析错误: {e}"
# 分析员工数据
analysis = analyze_csv_data(dict_filename)
print(f" 数据分析结果:")
if isinstance(analysis, dict):
for key, value in analysis.items():
if key == '数值列分析':
print(f" {key}:")
for col, stats in value.items():
print(f" {col}: {stats}")
else:
print(f" {key}: {value}")
else:
print(f" {analysis}")
# 7. CSV转换工具
print("\n7. CSV转换工具:")
def csv_to_json(csv_filename, json_filename=None):
"""CSV转JSON"""
try:
with open(csv_filename, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
data = list(reader)
if json_filename:
import json
with open(json_filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return f"已转换为: {json_filename}"
else:
return data
except Exception as e:
return f"转换错误: {e}"
def filter_csv(input_filename, output_filename, filter_func):
"""过滤CSV数据"""
try:
with open(input_filename, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
data = list(reader)
filtered_data = [row for row in data if filter_func(row)]
if filtered_data:
with open(output_filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(filtered_data)
return f"过滤后数据已保存到: {output_filename}"
else:
return "没有符合条件的数据"
except Exception as e:
return f"过滤错误: {e}"
# 测试转换工具
json_result = csv_to_json(dict_filename)
print(f" CSV转JSON结果: {len(json_result)}条记录")
# 过滤高薪员工
def high_salary_filter(row):
try:
return int(row['薪资']) > 10000
except:
return False
filter_result = filter_csv(dict_filename, 'high_salary.csv', high_salary_filter)
print(f" 过滤结果: {filter_result}")
# 清理测试文件
test_files = [filename, dict_filename, custom_filename, special_filename, 'high_salary.csv']
for file in test_files:
try:
import os
if os.path.exists(file):
os.remove(file)
except:
pass
print(" 已清理测试文件")
# 运行csv模块演示
csv_module_demo()
# 五、数学计算模块
# 5.1 math模块 - 数学函数
import math
import cmath # 复数数学函数
def math_module_demo():
"""math模块演示"""
print("=== math模块演示 ===")
# 1. 数学常数
print("\n1. 数学常数:")
constants = {
'π (pi)': math.pi,
'e (自然对数底)': math.e,
'τ (tau = 2π)': math.tau,
'∞ (无穷大)': math.inf,
'NaN (非数字)': math.nan
}
for name, value in constants.items():
print(f" {name}: {value}")
# 2. 基本数学运算
print("\n2. 基本数学运算:")
test_numbers = [16, 25, 100, 2.5, -3.7]
for num in test_numbers:
operations = {
f'sqrt({num})': math.sqrt(abs(num)), # 平方根
f'pow({num}, 2)': math.pow(num, 2), # 幂运算
f'abs({num})': abs(num), # 绝对值
f'ceil({num})': math.ceil(num), # 向上取整
f'floor({num})': math.floor(num), # 向下取整
f'trunc({num})': math.trunc(num) # 截断小数部分
}
print(f"\n 数字 {num}:")
for op, result in operations.items():
print(f" {op} = {result}")
# 3. 三角函数
print("\n3. 三角函数:")
angles_degrees = [0, 30, 45, 60, 90, 180, 270, 360]
print(" 角度(度) | 弧度 | sin | cos | tan")
print(" " + "-" * 50)
for angle_deg in angles_degrees:
angle_rad = math.radians(angle_deg) # 度转弧度
sin_val = math.sin(angle_rad)
cos_val = math.cos(angle_rad)
# 处理tan在90度和270度时的无穷大
try:
tan_val = math.tan(angle_rad)
if abs(tan_val) > 1e10: # 很大的数视为无穷大
tan_str = "∞"
else:
tan_str = f"{tan_val:.6f}"
except:
tan_str = "∞"
print(f" {angle_deg:7d} | {angle_rad:8.4f} | {sin_val:8.4f} | {cos_val:8.4f} | {tan_str}")
# 反三角函数
print("\n 反三角函数示例:")
test_values = [0, 0.5, 0.707, 0.866, 1]
for val in test_values:
try:
asin_deg = math.degrees(math.asin(val))
acos_deg = math.degrees(math.acos(val))
atan_deg = math.degrees(math.atan(val))
print(f" 值 {val}: arcsin={asin_deg:.1f}°, arccos={acos_deg:.1f}°, arctan={atan_deg:.1f}°")
except ValueError as e:
print(f" 值 {val}: 计算错误 - {e}")
# 4. 对数函数
print("\n4. 对数函数:")
log_test_values = [1, 2, 10, 100, math.e, math.pi]
for val in log_test_values:
log_results = {
f'ln({val})': math.log(val), # 自然对数
f'log10({val})': math.log10(val), # 常用对数
f'log2({val})': math.log2(val), # 二进制对数
f'log({val}, 3)': math.log(val, 3) # 以3为底的对数
}
print(f"\n 数值 {val:.4f}:")
for op, result in log_results.items():
print(f" {op} = {result:.6f}")
# 5. 指数函数
print("\n5. 指数函数:")
exp_test_values = [0, 1, 2, -1, 0.5, math.pi]
for val in exp_test_values:
exp_results = {
f'e^{val}': math.exp(val), # e的x次方
f'2^{val}': math.pow(2, val), # 2的x次方
f'10^{val}': math.pow(10, val), # 10的x次方
f'e^{val}-1': math.expm1(val) # e^x - 1 (更精确)
}
print(f"\n 指数 {val}:")
for op, result in exp_results.items():
print(f" {op} = {result:.6f}")
# 6. 双曲函数
print("\n6. 双曲函数:")
hyperbolic_values = [0, 1, 2, -1]
for val in hyperbolic_values:
hyp_results = {
f'sinh({val})': math.sinh(val), # 双曲正弦
f'cosh({val})': math.cosh(val), # 双曲余弦
f'tanh({val})': math.tanh(val) # 双曲正切
}
print(f"\n 值 {val}:")
for op, result in hyp_results.items():
print(f" {op} = {result:.6f}")
# 7. 特殊函数
print("\n7. 特殊函数:")
# 阶乘
factorial_values = [0, 1, 5, 10]
print(" 阶乘函数:")
for val in factorial_values:
result = math.factorial(val)
print(f" {val}! = {result}")
# 最大公约数和最小公倍数
print("\n 最大公约数和最小公倍数:")
number_pairs = [(12, 18), (24, 36), (17, 19), (100, 75)]
for a, b in number_pairs:
gcd_val = math.gcd(a, b)
lcm_val = abs(a * b) // gcd_val # 最小公倍数公式
print(f" gcd({a}, {b}) = {gcd_val}, lcm({a}, {b}) = {lcm_val}")
# 8. 数值判断函数
print("\n8. 数值判断函数:")
test_special_values = [0, 1, -1, math.inf, -math.inf, math.nan, 3.14]
print(" 值 | 有限 | 无穷 | NaN | 整数")
print(" " + "-" * 40)
for val in test_special_values:
is_finite = math.isfinite(val)
is_inf = math.isinf(val)
is_nan = math.isnan(val)
# 检查是否为整数
try:
is_integer = val.is_integer() if isinstance(val, float) else isinstance(val, int)
except:
is_integer = False
print(f" {str(val):8} | {str(is_finite):4} | {str(is_inf):4} | {str(is_nan):4} | {str(is_integer):4}")
# 9. 复数数学函数
print("\n9. 复数数学函数:")
complex_numbers = [1+2j, 3-4j, 5j, -2+3j]
for c in complex_numbers:
complex_results = {
f'abs({c})': abs(c), # 模长
f'phase({c})': cmath.phase(c), # 相位角
f'sqrt({c})': cmath.sqrt(c), # 复数平方根
f'exp({c})': cmath.exp(c), # 复数指数
f'log({c})': cmath.log(c) # 复数对数
}
print(f"\n 复数 {c}:")
for op, result in complex_results.items():
if isinstance(result, complex):
print(f" {op} = {result:.4f}")
else:
print(f" {op} = {result:.6f}")
# 10. 实用数学工具函数
print("\n10. 实用数学工具函数:")
def distance_2d(x1, y1, x2, y2):
"""计算二维平面上两点间距离"""
return math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
def angle_between_vectors(x1, y1, x2, y2):
"""计算两个向量间的夹角(弧度)"""
dot_product = x1 * x2 + y1 * y2
magnitude1 = math.sqrt(x1**2 + y1**2)
magnitude2 = math.sqrt(x2**2 + y2**2)
if magnitude1 == 0 or magnitude2 == 0:
return 0
cos_angle = dot_product / (magnitude1 * magnitude2)
# 处理浮点数精度问题
cos_angle = max(-1, min(1, cos_angle))
return math.acos(cos_angle)
def circle_area_circumference(radius):
"""计算圆的面积和周长"""
area = math.pi * radius**2
circumference = 2 * math.pi * radius
return area, circumference
def sphere_volume_surface(radius):
"""计算球的体积和表面积"""
volume = (4/3) * math.pi * radius**3
surface = 4 * math.pi * radius**2
return volume, surface
# 测试工具函数
print(" 几何计算示例:")
# 两点间距离
dist = distance_2d(0, 0, 3, 4)
print(f" 点(0,0)到点(3,4)的距离: {dist}")
# 向量夹角
angle_rad = angle_between_vectors(1, 0, 0, 1)
angle_deg = math.degrees(angle_rad)
print(f" 向量(1,0)和(0,1)的夹角: {angle_deg}度")
# 圆的计算
radius = 5
area, circumference = circle_area_circumference(radius)
print(f" 半径{radius}的圆: 面积={area:.2f}, 周长={circumference:.2f}")
# 球的计算
volume, surface = sphere_volume_surface(radius)
print(f" 半径{radius}的球: 体积={volume:.2f}, 表面积={surface:.2f}")
# 运行math模块演示
math_module_demo()
# 5.2 random模块 - 随机数生成
import random
import string
from collections import Counter
def random_module_demo():
"""random模块演示"""
print("=== random模块演示 ===")
# 1. 基本随机数生成
print("\n1. 基本随机数生成:")
# 设置随机种子(确保结果可重现)
random.seed(42)
print(f" 设置随机种子: 42")
# 生成随机浮点数
print("\n 随机浮点数 [0.0, 1.0):")
for i in range(5):
print(f" random(): {random.random():.6f}")
# 生成指定范围的随机浮点数
print("\n 指定范围的随机浮点数:")
for i in range(5):
val = random.uniform(1.5, 10.5)
print(f" uniform(1.5, 10.5): {val:.4f}")
# 生成随机整数
print("\n 随机整数:")
for i in range(5):
val = random.randint(1, 100) # 包含两端
print(f" randint(1, 100): {val}")
# 生成随机整数(不包含上界)
print("\n 随机整数(不包含上界):")
for i in range(5):
val = random.randrange(1, 100, 2) # 1到99的奇数
print(f" randrange(1, 100, 2): {val}")
# 2. 序列随机操作
print("\n2. 序列随机操作:")
# 随机选择
colors = ['红色', '绿色', '蓝色', '黄色', '紫色']
print(f" 颜色列表: {colors}")
print("\n 随机选择:")
for i in range(5):
color = random.choice(colors)
print(f" choice(): {color}")
# 带权重的随机选择
weights = [1, 2, 3, 4, 5] # 权重越大,被选中概率越高
print(f"\n 带权重随机选择 (权重: {weights}):")
for i in range(5):
color = random.choices(colors, weights=weights, k=1)[0]
print(f" choices(): {color}")
# 随机抽样(不重复)
print("\n 随机抽样(不重复):")
sample_size = 3
sample = random.sample(colors, sample_size)
print(f" sample({sample_size}): {sample}")
# 随机打乱
numbers = list(range(1, 11))
print(f"\n 原始列表: {numbers}")
random.shuffle(numbers)
print(f" 打乱后: {numbers}")
# 3. 概率分布
print("\n3. 概率分布:")
# 正态分布(高斯分布)
print("\n 正态分布 N(μ=100, σ=15):")
normal_samples = []
for i in range(10):
val = random.normalvariate(100, 15) # 均值100,标准差15
normal_samples.append(val)
print(f" 样本{i+1}: {val:.2f}")
print(f" 样本均值: {sum(normal_samples)/len(normal_samples):.2f}")
# 指数分布
print("\n 指数分布 (λ=1.5):")
for i in range(5):
val = random.expovariate(1.5)
print(f" 样本{i+1}: {val:.4f}")
# 伽马分布
print("\n 伽马分布 (α=2, β=1):")
for i in range(5):
val = random.gammavariate(2, 1)
print(f" 样本{i+1}: {val:.4f}")
# 4. 随机字符串生成
print("\n4. 随机字符串生成:")
def generate_random_string(length, chars=None):
"""生成随机字符串"""
if chars is None:
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(length))
def generate_password(length=12):
"""生成随机密码"""
chars = string.ascii_letters + string.digits + '!@#$%^&*'
# 确保包含各种字符类型
password = [
random.choice(string.ascii_lowercase),
random.choice(string.ascii_uppercase),
random.choice(string.digits),
random.choice('!@#$%^&*')
]
# 填充剩余长度
for _ in range(length - 4):
password.append(random.choice(chars))
# 打乱顺序
random.shuffle(password)
return ''.join(password)
def generate_uuid_like():
"""生成类似UUID的字符串"""
chars = string.hexdigits.lower()
parts = [
''.join(random.choice(chars) for _ in range(8)),
''.join(random.choice(chars) for _ in range(4)),
''.join(random.choice(chars) for _ in range(4)),
''.join(random.choice(chars) for _ in range(4)),
''.join(random.choice(chars) for _ in range(12))
]
return '-'.join(parts)
# 测试字符串生成
print(" 随机字符串示例:")
print(f" 字母数字(8位): {generate_random_string(8)}")
print(f" 纯字母(10位): {generate_random_string(10, string.ascii_letters)}")
print(f" 随机密码: {generate_password()}")
print(f" UUID样式: {generate_uuid_like()}")
# 5. 随机数据生成
print("\n5. 随机数据生成:")
def generate_random_person():
"""生成随机人员信息"""
first_names = ['张', '李', '王', '刘', '陈', '杨', '赵', '黄', '周', '吴']
last_names = ['伟', '芳', '娜', '敏', '静', '丽', '强', '磊', '军', '洋']
cities = ['北京', '上海', '广州', '深圳', '杭州', '南京', '武汉', '成都']
return {
'姓名': random.choice(first_names) + random.choice(last_names),
'年龄': random.randint(18, 65),
'城市': random.choice(cities),
'薪资': random.randint(5000, 50000),
'邮箱': f"{generate_random_string(8).lower()}@example.com"
}
def generate_test_data(count=5):
"""生成测试数据集"""
return [generate_random_person() for _ in range(count)]
# 生成测试数据
test_people = generate_test_data(5)
print(" 随机人员数据:")
for i, person in enumerate(test_people, 1):
print(f" 人员{i}: {person}")
# 6. 随机采样和统计
print("\n6. 随机采样和统计:")
# 模拟投硬币
def simulate_coin_flips(n):
"""模拟投硬币"""
results = [random.choice(['正面', '反面']) for _ in range(n)]
counter = Counter(results)
return counter
# 模拟掷骰子
def simulate_dice_rolls(n, sides=6):
"""模拟掷骰子"""
results = [random.randint(1, sides) for _ in range(n)]
counter = Counter(results)
return counter
# 蒙特卡洛估算π
def estimate_pi(n):
"""用蒙特卡洛方法估算π"""
inside_circle = 0
for _ in range(n):
x = random.uniform(-1, 1)
y = random.uniform(-1, 1)
if x*x + y*y <= 1:
inside_circle += 1
return 4 * inside_circle / n
# 测试模拟
coin_results = simulate_coin_flips(1000)
print(f" 投硬币1000次: {dict(coin_results)}")
dice_results = simulate_dice_rolls(600)
print(f" 掷骰子600次: {dict(dice_results)}")
pi_estimate = estimate_pi(100000)
print(f" 蒙特卡洛估算π (10万次): {pi_estimate:.6f}")
print(f" 真实π值: {3.141592653589793:.6f}")
print(f" 误差: {abs(pi_estimate - 3.141592653589793):.6f}")
# 7. 随机状态管理
print("\n7. 随机状态管理:")
# 保存当前状态
state = random.getstate()
print(" 保存随机状态")
# 生成一些随机数
nums1 = [random.randint(1, 100) for _ in range(5)]
print(f" 第一组随机数: {nums1}")
# 恢复状态
random.setstate(state)
print(" 恢复随机状态")
# 再次生成随机数(应该相同)
nums2 = [random.randint(1, 100) for _ in range(5)]
print(f" 第二组随机数: {nums2}")
print(f" 两组是否相同: {nums1 == nums2}")
# 8. 实用随机工具
print("\n8. 实用随机工具:")
def random_date(start_year=2020, end_year=2024):
"""生成随机日期"""
import datetime
start_date = datetime.date(start_year, 1, 1)
end_date = datetime.date(end_year, 12, 31)
time_between = end_date - start_date
days_between = time_between.days
random_days = random.randrange(days_between)
return start_date + datetime.timedelta(days=random_days)
def weighted_choice(choices, weights):
"""带权重的选择(自定义实现)"""
total = sum(weights)
r = random.uniform(0, total)
upto = 0
for choice, weight in zip(choices, weights):
if upto + weight >= r:
return choice
upto += weight
return choices[-1]
def random_color():
"""生成随机颜色(RGB)"""
return {
'r': random.randint(0, 255),
'g': random.randint(0, 255),
'b': random.randint(0, 255),
'hex': f"#{random.randint(0, 255):02x}{random.randint(0, 255):02x}{random.randint(0, 255):02x}"
}
# 测试工具函数
print(" 实用工具示例:")
print(f" 随机日期: {random_date()}")
choices = ['A', 'B', 'C', 'D']
weights = [1, 2, 3, 4]
print(f" 带权重选择: {weighted_choice(choices, weights)}")
color = random_color()
print(f" 随机颜色: RGB({color['r']}, {color['g']}, {color['b']}) = {color['hex']}")
# 运行random模块演示
random_module_demo()
# 六、网络编程模块
# 6.1 urllib模块 - URL处理
import urllib.request
import urllib.parse
import urllib.error
from urllib.robotparser import RobotFileParser
def urllib_module_demo():
"""urllib模块演示"""
print("=== urllib模块演示 ===")
# 1. URL解析和构建
print("\n1. URL解析和构建:")
# 解析URL
test_urls = [
'https://www.example.com:8080/path/to/page?param1=value1¶m2=value2#section1',
'http://user:pass@localhost:3000/api/data',
'ftp://files.example.com/downloads/file.zip'
]
for url in test_urls:
parsed = urllib.parse.urlparse(url)
print(f"\n URL: {url}")
print(f" 协议: {parsed.scheme}")
print(f" 主机: {parsed.netloc}")
print(f" 路径: {parsed.path}")
print(f" 查询: {parsed.query}")
print(f" 片段: {parsed.fragment}")
# 分解netloc
if parsed.netloc:
netloc_parts = urllib.parse.urlsplit(url)
print(f" 用户名: {netloc_parts.username}")
print(f" 密码: {netloc_parts.password}")
print(f" 主机名: {netloc_parts.hostname}")
print(f" 端口: {netloc_parts.port}")
# 构建URL
print("\n 构建URL:")
url_parts = {
'scheme': 'https',
'netloc': 'api.example.com',
'path': '/v1/users',
'params': '',
'query': 'page=1&limit=10',
'fragment': ''
}
constructed_url = urllib.parse.urlunparse(url_parts.values())
print(f" 构建的URL: {constructed_url}")
# 2. 查询字符串处理
print("\n2. 查询字符串处理:")
# 解析查询字符串
query_string = "name=张三&age=25&city=北京&hobby=编程&hobby=阅读"
parsed_query = urllib.parse.parse_qs(query_string)
print(f" 查询字符串: {query_string}")
print(f" 解析结果: {parsed_query}")
# 构建查询字符串
query_dict = {
'search': '机器学习',
'category': 'technology',
'sort': 'date',
'page': 1
}
encoded_query = urllib.parse.urlencode(query_dict)
print(f"\n 查询字典: {query_dict}")
print(f" 编码结果: {encoded_query}")
# 3. URL编码和解码
print("\n3. URL编码和解码:")
# URL编码
test_strings = [
'你好世界',
'hello world!',
'user@example.com',
'path/to/file with spaces.txt'
]
for text in test_strings:
encoded = urllib.parse.quote(text)
decoded = urllib.parse.unquote(encoded)
print(f" 原文: {text}")
print(f" 编码: {encoded}")
print(f" 解码: {decoded}")
print()
# 4. HTTP请求(基础)
print("\n4. HTTP请求示例:")
def safe_request(url, timeout=10):
"""安全的HTTP请求"""
try:
# 创建请求对象
req = urllib.request.Request(url)
req.add_header('User-Agent', 'Python-urllib/3.x')
# 发送请求
with urllib.request.urlopen(req, timeout=timeout) as response:
# 获取响应信息
info = {
'status': response.getcode(),
'headers': dict(response.headers),
'url': response.geturl(),
'content_length': len(response.read())
}
return info
except urllib.error.HTTPError as e:
return {'error': f'HTTP错误: {e.code} - {e.reason}'}
except urllib.error.URLError as e:
return {'error': f'URL错误: {e.reason}'}
except Exception as e:
return {'error': f'其他错误: {e}'}
# 测试请求(使用公共API)
test_api_url = "https://httpbin.org/get"
print(f" 测试URL: {test_api_url}")
result = safe_request(test_api_url)
if 'error' in result:
print(f" 请求失败: {result['error']}")
else:
print(f" 状态码: {result['status']}")
print(f" 内容长度: {result['content_length']} 字节")
print(f" 部分响应头:")
for key, value in list(result['headers'].items())[:3]:
print(f" {key}: {value}")
# 5. 实用URL工具函数
print("\n5. 实用URL工具函数:")
def is_valid_url(url):
"""检查URL是否有效"""
try:
result = urllib.parse.urlparse(url)
return all([result.scheme, result.netloc])
except:
return False
def join_url(base, path):
"""连接URL路径"""
return urllib.parse.urljoin(base, path)
def extract_domain(url):
"""提取域名"""
try:
parsed = urllib.parse.urlparse(url)
return parsed.netloc.split(':')[0] # 移除端口
except:
return None
def add_query_params(url, params):
"""向URL添加查询参数"""
parsed = urllib.parse.urlparse(url)
query_dict = urllib.parse.parse_qs(parsed.query)
query_dict.update(params)
# 重新构建查询字符串
new_query = urllib.parse.urlencode(query_dict, doseq=True)
# 重新构建URL
new_parsed = parsed._replace(query=new_query)
return urllib.parse.urlunparse(new_parsed)
# 测试工具函数
test_cases = [
'https://www.example.com',
'invalid-url',
'http://localhost:8080/api'
]
print(" URL有效性检查:")
for url in test_cases:
valid = is_valid_url(url)
print(f" {url}: {'有效' if valid else '无效'}")
print("\n URL路径连接:")
base_url = "https://api.example.com/v1"
paths = ["/users", "posts/123", "../admin/settings"]
for path in paths:
joined = join_url(base_url, path)
print(f" {base_url} + {path} = {joined}")
print("\n 域名提取:")
for url in test_urls:
domain = extract_domain(url)
print(f" {url} -> {domain}")
print("\n 添加查询参数:")
original_url = "https://search.example.com?q=python"
new_params = {'page': 2, 'sort': 'date'}
modified_url = add_query_params(original_url, new_params)
print(f" 原URL: {original_url}")
print(f" 新参数: {new_params}")
print(f" 修改后: {modified_url}")
# 运行urllib模块演示
urllib_module_demo()
# 七、文件处理模块
# 7.1 pathlib模块 - 现代路径处理
from pathlib import Path
import os
import tempfile
import shutil
def pathlib_module_demo():
"""pathlib模块演示"""
print("=== pathlib模块演示 ===")
# 1. 路径创建和基本操作
print("\n1. 路径创建和基本操作:")
# 创建路径对象
current_dir = Path.cwd() # 当前工作目录
home_dir = Path.home() # 用户主目录
print(f" 当前目录: {current_dir}")
print(f" 用户主目录: {home_dir}")
# 路径构建
project_path = Path("projects") / "my_app" / "src" / "main.py"
print(f" 构建的路径: {project_path}")
# 绝对路径和相对路径
abs_path = project_path.resolve()
print(f" 绝对路径: {abs_path}")
print(f" 是否绝对路径: {project_path.is_absolute()}")
# 2. 路径属性和信息
print("\n2. 路径属性和信息:")
test_path = Path("documents/reports/annual_report_2024.pdf")
path_info = {
'完整路径': str(test_path),
'文件名': test_path.name,
'文件名(无扩展名)': test_path.stem,
'扩展名': test_path.suffix,
'所有扩展名': test_path.suffixes,
'父目录': test_path.parent,
'所有父目录': list(test_path.parents),
'路径部分': test_path.parts,
'锚点': test_path.anchor
}
for key, value in path_info.items():
print(f" {key}: {value}")
# 3. 路径操作方法
print("\n3. 路径操作方法:")
# 路径连接
base = Path("data")
sub_paths = ["users", "profiles", "user_123.json"]
full_path = base
for part in sub_paths:
full_path = full_path / part
print(f" 路径连接: {full_path}")
# 路径替换
original = Path("backup/2023/data.txt")
new_year = original.with_name("2024").parent / "data.txt"
new_ext = original.with_suffix(".json")
new_stem = original.with_stem("backup_data")
print(f" 原路径: {original}")
print(f" 替换年份: {new_year}")
print(f" 替换扩展名: {new_ext}")
print(f" 替换文件名: {new_stem}")
# 4. 文件系统操作
print("\n4. 文件系统操作:")
# 创建临时目录进行演示
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
print(f" 临时目录: {temp_path}")
# 创建目录结构
project_dir = temp_path / "test_project"
src_dir = project_dir / "src"
docs_dir = project_dir / "docs"
# 创建目录
src_dir.mkdir(parents=True, exist_ok=True)
docs_dir.mkdir(parents=True, exist_ok=True)
print(f" 创建目录: {src_dir}")
print(f" 创建目录: {docs_dir}")
# 创建文件
main_file = src_dir / "main.py"
readme_file = project_dir / "README.md"
main_file.write_text("print('Hello, World!')\n", encoding='utf-8')
readme_file.write_text("# Test Project\n\nThis is a test project.\n", encoding='utf-8')
print(f" 创建文件: {main_file}")
print(f" 创建文件: {readme_file}")
# 文件信息
if main_file.exists():
stat = main_file.stat()
print(f"\n 文件信息 ({main_file.name}):")
print(f" 大小: {stat.st_size} 字节")
print(f" 修改时间: {stat.st_mtime}")
print(f" 是否文件: {main_file.is_file()}")
print(f" 是否目录: {main_file.is_dir()}")
# 读取文件内容
content = main_file.read_text(encoding='utf-8')
print(f"\n 文件内容: {repr(content)}")
# 5. 目录遍历
print("\n5. 目录遍历:")
# 列出目录内容
print(f" 项目目录内容:")
for item in project_dir.iterdir():
item_type = "目录" if item.is_dir() else "文件"
print(f" {item_type}: {item.name}")
# 递归查找文件
print(f"\n 递归查找所有.py文件:")
for py_file in project_dir.rglob("*.py"):
print(f" {py_file.relative_to(project_dir)}")
# 模式匹配
print(f"\n 模式匹配示例:")
test_files = [
"main.py",
"test_main.py",
"utils.py",
"README.md",
"config.json"
]
patterns = ["*.py", "test_*", "*.md"]
for pattern in patterns:
matches = [f for f in test_files if Path(f).match(pattern)]
print(f" 模式 '{pattern}': {matches}")
# 6. 实用路径工具函数
print("\n6. 实用路径工具函数:")
def ensure_directory(path):
"""确保目录存在"""
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
return path
def safe_file_name(name):
"""生成安全的文件名"""
# 移除或替换不安全字符
unsafe_chars = '<>:"/\\|?*'
safe_name = name
for char in unsafe_chars:
safe_name = safe_name.replace(char, '_')
return safe_name
def get_unique_filename(directory, base_name, extension):
"""获取唯一文件名"""
directory = Path(directory)
counter = 0
while True:
if counter == 0:
filename = f"{base_name}{extension}"
else:
filename = f"{base_name}_{counter}{extension}"
full_path = directory / filename
if not full_path.exists():
return full_path
counter += 1
def calculate_directory_size(directory):
"""计算目录大小"""
directory = Path(directory)
total_size = 0
for file_path in directory.rglob('*'):
if file_path.is_file():
total_size += file_path.stat().st_size
return total_size
def find_files_by_extension(directory, extension):
"""按扩展名查找文件"""
directory = Path(directory)
pattern = f"*.{extension.lstrip('.')}"
return list(directory.rglob(pattern))
# 测试工具函数
print(" 路径工具函数测试:")
# 安全文件名
unsafe_name = "report<2024>:data/analysis.txt"
safe_name = safe_file_name(unsafe_name)
print(f" 不安全文件名: {unsafe_name}")
print(f" 安全文件名: {safe_name}")
# 当前目录大小
current_size = calculate_directory_size(Path.cwd())
print(f" 当前目录大小: {current_size:,} 字节")
# 查找Python文件
py_files = find_files_by_extension(Path.cwd(), '.py')
print(f" 找到 {len(py_files)} 个Python文件")
# 7. 跨平台路径处理
print("\n7. 跨平台路径处理:")
# 路径分隔符
print(f" 当前系统路径分隔符: {os.sep}")
print(f" pathlib自动处理分隔符")
# 不同平台的路径示例
unix_style = "home/user/documents/file.txt"
windows_style = "C:\\Users\\User\\Documents\\file.txt"
unix_path = Path(unix_style)
windows_path = Path(windows_style)
print(f" Unix风格路径: {unix_path}")
print(f" Windows风格路径: {windows_path}")
print(f" pathlib统一处理: {unix_path.parts}")
# 运行pathlib模块演示
pathlib_module_demo()
# 7.2 shutil模块 - 高级文件操作
import shutil
import tempfile
from pathlib import Path
import zipfile
import tarfile
def shutil_module_demo():
"""shutil模块演示"""
print("=== shutil模块演示 ===")
# 创建临时目录进行演示
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
print(f"临时目录: {temp_path}")
# 1. 文件复制操作
print("\n1. 文件复制操作:")
# 创建源文件
source_file = temp_path / "source.txt"
source_file.write_text("这是源文件的内容\n包含多行文本\n用于测试复制功能", encoding='utf-8')
# 复制文件(保留元数据)
dest1 = temp_path / "copy1.txt"
shutil.copy2(source_file, dest1)
print(f" copy2复制: {source_file} -> {dest1}")
# 复制文件(不保留元数据)
dest2 = temp_path / "copy2.txt"
shutil.copy(source_file, dest2)
print(f" copy复制: {source_file} -> {dest2}")
# 复制文件内容(不复制权限)
dest3 = temp_path / "copy3.txt"
shutil.copyfile(source_file, dest3)
print(f" copyfile复制: {source_file} -> {dest3}")
# 验证复制结果
for dest in [dest1, dest2, dest3]:
if dest.exists():
size = dest.stat().st_size
print(f" {dest.name}: {size} 字节")
# 2. 目录复制操作
print("\n2. 目录复制操作:")
# 创建源目录结构
source_dir = temp_path / "source_project"
(source_dir / "src").mkdir(parents=True)
(source_dir / "docs").mkdir(parents=True)
(source_dir / "tests").mkdir(parents=True)
# 创建一些文件
(source_dir / "README.md").write_text("# 项目说明", encoding='utf-8')
(source_dir / "src" / "main.py").write_text("print('Hello')", encoding='utf-8')
(source_dir / "tests" / "test_main.py").write_text("# 测试文件", encoding='utf-8')
print(f" 创建源目录: {source_dir}")
# 复制整个目录树
dest_dir = temp_path / "copied_project"
shutil.copytree(source_dir, dest_dir)
print(f" copytree复制: {source_dir} -> {dest_dir}")
# 验证目录复制
def count_files(directory):
return len(list(Path(directory).rglob('*')))
source_count = count_files(source_dir)
dest_count = count_files(dest_dir)
print(f" 源目录文件数: {source_count}")
print(f" 目标目录文件数: {dest_count}")
# 3. 文件移动操作
print("\n3. 文件移动操作:")
# 创建要移动的文件
move_source = temp_path / "to_move.txt"
move_source.write_text("要移动的文件", encoding='utf-8')
# 移动文件
move_dest = temp_path / "moved" / "moved_file.txt"
move_dest.parent.mkdir(exist_ok=True)
shutil.move(move_source, move_dest)
print(f" 移动文件: {move_source} -> {move_dest}")
print(f" 源文件存在: {move_source.exists()}")
print(f" 目标文件存在: {move_dest.exists()}")
# 4. 文件删除操作
print("\n4. 文件删除操作:")
# 创建要删除的目录
delete_dir = temp_path / "to_delete"
delete_dir.mkdir()
(delete_dir / "file1.txt").write_text("文件1", encoding='utf-8')
(delete_dir / "file2.txt").write_text("文件2", encoding='utf-8')
print(f" 创建目录: {delete_dir}")
print(f" 目录存在: {delete_dir.exists()}")
# 删除整个目录树
shutil.rmtree(delete_dir)
print(f" 删除目录: {delete_dir}")
print(f" 目录存在: {delete_dir.exists()}")
# 5. 磁盘使用情况
print("\n5. 磁盘使用情况:")
# 获取磁盘使用情况
disk_usage = shutil.disk_usage(temp_path)
def format_bytes(bytes_value):
"""格式化字节数"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_value < 1024.0:
return f"{bytes_value:.2f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.2f} PB"
print(f" 总空间: {format_bytes(disk_usage.total)}")
print(f" 已使用: {format_bytes(disk_usage.used)}")
print(f" 可用空间: {format_bytes(disk_usage.free)}")
print(f" 使用率: {(disk_usage.used / disk_usage.total * 100):.1f}%")
# 6. 文件查找和过滤
print("\n6. 文件查找和过滤:")
def find_files_by_size(directory, min_size=0, max_size=float('inf')):
"""按大小查找文件"""
found_files = []
for file_path in Path(directory).rglob('*'):
if file_path.is_file():
size = file_path.stat().st_size
if min_size <= size <= max_size:
found_files.append((file_path, size))
return found_files
def find_large_files(directory, threshold_mb=1):
"""查找大文件"""
threshold_bytes = threshold_mb * 1024 * 1024
return find_files_by_size(directory, min_size=threshold_bytes)
def find_empty_files(directory):
"""查找空文件"""
return find_files_by_size(directory, max_size=0)
# 创建不同大小的测试文件
test_files_dir = temp_path / "test_files"
test_files_dir.mkdir()
(test_files_dir / "empty.txt").write_text("", encoding='utf-8')
(test_files_dir / "small.txt").write_text("小文件", encoding='utf-8')
(test_files_dir / "medium.txt").write_text("中等文件" * 100, encoding='utf-8')
# 查找文件
all_files = find_files_by_size(test_files_dir)
empty_files = find_empty_files(test_files_dir)
print(f" 测试目录中的所有文件:")
for file_path, size in all_files:
print(f" {file_path.name}: {size} 字节")
print(f" 空文件: {len(empty_files)} 个")
# 7. 压缩和解压缩
print("\n7. 压缩和解压缩:")
# 创建要压缩的目录
archive_source = temp_path / "to_archive"
archive_source.mkdir()
for i in range(3):
file_path = archive_source / f"file_{i}.txt"
file_path.write_text(f"这是文件 {i} 的内容\n" * 10, encoding='utf-8')
# 创建ZIP压缩包
zip_path = temp_path / "archive.zip"
shutil.make_archive(str(zip_path.with_suffix('')), 'zip', archive_source)
print(f" 创建ZIP压缩包: {zip_path}")
# 创建TAR压缩包
tar_path = temp_path / "archive.tar.gz"
shutil.make_archive(str(tar_path.with_suffix('').with_suffix('')), 'gztar', archive_source)
print(f" 创建TAR压缩包: {tar_path}")
# 解压缩
extract_dir = temp_path / "extracted"
extract_dir.mkdir()
shutil.unpack_archive(zip_path, extract_dir / "from_zip")
print(f" 解压ZIP到: {extract_dir / 'from_zip'}")
# 验证解压结果
extracted_files = list((extract_dir / "from_zip").rglob('*'))
print(f" 解压后文件数: {len(extracted_files)}")
# 8. 实用文件操作工具
print("\n8. 实用文件操作工具:")
def backup_file(file_path, backup_dir=None):
"""备份文件"""
file_path = Path(file_path)
if not file_path.exists():
return None
if backup_dir is None:
backup_dir = file_path.parent / "backup"
else:
backup_dir = Path(backup_dir)
backup_dir.mkdir(exist_ok=True)
# 生成备份文件名(包含时间戳)
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"{file_path.stem}_{timestamp}{file_path.suffix}"
backup_path = backup_dir / backup_name
shutil.copy2(file_path, backup_path)
return backup_path
def sync_directories(source, destination, delete_extra=False):
"""同步目录"""
source = Path(source)
destination = Path(destination)
if not source.exists():
return False
# 确保目标目录存在
destination.mkdir(parents=True, exist_ok=True)
# 复制新文件和更新的文件
for src_file in source.rglob('*'):
if src_file.is_file():
rel_path = src_file.relative_to(source)
dst_file = destination / rel_path
# 确保目标目录存在
dst_file.parent.mkdir(parents=True, exist_ok=True)
# 检查是否需要复制
need_copy = True
if dst_file.exists():
src_stat = src_file.stat()
dst_stat = dst_file.stat()
# 比较修改时间和大小
if (src_stat.st_mtime <= dst_stat.st_mtime and
src_stat.st_size == dst_stat.st_size):
need_copy = False
if need_copy:
shutil.copy2(src_file, dst_file)
return True
# 测试工具函数
test_file = temp_path / "test_backup.txt"
test_file.write_text("测试备份功能", encoding='utf-8')
backup_path = backup_file(test_file)
print(f" 备份文件: {test_file} -> {backup_path}")
# 测试目录同步
sync_source = temp_path / "sync_source"
sync_dest = temp_path / "sync_dest"
sync_source.mkdir()
(sync_source / "sync_test.txt").write_text("同步测试", encoding='utf-8')
sync_result = sync_directories(sync_source, sync_dest)
print(f" 目录同步结果: {sync_result}")
print(f" 同步后目标目录存在: {(sync_dest / 'sync_test.txt').exists()}")
# 运行shutil模块演示
shutil_module_demo()
# 八、实用工具模块
# 8.1 collections模块 - 特殊容器
from collections import (
Counter, defaultdict, OrderedDict, deque,
namedtuple, ChainMap, UserDict, UserList
)
import heapq
def collections_module_demo():
"""collections模块演示"""
print("=== collections模块演示 ===")
# 1. Counter - 计数器
print("\n1. Counter - 计数器:")
# 基本计数
text = "hello world python programming"
char_count = Counter(text)
word_count = Counter(text.split())
print(f" 文本: {text}")
print(f" 字符计数: {dict(char_count.most_common(5))}")
print(f" 单词计数: {dict(word_count)}")
# 列表计数
numbers = [1, 2, 3, 2, 1, 3, 1, 4, 5, 1]
num_count = Counter(numbers)
print(f"\n 数字列表: {numbers}")
print(f" 数字计数: {dict(num_count)}")
print(f" 最常见的3个: {num_count.most_common(3)}")
print(f" 总计数: {sum(num_count.values())}")
# Counter运算
counter1 = Counter(['a', 'b', 'c', 'a', 'b'])
counter2 = Counter(['a', 'b', 'b', 'd'])
print(f"\n Counter1: {dict(counter1)}")
print(f" Counter2: {dict(counter2)}")
print(f" 相加: {dict(counter1 + counter2)}")
print(f" 相减: {dict(counter1 - counter2)}")
print(f" 交集: {dict(counter1 & counter2)}")
print(f" 并集: {dict(counter1 | counter2)}")
# 2. defaultdict - 默认字典
print("\n2. defaultdict - 默认字典:")
# 基本使用
dd_list = defaultdict(list)
dd_int = defaultdict(int)
dd_set = defaultdict(set)
# 分组数据
students = [
('张三', '数学', 95),
('李四', '数学', 87),
('张三', '英语', 92),
('王五', '数学', 78),
('李四', '英语', 89)
]
# 按学科分组
subject_scores = defaultdict(list)
# 按学生分组
student_scores = defaultdict(dict)
for name, subject, score in students:
subject_scores[subject].append((name, score))
student_scores[name][subject] = score
print(f" 按学科分组:")
for subject, scores in subject_scores.items():
print(f" {subject}: {scores}")
print(f"\n 按学生分组:")
for name, scores in student_scores.items():
print(f" {name}: {dict(scores)}")
# 计数应用
word_positions = defaultdict(list)
sentence = "the quick brown fox jumps over the lazy dog"
for i, word in enumerate(sentence.split()):
word_positions[word].append(i)
print(f"\n 单词位置索引:")
for word, positions in word_positions.items():
print(f" '{word}': {positions}")
# 3. OrderedDict - 有序字典
print("\n3. OrderedDict - 有序字典:")
# 创建有序字典
od = OrderedDict()
od['first'] = 1
od['second'] = 2
od['third'] = 3
print(f" 有序字典: {list(od.items())}")
# 移动到末尾
od.move_to_end('first')
print(f" 移动'first'到末尾: {list(od.items())}")
# 移动到开头
od.move_to_end('third', last=False)
print(f" 移动'third'到开头: {list(od.items())}")
# LRU缓存实现
class LRUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
if key in self.cache:
# 移动到末尾(最近使用)
self.cache.move_to_end(key)
return self.cache[key]
return None
def put(self, key, value):
if key in self.cache:
# 更新并移动到末尾
self.cache[key] = value
self.cache.move_to_end(key)
else:
# 检查容量
if len(self.cache) >= self.capacity:
# 删除最久未使用的(第一个)
self.cache.popitem(last=False)
self.cache[key] = value
def items(self):
return list(self.cache.items())
# 测试LRU缓存
lru = LRUCache(3)
operations = [
('put', 'a', 1),
('put', 'b', 2),
('put', 'c', 3),
('get', 'a', None),
('put', 'd', 4), # 应该删除'b'
('get', 'b', None),
('items', None, None)
]
print(f"\n LRU缓存测试:")
for op, key, value in operations:
if op == 'put':
lru.put(key, value)
print(f" put({key}, {value}): {lru.items()}")
elif op == 'get':
result = lru.get(key)
print(f" get({key}): {result}, cache: {lru.items()}")
elif op == 'items':
print(f" 最终缓存: {lru.items()}")
# 4. deque - 双端队列
print("\n4. deque - 双端队列:")
# 基本操作
dq = deque(['a', 'b', 'c'])
print(f" 初始队列: {list(dq)}")
# 两端添加
dq.appendleft('left')
dq.append('right')
print(f" 两端添加后: {list(dq)}")
# 两端删除
left_item = dq.popleft()
right_item = dq.pop()
print(f" 删除的元素: 左={left_item}, 右={right_item}")
print(f" 删除后: {list(dq)}")
# 旋转
dq.rotate(1) # 向右旋转
print(f" 向右旋转1位: {list(dq)}")
dq.rotate(-2) # 向左旋转
print(f" 向左旋转2位: {list(dq)}")
# 限制长度的队列
limited_dq = deque(maxlen=3)
for i in range(5):
limited_dq.append(i)
print(f" 添加{i}: {list(limited_dq)}")
# 滑动窗口应用
def moving_average(data, window_size):
"""计算移动平均值"""
window = deque(maxlen=window_size)
averages = []
for value in data:
window.append(value)
if len(window) == window_size:
avg = sum(window) / window_size
averages.append(avg)
return averages
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
window_size = 3
averages = moving_average(data, window_size)
print(f"\n 数据: {data}")
print(f" 窗口大小: {window_size}")
print(f" 移动平均: {averages}")
# 5. namedtuple - 命名元组
print("\n5. namedtuple - 命名元组:")
# 创建命名元组类
Point = namedtuple('Point', ['x', 'y'])
Person = namedtuple('Person', 'name age city')
# 创建实例
p1 = Point(1, 2)
p2 = Point(x=3, y=4)
person1 = Person('张三', 25, '北京')
person2 = Person(name='李四', age=30, city='上海')
print(f" 点1: {p1}, x={p1.x}, y={p1.y}")
print(f" 点2: {p2}, x={p2.x}, y={p2.y}")
print(f" 人员1: {person1}")
print(f" 人员2: {person2}")
# 命名元组方法
print(f"\n 命名元组方法:")
print(f" 字段名: {Person._fields}")
print(f" 转为字典: {person1._asdict()}")
# 替换字段
person1_updated = person1._replace(age=26)
print(f" 替换年龄: {person1_updated}")
# 从可迭代对象创建
data = ['王五', 28, '广州']
person3 = Person._make(data)
print(f" 从列表创建: {person3}")
# 6. ChainMap - 链式映射
print("\n6. ChainMap - 链式映射:")
# 创建多个字典
dict1 = {'a': 1, 'b': 2}
dict2 = {'b': 3, 'c': 4}
dict3 = {'c': 5, 'd': 6}
# 创建链式映射
cm = ChainMap(dict1, dict2, dict3)
print(f" 字典1: {dict1}")
print(f" 字典2: {dict2}")
print(f" 字典3: {dict3}")
print(f" 链式映射: {dict(cm)}")
# 查找顺序(从第一个开始)
print(f"\n 查找顺序测试:")
for key in ['a', 'b', 'c', 'd', 'e']:
value = cm.get(key, 'Not Found')
print(f" {key}: {value}")
# 添加新映射
dict4 = {'e': 7, 'f': 8}
cm = cm.new_child(dict4)
print(f"\n 添加新映射后: {dict(cm)}")
# 配置管理应用
default_config = {
'host': 'localhost',
'port': 8080,
'debug': False,
'timeout': 30
}
user_config = {
'host': '192.168.1.100',
'debug': True
}
env_config = {
'port': 9000
}
# 配置优先级: 环境变量 > 用户配置 > 默认配置
config = ChainMap(env_config, user_config, default_config)
print(f"\n 配置管理示例:")
print(f" 默认配置: {default_config}")
print(f" 用户配置: {user_config}")
print(f" 环境配置: {env_config}")
print(f" 最终配置: {dict(config)}")
# 运行collections模块演示
collections_module_demo()
# 8.2 itertools模块 - 迭代器工具
import itertools
from itertools import (
count, cycle, repeat, chain, compress, dropwhile,
filterfalse, groupby, islice, starmap, takewhile,
zip_longest, product, permutations, combinations,
combinations_with_replacement, accumulate
)
def itertools_module_demo():
"""itertools模块演示"""
print("=== itertools模块演示 ===")
# 1. 无限迭代器
print("\n1. 无限迭代器:")
# count - 计数器
print(" count - 计数器:")
counter = count(10, 2) # 从10开始,步长为2
count_result = [next(counter) for _ in range(5)]
print(f" 从10开始步长2: {count_result}")
# cycle - 循环
print("\n cycle - 循环:")
colors = cycle(['red', 'green', 'blue'])
cycle_result = [next(colors) for _ in range(8)]
print(f" 循环颜色: {cycle_result}")
# repeat - 重复
print("\n repeat - 重复:")
repeat_result = list(repeat('hello', 3))
print(f" 重复'hello'3次: {repeat_result}")
# 2. 终止迭代器
print("\n2. 终止迭代器:")
# chain - 链接
print(" chain - 链接:")
list1 = [1, 2, 3]
list2 = ['a', 'b', 'c']
list3 = [10, 20]
chained = list(chain(list1, list2, list3))
print(f" 链接列表: {chained}")
# compress - 压缩过滤
print("\n compress - 压缩过滤:")
data = ['a', 'b', 'c', 'd', 'e']
selectors = [1, 0, 1, 0, 1]
compressed = list(compress(data, selectors))
print(f" 数据: {data}")
print(f" 选择器: {selectors}")
print(f" 压缩结果: {compressed}")
# dropwhile - 丢弃直到条件为假
print("\n dropwhile - 丢弃直到条件为假:")
numbers = [1, 3, 5, 2, 4, 6, 7, 8]
dropped = list(dropwhile(lambda x: x % 2 == 1, numbers))
print(f" 原数据: {numbers}")
print(f" 丢弃奇数直到遇到偶数: {dropped}")
# takewhile - 取值直到条件为假
print("\n takewhile - 取值直到条件为假:")
taken = list(takewhile(lambda x: x < 5, numbers))
print(f" 取值直到>=5: {taken}")
# filterfalse - 过滤假值
print("\n filterfalse - 过滤假值:")
mixed_data = [0, 1, '', 'hello', [], [1, 2], None, 42]
filtered = list(filterfalse(bool, mixed_data))
print(f" 原数据: {mixed_data}")
print(f" 假值: {filtered}")
# islice - 切片
print("\n islice - 切片:")
data = range(20)
slice1 = list(islice(data, 5)) # 前5个
slice2 = list(islice(data, 5, 10)) # 5到10
slice3 = list(islice(data, 0, 20, 3)) # 步长为3
print(f" 前5个: {slice1}")
print(f" 5到10: {slice2}")
print(f" 步长3: {slice3}")
# groupby - 分组
print("\n groupby - 分组:")
# 按值分组
data = [1, 1, 2, 2, 2, 3, 1, 1]
groups = [(k, list(g)) for k, g in groupby(data)]
print(f" 按值分组: {groups}")
# 按条件分组
words = ['apple', 'banana', 'cherry', 'date', 'elderberry']
by_length = [(k, list(g)) for k, g in groupby(words, key=len)]
print(f" 按长度分组: {by_length}")
# 学生成绩分组
students = [
('张三', 'A'),
('李四', 'B'),
('王五', 'A'),
('赵六', 'B'),
('钱七', 'A')
]
# 按成绩分组(需要先排序)
students_sorted = sorted(students, key=lambda x: x[1])
grade_groups = {k: [name for name, grade in g]
for k, g in groupby(students_sorted, key=lambda x: x[1])}
print(f" 学生按成绩分组: {grade_groups}")
# 3. 组合迭代器
print("\n3. 组合迭代器:")
# product - 笛卡尔积
print(" product - 笛卡尔积:")
colors = ['red', 'blue']
sizes = ['S', 'M', 'L']
products = list(product(colors, sizes))
print(f" 颜色×尺寸: {products}")
# 自身笛卡尔积
coords = list(product(range(3), repeat=2))
print(f" 坐标组合: {coords}")
# permutations - 排列
print("\n permutations - 排列:")
letters = ['A', 'B', 'C']
perms2 = list(permutations(letters, 2))
perms3 = list(permutations(letters))
print(f" 2个字母排列: {perms2}")
print(f" 3个字母排列: {perms3}")
# combinations - 组合
print("\n combinations - 组合:")
numbers = [1, 2, 3, 4]
combs2 = list(combinations(numbers, 2))
combs3 = list(combinations(numbers, 3))
print(f" 2个数字组合: {combs2}")
print(f" 3个数字组合: {combs3}")
# combinations_with_replacement - 可重复组合
print("\n combinations_with_replacement - 可重复组合:")
combs_rep = list(combinations_with_replacement([1, 2, 3], 2))
print(f" 可重复2组合: {combs_rep}")
# 4. 实用应用示例
print("\n4. 实用应用示例:")
# 分批处理
def batch_process(iterable, batch_size):
"""分批处理数据"""
iterator = iter(iterable)
while True:
batch = list(islice(iterator, batch_size))
if not batch:
break
yield batch
data = range(23)
batches = list(batch_process(data, 5))
print(f" 分批处理(每批5个): {batches}")
# 滑动窗口
def sliding_window(iterable, window_size):
"""滑动窗口"""
iterators = [islice(iterable, i, None) for i in range(window_size)]
return zip(*iterators)
data = [1, 2, 3, 4, 5, 6, 7]
windows = list(sliding_window(data, 3))
print(f" 滑动窗口(大小3): {windows}")
# 扁平化嵌套列表
def flatten(nested_list):
"""扁平化嵌套列表"""
return chain.from_iterable(nested_list)
nested = [[1, 2], [3, 4, 5], [6], [7, 8, 9]]
flattened = list(flatten(nested))
print(f" 嵌套列表: {nested}")
print(f" 扁平化: {flattened}")
# 累积计算
print("\n accumulate - 累积计算:")
numbers = [1, 2, 3, 4, 5]
# 累积和
cumsum = list(accumulate(numbers))
print(f" 累积和: {cumsum}")
# 累积乘积
import operator
cumproduct = list(accumulate(numbers, operator.mul))
print(f" 累积乘积: {cumproduct}")
# 累积最大值
data = [3, 1, 4, 1, 5, 9, 2, 6]
cummax = list(accumulate(data, max))
print(f" 数据: {data}")
print(f" 累积最大值: {cummax}")
# 5. 性能优化示例
print("\n5. 性能优化示例:")
# 使用itertools优化内存
def memory_efficient_processing(data):
"""内存高效的数据处理"""
# 只处理偶数,取前10个,每个乘以2
result = islice(
map(lambda x: x * 2,
filter(lambda x: x % 2 == 0, data)
),
10
)
return list(result)
large_data = range(1000000) # 模拟大数据
processed = memory_efficient_processing(large_data)
print(f" 处理大数据前10个偶数×2: {processed}")
# 生成测试数据
def generate_test_data():
"""生成测试数据"""
# 生成用户ID和随机分数的组合
user_ids = cycle(['user1', 'user2', 'user3'])
scores = cycle([85, 92, 78, 95, 88])
for i, (user_id, score) in enumerate(zip(user_ids, scores)):
if i >= 10: # 只生成10条
break
yield f"{user_id}_{i//3}", score
test_data = list(generate_test_data())
print(f" 生成的测试数据: {test_data}")
# 运行itertools模块演示
itertools_module_demo()
# 九、总结与最佳实践
# 9.1 模块选择指南
def module_selection_guide():
"""模块选择指南"""
print("=== Python内建模块选择指南 ===")
guide = {
"系统交互": {
"文件操作": ["os", "pathlib", "shutil"],
"系统信息": ["sys", "platform"],
"环境变量": ["os.environ"]
},
"时间处理": {
"日期时间": ["datetime"],
"时间戳": ["time"],
"性能测量": ["time.perf_counter"]
},
"数据处理": {
"JSON": ["json"],
"CSV": ["csv"],
"配置文件": ["configparser"]
},
"数学计算": {
"基础数学": ["math"],
"随机数": ["random"],
"统计": ["statistics"]
},
"网络编程": {
"URL处理": ["urllib.parse"],
"HTTP请求": ["urllib.request"],
"网络工具": ["socket"]
},
"数据结构": {
"特殊容器": ["collections"],
"堆队列": ["heapq"],
"双端队列": ["collections.deque"]
},
"迭代工具": {
"迭代器": ["itertools"],
"函数式编程": ["functools"],
"操作符": ["operator"]
}
}
for category, subcategories in guide.items():
print(f"\n{category}:")
for task, modules in subcategories.items():
print(f" {task}: {', '.join(modules)}")
# 运行模块选择指南
module_selection_guide()
# 9.2 最佳实践
def best_practices_demo():
"""最佳实践演示"""
print("=== Python内建模块最佳实践 ===")
print("\n1. 导入最佳实践:")
print("""
# ✅ 推荐的导入方式
import os
import sys
from pathlib import Path
from collections import defaultdict, Counter
from datetime import datetime, timedelta
# ❌ 避免的导入方式
from os import * # 污染命名空间
import datetime as dt # 不必要的别名
""")
print("\n2. 错误处理最佳实践:")
def safe_file_operation(filename):
"""安全的文件操作"""
try:
with open(filename, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
print(f"文件 {filename} 不存在")
return None
except PermissionError:
print(f"没有权限访问文件 {filename}")
return None
except UnicodeDecodeError:
print(f"文件 {filename} 编码错误")
return None
except Exception as e:
print(f"读取文件时发生未知错误: {e}")
return None
def safe_json_operation(data):
"""安全的JSON操作"""
import json
try:
return json.dumps(data, ensure_ascii=False, indent=2)
except TypeError as e:
print(f"JSON序列化错误: {e}")
return None
print(" ✅ 具体异常处理")
print(" ✅ 资源自动清理(with语句)")
print(" ✅ 编码明确指定")
print("\n3. 性能优化最佳实践:")
# 使用生成器而不是列表
def process_large_file_good(filename):
"""内存友好的文件处理"""
try:
with open(filename, 'r', encoding='utf-8') as f:
for line in f: # 逐行读取,不加载整个文件
yield line.strip()
except FileNotFoundError:
return
def process_large_file_bad(filename):
"""内存不友好的文件处理"""
try:
with open(filename, 'r', encoding='utf-8') as f:
return f.readlines() # 一次性加载所有行
except FileNotFoundError:
return []
# 使用适当的数据结构
from collections import defaultdict, Counter
def count_words_good(text):
"""高效的单词计数"""
return Counter(text.split())
def count_words_bad(text):
"""低效的单词计数"""
word_count = {}
for word in text.split():
if word in word_count:
word_count[word] += 1
else:
word_count[word] = 1
return word_count
print(" ✅ 使用生成器处理大数据")
print(" ✅ 选择合适的数据结构")
print(" ✅ 避免重复计算")
print("\n4. 代码组织最佳实践:")
class ConfigManager:
"""配置管理器"""
def __init__(self, config_file=None):
self.config = {}
if config_file:
self.load_config(config_file)
def load_config(self, config_file):
"""加载配置文件"""
import json
from pathlib import Path
config_path = Path(config_file)
if not config_path.exists():
raise FileNotFoundError(f"配置文件不存在: {config_file}")
try:
with open(config_path, 'r', encoding='utf-8') as f:
self.config = json.load(f)
except json.JSONDecodeError as e:
raise ValueError(f"配置文件格式错误: {e}")
def get(self, key, default=None):
"""获取配置值"""
return self.config.get(key, default)
def set(self, key, value):
"""设置配置值"""
self.config[key] = value
def save_config(self, config_file):
"""保存配置文件"""
import json
from pathlib import Path
config_path = Path(config_file)
config_path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, ensure_ascii=False, indent=2)
except Exception as e:
raise IOError(f"保存配置文件失败: {e}")
print(" ✅ 类封装相关功能")
print(" ✅ 明确的方法职责")
print(" ✅ 完善的错误处理")
print("\n5. 测试友好的代码:")
def calculate_file_stats(file_path):
"""计算文件统计信息"""
from pathlib import Path
path = Path(file_path)
if not path.exists():
return None
stat = path.stat()
return {
'size': stat.st_size,
'modified': stat.st_mtime,
'is_file': path.is_file(),
'extension': path.suffix
}
def process_data_with_logging(data, logger=None):
"""带日志的数据处理"""
if logger is None:
import logging
logger = logging.getLogger(__name__)
logger.info(f"开始处理 {len(data)} 条数据")
processed = []
for item in data:
try:
# 处理逻辑
result = item * 2 # 示例处理
processed.append(result)
except Exception as e:
logger.error(f"处理数据项失败: {item}, 错误: {e}")
logger.info(f"处理完成,成功 {len(processed)} 条")
return processed
print(" ✅ 函数职责单一")
print(" ✅ 依赖注入(如logger)")
print(" ✅ 返回值明确")
# 运行最佳实践演示
best_practices_demo()
# 9.3 常见陷阱和解决方案
def common_pitfalls_demo():
"""常见陷阱和解决方案"""
print("=== 常见陷阱和解决方案 ===")
print("\n1. 时间处理陷阱:")
# ❌ 错误:使用可变默认参数
def bad_timestamp_function(timestamp=None):
from datetime import datetime
if timestamp is None:
timestamp = datetime.now() # 每次调用都会重新计算
return timestamp
# ✅ 正确:避免可变默认参数
def good_timestamp_function(timestamp=None):
from datetime import datetime
if timestamp is None:
timestamp = datetime.now()
return timestamp
print(" ✅ 避免在默认参数中使用可变对象")
# 时区处理
from datetime import datetime, timezone, timedelta
# ❌ 错误:忽略时区
naive_time = datetime.now()
# ✅ 正确:明确时区
aware_time = datetime.now(timezone.utc)
local_time = datetime.now(timezone(timedelta(hours=8))) # 北京时间
print(" ✅ 明确处理时区信息")
print("\n2. 文件处理陷阱:")
# ❌ 错误:不处理编码
def bad_file_read(filename):
try:
with open(filename, 'r') as f: # 使用系统默认编码
return f.read()
except:
return None
# ✅ 正确:明确指定编码
def good_file_read(filename):
try:
with open(filename, 'r', encoding='utf-8') as f:
return f.read()
except UnicodeDecodeError:
# 尝试其他编码
try:
with open(filename, 'r', encoding='gbk') as f:
return f.read()
except UnicodeDecodeError:
return None
except FileNotFoundError:
return None
print(" ✅ 明确指定文件编码")
print(" ✅ 处理编码错误")
print("\n3. JSON处理陷阱:")
import json
from datetime import datetime
from decimal import Decimal
# ❌ 错误:不处理特殊类型
def bad_json_serialize(data):
return json.dumps(data) # datetime等类型会报错
# ✅ 正确:自定义序列化
def good_json_serialize(data):
def json_serializer(obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, Decimal):
return float(obj)
elif hasattr(obj, '__dict__'):
return obj.__dict__
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
return json.dumps(data, default=json_serializer, ensure_ascii=False, indent=2)
# 测试数据
test_data = {
'name': '张三',
'timestamp': datetime.now(),
'price': Decimal('99.99')
}
try:
result = good_json_serialize(test_data)
print(f" ✅ 成功序列化: {result[:50]}...")
except Exception as e:
print(f" ❌ 序列化失败: {e}")
print("\n4. 路径处理陷阱:")
import os
from pathlib import Path
# ❌ 错误:硬编码路径分隔符
def bad_path_join(base, *parts):
return base + '/' + '/'.join(parts) # 在Windows上会有问题
# ✅ 正确:使用pathlib或os.path
def good_path_join(base, *parts):
return str(Path(base).joinpath(*parts))
# 或者使用os.path
def good_path_join_os(base, *parts):
return os.path.join(base, *parts)
print(" ✅ 使用pathlib或os.path处理路径")
print(" ✅ 避免硬编码路径分隔符")
print("\n5. 迭代器陷阱:")
# ❌ 错误:重复使用迭代器
def bad_iterator_usage():
data = iter([1, 2, 3, 4, 5])
first_sum = sum(data) # 消耗了迭代器
second_sum = sum(data) # 迭代器已空,结果为0
return first_sum, second_sum
# ✅ 正确:重新创建迭代器或使用列表
def good_iterator_usage():
data = [1, 2, 3, 4, 5] # 使用列表
first_sum = sum(data)
second_sum = sum(data)
return first_sum, second_sum
bad_result = bad_iterator_usage()
good_result = good_iterator_usage()
print(f" ❌ 错误用法结果: {bad_result}")
print(f" ✅ 正确用法结果: {good_result}")
print("\n6. 内存使用陷阱:")
# ❌ 错误:一次性加载大量数据
def bad_large_data_processing(filename):
with open(filename, 'r', encoding='utf-8') as f:
lines = f.readlines() # 全部加载到内存
processed = []
for line in lines:
if line.strip(): # 处理非空行
processed.append(line.strip().upper())
return processed
# ✅ 正确:流式处理
def good_large_data_processing(filename):
def process_lines():
with open(filename, 'r', encoding='utf-8') as f:
for line in f: # 逐行处理
if line.strip():
yield line.strip().upper()
return list(process_lines()) # 只在需要时转换为列表
print(" ✅ 使用生成器处理大数据")
print(" ✅ 避免一次性加载大量数据")
# 运行常见陷阱演示
common_pitfalls_demo()
# 9.4 学习建议
def learning_suggestions():
"""学习建议"""
print("=== Python内建模块学习建议 ===")
suggestions = {
"初学者阶段": {
"必学模块": ["os", "sys", "datetime", "json", "math", "random"],
"学习重点": [
"掌握基本的文件操作",
"理解时间日期处理",
"学会JSON数据处理",
"熟悉数学和随机数操作"
],
"实践项目": [
"文件管理工具",
"日志分析器",
"数据转换工具",
"简单的配置管理"
]
},
"进阶阶段": {
"必学模块": ["pathlib", "collections", "itertools", "functools", "urllib"],
"学习重点": [
"现代化的路径处理",
"高效的数据结构使用",
"迭代器和生成器优化",
"网络编程基础"
],
"实践项目": [
"数据处理管道",
"网络爬虫",
"性能监控工具",
"批处理系统"
]
},
"高级阶段": {
"必学模块": ["asyncio", "multiprocessing", "threading", "logging", "unittest"],
"学习重点": [
"异步编程模式",
"并发和并行处理",
"完善的日志系统",
"测试驱动开发"
],
"实践项目": [
"高并发服务器",
"分布式系统",
"微服务架构",
"企业级应用"
]
}
}
for stage, content in suggestions.items():
print(f"\n{stage}:")
for category, items in content.items():
print(f" {category}:")
for item in items:
print(f" • {item}")
print("\n学习方法建议:")
methods = [
"📚 阅读官方文档 - 最权威的学习资源",
"💻 动手实践 - 编写小项目巩固知识",
"🔍 源码阅读 - 理解模块内部实现",
"🤝 社区交流 - 参与开源项目和讨论",
"📝 总结记录 - 建立个人知识库",
"🎯 项目驱动 - 通过实际需求学习",
"⚡ 性能测试 - 了解不同方法的效率",
"🐛 错误调试 - 从错误中学习经验"
]
for method in methods:
print(f" {method}")
print("\n推荐资源:")
resources = {
"官方文档": "https://docs.python.org/3/library/",
"在线教程": "Real Python, Python.org Tutorial",
"书籍推荐": "《Python标准库》、《Effective Python》",
"实践平台": "LeetCode, HackerRank, Codewars",
"开源项目": "GitHub上的Python项目"
}
for category, resource in resources.items():
print(f" {category}: {resource}")
# 运行学习建议
learning_suggestions()
# 总结
通过本章的学习,我们深入了解了Python的常用内建模块,包括:
# 核心收获
- 系统交互模块 - 掌握了
os
、sys
、platform
模块的使用 - 时间处理模块 - 学会了
datetime
和time
模块的时间操作 - 数据处理模块 - 熟悉了
json
、csv
模块的数据处理 - 数学计算模块 - 了解了
math
、random
模块的数学运算 - 网络编程模块 - 掌握了
urllib
模块的URL处理 - 文件处理模块 - 学会了
pathlib
、shutil
模块的文件操作 - 实用工具模块 - 熟悉了
collections
、itertools
模块的高级功能
# 关键技能
- ✅ 能够选择合适的模块解决具体问题
- ✅ 掌握模块的核心功能和最佳实践
- ✅ 了解常见陷阱和解决方案
- ✅ 具备编写高质量、可维护代码的能力
# 下一步学习
- 第三方库学习 - requests、pandas、numpy等
- 框架学习 - Django、Flask、FastAPI等
- 专业领域 - 数据科学、机器学习、Web开发等
- 高级主题 - 异步编程、并发处理、性能优化等
掌握Python内建模块是成为Python高手的重要基础,继续保持学习和实践的热情!