第8天-模块
哪吒 2023/6/15
# 第8天-Python模块与包
欢迎来到Python模块的世界!模块是Python代码组织和重用的基础,掌握模块系统将让你的代码更加模块化、可维护和可扩展。
# 什么是模块?
模块(Module)是包含Python代码的文件,通常以.py
为扩展名。模块可以包含函数、类、变量以及可执行的代码。
生活类比:
- 模块:像工具箱中的不同工具,每个工具有特定的功能
- 包:像工具箱本身,将相关的工具组织在一起
- 导入:像从工具箱中取出需要的工具来使用
# 模块的优势
- 代码重用:避免重复编写相同的代码
- 命名空间:避免变量名冲突
- 代码组织:将相关功能组织在一起
- 维护性:便于代码的维护和更新
- 协作开发:团队成员可以独立开发不同模块
# 一、创建和使用模块
# 1.1 创建简单模块
让我们创建一个数学工具模块:
# 文件名:math_utils.py
"""数学工具模块
这个模块提供了一些常用的数学函数。
"""
import math
# 模块级变量
PI = 3.14159265359
E = 2.71828182846
def add(x, y):
"""加法函数"""
return x + y
def subtract(x, y):
"""减法函数"""
return x - y
def multiply(x, y):
"""乘法函数"""
return x * y
def divide(x, y):
"""除法函数"""
if y == 0:
raise ValueError("除数不能为零")
return x / y
def power(base, exponent):
"""幂运算"""
return base ** exponent
def factorial(n):
"""计算阶乘"""
if n < 0:
raise ValueError("阶乘的参数必须是非负整数")
if n == 0 or n == 1:
return 1
return n * factorial(n - 1)
def is_prime(n):
"""判断是否为质数"""
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
def gcd(a, b):
"""计算最大公约数"""
while b:
a, b = b, a % b
return a
def lcm(a, b):
"""计算最小公倍数"""
return abs(a * b) // gcd(a, b)
class Calculator:
"""简单计算器类"""
def __init__(self):
self.history = []
def calculate(self, operation, x, y=None):
"""执行计算并记录历史"""
if operation == 'add' and y is not None:
result = add(x, y)
elif operation == 'subtract' and y is not None:
result = subtract(x, y)
elif operation == 'multiply' and y is not None:
result = multiply(x, y)
elif operation == 'divide' and y is not None:
result = divide(x, y)
elif operation == 'factorial':
result = factorial(x)
else:
raise ValueError("不支持的操作")
self.history.append(f"{operation}({x}, {y}) = {result}" if y is not None else f"{operation}({x}) = {result}")
return result
def get_history(self):
"""获取计算历史"""
return self.history.copy()
def clear_history(self):
"""清空计算历史"""
self.history.clear()
# 模块级代码(导入时执行)
print(f"数学工具模块已加载,π = {PI}, e = {E}")
# 测试代码(只在直接运行时执行)
if __name__ == "__main__":
print("测试数学工具模块:")
print(f"5 + 3 = {add(5, 3)}")
print(f"10 - 4 = {subtract(10, 4)}")
print(f"6 * 7 = {multiply(6, 7)}")
print(f"15 / 3 = {divide(15, 3)}")
print(f"2^8 = {power(2, 8)}")
print(f"5! = {factorial(5)}")
print(f"17是质数吗?{is_prime(17)}")
print(f"gcd(48, 18) = {gcd(48, 18)}")
print(f"lcm(12, 8) = {lcm(12, 8)}")
calc = Calculator()
calc.calculate('add', 10, 5)
calc.calculate('factorial', 5)
print("计算历史:", calc.get_history())
# 1.2 导入和使用模块
# 使用我们创建的math_utils模块
# 方法1:导入整个模块
import math_utils
result1 = math_utils.add(10, 5)
print(f"10 + 5 = {result1}")
print(f"π的值:{math_utils.PI}")
# 方法2:导入特定函数
from math_utils import multiply, divide, factorial
result2 = multiply(6, 7)
result3 = divide(20, 4)
result4 = factorial(5)
print(f"6 * 7 = {result2}")
print(f"20 / 4 = {result3}")
print(f"5! = {result4}")
# 方法3:导入并重命名
from math_utils import is_prime as check_prime
from math_utils import Calculator as Calc
print(f"13是质数吗?{check_prime(13)}")
calc = Calc()
result5 = calc.calculate('add', 15, 25)
print(f"计算结果:{result5}")
# 方法4:导入所有(不推荐)
# from math_utils import *
# 这种方式可能导致命名冲突,一般不推荐使用
# 方法5:使用别名导入模块
import math_utils as mu
result6 = mu.power(2, 10)
print(f"2^10 = {result6}")
# 二、模块搜索路径
# 2.1 Python如何查找模块
Python按以下顺序搜索模块:
- 当前目录
- PYTHONPATH环境变量指定的目录
- 标准库目录
- site-packages目录(第三方包)
import sys
# 查看模块搜索路径
print("Python模块搜索路径:")
for i, path in enumerate(sys.path, 1):
print(f"{i}. {path}")
# 动态添加搜索路径
sys.path.append('/path/to/your/modules')
# 查看已加载的模块
print("\n已加载的模块:")
for module_name in sorted(sys.modules.keys())[:10]: # 只显示前10个
print(f"- {module_name}")
# 2.2 模块的属性和信息
import math_utils
import math
# 查看模块属性
print("math_utils模块的属性:")
for attr in dir(math_utils):
if not attr.startswith('_'): # 不显示私有属性
print(f"- {attr}")
# 模块的特殊属性
print(f"\n模块名称:{math_utils.__name__}")
print(f"模块文档:{math_utils.__doc__}")
print(f"模块文件路径:{math_utils.__file__}")
# 获取函数的帮助信息
help(math_utils.factorial)
# 检查对象类型
print(f"\nadd是函数吗?{callable(math_utils.add)}")
print(f"PI是什么类型?{type(math_utils.PI)}")
print(f"Calculator是类吗?{isinstance(math_utils.Calculator, type)}")
# 三、包(Packages)
# 3.1 什么是包
包是包含多个模块的目录,必须包含一个__init__.py
文件(可以为空)。
my_package/
__init__.py
module1.py
module2.py
subpackage/
__init__.py
module3.py
module4.py
# 3.2 创建包结构
让我们创建一个完整的工具包:
# 目录结构:
# utils/
# __init__.py
# string_utils.py
# file_utils.py
# data_utils.py
# math/
# __init__.py
# basic.py
# advanced.py
# utils/__init__.py
"""工具包
这个包提供了各种实用工具函数。
"""
__version__ = "1.0.0"
__author__ = "哪吒"
# 导入子模块,使其可以直接从包中访问
from .string_utils import *
from .file_utils import read_file, write_file
from .data_utils import DataProcessor
# 定义包级别的函数
def get_package_info():
"""获取包信息"""
return {
'name': __name__,
'version': __version__,
'author': __author__
}
# 定义__all__来控制from utils import *的行为
__all__ = [
'get_package_info',
'capitalize_words',
'reverse_string',
'read_file',
'write_file',
'DataProcessor'
]
# utils/string_utils.py
"""字符串工具模块"""
def capitalize_words(text):
"""将每个单词的首字母大写"""
return ' '.join(word.capitalize() for word in text.split())
def reverse_string(text):
"""反转字符串"""
return text[::-1]
def count_words(text):
"""统计单词数量"""
return len(text.split())
def remove_duplicates(text):
"""移除重复的单词"""
words = text.split()
unique_words = []
for word in words:
if word not in unique_words:
unique_words.append(word)
return ' '.join(unique_words)
def is_palindrome(text):
"""检查是否为回文"""
cleaned = ''.join(char.lower() for char in text if char.isalnum())
return cleaned == cleaned[::-1]
class StringProcessor:
"""字符串处理器类"""
def __init__(self, text):
self.text = text
def process(self, operations):
"""应用一系列操作"""
result = self.text
for operation in operations:
if operation == 'capitalize':
result = capitalize_words(result)
elif operation == 'reverse':
result = reverse_string(result)
elif operation == 'remove_duplicates':
result = remove_duplicates(result)
return result
# utils/file_utils.py
"""文件操作工具模块"""
import os
import json
import csv
from typing import List, Dict, Any
def read_file(filepath, encoding='utf-8'):
"""读取文本文件"""
try:
with open(filepath, 'r', encoding=encoding) as file:
return file.read()
except FileNotFoundError:
print(f"文件 {filepath} 不存在")
return None
except Exception as e:
print(f"读取文件时出错:{e}")
return None
def write_file(filepath, content, encoding='utf-8'):
"""写入文本文件"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, 'w', encoding=encoding) as file:
file.write(content)
return True
except Exception as e:
print(f"写入文件时出错:{e}")
return False
def read_json(filepath):
"""读取JSON文件"""
try:
with open(filepath, 'r', encoding='utf-8') as file:
return json.load(file)
except Exception as e:
print(f"读取JSON文件时出错:{e}")
return None
def write_json(filepath, data, indent=2):
"""写入JSON文件"""
try:
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, 'w', encoding='utf-8') as file:
json.dump(data, file, indent=indent, ensure_ascii=False)
return True
except Exception as e:
print(f"写入JSON文件时出错:{e}")
return False
def read_csv(filepath):
"""读取CSV文件"""
try:
with open(filepath, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
return list(reader)
except Exception as e:
print(f"读取CSV文件时出错:{e}")
return None
def write_csv(filepath, data, fieldnames=None):
"""写入CSV文件"""
try:
if not data:
return False
if fieldnames is None:
fieldnames = data[0].keys() if isinstance(data[0], dict) else None
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, 'w', newline='', encoding='utf-8') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
return True
except Exception as e:
print(f"写入CSV文件时出错:{e}")
return False
def get_file_info(filepath):
"""获取文件信息"""
try:
stat = os.stat(filepath)
return {
'size': stat.st_size,
'modified': stat.st_mtime,
'created': stat.st_ctime,
'is_file': os.path.isfile(filepath),
'is_dir': os.path.isdir(filepath)
}
except Exception as e:
print(f"获取文件信息时出错:{e}")
return None
class FileManager:
"""文件管理器类"""
def __init__(self, base_path='.'):
self.base_path = base_path
def list_files(self, extension=None):
"""列出目录中的文件"""
files = []
for item in os.listdir(self.base_path):
item_path = os.path.join(self.base_path, item)
if os.path.isfile(item_path):
if extension is None or item.endswith(extension):
files.append(item)
return files
def create_backup(self, filepath):
"""创建文件备份"""
backup_path = filepath + '.backup'
try:
content = read_file(filepath)
if content is not None:
return write_file(backup_path, content)
except Exception as e:
print(f"创建备份时出错:{e}")
return False
# utils/data_utils.py
"""数据处理工具模块"""
from typing import List, Dict, Any, Union
from collections import Counter
import statistics
class DataProcessor:
"""数据处理器类"""
def __init__(self, data: List[Any] = None):
self.data = data or []
def add_data(self, item: Any):
"""添加数据项"""
self.data.append(item)
def filter_data(self, condition):
"""过滤数据"""
return [item for item in self.data if condition(item)]
def map_data(self, transform):
"""转换数据"""
return [transform(item) for item in self.data]
def group_by(self, key_func):
"""按条件分组"""
groups = {}
for item in self.data:
key = key_func(item)
if key not in groups:
groups[key] = []
groups[key].append(item)
return groups
def get_statistics(self):
"""获取数值数据的统计信息"""
if not self.data or not all(isinstance(x, (int, float)) for x in self.data):
return None
return {
'count': len(self.data),
'sum': sum(self.data),
'mean': statistics.mean(self.data),
'median': statistics.median(self.data),
'mode': statistics.mode(self.data) if len(set(self.data)) < len(self.data) else None,
'min': min(self.data),
'max': max(self.data),
'range': max(self.data) - min(self.data)
}
def clean_data(data: List[Dict[str, Any]], required_fields: List[str]) -> List[Dict[str, Any]]:
"""清理数据,移除缺少必需字段的记录"""
cleaned = []
for record in data:
if all(field in record and record[field] is not None for field in required_fields):
cleaned.append(record)
return cleaned
def aggregate_data(data: List[Dict[str, Any]], group_by: str, aggregate_field: str, operation: str = 'sum'):
"""聚合数据"""
groups = {}
for record in data:
key = record.get(group_by)
if key not in groups:
groups[key] = []
groups[key].append(record.get(aggregate_field, 0))
result = {}
for key, values in groups.items():
if operation == 'sum':
result[key] = sum(values)
elif operation == 'avg':
result[key] = sum(values) / len(values) if values else 0
elif operation == 'count':
result[key] = len(values)
elif operation == 'max':
result[key] = max(values) if values else 0
elif operation == 'min':
result[key] = min(values) if values else 0
return result
def find_duplicates(data: List[Any]) -> List[Any]:
"""查找重复项"""
counter = Counter(data)
return [item for item, count in counter.items() if count > 1]
def remove_duplicates(data: List[Any]) -> List[Any]:
"""移除重复项,保持顺序"""
seen = set()
result = []
for item in data:
if item not in seen:
seen.add(item)
result.append(item)
return result
def sort_data(data: List[Dict[str, Any]], sort_by: str, reverse: bool = False) -> List[Dict[str, Any]]:
"""排序数据"""
return sorted(data, key=lambda x: x.get(sort_by, 0), reverse=reverse)
# utils/math/__init__.py
"""数学工具子包"""
from .basic import *
from .advanced import *
__all__ = ['add', 'subtract', 'multiply', 'divide', 'power', 'sqrt', 'log', 'sin', 'cos']
# utils/math/basic.py
"""基础数学运算"""
def add(x, y):
"""加法"""
return x + y
def subtract(x, y):
"""减法"""
return x - y
def multiply(x, y):
"""乘法"""
return x * y
def divide(x, y):
"""除法"""
if y == 0:
raise ValueError("除数不能为零")
return x / y
def power(base, exponent):
"""幂运算"""
return base ** exponent
# utils/math/advanced.py
"""高级数学运算"""
import math
def sqrt(x):
"""平方根"""
if x < 0:
raise ValueError("负数不能开平方根")
return math.sqrt(x)
def log(x, base=math.e):
"""对数"""
if x <= 0:
raise ValueError("对数的参数必须大于0")
return math.log(x, base)
def sin(x):
"""正弦"""
return math.sin(x)
def cos(x):
"""余弦"""
return math.cos(x)
def tan(x):
"""正切"""
return math.tan(x)
# 3.3 使用包
# 使用我们创建的工具包
# 导入整个包
import utils
# 使用包级别的函数
info = utils.get_package_info()
print(f"包信息:{info}")
# 使用字符串工具
text = "hello world python programming"
result = utils.capitalize_words(text)
print(f"首字母大写:{result}")
# 使用文件工具
utils.write_file('test.txt', 'Hello, World!')
content = utils.read_file('test.txt')
print(f"文件内容:{content}")
# 使用数据处理器
processor = utils.DataProcessor([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
even_numbers = processor.filter_data(lambda x: x % 2 == 0)
print(f"偶数:{even_numbers}")
stats = processor.get_statistics()
print(f"统计信息:{stats}")
# 导入子包
from utils.math import add, multiply, sqrt
result1 = add(10, 5)
result2 = multiply(6, 7)
result3 = sqrt(16)
print(f"数学运算:{result1}, {result2}, {result3}")
# 使用别名导入
from utils.string_utils import StringProcessor as SP
processor = SP("hello world hello python")
result = processor.process(['capitalize', 'remove_duplicates'])
print(f"字符串处理结果:{result}")
# 四、标准库模块
# 4.1 常用标准库模块
# os模块 - 操作系统接口
import os
print(f"当前工作目录:{os.getcwd()}")
print(f"用户主目录:{os.path.expanduser('~')}")
print(f"路径分隔符:{os.sep}")
# 环境变量
print(f"PATH环境变量:{os.environ.get('PATH', '未设置')}")
# 文件和目录操作
if not os.path.exists('temp_dir'):
os.makedirs('temp_dir')
print("创建了临时目录")
# sys模块 - 系统相关参数和函数
import sys
print(f"Python版本:{sys.version}")
print(f"平台:{sys.platform}")
print(f"命令行参数:{sys.argv}")
# datetime模块 - 日期和时间
from datetime import datetime, date, time, timedelta
now = datetime.now()
print(f"当前时间:{now}")
print(f"格式化时间:{now.strftime('%Y-%m-%d %H:%M:%S')}")
# 时间计算
tomorrow = now + timedelta(days=1)
print(f"明天:{tomorrow.strftime('%Y-%m-%d')}")
# random模块 - 随机数生成
import random
print(f"随机整数:{random.randint(1, 100)}")
print(f"随机浮点数:{random.random()}")
print(f"随机选择:{random.choice(['apple', 'banana', 'orange'])}")
# 随机打乱列表
numbers = [1, 2, 3, 4, 5]
random.shuffle(numbers)
print(f"打乱后的列表:{numbers}")
# json模块 - JSON数据处理
import json
data = {
'name': '张三',
'age': 25,
'skills': ['Python', 'JavaScript', 'SQL']
}
# 转换为JSON字符串
json_str = json.dumps(data, ensure_ascii=False, indent=2)
print(f"JSON字符串:\n{json_str}")
# 从JSON字符串解析
parsed_data = json.loads(json_str)
print(f"解析后的数据:{parsed_data}")
# 4.2 更多实用标准库
# collections模块 - 特殊容器数据类型
from collections import Counter, defaultdict, namedtuple, deque
# Counter - 计数器
text = "hello world"
letter_count = Counter(text)
print(f"字母计数:{letter_count}")
print(f"最常见的3个字母:{letter_count.most_common(3)}")
# defaultdict - 默认字典
dd = defaultdict(list)
dd['fruits'].append('apple')
dd['fruits'].append('banana')
dd['vegetables'].append('carrot')
print(f"默认字典:{dict(dd)}")
# namedtuple - 命名元组
Point = namedtuple('Point', ['x', 'y'])
p = Point(3, 4)
print(f"点坐标:{p}, x={p.x}, y={p.y}")
# deque - 双端队列
dq = deque([1, 2, 3])
dq.appendleft(0)
dq.append(4)
print(f"双端队列:{list(dq)}")
# itertools模块 - 迭代工具
import itertools
# 组合和排列
data = ['A', 'B', 'C']
combinations = list(itertools.combinations(data, 2))
permutations = list(itertools.permutations(data, 2))
print(f"组合:{combinations}")
print(f"排列:{permutations}")
# 无限迭代器
counter = itertools.count(1, 2) # 从1开始,步长为2
first_5_odd = [next(counter) for _ in range(5)]
print(f"前5个奇数:{first_5_odd}")
# re模块 - 正则表达式
import re
text = "联系电话:138-1234-5678,邮箱:user@example.com"
# 查找电话号码
phone_pattern = r'\d{3}-\d{4}-\d{4}'
phone = re.search(phone_pattern, text)
if phone:
print(f"找到电话号码:{phone.group()}")
# 查找邮箱
email_pattern = r'\w+@\w+\.\w+'
email = re.search(email_pattern, text)
if email:
print(f"找到邮箱:{email.group()}")
# 替换文本
cleaned_text = re.sub(r'\d{3}-\d{4}-\d{4}', '[电话已隐藏]', text)
print(f"清理后的文本:{cleaned_text}")
# pathlib模块 - 面向对象的路径操作
from pathlib import Path
# 创建路径对象
path = Path('data/files/document.txt')
print(f"文件名:{path.name}")
print(f"文件扩展名:{path.suffix}")
print(f"父目录:{path.parent}")
print(f"绝对路径:{path.absolute()}")
# 路径操作
new_path = path.with_suffix('.pdf')
print(f"更改扩展名后:{new_path}")
# 检查路径
print(f"路径存在吗?{path.exists()}")
print(f"是文件吗?{path.is_file()}")
print(f"是目录吗?{path.is_dir()}")
# 五、第三方包管理
# 5.1 pip包管理器
# 安装包
pip install requests
pip install pandas numpy matplotlib
# 安装特定版本
pip install django==3.2.0
# 从requirements.txt安装
pip install -r requirements.txt
# 升级包
pip upgrade requests
# 卸载包
pip uninstall requests
# 列出已安装的包
pip list
# 显示包信息
pip show requests
# 生成requirements.txt
pip freeze > requirements.txt
# 5.2 虚拟环境
# 创建虚拟环境
python -m venv myenv
# 激活虚拟环境
# Windows
myenv\Scripts\activate
# macOS/Linux
source myenv/bin/activate
# 停用虚拟环境
deactivate
# 删除虚拟环境
rmdir /s myenv # Windows
rm -rf myenv # macOS/Linux
# 5.3 常用第三方包示例
# requests - HTTP库
import requests
response = requests.get('https://api.github.com/users/octocat')
if response.status_code == 200:
data = response.json()
print(f"用户名:{data['login']}")
print(f"公开仓库数:{data['public_repos']}")
# pandas - 数据分析
import pandas as pd
# 创建DataFrame
data = {
'name': ['Alice', 'Bob', 'Charlie'],
'age': [25, 30, 35],
'city': ['北京', '上海', '广州']
}
df = pd.DataFrame(data)
print(df)
# 数据操作
print(f"平均年龄:{df['age'].mean()}")
print(f"年龄大于25的人:\n{df[df['age'] > 25]}")
# matplotlib - 绘图
import matplotlib.pyplot as plt
import numpy as np
x = np.linspace(0, 10, 100)
y = np.sin(x)
plt.figure(figsize=(10, 6))
plt.plot(x, y, label='sin(x)')
plt.xlabel('x')
plt.ylabel('y')
plt.title('正弦函数图像')
plt.legend()
plt.grid(True)
plt.show()
# 六、模块最佳实践
# 6.1 模块设计原则
# 好的模块设计示例
"""用户管理模块
这个模块提供用户注册、登录、权限管理等功能。
示例用法:
from user_manager import User, authenticate
user = User('alice', 'alice@example.com')
user.set_password('secret123')
if authenticate('alice', 'secret123'):
print('登录成功')
"""
import hashlib
import json
from datetime import datetime
from typing import Optional, Dict, List
# 模块级常量
DEFAULT_ROLE = 'user'
ADMIN_ROLE = 'admin'
MAX_LOGIN_ATTEMPTS = 3
# 私有函数(以下划线开头)
def _hash_password(password: str) -> str:
"""私有函数:密码哈希"""
return hashlib.sha256(password.encode()).hexdigest()
def _validate_email(email: str) -> bool:
"""私有函数:邮箱验证"""
import re
pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
return re.match(pattern, email) is not None
class User:
"""用户类"""
def __init__(self, username: str, email: str, role: str = DEFAULT_ROLE):
if not username or not email:
raise ValueError("用户名和邮箱不能为空")
if not _validate_email(email):
raise ValueError("邮箱格式不正确")
self.username = username
self.email = email
self.role = role
self.password_hash = None
self.created_at = datetime.now()
self.last_login = None
self.login_attempts = 0
self.is_active = True
def set_password(self, password: str) -> None:
"""设置密码"""
if len(password) < 6:
raise ValueError("密码长度至少6位")
self.password_hash = _hash_password(password)
def check_password(self, password: str) -> bool:
"""检查密码"""
if not self.password_hash:
return False
return self.password_hash == _hash_password(password)
def to_dict(self) -> Dict:
"""转换为字典"""
return {
'username': self.username,
'email': self.email,
'role': self.role,
'created_at': self.created_at.isoformat(),
'last_login': self.last_login.isoformat() if self.last_login else None,
'is_active': self.is_active
}
def __str__(self):
return f"User(username='{self.username}', email='{self.email}', role='{self.role}')"
def __repr__(self):
return self.__str__()
class UserManager:
"""用户管理器"""
def __init__(self):
self.users: Dict[str, User] = {}
def register(self, username: str, email: str, password: str, role: str = DEFAULT_ROLE) -> User:
"""注册用户"""
if username in self.users:
raise ValueError(f"用户名 '{username}' 已存在")
user = User(username, email, role)
user.set_password(password)
self.users[username] = user
return user
def authenticate(self, username: str, password: str) -> Optional[User]:
"""用户认证"""
user = self.users.get(username)
if not user or not user.is_active:
return None
if user.login_attempts >= MAX_LOGIN_ATTEMPTS:
raise ValueError("登录尝试次数过多,账户已锁定")
if user.check_password(password):
user.last_login = datetime.now()
user.login_attempts = 0
return user
else:
user.login_attempts += 1
return None
def get_user(self, username: str) -> Optional[User]:
"""获取用户"""
return self.users.get(username)
def list_users(self, role: Optional[str] = None) -> List[User]:
"""列出用户"""
if role:
return [user for user in self.users.values() if user.role == role]
return list(self.users.values())
def export_users(self, filepath: str) -> bool:
"""导出用户数据"""
try:
data = [user.to_dict() for user in self.users.values()]
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"导出用户数据失败:{e}")
return False
# 模块级实例(单例模式)
_user_manager = UserManager()
# 公共API函数
def register_user(username: str, email: str, password: str, role: str = DEFAULT_ROLE) -> User:
"""注册用户(模块级函数)"""
return _user_manager.register(username, email, password, role)
def authenticate(username: str, password: str) -> Optional[User]:
"""用户认证(模块级函数)"""
return _user_manager.authenticate(username, password)
def get_user(username: str) -> Optional[User]:
"""获取用户(模块级函数)"""
return _user_manager.get_user(username)
def list_users(role: Optional[str] = None) -> List[User]:
"""列出用户(模块级函数)"""
return _user_manager.list_users(role)
# 定义模块的公共接口
__all__ = [
'User',
'UserManager',
'register_user',
'authenticate',
'get_user',
'list_users',
'DEFAULT_ROLE',
'ADMIN_ROLE'
]
# 模块测试代码
if __name__ == "__main__":
# 测试用户管理功能
print("测试用户管理模块:")
# 注册用户
alice = register_user('alice', 'alice@example.com', 'password123')
bob = register_user('bob', 'bob@example.com', 'secret456', ADMIN_ROLE)
print(f"注册用户:{alice}")
print(f"注册管理员:{bob}")
# 用户认证
auth_user = authenticate('alice', 'password123')
if auth_user:
print(f"认证成功:{auth_user.username}")
# 列出用户
all_users = list_users()
print(f"所有用户:{[user.username for user in all_users]}")
admin_users = list_users(ADMIN_ROLE)
print(f"管理员用户:{[user.username for user in admin_users]}")
# 6.2 模块文档和测试
# 文档字符串最佳实践
def calculate_distance(point1, point2):
"""计算两点之间的欧几里得距离。
Args:
point1 (tuple): 第一个点的坐标 (x, y)
point2 (tuple): 第二个点的坐标 (x, y)
Returns:
float: 两点之间的距离
Raises:
ValueError: 当输入不是有效坐标时
TypeError: 当输入类型不正确时
Examples:
>>> calculate_distance((0, 0), (3, 4))
5.0
>>> calculate_distance((1, 1), (4, 5))
5.0
"""
try:
x1, y1 = point1
x2, y2 = point2
return ((x2 - x1) ** 2 + (y2 - y1) ** 2) ** 0.5
except (ValueError, TypeError) as e:
raise ValueError(f"无效的坐标输入: {e}")
# 单元测试示例
import unittest
class TestMathUtils(unittest.TestCase):
"""数学工具模块测试"""
def test_calculate_distance(self):
"""测试距离计算"""
# 测试正常情况
self.assertEqual(calculate_distance((0, 0), (3, 4)), 5.0)
self.assertEqual(calculate_distance((1, 1), (1, 1)), 0.0)
# 测试异常情况
with self.assertRaises(ValueError):
calculate_distance((1, 2), (3,)) # 坐标不完整
with self.assertRaises(ValueError):
calculate_distance("invalid", (1, 2)) # 无效类型
if __name__ == "__main__":
unittest.main()
# 七、实战项目:个人任务管理系统
让我们创建一个完整的任务管理系统来实践模块化编程:
# task_manager/
# __init__.py
# models.py
# storage.py
# cli.py
# utils.py
# task_manager/__init__.py
"""个人任务管理系统
一个简单而强大的任务管理工具。
"""
__version__ = "1.0.0"
__author__ = "哪吒"
from .models import Task, TaskManager
from .storage import FileStorage, JSONStorage
from .utils import format_date, get_priority_color
__all__ = ['Task', 'TaskManager', 'FileStorage', 'JSONStorage', 'format_date', 'get_priority_color']
# task_manager/models.py
"""任务模型"""
from datetime import datetime, date
from typing import List, Optional, Dict, Any
from enum import Enum
class Priority(Enum):
"""任务优先级"""
LOW = 1
MEDIUM = 2
HIGH = 3
URGENT = 4
class Status(Enum):
"""任务状态"""
TODO = "待办"
IN_PROGRESS = "进行中"
COMPLETED = "已完成"
CANCELLED = "已取消"
class Task:
"""任务类"""
def __init__(self, title: str, description: str = "", priority: Priority = Priority.MEDIUM,
due_date: Optional[date] = None, tags: Optional[List[str]] = None):
self.id = self._generate_id()
self.title = title
self.description = description
self.priority = priority
self.status = Status.TODO
self.created_at = datetime.now()
self.updated_at = datetime.now()
self.due_date = due_date
self.completed_at = None
self.tags = tags or []
def _generate_id(self) -> str:
"""生成唯一ID"""
import uuid
return str(uuid.uuid4())[:8]
def mark_completed(self):
"""标记为已完成"""
self.status = Status.COMPLETED
self.completed_at = datetime.now()
self.updated_at = datetime.now()
def mark_in_progress(self):
"""标记为进行中"""
self.status = Status.IN_PROGRESS
self.updated_at = datetime.now()
def mark_cancelled(self):
"""标记为已取消"""
self.status = Status.CANCELLED
self.updated_at = datetime.now()
def update(self, title: Optional[str] = None, description: Optional[str] = None,
priority: Optional[Priority] = None, due_date: Optional[date] = None,
tags: Optional[List[str]] = None):
"""更新任务信息"""
if title is not None:
self.title = title
if description is not None:
self.description = description
if priority is not None:
self.priority = priority
if due_date is not None:
self.due_date = due_date
if tags is not None:
self.tags = tags
self.updated_at = datetime.now()
def add_tag(self, tag: str):
"""添加标签"""
if tag not in self.tags:
self.tags.append(tag)
self.updated_at = datetime.now()
def remove_tag(self, tag: str):
"""移除标签"""
if tag in self.tags:
self.tags.remove(tag)
self.updated_at = datetime.now()
def is_overdue(self) -> bool:
"""检查是否过期"""
if self.due_date and self.status not in [Status.COMPLETED, Status.CANCELLED]:
return date.today() > self.due_date
return False
def days_until_due(self) -> Optional[int]:
"""距离截止日期的天数"""
if self.due_date:
delta = self.due_date - date.today()
return delta.days
return None
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'id': self.id,
'title': self.title,
'description': self.description,
'priority': self.priority.value,
'status': self.status.value,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat(),
'due_date': self.due_date.isoformat() if self.due_date else None,
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
'tags': self.tags
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Task':
"""从字典创建任务"""
task = cls.__new__(cls)
task.id = data['id']
task.title = data['title']
task.description = data['description']
task.priority = Priority(data['priority'])
task.status = Status(data['status'])
task.created_at = datetime.fromisoformat(data['created_at'])
task.updated_at = datetime.fromisoformat(data['updated_at'])
task.due_date = date.fromisoformat(data['due_date']) if data['due_date'] else None
task.completed_at = datetime.fromisoformat(data['completed_at']) if data['completed_at'] else None
task.tags = data['tags']
return task
def __str__(self):
status_icon = {
Status.TODO: "⭕",
Status.IN_PROGRESS: "🔄",
Status.COMPLETED: "✅",
Status.CANCELLED: "❌"
}
priority_icon = {
Priority.LOW: "🟢",
Priority.MEDIUM: "🟡",
Priority.HIGH: "🟠",
Priority.URGENT: "🔴"
}
due_info = f" (截止: {self.due_date})" if self.due_date else ""
tags_info = f" #{' #'.join(self.tags)}" if self.tags else ""
return f"{status_icon[self.status]} {priority_icon[self.priority]} {self.title}{due_info}{tags_info}"
class TaskManager:
"""任务管理器"""
def __init__(self):
self.tasks: Dict[str, Task] = {}
def add_task(self, title: str, description: str = "", priority: Priority = Priority.MEDIUM,
due_date: Optional[date] = None, tags: Optional[List[str]] = None) -> Task:
"""添加任务"""
task = Task(title, description, priority, due_date, tags)
self.tasks[task.id] = task
return task
def get_task(self, task_id: str) -> Optional[Task]:
"""获取任务"""
return self.tasks.get(task_id)
def update_task(self, task_id: str, **kwargs) -> bool:
"""更新任务"""
task = self.get_task(task_id)
if task:
task.update(**kwargs)
return True
return False
def delete_task(self, task_id: str) -> bool:
"""删除任务"""
if task_id in self.tasks:
del self.tasks[task_id]
return True
return False
def list_tasks(self, status: Optional[Status] = None, priority: Optional[Priority] = None,
tag: Optional[str] = None, overdue_only: bool = False) -> List[Task]:
"""列出任务"""
tasks = list(self.tasks.values())
if status:
tasks = [t for t in tasks if t.status == status]
if priority:
tasks = [t for t in tasks if t.priority == priority]
if tag:
tasks = [t for t in tasks if tag in t.tags]
if overdue_only:
tasks = [t for t in tasks if t.is_overdue()]
return sorted(tasks, key=lambda t: (t.priority.value, t.created_at), reverse=True)
def search_tasks(self, keyword: str) -> List[Task]:
"""搜索任务"""
keyword = keyword.lower()
return [task for task in self.tasks.values()
if keyword in task.title.lower() or keyword in task.description.lower()]
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
total = len(self.tasks)
completed = len([t for t in self.tasks.values() if t.status == Status.COMPLETED])
in_progress = len([t for t in self.tasks.values() if t.status == Status.IN_PROGRESS])
overdue = len([t for t in self.tasks.values() if t.is_overdue()])
return {
'total': total,
'completed': completed,
'in_progress': in_progress,
'todo': total - completed - in_progress,
'overdue': overdue,
'completion_rate': (completed / total * 100) if total > 0 else 0
}
def get_tasks_by_date(self, target_date: date) -> List[Task]:
"""获取指定日期的任务"""
return [task for task in self.tasks.values() if task.due_date == target_date]
def get_upcoming_tasks(self, days: int = 7) -> List[Task]:
"""获取即将到期的任务"""
upcoming = []
for task in self.tasks.values():
if task.due_date and task.status not in [Status.COMPLETED, Status.CANCELLED]:
days_until = task.days_until_due()
if days_until is not None and 0 <= days_until <= days:
upcoming.append(task)
return sorted(upcoming, key=lambda t: t.due_date)
# 八、总结
# 8.1 模块系统的核心概念
- 模块:包含Python代码的文件
- 包:包含多个模块的目录
- 导入:使用其他模块的功能
- 命名空间:避免名称冲突
- 搜索路径:Python查找模块的位置
# 8.2 最佳实践总结
模块设计:
- 单一职责原则
- 清晰的接口设计
- 完善的文档字符串
- 合理的错误处理
包组织:
- 逻辑清晰的目录结构
- 适当的
__init__.py
文件 - 明确的
__all__
定义
导入规范:
- 优先使用具体导入
- 避免循环导入
- 合理使用别名
代码质量:
- 编写单元测试
- 使用类型注解
- 遵循PEP 8规范