第8天-模块

2023/6/15

# 第8天-Python模块与包

欢迎来到Python模块的世界!模块是Python代码组织和重用的基础,掌握模块系统将让你的代码更加模块化、可维护和可扩展。

# 什么是模块?

模块(Module)是包含Python代码的文件,通常以.py为扩展名。模块可以包含函数、类、变量以及可执行的代码。

生活类比

  • 模块:像工具箱中的不同工具,每个工具有特定的功能
  • :像工具箱本身,将相关的工具组织在一起
  • 导入:像从工具箱中取出需要的工具来使用

# 模块的优势

  1. 代码重用:避免重复编写相同的代码
  2. 命名空间:避免变量名冲突
  3. 代码组织:将相关功能组织在一起
  4. 维护性:便于代码的维护和更新
  5. 协作开发:团队成员可以独立开发不同模块

# 一、创建和使用模块

# 1.1 创建简单模块

让我们创建一个数学工具模块:

# 文件名:math_utils.py
"""数学工具模块

这个模块提供了一些常用的数学函数。
"""

import math

# 模块级变量
PI = 3.14159265359
E = 2.71828182846

def add(x, y):
    """加法函数"""
    return x + y

def subtract(x, y):
    """减法函数"""
    return x - y

def multiply(x, y):
    """乘法函数"""
    return x * y

def divide(x, y):
    """除法函数"""
    if y == 0:
        raise ValueError("除数不能为零")
    return x / y

def power(base, exponent):
    """幂运算"""
    return base ** exponent

def factorial(n):
    """计算阶乘"""
    if n < 0:
        raise ValueError("阶乘的参数必须是非负整数")
    if n == 0 or n == 1:
        return 1
    return n * factorial(n - 1)

def is_prime(n):
    """判断是否为质数"""
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

def gcd(a, b):
    """计算最大公约数"""
    while b:
        a, b = b, a % b
    return a

def lcm(a, b):
    """计算最小公倍数"""
    return abs(a * b) // gcd(a, b)

class Calculator:
    """简单计算器类"""
    
    def __init__(self):
        self.history = []
    
    def calculate(self, operation, x, y=None):
        """执行计算并记录历史"""
        if operation == 'add' and y is not None:
            result = add(x, y)
        elif operation == 'subtract' and y is not None:
            result = subtract(x, y)
        elif operation == 'multiply' and y is not None:
            result = multiply(x, y)
        elif operation == 'divide' and y is not None:
            result = divide(x, y)
        elif operation == 'factorial':
            result = factorial(x)
        else:
            raise ValueError("不支持的操作")
        
        self.history.append(f"{operation}({x}, {y}) = {result}" if y is not None else f"{operation}({x}) = {result}")
        return result
    
    def get_history(self):
        """获取计算历史"""
        return self.history.copy()
    
    def clear_history(self):
        """清空计算历史"""
        self.history.clear()

# 模块级代码(导入时执行)
print(f"数学工具模块已加载,π = {PI}, e = {E}")

# 测试代码(只在直接运行时执行)
if __name__ == "__main__":
    print("测试数学工具模块:")
    print(f"5 + 3 = {add(5, 3)}")
    print(f"10 - 4 = {subtract(10, 4)}")
    print(f"6 * 7 = {multiply(6, 7)}")
    print(f"15 / 3 = {divide(15, 3)}")
    print(f"2^8 = {power(2, 8)}")
    print(f"5! = {factorial(5)}")
    print(f"17是质数吗?{is_prime(17)}")
    print(f"gcd(48, 18) = {gcd(48, 18)}")
    print(f"lcm(12, 8) = {lcm(12, 8)}")
    
    calc = Calculator()
    calc.calculate('add', 10, 5)
    calc.calculate('factorial', 5)
    print("计算历史:", calc.get_history())

# 1.2 导入和使用模块

# 使用我们创建的math_utils模块

# 方法1:导入整个模块
import math_utils

result1 = math_utils.add(10, 5)
print(f"10 + 5 = {result1}")
print(f"π的值:{math_utils.PI}")

# 方法2:导入特定函数
from math_utils import multiply, divide, factorial

result2 = multiply(6, 7)
result3 = divide(20, 4)
result4 = factorial(5)
print(f"6 * 7 = {result2}")
print(f"20 / 4 = {result3}")
print(f"5! = {result4}")

# 方法3:导入并重命名
from math_utils import is_prime as check_prime
from math_utils import Calculator as Calc

print(f"13是质数吗?{check_prime(13)}")
calc = Calc()
result5 = calc.calculate('add', 15, 25)
print(f"计算结果:{result5}")

# 方法4:导入所有(不推荐)
# from math_utils import *
# 这种方式可能导致命名冲突,一般不推荐使用

# 方法5:使用别名导入模块
import math_utils as mu

result6 = mu.power(2, 10)
print(f"2^10 = {result6}")

# 二、模块搜索路径

