第10天-高级编程
哪吒 2023/6/15
宝贝,学习是一个过程,不要给自己太大压力哦~
# 第10天-高级编程
# 学习目标
通过本章学习,你将掌握:
- 元类(Metaclass)的概念和使用
- 描述符(Descriptor)协议
- 上下文管理器的深入应用
- 协程和异步编程基础
- 内存管理和性能优化
- 设计模式在Python中的实现
- 代码调试和性能分析技巧
# 一、元类(Metaclass)
# 1.1 什么是元类
元类是"创建类的类"。在Python中,类也是对象,而元类就是创建这些类对象的"类"。
# 理解类和对象的关系
class MyClass:
pass
# MyClass是一个类,同时也是一个对象
print(f"MyClass的类型:{type(MyClass)}") # <class 'type'>
print(f"MyClass是否是对象:{isinstance(MyClass, object)}") # True
# 创建实例
obj = MyClass()
print(f"obj的类型:{type(obj)}") # <class '__main__.MyClass'>
print(f"obj的类:{obj.__class__}") # <class '__main__.MyClass'>
# type既是类型,也是元类
print(f"type的类型:{type(type)}") # <class 'type'>
# 1.2 使用type动态创建类
# 使用type动态创建类
# type(name, bases, dict)
# 方法1:传统方式定义类
class Person:
def __init__(self, name):
self.name = name
def say_hello(self):
return f"Hello, I'm {self.name}"
# 方法2:使用type动态创建相同的类
def init_method(self, name):
self.name = name
def say_hello_method(self):
return f"Hello, I'm {self.name}"
# 动态创建Person类
DynamicPerson = type(
'DynamicPerson', # 类名
(object,), # 基类元组
{ # 类字典
'__init__': init_method,
'say_hello': say_hello_method,
'class_var': 'I am a dynamic class'
}
)
# 使用动态创建的类
person1 = Person("张三")
person2 = DynamicPerson("李四")
print(person1.say_hello()) # Hello, I'm 张三
print(person2.say_hello()) # Hello, I'm 李四
print(person2.class_var) # I am a dynamic class
print(f"Person类型:{type(Person)}")
print(f"DynamicPerson类型:{type(DynamicPerson)}")
# 1.3 自定义元类
class SingletonMeta(type):
"""单例模式元类"""
_instances = {}
def __call__(cls, *args, **kwargs):
"""控制类的实例化过程"""
if cls not in cls._instances:
# 如果还没有实例,创建一个
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class Database(metaclass=SingletonMeta):
"""数据库连接类(单例模式)"""
def __init__(self, host="localhost", port=3306):
self.host = host
self.port = port
self.connected = False
print(f"创建数据库连接:{host}:{port}")
def connect(self):
if not self.connected:
print(f"连接到数据库 {self.host}:{self.port}")
self.connected = True
else:
print("数据库已连接")
def disconnect(self):
if self.connected:
print(f"断开数据库连接 {self.host}:{self.port}")
self.connected = False
class ValidatedMeta(type):
"""属性验证元类"""
def __new__(mcs, name, bases, namespace):
# 在创建类之前进行验证
if 'required_methods' in namespace:
required = namespace['required_methods']
for method in required:
if method not in namespace:
raise TypeError(f"类 {name} 必须实现方法 {method}")
# 为所有方法添加日志装饰器
for key, value in namespace.items():
if callable(value) and not key.startswith('_'):
namespace[key] = mcs.add_logging(value, key)
return super().__new__(mcs, name, bases, namespace)
@staticmethod
def add_logging(func, func_name):
"""为方法添加日志"""
def wrapper(*args, **kwargs):
print(f"调用方法: {func_name}")
result = func(*args, **kwargs)
print(f"方法 {func_name} 执行完成")
return result
return wrapper
class APIClient(metaclass=ValidatedMeta):
"""API客户端类"""
required_methods = ['get', 'post'] # 必须实现的方法
def __init__(self, base_url):
self.base_url = base_url
def get(self, endpoint):
return f"GET {self.base_url}/{endpoint}"
def post(self, endpoint, data):
return f"POST {self.base_url}/{endpoint} with {data}"
def delete(self, endpoint):
return f"DELETE {self.base_url}/{endpoint}"
# 使用示例
print("=== 元类示例 ===")
# 单例模式测试
print("\n=== 单例模式测试 ===")
db1 = Database("localhost", 3306)
db2 = Database("remote", 5432) # 参数会被忽略,返回同一个实例
print(f"db1 is db2: {db1 is db2}") # True
print(f"db1.host: {db1.host}, db2.host: {db2.host}") # 都是localhost
db1.connect()
db2.connect() # 显示已连接
# 验证元类测试
print("\n=== 验证元类测试 ===")
client = APIClient("https://api.example.com")
print(client.get("users"))
print(client.post("users", {"name": "张三"}))
print(client.delete("users/1"))
# 1.4 元类的实际应用
class ORMMeta(type):
"""简单的ORM元类"""
def __new__(mcs, name, bases, namespace):
# 收集字段信息
fields = {}
for key, value in namespace.items():
if isinstance(value, Field):
fields[key] = value
value.name = key
namespace['_fields'] = fields
namespace['_table_name'] = namespace.get('_table_name', name.lower())
return super().__new__(mcs, name, bases, namespace)
class Field:
"""字段基类"""
def __init__(self, field_type, required=True, default=None):
self.field_type = field_type
self.required = required
self.default = default
self.name = None
def validate(self, value):
if value is None and self.required:
raise ValueError(f"字段 {self.name} 是必需的")
if value is not None and not isinstance(value, self.field_type):
raise TypeError(f"字段 {self.name} 必须是 {self.field_type.__name__} 类型")
return value
class StringField(Field):
def __init__(self, max_length=255, **kwargs):
super().__init__(str, **kwargs)
self.max_length = max_length
def validate(self, value):
value = super().validate(value)
if value and len(value) > self.max_length:
raise ValueError(f"字段 {self.name} 长度不能超过 {self.max_length}")
return value
class IntegerField(Field):
def __init__(self, **kwargs):
super().__init__(int, **kwargs)
class Model(metaclass=ORMMeta):
"""模型基类"""
def __init__(self, **kwargs):
for field_name, field in self._fields.items():
value = kwargs.get(field_name, field.default)
setattr(self, field_name, field.validate(value))
def save(self):
"""保存到数据库(模拟)"""
fields = []
values = []
for field_name, field in self._fields.items():
fields.append(field_name)
values.append(getattr(self, field_name))
sql = f"INSERT INTO {self._table_name} ({', '.join(fields)}) VALUES ({', '.join(map(repr, values))})"
print(f"执行SQL: {sql}")
def __repr__(self):
attrs = []
for field_name in self._fields:
value = getattr(self, field_name)
attrs.append(f"{field_name}={value!r}")
return f"{self.__class__.__name__}({', '.join(attrs)})"
# 定义用户模型
class User(Model):
_table_name = 'users'
id = IntegerField(required=False)
name = StringField(max_length=50)
email = StringField(max_length=100)
age = IntegerField(required=False, default=0)
# 使用ORM
print("\n=== ORM元类示例 ===")
try:
user = User(name="张三", email="zhangsan@example.com", age=25)
print(user)
user.save()
# 测试验证
invalid_user = User(name="李四", email="lisi@example.com", age="invalid")
except (ValueError, TypeError) as e:
print(f"验证错误: {e}")
# 二、描述符(Descriptor)
# 2.1 描述符协议
描述符是定义了__get__
、__set__
或__delete__
方法的对象。
class Descriptor:
"""基本描述符示例"""
def __init__(self, name=None):
self.name = name
self.value = None
def __get__(self, obj, objtype=None):
"""获取属性值时调用"""
print(f"获取属性 {self.name}")
if obj is None:
return self
return self.value
def __set__(self, obj, value):
"""设置属性值时调用"""
print(f"设置属性 {self.name} = {value}")
self.value = value
def __delete__(self, obj):
"""删除属性时调用"""
print(f"删除属性 {self.name}")
self.value = None
class MyClass:
attr = Descriptor("attr")
def __init__(self):
self.normal_attr = "normal"
# 使用描述符
print("=== 描述符基础示例 ===")
obj = MyClass()
# 访问描述符属性
print(f"obj.attr = {obj.attr}") # 调用 __get__
# 设置描述符属性
obj.attr = "new value" # 调用 __set__
print(f"obj.attr = {obj.attr}") # 调用 __get__
# 删除描述符属性
del obj.attr # 调用 __delete__
print(f"obj.attr = {obj.attr}") # 调用 __get__
# 2.2 实用描述符示例
class ValidatedAttribute:
"""验证属性描述符"""
def __init__(self, validator=None, default=None):
self.validator = validator
self.default = default
self.name = None
def __set_name__(self, owner, name):
"""Python 3.6+ 新特性,自动设置属性名"""
self.name = name
def __get__(self, obj, objtype=None):
if obj is None:
return self
return obj.__dict__.get(self.name, self.default)
def __set__(self, obj, value):
if self.validator:
value = self.validator(value)
obj.__dict__[self.name] = value
class TypedAttribute(ValidatedAttribute):
"""类型验证描述符"""
def __init__(self, expected_type, **kwargs):
def type_validator(value):
if not isinstance(value, expected_type):
raise TypeError(f"{self.name} 必须是 {expected_type.__name__} 类型")
return value
super().__init__(type_validator, **kwargs)
class RangeAttribute(ValidatedAttribute):
"""范围验证描述符"""
def __init__(self, min_value=None, max_value=None, **kwargs):
def range_validator(value):
if min_value is not None and value < min_value:
raise ValueError(f"{self.name} 不能小于 {min_value}")
if max_value is not None and value > max_value:
raise ValueError(f"{self.name} 不能大于 {max_value}")
return value
super().__init__(range_validator, **kwargs)
class EmailAttribute(ValidatedAttribute):
"""邮箱验证描述符"""
def __init__(self, **kwargs):
def email_validator(value):
if not isinstance(value, str) or '@' not in value:
raise ValueError(f"{self.name} 必须是有效的邮箱地址")
return value.lower()
super().__init__(email_validator, **kwargs)
class CachedProperty:
"""缓存属性描述符"""
def __init__(self, func):
self.func = func
self.name = func.__name__
self.__doc__ = func.__doc__
def __get__(self, obj, objtype=None):
if obj is None:
return self
# 检查是否已缓存
cache_name = f'_cached_{self.name}'
if hasattr(obj, cache_name):
print(f"从缓存获取 {self.name}")
return getattr(obj, cache_name)
# 计算并缓存结果
print(f"计算 {self.name}")
result = self.func(obj)
setattr(obj, cache_name, result)
return result
def __set__(self, obj, value):
# 清除缓存
cache_name = f'_cached_{self.name}'
if hasattr(obj, cache_name):
delattr(obj, cache_name)
# 设置新值
setattr(obj, f'_{self.name}', value)
# 使用描述符的类
class Person:
"""使用各种描述符的人员类"""
name = TypedAttribute(str)
age = RangeAttribute(0, 150)
email = EmailAttribute()
def __init__(self, name, age, email):
self.name = name
self.age = age
self.email = email
@CachedProperty
def full_info(self):
"""完整信息(计算密集型操作模拟)"""
import time
time.sleep(0.1) # 模拟耗时操作
return f"{self.name} ({self.age}岁) - {self.email}"
def __repr__(self):
return f"Person(name='{self.name}', age={self.age}, email='{self.email}')"
# 使用示例
print("\n=== 描述符实用示例 ===")
try:
person = Person("张三", 25, "ZhangSan@Example.Com")
print(person)
print(f"邮箱已转换为小写: {person.email}")
# 测试缓存属性
print("\n=== 缓存属性测试 ===")
print(f"第一次访问: {person.full_info}") # 计算
print(f"第二次访问: {person.full_info}") # 从缓存获取
print(f"第三次访问: {person.full_info}") # 从缓存获取
# 测试验证
print("\n=== 验证测试 ===")
person.age = 30 # 正常
print(f"修改年龄后: {person.age}")
# 尝试设置无效值
person.age = -5 # 应该抛出异常
except (TypeError, ValueError) as e:
print(f"验证错误: {e}")
try:
# 测试类型验证
person.name = 123 # 应该抛出异常
except TypeError as e:
print(f"类型错误: {e}")
try:
# 测试邮箱验证
person.email = "invalid-email" # 应该抛出异常
except ValueError as e:
print(f"邮箱错误: {e}")
# 三、协程和异步编程
# 3.1 生成器和协程基础
import asyncio
import time
from typing import AsyncGenerator
# 传统生成器
def simple_generator():
"""简单生成器"""
print("生成器开始")
yield 1
print("生成器继续")
yield 2
print("生成器结束")
yield 3
# 协程生成器
def coroutine_example():
"""协程示例"""
print("协程启动")
value = None
while True:
received = yield value
print(f"协程接收到: {received}")
value = f"处理了: {received}"
# 异步生成器
async def async_generator():
"""异步生成器"""
for i in range(5):
print(f"异步生成 {i}")
await asyncio.sleep(0.1) # 模拟异步操作
yield i
# 异步函数
async def async_function(name: str, delay: float) -> str:
"""异步函数示例"""
print(f"{name} 开始执行")
await asyncio.sleep(delay) # 异步等待
print(f"{name} 执行完成")
return f"{name} 的结果"
async def fetch_data(url: str, delay: float) -> dict:
"""模拟异步获取数据"""
print(f"开始获取数据: {url}")
await asyncio.sleep(delay) # 模拟网络请求
data = {
"url": url,
"data": f"来自 {url} 的数据",
"timestamp": time.time()
}
print(f"数据获取完成: {url}")
return data
# 使用示例
print("=== 生成器和协程示例 ===")
# 使用生成器
print("\n=== 简单生成器 ===")
gen = simple_generator()
for value in gen:
print(f"获得值: {value}")
# 使用协程
print("\n=== 协程示例 ===")
coro = coroutine_example()
next(coro) # 启动协程
result1 = coro.send("Hello")
print(f"协程返回: {result1}")
result2 = coro.send("World")
print(f"协程返回: {result2}")
coro.close() # 关闭协程
# 异步函数示例
async def main_async_example():
"""异步主函数"""
print("\n=== 异步函数示例 ===")
# 顺序执行
start_time = time.time()
result1 = await async_function("任务1", 1.0)
result2 = await async_function("任务2", 0.5)
end_time = time.time()
print(f"顺序执行结果: {result1}, {result2}")
print(f"顺序执行耗时: {end_time - start_time:.2f}秒")
# 并发执行
start_time = time.time()
task1 = async_function("并发任务1", 1.0)
task2 = async_function("并发任务2", 0.5)
results = await asyncio.gather(task1, task2)
end_time = time.time()
print(f"并发执行结果: {results}")
print(f"并发执行耗时: {end_time - start_time:.2f}秒")
# 使用异步生成器
print("\n=== 异步生成器 ===")
async for value in async_generator():
print(f"异步获得值: {value}")
# 批量数据获取
print("\n=== 批量异步数据获取 ===")
urls = [
"https://api1.example.com",
"https://api2.example.com",
"https://api3.example.com"
]
# 创建任务
tasks = [fetch_data(url, 0.5) for url in urls]
# 等待所有任务完成
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"获取到 {len(results)} 个结果")
for result in results:
print(f" {result['url']}: {result['data']}")
print(f"总耗时: {end_time - start_time:.2f}秒")
# 运行异步示例
if __name__ == "__main__":
asyncio.run(main_async_example())
# 3.2 异步上下文管理器和迭代器
import asyncio
import aiofiles # 需要安装: pip install aiofiles
from contextlib import asynccontextmanager
class AsyncDatabaseConnection:
"""异步数据库连接"""
def __init__(self, host: str, database: str):
self.host = host
self.database = database
self.connection = None
async def __aenter__(self):
"""异步进入上下文"""
print(f"异步连接到数据库: {self.host}/{self.database}")
await asyncio.sleep(0.1) # 模拟连接延迟
self.connection = f"Connection to {self.database}"
return self.connection
async def __aexit__(self, exc_type, exc_value, traceback):
"""异步退出上下文"""
if self.connection:
print(f"异步关闭数据库连接: {self.database}")
await asyncio.sleep(0.05) # 模拟关闭延迟
self.connection = None
return False
class AsyncFileProcessor:
"""异步文件处理器"""
def __init__(self, filename: str):
self.filename = filename
self.file = None
def __aiter__(self):
return self
async def __anext__(self):
if self.file is None:
# 第一次调用时打开文件
try:
self.file = await aiofiles.open(self.filename, 'r', encoding='utf-8')
except FileNotFoundError:
# 如果文件不存在,创建一个模拟的内容
self.lines = ["第一行内容", "第二行内容", "第三行内容"]
self.index = 0
if hasattr(self, 'lines'):
# 使用模拟内容
if self.index >= len(self.lines):
raise StopAsyncIteration
line = self.lines[self.index]
self.index += 1
await asyncio.sleep(0.1) # 模拟处理延迟
return line.strip()
else:
# 使用真实文件
line = await self.file.readline()
if not line:
await self.file.close()
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟处理延迟
return line.strip()
@asynccontextmanager
async def async_timer(name: str):
"""异步计时器上下文管理器"""
start_time = time.time()
print(f"{name} 开始")
try:
yield start_time
finally:
end_time = time.time()
print(f"{name} 完成,耗时: {end_time - start_time:.2f}秒")
async def async_context_examples():
"""异步上下文管理器示例"""
print("=== 异步上下文管理器示例 ===")
# 使用异步数据库连接
async with AsyncDatabaseConnection("localhost", "myapp") as conn:
print(f"使用连接: {conn}")
await asyncio.sleep(0.2) # 模拟数据库操作
print("数据库操作完成")
# 使用异步计时器
async with async_timer("数据处理任务"):
await asyncio.sleep(1.0) # 模拟耗时任务
print("任务执行中...")
# 使用异步迭代器
print("\n=== 异步迭代器示例 ===")
processor = AsyncFileProcessor("example.txt")
async for line in processor:
print(f"处理行: {line}")
# 运行异步上下文示例
if __name__ == "__main__":
asyncio.run(async_context_examples())
# 四、内存管理和性能优化
# 4.1 内存管理基础
import gc
import sys
import weakref
from memory_profiler import profile # 需要安装: pip install memory-profiler
class MemoryExample:
"""内存管理示例类"""
def __init__(self, data):
self.data = data
self.refs = []
def add_reference(self, obj):
self.refs.append(obj)
def __del__(self):
print(f"MemoryExample 对象被销毁: {id(self)}")
def memory_management_examples():
"""内存管理示例"""
print("=== 内存管理示例 ===")
# 查看对象引用计数
obj = MemoryExample("test data")
print(f"对象引用计数: {sys.getrefcount(obj)}")
# 创建引用
ref1 = obj
print(f"创建引用后计数: {sys.getrefcount(obj)}")
# 删除引用
del ref1
print(f"删除引用后计数: {sys.getrefcount(obj)}")
# 循环引用问题
print("\n=== 循环引用示例 ===")
obj1 = MemoryExample("obj1")
obj2 = MemoryExample("obj2")
# 创建循环引用
obj1.add_reference(obj2)
obj2.add_reference(obj1)
print(f"obj1 引用计数: {sys.getrefcount(obj1)}")
print(f"obj2 引用计数: {sys.getrefcount(obj2)}")
# 删除主引用
del obj1, obj2
# 手动触发垃圾回收
print("触发垃圾回收前...")
collected = gc.collect()
print(f"垃圾回收清理了 {collected} 个对象")
# 弱引用示例
print("\n=== 弱引用示例 ===")
class Observer:
def __init__(self, name):
self.name = name
def notify(self, message):
print(f"{self.name} 收到通知: {message}")
def __del__(self):
print(f"Observer {self.name} 被销毁")
class Subject:
def __init__(self):
self._observers = set()
def attach(self, observer):
# 使用弱引用避免循环引用
self._observers.add(weakref.ref(observer))
def notify(self, message):
# 清理已失效的弱引用
dead_refs = set()
for obs_ref in self._observers:
observer = obs_ref()
if observer is None:
dead_refs.add(obs_ref)
else:
observer.notify(message)
# 移除失效引用
self._observers -= dead_refs
subject = Subject()
# 创建观察者
obs1 = Observer("观察者1")
obs2 = Observer("观察者2")
subject.attach(obs1)
subject.attach(obs2)
subject.notify("第一条消息")
# 删除一个观察者
del obs1
subject.notify("第二条消息")
del obs2
subject.notify("第三条消息")
# @profile # 取消注释以启用内存分析
def memory_intensive_function():
"""内存密集型函数示例"""
# 创建大量对象
data = []
for i in range(100000):
data.append({"id": i, "value": f"item_{i}"})
# 处理数据
processed = []
for item in data:
if item["id"] % 2 == 0:
processed.append(item["value"].upper())
return len(processed)
def performance_optimization_examples():
"""性能优化示例"""
print("\n=== 性能优化示例 ===")
import time
from functools import lru_cache
# 缓存装饰器示例
@lru_cache(maxsize=128)
def fibonacci_cached(n):
"""带缓存的斐波那契数列"""
if n < 2:
return n
return fibonacci_cached(n-1) + fibonacci_cached(n-2)
def fibonacci_normal(n):
"""普通斐波那契数列"""
if n < 2:
return n
return fibonacci_normal(n-1) + fibonacci_normal(n-2)
# 性能对比
n = 30
# 普通版本
start_time = time.time()
result_normal = fibonacci_normal(n)
normal_time = time.time() - start_time
# 缓存版本
start_time = time.time()
result_cached = fibonacci_cached(n)
cached_time = time.time() - start_time
print(f"普通版本: 结果={result_normal}, 耗时={normal_time:.4f}秒")
print(f"缓存版本: 结果={result_cached}, 耗时={cached_time:.4f}秒")
print(f"性能提升: {normal_time/cached_time:.2f}倍")
# 查看缓存信息
print(f"缓存信息: {fibonacci_cached.cache_info()}")
# 列表推导式 vs 循环
print("\n=== 列表推导式性能对比 ===")
data = range(100000)
# 使用循环
start_time = time.time()
result_loop = []
for x in data:
if x % 2 == 0:
result_loop.append(x * 2)
loop_time = time.time() - start_time
# 使用列表推导式
start_time = time.time()
result_comprehension = [x * 2 for x in data if x % 2 == 0]
comp_time = time.time() - start_time
print(f"循环方式: 耗时={loop_time:.4f}秒")
print(f"列表推导式: 耗时={comp_time:.4f}秒")
print(f"性能提升: {loop_time/comp_time:.2f}倍")
# 运行示例
if __name__ == "__main__":
memory_management_examples()
performance_optimization_examples()
# 运行内存密集型函数
print("\n=== 内存密集型函数 ===")
result = memory_intensive_function()
print(f"处理结果: {result}")
# 五、设计模式
# 5.1 创建型模式
from abc import ABC, abstractmethod
from typing import Dict, Any
import copy
# 单例模式
class Singleton:
"""单例模式实现"""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
self.data = {}
self._initialized = True
def set_data(self, key, value):
self.data[key] = value
def get_data(self, key):
return self.data.get(key)
# 工厂模式
class Animal(ABC):
"""动物抽象基类"""
@abstractmethod
def make_sound(self) -> str:
pass
@abstractmethod
def get_type(self) -> str:
pass
class Dog(Animal):
def make_sound(self) -> str:
return "汪汪"
def get_type(self) -> str:
return "狗"
class Cat(Animal):
def make_sound(self) -> str:
return "喵喵"
def get_type(self) -> str:
return "猫"
class Bird(Animal):
def make_sound(self) -> str:
return "啾啾"
def get_type(self) -> str:
return "鸟"
class AnimalFactory:
"""动物工厂"""
_animals = {
'dog': Dog,
'cat': Cat,
'bird': Bird
}
@classmethod
def create_animal(cls, animal_type: str) -> Animal:
animal_class = cls._animals.get(animal_type.lower())
if animal_class is None:
raise ValueError(f"不支持的动物类型: {animal_type}")
return animal_class()
@classmethod
def get_available_types(cls):
return list(cls._animals.keys())
# 建造者模式
class Computer:
"""计算机类"""
def __init__(self):
self.cpu = None
self.memory = None
self.storage = None
self.graphics = None
self.price = 0
def __str__(self):
return f"Computer(CPU: {self.cpu}, Memory: {self.memory}, Storage: {self.storage}, Graphics: {self.graphics}, Price: ¥{self.price})"
class ComputerBuilder:
"""计算机建造者"""
def __init__(self):
self.computer = Computer()
def set_cpu(self, cpu: str, price: int):
self.computer.cpu = cpu
self.computer.price += price
return self
def set_memory(self, memory: str, price: int):
self.computer.memory = memory
self.computer.price += price
return self
def set_storage(self, storage: str, price: int):
self.computer.storage = storage
self.computer.price += price
return self
def set_graphics(self, graphics: str, price: int):
self.computer.graphics = graphics
self.computer.price += price
return self
def build(self) -> Computer:
return self.computer
class ComputerDirector:
"""计算机构建指导者"""
@staticmethod
def build_gaming_computer():
"""构建游戏电脑"""
return (ComputerBuilder()
.set_cpu("Intel i7-12700K", 3000)
.set_memory("32GB DDR4", 1200)
.set_storage("1TB NVMe SSD", 800)
.set_graphics("RTX 4070", 4500)
.build())
@staticmethod
def build_office_computer():
"""构建办公电脑"""
return (ComputerBuilder()
.set_cpu("Intel i5-12400", 1500)
.set_memory("16GB DDR4", 600)
.set_storage("512GB SSD", 400)
.set_graphics("集成显卡", 0)
.build())
# 原型模式
class Prototype(ABC):
"""原型抽象基类"""
@abstractmethod
def clone(self):
pass
class Document(Prototype):
"""文档类"""
def __init__(self, title: str, content: str, metadata: Dict[str, Any] = None):
self.title = title
self.content = content
self.metadata = metadata or {}
self.created_at = time.time()
def clone(self):
"""深拷贝克隆"""
return copy.deepcopy(self)
def shallow_clone(self):
"""浅拷贝克隆"""
return copy.copy(self)
def __str__(self):
return f"Document(title='{self.title}', content_length={len(self.content)}, metadata={self.metadata})"
# 使用示例
def design_patterns_examples():
"""设计模式示例"""
print("=== 设计模式示例 ===")
# 单例模式
print("\n=== 单例模式 ===")
singleton1 = Singleton()
singleton2 = Singleton()
print(f"singleton1 is singleton2: {singleton1 is singleton2}")
singleton1.set_data("key1", "value1")
print(f"singleton2.get_data('key1'): {singleton2.get_data('key1')}")
# 工厂模式
print("\n=== 工厂模式 ===")
print(f"可用动物类型: {AnimalFactory.get_available_types()}")
animals = []
for animal_type in ['dog', 'cat', 'bird']:
animal = AnimalFactory.create_animal(animal_type)
animals.append(animal)
print(f"{animal.get_type()}: {animal.make_sound()}")
# 建造者模式
print("\n=== 建造者模式 ===")
gaming_pc = ComputerDirector.build_gaming_computer()
office_pc = ComputerDirector.build_office_computer()
print(f"游戏电脑: {gaming_pc}")
print(f"办公电脑: {office_pc}")
# 自定义配置
custom_pc = (ComputerBuilder()
.set_cpu("AMD Ryzen 7", 2500)
.set_memory("64GB DDR4", 2000)
.set_storage("2TB NVMe SSD", 1500)
.set_graphics("RTX 4080", 6000)
.build())
print(f"自定义电脑: {custom_pc}")
# 原型模式
print("\n=== 原型模式 ===")
original_doc = Document(
"原始文档",
"这是原始文档的内容",
{"author": "张三", "version": 1.0}
)
# 深拷贝
cloned_doc = original_doc.clone()
cloned_doc.title = "克隆文档"
cloned_doc.metadata["author"] = "李四"
print(f"原始文档: {original_doc}")
print(f"克隆文档: {cloned_doc}")
print(f"元数据是否相同对象: {original_doc.metadata is cloned_doc.metadata}")
# 浅拷贝
shallow_doc = original_doc.shallow_clone()
shallow_doc.title = "浅拷贝文档"
shallow_doc.metadata["version"] = 2.0 # 会影响原始文档
print(f"浅拷贝后原始文档: {original_doc}")
print(f"浅拷贝文档: {shallow_doc}")
# 运行示例
if __name__ == "__main__":
design_patterns_examples()
# 5.2 结构型和行为型模式
from abc import ABC, abstractmethod
from typing import List, Dict, Any
import time
# 装饰器模式
class Coffee(ABC):
"""咖啡抽象基类"""
@abstractmethod
def get_description(self) -> str:
pass
@abstractmethod
def get_cost(self) -> float:
pass
class SimpleCoffee(Coffee):
"""简单咖啡"""
def get_description(self) -> str:
return "简单咖啡"
def get_cost(self) -> float:
return 10.0
class CoffeeDecorator(Coffee):
"""咖啡装饰器基类"""
def __init__(self, coffee: Coffee):
self._coffee = coffee
def get_description(self) -> str:
return self._coffee.get_description()
def get_cost(self) -> float:
return self._coffee.get_cost()
class MilkDecorator(CoffeeDecorator):
"""牛奶装饰器"""
def get_description(self) -> str:
return self._coffee.get_description() + ", 牛奶"
def get_cost(self) -> float:
return self._coffee.get_cost() + 2.0
class SugarDecorator(CoffeeDecorator):
"""糖装饰器"""
def get_description(self) -> str:
return self._coffee.get_description() + ", 糖"
def get_cost(self) -> float:
return self._coffee.get_cost() + 1.0
class WhipDecorator(CoffeeDecorator):
"""奶泡装饰器"""
def get_description(self) -> str:
return self._coffee.get_description() + ", 奶泡"
def get_cost(self) -> float:
return self._coffee.get_cost() + 3.0
# 观察者模式
class Observer(ABC):
"""观察者接口"""
@abstractmethod
def update(self, subject, data: Any):
pass
class Subject:
"""主题类"""
def __init__(self):
self._observers: List[Observer] = []
self._state = None
def attach(self, observer: Observer):
"""添加观察者"""
if observer not in self._observers:
self._observers.append(observer)
print(f"观察者 {observer.__class__.__name__} 已添加")
def detach(self, observer: Observer):
"""移除观察者"""
if observer in self._observers:
self._observers.remove(observer)
print(f"观察者 {observer.__class__.__name__} 已移除")
def notify(self, data: Any = None):
"""通知所有观察者"""
print(f"通知 {len(self._observers)} 个观察者")
for observer in self._observers:
observer.update(self, data)
def set_state(self, state: Any):
"""设置状态并通知观察者"""
self._state = state
self.notify(state)
def get_state(self):
return self._state
class EmailNotifier(Observer):
"""邮件通知观察者"""
def __init__(self, email: str):
self.email = email
def update(self, subject, data: Any):
print(f"邮件通知 ({self.email}): 收到更新 - {data}")
class SMSNotifier(Observer):
"""短信通知观察者"""
def __init__(self, phone: str):
self.phone = phone
def update(self, subject, data: Any):
print(f"短信通知 ({self.phone}): 收到更新 - {data}")
class LogObserver(Observer):
"""日志观察者"""
def update(self, subject, data: Any):
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
print(f"日志记录 [{timestamp}]: 状态变更为 {data}")
# 策略模式
class PaymentStrategy(ABC):
"""支付策略接口"""
@abstractmethod
def pay(self, amount: float) -> str:
pass
class CreditCardPayment(PaymentStrategy):
"""信用卡支付"""
def __init__(self, card_number: str, holder_name: str):
self.card_number = card_number[-4:] # 只保留后4位
self.holder_name = holder_name
def pay(self, amount: float) -> str:
return f"使用信用卡支付 ¥{amount:.2f} (卡号: ****{self.card_number}, 持卡人: {self.holder_name})"
class AlipayPayment(PaymentStrategy):
"""支付宝支付"""
def __init__(self, account: str):
self.account = account
def pay(self, amount: float) -> str:
return f"使用支付宝支付 ¥{amount:.2f} (账户: {self.account})"
class WeChatPayment(PaymentStrategy):
"""微信支付"""
def __init__(self, account: str):
self.account = account
def pay(self, amount: float) -> str:
return f"使用微信支付 ¥{amount:.2f} (账户: {self.account})"
class PaymentContext:
"""支付上下文"""
def __init__(self, strategy: PaymentStrategy):
self._strategy = strategy
def set_strategy(self, strategy: PaymentStrategy):
self._strategy = strategy
def execute_payment(self, amount: float) -> str:
return self._strategy.pay(amount)
# 命令模式
class Command(ABC):
"""命令接口"""
@abstractmethod
def execute(self):
pass
@abstractmethod
def undo(self):
pass
class Light:
"""灯具类"""
def __init__(self, location: str):
self.location = location
self.is_on = False
def turn_on(self):
self.is_on = True
print(f"{self.location}的灯已打开")
def turn_off(self):
self.is_on = False
print(f"{self.location}的灯已关闭")
class LightOnCommand(Command):
"""开灯命令"""
def __init__(self, light: Light):
self.light = light
def execute(self):
self.light.turn_on()
def undo(self):
self.light.turn_off()
class LightOffCommand(Command):
"""关灯命令"""
def __init__(self, light: Light):
self.light = light
def execute(self):
self.light.turn_off()
def undo(self):
self.light.turn_on()
class RemoteControl:
"""遥控器"""
def __init__(self):
self.commands: Dict[str, Command] = {}
self.last_command: Command = None
def set_command(self, slot: str, command: Command):
self.commands[slot] = command
def press_button(self, slot: str):
if slot in self.commands:
command = self.commands[slot]
command.execute()
self.last_command = command
else:
print(f"按钮 {slot} 未设置命令")
def press_undo(self):
if self.last_command:
self.last_command.undo()
else:
print("没有可撤销的命令")
# 使用示例
def advanced_patterns_examples():
"""高级设计模式示例"""
print("=== 高级设计模式示例 ===")
# 装饰器模式
print("\n=== 装饰器模式 - 咖啡订制 ===")
coffee = SimpleCoffee()
print(f"{coffee.get_description()}: ¥{coffee.get_cost():.2f}")
# 添加牛奶
coffee = MilkDecorator(coffee)
print(f"{coffee.get_description()}: ¥{coffee.get_cost():.2f}")
# 添加糖
coffee = SugarDecorator(coffee)
print(f"{coffee.get_description()}: ¥{coffee.get_cost():.2f}")
# 添加奶泡
coffee = WhipDecorator(coffee)
print(f"{coffee.get_description()}: ¥{coffee.get_cost():.2f}")
# 观察者模式
print("\n=== 观察者模式 - 消息通知 ===")
news_agency = Subject()
# 创建观察者
email_notifier = EmailNotifier("user@example.com")
sms_notifier = SMSNotifier("13800138000")
log_observer = LogObserver()
# 添加观察者
news_agency.attach(email_notifier)
news_agency.attach(sms_notifier)
news_agency.attach(log_observer)
# 发布新闻
news_agency.set_state("Python 3.12 正式发布!")
print()
# 移除一个观察者
news_agency.detach(sms_notifier)
news_agency.set_state("新的Python教程上线了!")
# 策略模式
print("\n=== 策略模式 - 支付方式 ===")
# 创建不同的支付策略
credit_card = CreditCardPayment("1234567890123456", "张三")
alipay = AlipayPayment("zhangsan@example.com")
wechat = WeChatPayment("zhangsan_wx")
# 使用支付上下文
payment_context = PaymentContext(credit_card)
print(payment_context.execute_payment(100.0))
# 切换支付策略
payment_context.set_strategy(alipay)
print(payment_context.execute_payment(200.0))
payment_context.set_strategy(wechat)
print(payment_context.execute_payment(50.0))
# 命令模式
print("\n=== 命令模式 - 智能家居 ===")
# 创建设备
living_room_light = Light("客厅")
bedroom_light = Light("卧室")
# 创建命令
living_room_on = LightOnCommand(living_room_light)
living_room_off = LightOffCommand(living_room_light)
bedroom_on = LightOnCommand(bedroom_light)
bedroom_off = LightOffCommand(bedroom_light)
# 创建遥控器
remote = RemoteControl()
# 设置命令
remote.set_command("living_room_on", living_room_on)
remote.set_command("living_room_off", living_room_off)
remote.set_command("bedroom_on", bedroom_on)
remote.set_command("bedroom_off", bedroom_off)
# 使用遥控器
remote.press_button("living_room_on")
remote.press_button("bedroom_on")
remote.press_button("living_room_off")
# 撤销最后一个命令
remote.press_undo()
remote.press_button("bedroom_off")
remote.press_undo()
# 运行示例
if __name__ == "__main__":
advanced_patterns_examples()
# 六、调试和性能分析
# 6.1 调试技巧
import pdb
import traceback
import logging
from functools import wraps
import time
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def debug_decorator(func):
"""调试装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
logger.debug(f"调用函数 {func.__name__},参数: args={args}, kwargs={kwargs}")
try:
result = func(*args, **kwargs)
logger.debug(f"函数 {func.__name__} 返回: {result}")
return result
except Exception as e:
logger.error(f"函数 {func.__name__} 发生异常: {e}")
logger.error(f"异常详情:\n{traceback.format_exc()}")
raise
return wrapper
def timing_decorator(func):
"""计时装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"函数 {func.__name__} 执行时间: {end_time - start_time:.4f}秒")
return result
return wrapper
class DebugContext:
"""调试上下文管理器"""
def __init__(self, name):
self.name = name
def __enter__(self):
print(f"开始调试: {self.name}")
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
print(f"调试 {self.name} 时发生异常: {exc_type.__name__}: {exc_value}")
return False # 不抑制异常
print(f"调试 {self.name} 完成")
@debug_decorator
@timing_decorator
def complex_calculation(n):
"""复杂计算示例"""
if n < 0:
raise ValueError("n 必须是非负数")
result = 0
for i in range(n):
result += i ** 2
if i == n // 2:
# 在中间设置断点进行调试
# pdb.set_trace() # 取消注释以启用断点
pass
return result
def debugging_examples():
"""调试示例"""
print("=== 调试技巧示例 ===")
# 使用调试上下文
with DebugContext("计算测试"):
try:
result = complex_calculation(1000)
print(f"计算结果: {result}")
except ValueError as e:
print(f"捕获到值错误: {e}")
# 测试异常情况
with DebugContext("异常测试"):
try:
result = complex_calculation(-5)
except ValueError as e:
print(f"预期的异常: {e}")
# 使用断言进行调试
def test_function(x, y):
assert isinstance(x, (int, float)), f"x 必须是数字,实际类型: {type(x)}"
assert isinstance(y, (int, float)), f"y 必须是数字,实际类型: {type(y)}"
assert y != 0, "y 不能为零"
result = x / y
assert result > 0, f"结果必须为正数,实际值: {result}"
return result
print("\n=== 断言测试 ===")
try:
print(f"10 / 2 = {test_function(10, 2)}")
print(f"10 / -2 = {test_function(10, -2)}")
except AssertionError as e:
print(f"断言失败: {e}")
# 6.2 性能分析
import cProfile
import pstats
import io
from line_profiler import LineProfiler # 需要安装: pip install line_profiler
from memory_profiler import profile as memory_profile # 需要安装: pip install memory-profiler
def profile_function(func):
"""性能分析装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
result = func(*args, **kwargs)
pr.disable()
# 创建统计信息
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(10) # 显示前10个最耗时的函数
print(f"\n=== {func.__name__} 性能分析 ===")
print(s.getvalue())
return result
return wrapper
@profile_function
def inefficient_function():
"""低效函数示例"""
# 低效的字符串拼接
result = ""
for i in range(10000):
result += str(i)
return result
@profile_function
def efficient_function():
"""高效函数示例"""
# 高效的字符串拼接
parts = []
for i in range(10000):
parts.append(str(i))
return ''.join(parts)
# @memory_profile # 取消注释以启用内存分析
def memory_intensive_task():
"""内存密集型任务"""
# 创建大量数据
data = []
for i in range(100000):
data.append([j for j in range(100)])
# 处理数据
processed = []
for item in data:
processed.append(sum(item))
return len(processed)
def performance_analysis_examples():
"""性能分析示例"""
print("=== 性能分析示例 ===")
# 比较低效和高效的实现
print("\n=== 字符串拼接性能对比 ===")
start_time = time.time()
result1 = inefficient_function()
inefficient_time = time.time() - start_time
start_time = time.time()
result2 = efficient_function()
efficient_time = time.time() - start_time
print(f"低效方法耗时: {inefficient_time:.4f}秒")
print(f"高效方法耗时: {efficient_time:.4f}秒")
print(f"性能提升: {inefficient_time/efficient_time:.2f}倍")
# 内存使用分析
print("\n=== 内存使用分析 ===")
result = memory_intensive_task()
print(f"处理了 {result} 个数据项")
# 运行示例
if __name__ == "__main__":
debugging_examples()
performance_analysis_examples()
# 七、实战项目:高级任务管理系统
import asyncio
import json
import sqlite3
from abc import ABC, abstractmethod
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Optional, Dict, Any
from contextlib import asynccontextmanager
import weakref
class Priority(Enum):
"""任务优先级"""
LOW = 1
MEDIUM = 2
HIGH = 3
URGENT = 4
class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
CANCELLED = "cancelled"
@dataclass
class Task:
"""任务数据类"""
id: int
title: str
description: str
priority: Priority
status: TaskStatus
created_at: datetime
due_date: Optional[datetime] = None
completed_at: Optional[datetime] = None
tags: List[str] = None
def __post_init__(self):
if self.tags is None:
self.tags = []
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
data = asdict(self)
data['priority'] = self.priority.value
data['status'] = self.status.value
data['created_at'] = self.created_at.isoformat()
if self.due_date:
data['due_date'] = self.due_date.isoformat()
if self.completed_at:
data['completed_at'] = self.completed_at.isoformat()
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Task':
"""从字典创建任务"""
data['priority'] = Priority(data['priority'])
data['status'] = TaskStatus(data['status'])
data['created_at'] = datetime.fromisoformat(data['created_at'])
if data.get('due_date'):
data['due_date'] = datetime.fromisoformat(data['due_date'])
if data.get('completed_at'):
data['completed_at'] = datetime.fromisoformat(data['completed_at'])
return cls(**data)
class Observer(ABC):
"""观察者接口"""
@abstractmethod
async def on_task_created(self, task: Task):
pass
@abstractmethod
async def on_task_updated(self, task: Task):
pass
@abstractmethod
async def on_task_deleted(self, task_id: int):
pass
class TaskRepository(ABC):
"""任务仓库接口"""
@abstractmethod
async def create(self, task: Task) -> Task:
pass
@abstractmethod
async def get_by_id(self, task_id: int) -> Optional[Task]:
pass
@abstractmethod
async def get_all(self) -> List[Task]:
pass
@abstractmethod
async def update(self, task: Task) -> Task:
pass
@abstractmethod
async def delete(self, task_id: int) -> bool:
pass
class SQLiteTaskRepository(TaskRepository):
"""SQLite任务仓库实现"""
def __init__(self, db_path: str = "tasks.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
priority INTEGER NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
due_date TEXT,
completed_at TEXT,
tags TEXT
)
""")
conn.commit()
conn.close()
async def create(self, task: Task) -> Task:
"""创建任务"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO tasks (title, description, priority, status, created_at, due_date, completed_at, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
task.title,
task.description,
task.priority.value,
task.status.value,
task.created_at.isoformat(),
task.due_date.isoformat() if task.due_date else None,
task.completed_at.isoformat() if task.completed_at else None,
json.dumps(task.tags)
))
task.id = cursor.lastrowid
conn.commit()
conn.close()
return task
async def get_by_id(self, task_id: int) -> Optional[Task]:
"""根据ID获取任务"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM tasks WHERE id = ?", (task_id,))
row = cursor.fetchone()
conn.close()
if row:
return self._row_to_task(row)
return None
async def get_all(self) -> List[Task]:
"""获取所有任务"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("SELECT * FROM tasks ORDER BY created_at DESC")
rows = cursor.fetchall()
conn.close()
return [self._row_to_task(row) for row in rows]
async def update(self, task: Task) -> Task:
"""更新任务"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
UPDATE tasks SET
title = ?, description = ?, priority = ?, status = ?,
due_date = ?, completed_at = ?, tags = ?
WHERE id = ?
""", (
task.title,
task.description,
task.priority.value,
task.status.value,
task.due_date.isoformat() if task.due_date else None,
task.completed_at.isoformat() if task.completed_at else None,
json.dumps(task.tags),
task.id
))
conn.commit()
conn.close()
return task
async def delete(self, task_id: int) -> bool:
"""删除任务"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
deleted = cursor.rowcount > 0
conn.commit()
conn.close()
return deleted
def _row_to_task(self, row) -> Task:
"""将数据库行转换为Task对象"""
return Task(
id=row[0],
title=row[1],
description=row[2],
priority=Priority(row[3]),
status=TaskStatus(row[4]),
created_at=datetime.fromisoformat(row[5]),
due_date=datetime.fromisoformat(row[6]) if row[6] else None,
completed_at=datetime.fromisoformat(row[7]) if row[7] else None,
tags=json.loads(row[8]) if row[8] else []
)
class NotificationObserver(Observer):
"""通知观察者"""
async def on_task_created(self, task: Task):
print(f"📝 新任务创建: {task.title}")
async def on_task_updated(self, task: Task):
print(f"✏️ 任务更新: {task.title} - 状态: {task.status.value}")
async def on_task_deleted(self, task_id: int):
print(f"🗑️ 任务删除: ID {task_id}")
class TaskManager:
"""任务管理器"""
def __init__(self, repository: TaskRepository):
self.repository = repository
self._observers: List[weakref.ReferenceType] = []
self._next_id = 1
def add_observer(self, observer: Observer):
"""添加观察者"""
self._observers.append(weakref.ref(observer))
def remove_observer(self, observer: Observer):
"""移除观察者"""
self._observers = [ref for ref in self._observers if ref() is not observer]
async def _notify_observers(self, method_name: str, *args):
"""通知观察者"""
dead_refs = []
for ref in self._observers:
observer = ref()
if observer is None:
dead_refs.append(ref)
else:
method = getattr(observer, method_name)
await method(*args)
# 清理失效的引用
for ref in dead_refs:
self._observers.remove(ref)
async def create_task(
self,
title: str,
description: str = "",
priority: Priority = Priority.MEDIUM,
due_date: Optional[datetime] = None,
tags: List[str] = None
) -> Task:
"""创建任务"""
task = Task(
id=0, # 将由仓库设置
title=title,
description=description,
priority=priority,
status=TaskStatus.PENDING,
created_at=datetime.now(),
due_date=due_date,
tags=tags or []
)
task = await self.repository.create(task)
await self._notify_observers('on_task_created', task)
return task
async def get_task(self, task_id: int) -> Optional[Task]:
"""获取任务"""
return await self.repository.get_by_id(task_id)
async def get_all_tasks(self) -> List[Task]:
"""获取所有任务"""
return await self.repository.get_all()
async def update_task_status(self, task_id: int, status: TaskStatus) -> Optional[Task]:
"""更新任务状态"""
task = await self.repository.get_by_id(task_id)
if not task:
return None
task.status = status
if status == TaskStatus.COMPLETED:
task.completed_at = datetime.now()
task = await self.repository.update(task)
await self._notify_observers('on_task_updated', task)
return task
async def delete_task(self, task_id: int) -> bool:
"""删除任务"""
success = await self.repository.delete(task_id)
if success:
await self._notify_observers('on_task_deleted', task_id)
return success
async def get_overdue_tasks(self) -> List[Task]:
"""获取过期任务"""
all_tasks = await self.repository.get_all()
now = datetime.now()
return [
task for task in all_tasks
if task.due_date and task.due_date < now and task.status != TaskStatus.COMPLETED
]
async def get_tasks_by_priority(self, priority: Priority) -> List[Task]:
"""根据优先级获取任务"""
all_tasks = await self.repository.get_all()
return [task for task in all_tasks if task.priority == priority]
@asynccontextmanager
async def task_manager_context(db_path: str = "tasks.db"):
"""任务管理器上下文管理器"""
repository = SQLiteTaskRepository(db_path)
manager = TaskManager(repository)
# 添加通知观察者
notifier = NotificationObserver()
manager.add_observer(notifier)
try:
yield manager
finally:
print("任务管理器已关闭")
async def demo_task_management_system():
"""演示任务管理系统"""
print("=== 高级任务管理系统演示 ===")
async with task_manager_context() as manager:
# 创建任务
task1 = await manager.create_task(
"学习Python高级编程",
"深入学习元类、描述符、协程等高级特性",
Priority.HIGH,
datetime.now() + timedelta(days=7),
["学习", "编程"]
)
task2 = await manager.create_task(
"完成项目文档",
"编写项目的技术文档和用户手册",
Priority.MEDIUM,
datetime.now() + timedelta(days=3),
["文档", "项目"]
)
task3 = await manager.create_task(
"代码审查",
"审查团队成员提交的代码",
Priority.URGENT,
datetime.now() + timedelta(hours=2),
["代码审查", "团队"]
)
# 获取所有任务
print("\n=== 所有任务 ===")
all_tasks = await manager.get_all_tasks()
for task in all_tasks:
print(f"ID: {task.id}, 标题: {task.title}, 优先级: {task.priority.name}, 状态: {task.status.value}")
# 更新任务状态
print("\n=== 更新任务状态 ===")
await manager.update_task_status(task2.id, TaskStatus.IN_PROGRESS)
await manager.update_task_status(task3.id, TaskStatus.COMPLETED)
# 获取高优先级任务
print("\n=== 高优先级任务 ===")
high_priority_tasks = await manager.get_tasks_by_priority(Priority.HIGH)
for task in high_priority_tasks:
print(f"- {task.title}")
# 检查过期任务
print("\n=== 过期任务检查 ===")
overdue_tasks = await manager.get_overdue_tasks()
if overdue_tasks:
for task in overdue_tasks:
print(f"⚠️ 过期任务: {task.title} (截止日期: {task.due_date})")
else:
print("✅ 没有过期任务")
# 运行演示
if __name__ == "__main__":
asyncio.run(demo_task_management_system())
# 总结
通过本章的学习,你已经掌握了Python的高级编程技巧:
# 核心概念
元类(Metaclass)
- 理解"类的类"概念
- 使用
type
动态创建类 - 自定义元类实现特殊功能
- 实际应用:ORM、单例模式、验证等
描述符(Descriptor)
- 掌握描述符协议
- 实现属性验证和缓存
- 理解Python内置描述符的工作原理
协程和异步编程
- 理解生成器、协程和异步函数的区别
- 掌握
async/await
语法 - 使用异步上下文管理器和迭代器
- 并发编程最佳实践
内存管理和性能优化
- 理解Python的内存管理机制
- 避免内存泄漏和循环引用
- 使用性能分析工具
- 优化代码性能
设计模式
- 创建型模式:单例、工厂、建造者、原型
- 结构型模式:装饰器
- 行为型模式:观察者、策略、命令
调试和性能分析
- 使用调试工具和技巧
- 性能分析和优化方法
- 日志和错误处理
# 最佳实践
代码质量
- 使用类型注解提高代码可读性
- 编写单元测试和文档
- 遵循PEP 8编码规范
性能优化
- 选择合适的数据结构
- 使用生成器处理大数据
- 合理使用缓存
- 避免过早优化
异步编程
- 理解何时使用异步
- 避免阻塞操作
- 正确处理异常
- 使用连接池等资源管理
# 下一步学习方向
Web开发框架
- FastAPI(异步Web框架)
- Django(全功能Web框架)
- Flask(轻量级Web框架)
数据科学和机器学习
- NumPy、Pandas(数据处理)
- Scikit-learn(机器学习)
- TensorFlow、PyTorch(深度学习)
系统编程
- 多进程和多线程
- 网络编程
- 系统监控和自动化
DevOps和部署
- Docker容器化
- CI/CD流水线
- 云平台部署
# 练习建议
- 实现一个简单的ORM框架
- 创建一个异步Web爬虫
- 设计一个插件系统
- 开发一个性能监控工具
- 构建一个分布式任务队列
继续保持学习的热情,Python的高级特性将帮助你编写更优雅、更高效的代码!