# 2.1 Python如何查找模块

Python按以下顺序搜索模块:

  1. 当前目录
  2. PYTHONPATH环境变量指定的目录
  3. 标准库目录
  4. site-packages目录(第三方包)
import sys

# 查看模块搜索路径
print("Python模块搜索路径:")
for i, path in enumerate(sys.path, 1):
    print(f"{i}. {path}")

# 动态添加搜索路径
sys.path.append('/path/to/your/modules')

# 查看已加载的模块
print("\n已加载的模块:")
for module_name in sorted(sys.modules.keys())[:10]:  # 只显示前10个
    print(f"- {module_name}")

# 2.2 模块的属性和信息

import math_utils
import math

# 查看模块属性
print("math_utils模块的属性:")
for attr in dir(math_utils):
    if not attr.startswith('_'):  # 不显示私有属性
        print(f"- {attr}")

# 模块的特殊属性
print(f"\n模块名称:{math_utils.__name__}")
print(f"模块文档:{math_utils.__doc__}")
print(f"模块文件路径:{math_utils.__file__}")

# 获取函数的帮助信息
help(math_utils.factorial)

# 检查对象类型
print(f"\nadd是函数吗?{callable(math_utils.add)}")
print(f"PI是什么类型?{type(math_utils.PI)}")
print(f"Calculator是类吗?{isinstance(math_utils.Calculator, type)}")

# 三、包(Packages)

# 3.1 什么是包

包是包含多个模块的目录,必须包含一个__init__.py文件(可以为空)。

my_package/
    __init__.py
    module1.py
    module2.py
    subpackage/
        __init__.py
        module3.py
        module4.py

# 3.2 创建包结构

让我们创建一个完整的工具包:

# 目录结构:
# utils/
#     __init__.py
#     string_utils.py
#     file_utils.py
#     data_utils.py
#     math/
#         __init__.py
#         basic.py
#         advanced.py

# utils/__init__.py
"""工具包

这个包提供了各种实用工具函数。
"""

__version__ = "1.0.0"
__author__ = "哪吒"

# 导入子模块,使其可以直接从包中访问
from .string_utils import *
from .file_utils import read_file, write_file
from .data_utils import DataProcessor

# 定义包级别的函数
def get_package_info():
    """获取包信息"""
    return {
        'name': __name__,
        'version': __version__,
        'author': __author__
    }

# 定义__all__来控制from utils import *的行为
__all__ = [
    'get_package_info',
    'capitalize_words',
    'reverse_string',
    'read_file',
    'write_file',
    'DataProcessor'
]
# utils/string_utils.py
"""字符串工具模块"""

def capitalize_words(text):
    """将每个单词的首字母大写"""
    return ' '.join(word.capitalize() for word in text.split())

def reverse_string(text):
    """反转字符串"""
    return text[::-1]

def count_words(text):
    """统计单词数量"""
    return len(text.split())

def remove_duplicates(text):
    """移除重复的单词"""
    words = text.split()
    unique_words = []
    for word in words:
        if word not in unique_words:
            unique_words.append(word)
    return ' '.join(unique_words)

def is_palindrome(text):
    """检查是否为回文"""
    cleaned = ''.join(char.lower() for char in text if char.isalnum())
    return cleaned == cleaned[::-1]

class StringProcessor:
    """字符串处理器类"""
    
    def __init__(self, text):
        self.text = text
    
    def process(self, operations):
        """应用一系列操作"""
        result = self.text
        for operation in operations:
            if operation == 'capitalize':
                result = capitalize_words(result)
            elif operation == 'reverse':
                result = reverse_string(result)
            elif operation == 'remove_duplicates':
                result = remove_duplicates(result)
        return result
# utils/file_utils.py
"""文件操作工具模块"""

import os
import json
import csv
from typing import List, Dict, Any

def read_file(filepath, encoding='utf-8'):
    """读取文本文件"""
    try:
        with open(filepath, 'r', encoding=encoding) as file:
            return file.read()
    except FileNotFoundError:
        print(f"文件 {filepath} 不存在")
        return None
    except Exception as e:
        print(f"读取文件时出错:{e}")
        return None

def write_file(filepath, content, encoding='utf-8'):
    """写入文本文件"""
    try:
        # 确保目录存在
        os.makedirs(os.path.dirname(filepath), exist_ok=True)
        with open(filepath, 'w', encoding=encoding) as file:
            file.write(content)
        return True
    except Exception as e:
        print(f"写入文件时出错:{e}")
        return False

def read_json(filepath):
    """读取JSON文件"""
    try:
        with open(filepath, 'r', encoding='utf-8') as file:
            return json.load(file)
    except Exception as e:
        print(f"读取JSON文件时出错:{e}")
        return None

def write_json(filepath, data, indent=2):
    """写入JSON文件"""
    try:
        os.makedirs(os.path.dirname(filepath), exist_ok=True)
        with open(filepath, 'w', encoding='utf-8') as file:
            json.dump(data, file, indent=indent, ensure_ascii=False)
        return True
    except Exception as e:
        print(f"写入JSON文件时出错:{e}")
        return False

def read_csv(filepath):
    """读取CSV文件"""
    try:
        with open(filepath, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            return list(reader)
    except Exception as e:
        print(f"读取CSV文件时出错:{e}")
        return None

def write_csv(filepath, data, fieldnames=None):
    """写入CSV文件"""
    try:
        if not data:
            return False
        
        if fieldnames is None:
            fieldnames = data[0].keys() if isinstance(data[0], dict) else None
        
        os.makedirs(os.path.dirname(filepath), exist_ok=True)
        with open(filepath, 'w', newline='', encoding='utf-8') as file:
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(data)
        return True
    except Exception as e:
        print(f"写入CSV文件时出错:{e}")
        return False

def get_file_info(filepath):
    """获取文件信息"""
    try:
        stat = os.stat(filepath)
        return {
            'size': stat.st_size,
            'modified': stat.st_mtime,
            'created': stat.st_ctime,
            'is_file': os.path.isfile(filepath),
            'is_dir': os.path.isdir(filepath)
        }
    except Exception as e:
        print(f"获取文件信息时出错:{e}")
        return None

class FileManager:
    """文件管理器类"""
    
    def __init__(self, base_path='.'):
        self.base_path = base_path
    
    def list_files(self, extension=None):
        """列出目录中的文件"""
        files = []
        for item in os.listdir(self.base_path):
            item_path = os.path.join(self.base_path, item)
            if os.path.isfile(item_path):
                if extension is None or item.endswith(extension):
                    files.append(item)
        return files
    
    def create_backup(self, filepath):
        """创建文件备份"""
        backup_path = filepath + '.backup'
        try:
            content = read_file(filepath)
            if content is not None:
                return write_file(backup_path, content)
        except Exception as e:
            print(f"创建备份时出错:{e}")
        return False
# utils/data_utils.py
"""数据处理工具模块"""

from typing import List, Dict, Any, Union
from collections import Counter
import statistics

class DataProcessor:
    """数据处理器类"""
    
    def __init__(self, data: List[Any] = None):
        self.data = data or []
    
    def add_data(self, item: Any):
        """添加数据项"""
        self.data.append(item)
    
    def filter_data(self, condition):
        """过滤数据"""
        return [item for item in self.data if condition(item)]
    
    def map_data(self, transform):
        """转换数据"""
        return [transform(item) for item in self.data]
    
    def group_by(self, key_func):
        """按条件分组"""
        groups = {}
        for item in self.data:
            key = key_func(item)
            if key not in groups:
                groups[key] = []
            groups[key].append(item)
        return groups
    
    def get_statistics(self):
        """获取数值数据的统计信息"""
        if not self.data or not all(isinstance(x, (int, float)) for x in self.data):
            return None
        
        return {
            'count': len(self.data),
            'sum': sum(self.data),
            'mean': statistics.mean(self.data),
            'median': statistics.median(self.data),
            'mode': statistics.mode(self.data) if len(set(self.data)) < len(self.data) else None,
            'min': min(self.data),
            'max': max(self.data),
            'range': max(self.data) - min(self.data)
        }

def clean_data(data: List[Dict[str, Any]], required_fields: List[str]) -> List[Dict[str, Any]]:
    """清理数据,移除缺少必需字段的记录"""
    cleaned = []
    for record in data:
        if all(field in record and record[field] is not None for field in required_fields):
            cleaned.append(record)
    return cleaned

def aggregate_data(data: List[Dict[str, Any]], group_by: str, aggregate_field: str, operation: str = 'sum'):
    """聚合数据"""
    groups = {}
    
    for record in data:
        key = record.get(group_by)
        if key not in groups:
            groups[key] = []
        groups[key].append(record.get(aggregate_field, 0))
    
    result = {}
    for key, values in groups.items():
        if operation == 'sum':
            result[key] = sum(values)
        elif operation == 'avg':
            result[key] = sum(values) / len(values) if values else 0
        elif operation == 'count':
            result[key] = len(values)
        elif operation == 'max':
            result[key] = max(values) if values else 0
        elif operation == 'min':
            result[key] = min(values) if values else 0
    
    return result

def find_duplicates(data: List[Any]) -> List[Any]:
    """查找重复项"""
    counter = Counter(data)
    return [item for item, count in counter.items() if count > 1]

def remove_duplicates(data: List[Any]) -> List[Any]:
    """移除重复项,保持顺序"""
    seen = set()
    result = []
    for item in data:
        if item not in seen:
            seen.add(item)
            result.append(item)
    return result

def sort_data(data: List[Dict[str, Any]], sort_by: str, reverse: bool = False) -> List[Dict[str, Any]]:
    """排序数据"""
    return sorted(data, key=lambda x: x.get(sort_by, 0), reverse=reverse)
# utils/math/__init__.py
"""数学工具子包"""

from .basic import *
from .advanced import *

__all__ = ['add', 'subtract', 'multiply', 'divide', 'power', 'sqrt', 'log', 'sin', 'cos']
# utils/math/basic.py
"""基础数学运算"""

def add(x, y):
    """加法"""
    return x + y

def subtract(x, y):
    """减法"""
    return x - y

def multiply(x, y):
    """乘法"""
    return x * y

def divide(x, y):
    """除法"""
    if y == 0:
        raise ValueError("除数不能为零")
    return x / y

def power(base, exponent):
    """幂运算"""
    return base ** exponent
# utils/math/advanced.py
"""高级数学运算"""

import math

def sqrt(x):
    """平方根"""
    if x < 0:
        raise ValueError("负数不能开平方根")
    return math.sqrt(x)

def log(x, base=math.e):
    """对数"""
    if x <= 0:
        raise ValueError("对数的参数必须大于0")
    return math.log(x, base)

def sin(x):
    """正弦"""
    return math.sin(x)

def cos(x):
    """余弦"""
    return math.cos(x)

def tan(x):
    """正切"""
    return math.tan(x)

# 3.3 使用包

# 使用我们创建的工具包

# 导入整个包
import utils

# 使用包级别的函数
info = utils.get_package_info()
print(f"包信息:{info}")

# 使用字符串工具
text = "hello world python programming"
result = utils.capitalize_words(text)
print(f"首字母大写:{result}")

# 使用文件工具
utils.write_file('test.txt', 'Hello, World!')
content = utils.read_file('test.txt')
print(f"文件内容:{content}")

# 使用数据处理器
processor = utils.DataProcessor([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
even_numbers = processor.filter_data(lambda x: x % 2 == 0)
print(f"偶数:{even_numbers}")

stats = processor.get_statistics()
print(f"统计信息:{stats}")

# 导入子包
from utils.math import add, multiply, sqrt

result1 = add(10, 5)
result2 = multiply(6, 7)
result3 = sqrt(16)
print(f"数学运算:{result1}, {result2}, {result3}")

# 使用别名导入
from utils.string_utils import StringProcessor as SP

processor = SP("hello world hello python")
result = processor.process(['capitalize', 'remove_duplicates'])
print(f"字符串处理结果:{result}")

# 四、标准库模块

# 4.1 常用标准库模块

# os模块 - 操作系统接口
import os

print(f"当前工作目录:{os.getcwd()}")
print(f"用户主目录:{os.path.expanduser('~')}")
print(f"路径分隔符:{os.sep}")

# 环境变量
print(f"PATH环境变量:{os.environ.get('PATH', '未设置')}")

# 文件和目录操作
if not os.path.exists('temp_dir'):
    os.makedirs('temp_dir')
    print("创建了临时目录")

# sys模块 - 系统相关参数和函数
import sys

print(f"Python版本:{sys.version}")
print(f"平台:{sys.platform}")
print(f"命令行参数:{sys.argv}")

# datetime模块 - 日期和时间
from datetime import datetime, date, time, timedelta

now = datetime.now()
print(f"当前时间:{now}")
print(f"格式化时间:{now.strftime('%Y-%m-%d %H:%M:%S')}")

# 时间计算
tomorrow = now + timedelta(days=1)
print(f"明天:{tomorrow.strftime('%Y-%m-%d')}")

# random模块 - 随机数生成
import random

print(f"随机整数:{random.randint(1, 100)}")
print(f"随机浮点数:{random.random()}")
print(f"随机选择:{random.choice(['apple', 'banana', 'orange'])}")

# 随机打乱列表
numbers = [1, 2, 3, 4, 5]
random.shuffle(numbers)
print(f"打乱后的列表:{numbers}")

# json模块 - JSON数据处理
import json

data = {
    'name': '张三',
    'age': 25,
    'skills': ['Python', 'JavaScript', 'SQL']
}

# 转换为JSON字符串
json_str = json.dumps(data, ensure_ascii=False, indent=2)
print(f"JSON字符串:\n{json_str}")

# 从JSON字符串解析
parsed_data = json.loads(json_str)
print(f"解析后的数据:{parsed_data}")

# 4.2 更多实用标准库

# collections模块 - 特殊容器数据类型
from collections import Counter, defaultdict, namedtuple, deque

# Counter - 计数器
text = "hello world"
letter_count = Counter(text)
print(f"字母计数:{letter_count}")
print(f"最常见的3个字母:{letter_count.most_common(3)}")

# defaultdict - 默认字典
dd = defaultdict(list)
dd['fruits'].append('apple')
dd['fruits'].append('banana')
dd['vegetables'].append('carrot')
print(f"默认字典:{dict(dd)}")

# namedtuple - 命名元组
Point = namedtuple('Point', ['x', 'y'])
p = Point(3, 4)
print(f"点坐标:{p}, x={p.x}, y={p.y}")

# deque - 双端队列
dq = deque([1, 2, 3])
dq.appendleft(0)
dq.append(4)
print(f"双端队列:{list(dq)}")

# itertools模块 - 迭代工具
import itertools

# 组合和排列
data = ['A', 'B', 'C']
combinations = list(itertools.combinations(data, 2))
permutations = list(itertools.permutations(data, 2))
print(f"组合:{combinations}")
print(f"排列:{permutations}")

# 无限迭代器
counter = itertools.count(1, 2)  # 从1开始,步长为2
first_5_odd = [next(counter) for _ in range(5)]
print(f"前5个奇数:{first_5_odd}")

# re模块 - 正则表达式
import re

text = "联系电话:138-1234-5678,邮箱:user@example.com"

# 查找电话号码
phone_pattern = r'\d{3}-\d{4}-\d{4}'
phone = re.search(phone_pattern, text)
if phone:
    print(f"找到电话号码:{phone.group()}")

# 查找邮箱
email_pattern = r'\w+@\w+\.\w+'
email = re.search(email_pattern, text)
if email:
    print(f"找到邮箱:{email.group()}")

# 替换文本
cleaned_text = re.sub(r'\d{3}-\d{4}-\d{4}', '[电话已隐藏]', text)
print(f"清理后的文本:{cleaned_text}")

# pathlib模块 - 面向对象的路径操作
from pathlib import Path

# 创建路径对象
path = Path('data/files/document.txt')
print(f"文件名:{path.name}")
print(f"文件扩展名:{path.suffix}")
print(f"父目录:{path.parent}")
print(f"绝对路径:{path.absolute()}")

# 路径操作
new_path = path.with_suffix('.pdf')
print(f"更改扩展名后:{new_path}")

# 检查路径
print(f"路径存在吗?{path.exists()}")
print(f"是文件吗?{path.is_file()}")
print(f"是目录吗?{path.is_dir()}")

# 五、第三方包管理

# 5.1 pip包管理器

# 安装包
pip install requests
pip install pandas numpy matplotlib

# 安装特定版本
pip install django==3.2.0

# 从requirements.txt安装
pip install -r requirements.txt

# 升级包
pip upgrade requests

# 卸载包
pip uninstall requests

# 列出已安装的包
pip list

# 显示包信息
pip show requests

# 生成requirements.txt
pip freeze > requirements.txt

# 5.2 虚拟环境

# 创建虚拟环境
python -m venv myenv

# 激活虚拟环境
# Windows
myenv\Scripts\activate
# macOS/Linux
source myenv/bin/activate

# 停用虚拟环境
deactivate

# 删除虚拟环境
rmdir /s myenv  # Windows
rm -rf myenv    # macOS/Linux

# 5.3 常用第三方包示例

# requests - HTTP库
import requests

response = requests.get('https://api.github.com/users/octocat')
if response.status_code == 200:
    data = response.json()
    print(f"用户名:{data['login']}")
    print(f"公开仓库数:{data['public_repos']}")

# pandas - 数据分析
import pandas as pd

# 创建DataFrame
data = {
    'name': ['Alice', 'Bob', 'Charlie'],
    'age': [25, 30, 35],
    'city': ['北京', '上海', '广州']
}
df = pd.DataFrame(data)
print(df)

# 数据操作
print(f"平均年龄:{df['age'].mean()}")
print(f"年龄大于25的人:\n{df[df['age'] > 25]}")

# matplotlib - 绘图
import matplotlib.pyplot as plt
import numpy as np

x = np.linspace(0, 10, 100)
y = np.sin(x)

plt.figure(figsize=(10, 6))
plt.plot(x, y, label='sin(x)')
plt.xlabel('x')
plt.ylabel('y')
plt.title('正弦函数图像')
plt.legend()
plt.grid(True)
plt.show()

# 六、模块最佳实践

# 6.1 模块设计原则

# 好的模块设计示例
"""用户管理模块

这个模块提供用户注册、登录、权限管理等功能。

示例用法:
    from user_manager import User, authenticate
    
    user = User('alice', 'alice@example.com')
    user.set_password('secret123')
    
    if authenticate('alice', 'secret123'):
        print('登录成功')
"""

import hashlib
import json
from datetime import datetime
from typing import Optional, Dict, List

# 模块级常量
DEFAULT_ROLE = 'user'
ADMIN_ROLE = 'admin'
MAX_LOGIN_ATTEMPTS = 3

# 私有函数(以下划线开头)
def _hash_password(password: str) -> str:
    """私有函数:密码哈希"""
    return hashlib.sha256(password.encode()).hexdigest()

def _validate_email(email: str) -> bool:
    """私有函数:邮箱验证"""
    import re
    pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
    return re.match(pattern, email) is not None

class User:
    """用户类"""
    
    def __init__(self, username: str, email: str, role: str = DEFAULT_ROLE):
        if not username or not email:
            raise ValueError("用户名和邮箱不能为空")
        
        if not _validate_email(email):
            raise ValueError("邮箱格式不正确")
        
        self.username = username
        self.email = email
        self.role = role
        self.password_hash = None
        self.created_at = datetime.now()
        self.last_login = None
        self.login_attempts = 0
        self.is_active = True
    
    def set_password(self, password: str) -> None:
        """设置密码"""
        if len(password) < 6:
            raise ValueError("密码长度至少6位")
        self.password_hash = _hash_password(password)
    
    def check_password(self, password: str) -> bool:
        """检查密码"""
        if not self.password_hash:
            return False
        return self.password_hash == _hash_password(password)
    
    def to_dict(self) -> Dict:
        """转换为字典"""
        return {
            'username': self.username,
            'email': self.email,
            'role': self.role,
            'created_at': self.created_at.isoformat(),
            'last_login': self.last_login.isoformat() if self.last_login else None,
            'is_active': self.is_active
        }
    
    def __str__(self):
        return f"User(username='{self.username}', email='{self.email}', role='{self.role}')"
    
    def __repr__(self):
        return self.__str__()

class UserManager:
    """用户管理器"""
    
    def __init__(self):
        self.users: Dict[str, User] = {}
    
    def register(self, username: str, email: str, password: str, role: str = DEFAULT_ROLE) -> User:
        """注册用户"""
        if username in self.users:
            raise ValueError(f"用户名 '{username}' 已存在")
        
        user = User(username, email, role)
        user.set_password(password)
        self.users[username] = user
        return user
    
    def authenticate(self, username: str, password: str) -> Optional[User]:
        """用户认证"""
        user = self.users.get(username)
        if not user or not user.is_active:
            return None
        
        if user.login_attempts >= MAX_LOGIN_ATTEMPTS:
            raise ValueError("登录尝试次数过多,账户已锁定")
        
        if user.check_password(password):
            user.last_login = datetime.now()
            user.login_attempts = 0
            return user
        else:
            user.login_attempts += 1
            return None
    
    def get_user(self, username: str) -> Optional[User]:
        """获取用户"""
        return self.users.get(username)
    
    def list_users(self, role: Optional[str] = None) -> List[User]:
        """列出用户"""
        if role:
            return [user for user in self.users.values() if user.role == role]
        return list(self.users.values())
    
    def export_users(self, filepath: str) -> bool:
        """导出用户数据"""
        try:
            data = [user.to_dict() for user in self.users.values()]
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
            return True
        except Exception as e:
            print(f"导出用户数据失败:{e}")
            return False

# 模块级实例(单例模式)
_user_manager = UserManager()

# 公共API函数
def register_user(username: str, email: str, password: str, role: str = DEFAULT_ROLE) -> User:
    """注册用户(模块级函数)"""
    return _user_manager.register(username, email, password, role)

def authenticate(username: str, password: str) -> Optional[User]:
    """用户认证(模块级函数)"""
    return _user_manager.authenticate(username, password)

def get_user(username: str) -> Optional[User]:
    """获取用户(模块级函数)"""
    return _user_manager.get_user(username)

def list_users(role: Optional[str] = None) -> List[User]:
    """列出用户(模块级函数)"""
    return _user_manager.list_users(role)

# 定义模块的公共接口
__all__ = [
    'User',
    'UserManager', 
    'register_user',
    'authenticate',
    'get_user',
    'list_users',
    'DEFAULT_ROLE',
    'ADMIN_ROLE'
]

# 模块测试代码
if __name__ == "__main__":
    # 测试用户管理功能
    print("测试用户管理模块:")
    
    # 注册用户
    alice = register_user('alice', 'alice@example.com', 'password123')
    bob = register_user('bob', 'bob@example.com', 'secret456', ADMIN_ROLE)
    
    print(f"注册用户:{alice}")
    print(f"注册管理员:{bob}")
    
    # 用户认证
    auth_user = authenticate('alice', 'password123')
    if auth_user:
        print(f"认证成功:{auth_user.username}")
    
    # 列出用户
    all_users = list_users()
    print(f"所有用户:{[user.username for user in all_users]}")
    
    admin_users = list_users(ADMIN_ROLE)
    print(f"管理员用户:{[user.username for user in admin_users]}")

# 6.2 模块文档和测试

# 文档字符串最佳实践
def calculate_distance(point1, point2):
    """计算两点之间的欧几里得距离。
    
    Args:
        point1 (tuple): 第一个点的坐标 (x, y)
        point2 (tuple): 第二个点的坐标 (x, y)
    
    Returns:
        float: 两点之间的距离
    
    Raises:
        ValueError: 当输入不是有效坐标时
        TypeError: 当输入类型不正确时
    
    Examples:
        >>> calculate_distance((0, 0), (3, 4))
        5.0
        >>> calculate_distance((1, 1), (4, 5))
        5.0
    """
    try:
        x1, y1 = point1
        x2, y2 = point2
        return ((x2 - x1) ** 2 + (y2 - y1) ** 2) ** 0.5
    except (ValueError, TypeError) as e:
        raise ValueError(f"无效的坐标输入: {e}")

# 单元测试示例
import unittest

class TestMathUtils(unittest.TestCase):
    """数学工具模块测试"""
    
    def test_calculate_distance(self):
        """测试距离计算"""
        # 测试正常情况
        self.assertEqual(calculate_distance((0, 0), (3, 4)), 5.0)
        self.assertEqual(calculate_distance((1, 1), (1, 1)), 0.0)
        
        # 测试异常情况
        with self.assertRaises(ValueError):
            calculate_distance((1, 2), (3,))  # 坐标不完整
        
        with self.assertRaises(ValueError):
            calculate_distance("invalid", (1, 2))  # 无效类型

if __name__ == "__main__":
    unittest.main()

# 七、实战项目:个人任务管理系统

让我们创建一个完整的任务管理系统来实践模块化编程:

# task_manager/
#     __init__.py
#     models.py
#     storage.py
#     cli.py
#     utils.py

# task_manager/__init__.py
"""个人任务管理系统

一个简单而强大的任务管理工具。
"""

__version__ = "1.0.0"
__author__ = "哪吒"

from .models import Task, TaskManager
from .storage import FileStorage, JSONStorage
from .utils import format_date, get_priority_color

__all__ = ['Task', 'TaskManager', 'FileStorage', 'JSONStorage', 'format_date', 'get_priority_color']
# task_manager/models.py
"""任务模型"""

from datetime import datetime, date
from typing import List, Optional, Dict, Any
from enum import Enum

class Priority(Enum):
    """任务优先级"""
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    URGENT = 4

class Status(Enum):
    """任务状态"""
    TODO = "待办"
    IN_PROGRESS = "进行中"
    COMPLETED = "已完成"
    CANCELLED = "已取消"

class Task:
    """任务类"""
    
    def __init__(self, title: str, description: str = "", priority: Priority = Priority.MEDIUM, 
                 due_date: Optional[date] = None, tags: Optional[List[str]] = None):
        self.id = self._generate_id()
        self.title = title
        self.description = description
        self.priority = priority
        self.status = Status.TODO
        self.created_at = datetime.now()
        self.updated_at = datetime.now()
        self.due_date = due_date
        self.completed_at = None
        self.tags = tags or []
    
    def _generate_id(self) -> str:
        """生成唯一ID"""
        import uuid
        return str(uuid.uuid4())[:8]
    
    def mark_completed(self):
        """标记为已完成"""
        self.status = Status.COMPLETED
        self.completed_at = datetime.now()
        self.updated_at = datetime.now()
    
    def mark_in_progress(self):
        """标记为进行中"""
        self.status = Status.IN_PROGRESS
        self.updated_at = datetime.now()
    
    def mark_cancelled(self):
        """标记为已取消"""
        self.status = Status.CANCELLED
        self.updated_at = datetime.now()
    
    def update(self, title: Optional[str] = None, description: Optional[str] = None,
               priority: Optional[Priority] = None, due_date: Optional[date] = None,
               tags: Optional[List[str]] = None):
        """更新任务信息"""
        if title is not None:
            self.title = title
        if description is not None:
            self.description = description
        if priority is not None:
            self.priority = priority
        if due_date is not None:
            self.due_date = due_date
        if tags is not None:
            self.tags = tags
        self.updated_at = datetime.now()
    
    def add_tag(self, tag: str):
        """添加标签"""
        if tag not in self.tags:
            self.tags.append(tag)
            self.updated_at = datetime.now()
    
    def remove_tag(self, tag: str):
        """移除标签"""
        if tag in self.tags:
            self.tags.remove(tag)
            self.updated_at = datetime.now()
    
    def is_overdue(self) -> bool:
        """检查是否过期"""
        if self.due_date and self.status not in [Status.COMPLETED, Status.CANCELLED]:
            return date.today() > self.due_date
        return False
    
    def days_until_due(self) -> Optional[int]:
        """距离截止日期的天数"""
        if self.due_date:
            delta = self.due_date - date.today()
            return delta.days
        return None
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return {
            'id': self.id,
            'title': self.title,
            'description': self.description,
            'priority': self.priority.value,
            'status': self.status.value,
            'created_at': self.created_at.isoformat(),
            'updated_at': self.updated_at.isoformat(),
            'due_date': self.due_date.isoformat() if self.due_date else None,
            'completed_at': self.completed_at.isoformat() if self.completed_at else None,
            'tags': self.tags
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'Task':
        """从字典创建任务"""
        task = cls.__new__(cls)
        task.id = data['id']
        task.title = data['title']
        task.description = data['description']
        task.priority = Priority(data['priority'])
        task.status = Status(data['status'])
        task.created_at = datetime.fromisoformat(data['created_at'])
        task.updated_at = datetime.fromisoformat(data['updated_at'])
        task.due_date = date.fromisoformat(data['due_date']) if data['due_date'] else None
        task.completed_at = datetime.fromisoformat(data['completed_at']) if data['completed_at'] else None
        task.tags = data['tags']
        return task
    
    def __str__(self):
        status_icon = {
            Status.TODO: "⭕",
            Status.IN_PROGRESS: "🔄",
            Status.COMPLETED: "✅",
            Status.CANCELLED: "❌"
        }
        priority_icon = {
            Priority.LOW: "🟢",
            Priority.MEDIUM: "🟡",
            Priority.HIGH: "🟠",
            Priority.URGENT: "🔴"
        }
        
        due_info = f" (截止: {self.due_date})" if self.due_date else ""
        tags_info = f" #{' #'.join(self.tags)}" if self.tags else ""
        
        return f"{status_icon[self.status]} {priority_icon[self.priority]} {self.title}{due_info}{tags_info}"

class TaskManager:
    """任务管理器"""
    
    def __init__(self):
        self.tasks: Dict[str, Task] = {}
    
    def add_task(self, title: str, description: str = "", priority: Priority = Priority.MEDIUM,
                 due_date: Optional[date] = None, tags: Optional[List[str]] = None) -> Task:
        """添加任务"""
        task = Task(title, description, priority, due_date, tags)
        self.tasks[task.id] = task
        return task
    
    def get_task(self, task_id: str) -> Optional[Task]:
        """获取任务"""
        return self.tasks.get(task_id)
    
    def update_task(self, task_id: str, **kwargs) -> bool:
        """更新任务"""
        task = self.get_task(task_id)
        if task:
            task.update(**kwargs)
            return True
        return False
    
    def delete_task(self, task_id: str) -> bool:
        """删除任务"""
        if task_id in self.tasks:
            del self.tasks[task_id]
            return True
        return False
    
    def list_tasks(self, status: Optional[Status] = None, priority: Optional[Priority] = None,
                   tag: Optional[str] = None, overdue_only: bool = False) -> List[Task]:
        """列出任务"""
        tasks = list(self.tasks.values())
        
        if status:
            tasks = [t for t in tasks if t.status == status]
        
        if priority:
            tasks = [t for t in tasks if t.priority == priority]
        
        if tag:
            tasks = [t for t in tasks if tag in t.tags]
        
        if overdue_only:
            tasks = [t for t in tasks if t.is_overdue()]
        
        return sorted(tasks, key=lambda t: (t.priority.value, t.created_at), reverse=True)
    
    def search_tasks(self, keyword: str) -> List[Task]:
        """搜索任务"""
        keyword = keyword.lower()
        return [task for task in self.tasks.values() 
                if keyword in task.title.lower() or keyword in task.description.lower()]
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""
        total = len(self.tasks)
        completed = len([t for t in self.tasks.values() if t.status == Status.COMPLETED])
        in_progress = len([t for t in self.tasks.values() if t.status == Status.IN_PROGRESS])
        overdue = len([t for t in self.tasks.values() if t.is_overdue()])
        
        return {
            'total': total,
            'completed': completed,
            'in_progress': in_progress,
            'todo': total - completed - in_progress,
            'overdue': overdue,
            'completion_rate': (completed / total * 100) if total > 0 else 0
        }
    
    def get_tasks_by_date(self, target_date: date) -> List[Task]:
        """获取指定日期的任务"""
        return [task for task in self.tasks.values() if task.due_date == target_date]
    
    def get_upcoming_tasks(self, days: int = 7) -> List[Task]:
        """获取即将到期的任务"""
        upcoming = []
        for task in self.tasks.values():
            if task.due_date and task.status not in [Status.COMPLETED, Status.CANCELLED]:
                days_until = task.days_until_due()
                if days_until is not None and 0 <= days_until <= days:
                    upcoming.append(task)
        return sorted(upcoming, key=lambda t: t.due_date)

# 八、总结

# 8.1 模块系统的核心概念

  1. 模块:包含Python代码的文件
  2. :包含多个模块的目录
  3. 导入:使用其他模块的功能
  4. 命名空间:避免名称冲突
  5. 搜索路径:Python查找模块的位置

# 8.2 最佳实践总结

  1. 模块设计

    • 单一职责原则
    • 清晰的接口设计
    • 完善的文档字符串
    • 合理的错误处理
  2. 包组织

    • 逻辑清晰的目录结构
    • 适当的__init__.py文件
    • 明确的__all__定义
  3. 导入规范

    • 优先使用具体导入
    • 避免循环导入
    • 合理使用别名
  4. 代码质量

    • 编写单元测试
    • 使用类型注解
    • 遵循PEP 8规范