第22天-异步IO
吒 2023/6/15
# 第22天-异步IO
# 1. 异步编程基础
# 1.1 异步编程概念
def async_programming_concepts():
"""异步编程概念演示"""
print("=== 异步编程概念演示 ===")
# 1. 同步vs异步对比
print("\n1. 同步vs异步编程对比:")
sync_async_comparison = '''
# 同步编程特点:
# - 代码按顺序执行,一行执行完才执行下一行
# - 遇到耗时操作(如网络请求、文件读写)会阻塞
# - 简单直观,但效率较低
import time
import requests
def sync_example():
"""同步编程示例"""
print("开始同步任务")
# 模拟耗时操作
time.sleep(2)
print("任务1完成")
time.sleep(2)
print("任务2完成")
time.sleep(2)
print("任务3完成")
print("所有同步任务完成")
# 异步编程特点:
# - 可以在等待耗时操作时执行其他任务
# - 不会阻塞程序执行
# - 提高程序效率,特别是I/O密集型任务
# - 代码相对复杂,需要理解事件循环概念
import asyncio
async def async_example():
"""异步编程示例"""
print("开始异步任务")
# 创建异步任务
task1 = asyncio.create_task(async_task("任务1", 2))
task2 = asyncio.create_task(async_task("任务2", 2))
task3 = asyncio.create_task(async_task("任务3", 2))
# 并发执行任务
await asyncio.gather(task1, task2, task3)
print("所有异步任务完成")
async def async_task(name, delay):
"""异步任务"""
await asyncio.sleep(delay)
print(f"{name}完成")
return name
# 运行异步示例
# asyncio.run(async_example())
'''
print(" 同步vs异步对比:")
print(sync_async_comparison)
# 2. 异步编程的优势
print("\n2. 异步编程的优势:")
advantages = '''
异步编程的主要优势:
1. 提高并发性能:
- 在等待I/O操作时可以执行其他任务
- 单线程处理大量并发请求
- 避免线程切换开销
2. 资源利用率高:
- 减少内存占用(相比多线程)
- 降低CPU上下文切换成本
- 更好的可扩展性
3. 适用场景:
- 网络编程(Web服务器、爬虫)
- 文件I/O操作
- 数据库访问
- 实时通信应用
4. 性能对比示例:
- 同步:3个2秒任务 = 6秒总时间
- 异步:3个2秒任务 = 2秒总时间(并发执行)
'''
print(" 异步编程优势:")
print(advantages)
# 3. 异步编程的挑战
print("\n3. 异步编程的挑战:")
challenges = '''
异步编程的挑战:
1. 学习曲线陡峭:
- 需要理解事件循环、协程等概念
- 调试相对困难
- 错误处理更复杂
2. 代码复杂性:
- 需要使用async/await语法
- 回调地狱问题
- 状态管理困难
3. 生态系统要求:
- 需要异步版本的库
- 不是所有库都支持异步
- 混合同步/异步代码的复杂性
4. 性能陷阱:
- CPU密集型任务不适合异步
- 不当使用可能降低性能
- 需要合理设计异步架构
'''
print(" 异步编程挑战:")
print(challenges)
print("\n ✓ 异步编程概念演示完成")
# 运行异步编程概念演示
async_programming_concepts()
# 1.2 Python异步编程发展历程
def python_async_history():
"""Python异步编程发展历程"""
print("=== Python异步编程发展历程 ===")
# 1. 发展历程
print("\n1. Python异步编程发展历程:")
history = '''
Python异步编程发展历程:
1. Python 2.5 (2006年):
- 引入生成器的send()和throw()方法
- 为协程奠定基础
2. Python 3.3 (2012年):
- 引入yield from语法
- 简化生成器委托
3. Python 3.4 (2014年):
- 引入asyncio模块
- 提供事件循环和协程支持
- 使用@asyncio.coroutine装饰器
4. Python 3.5 (2015年):
- 引入async/await语法
- 原生协程支持
- 异步编程成为语言特性
5. Python 3.6+ (2016年至今):
- 持续改进asyncio性能
- 增加新的异步特性
- 生态系统不断完善
'''
print(" 发展历程:")
print(history)
# 2. 核心概念
print("\n2. 异步编程核心概念:")
core_concepts = '''
异步编程核心概念:
1. 事件循环 (Event Loop):
- 异步程序的核心调度器
- 管理和执行异步任务
- 处理I/O事件和回调
2. 协程 (Coroutine):
- 可以暂停和恢复的函数
- 使用async def定义
- 通过await调用
3. 任务 (Task):
- 协程的包装器
- 可以并发执行
- 提供状态管理
4. Future:
- 表示异步操作的结果
- 可以设置回调
- 任务的基类
5. 异步上下文管理器:
- 支持async with语法
- 异步资源管理
6. 异步迭代器:
- 支持async for语法
- 异步数据流处理
'''
print(" 核心概念:")
print(core_concepts)
print("\n ✓ Python异步编程发展历程演示完成")
# 运行Python异步编程发展历程演示
python_async_history()
# 2. asyncio基础
# 2.1 事件循环和协程
import asyncio
import time
from datetime import datetime
def asyncio_basics_demo():
"""asyncio基础演示"""
print("=== asyncio基础演示 ===")
# 1. 事件循环基础
print("\n1. 事件循环基础:")
event_loop_code = '''
# 事件循环基础操作
import asyncio
# 方法1: 使用asyncio.run()(推荐)
async def main():
print("Hello, asyncio!")
await asyncio.sleep(1)
print("Goodbye, asyncio!")
# 运行协程
asyncio.run(main())
# 方法2: 手动管理事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
# 方法3: 获取当前事件循环
async def get_loop_info():
loop = asyncio.get_running_loop()
print(f"当前事件循环: {loop}")
print(f"循环是否运行: {loop.is_running()}")
# 事件循环调试信息
print(f"循环调试模式: {loop.get_debug()}")
asyncio.run(get_loop_info())
'''
print(" 事件循环基础代码:")
print(event_loop_code)
# 2. 协程基础
print("\n2. 协程基础:")
coroutine_code = '''
# 协程定义和使用
import asyncio
import time
# 定义协程函数
async def simple_coroutine(name, delay):
"""简单协程示例"""
print(f"{name} 开始执行")
await asyncio.sleep(delay) # 异步等待
print(f"{name} 执行完成")
return f"{name} 的结果"
# 协程的不同调用方式
async def coroutine_examples():
print("=== 协程调用示例 ===")
# 1. 直接await
print("\n1. 直接await调用:")
result = await simple_coroutine("任务1", 1)
print(f"结果: {result}")
# 2. 并发执行多个协程
print("\n2. 并发执行:")
start_time = time.time()
# 使用asyncio.gather()
results = await asyncio.gather(
simple_coroutine("任务A", 1),
simple_coroutine("任务B", 2),
simple_coroutine("任务C", 1.5)
)
end_time = time.time()
print(f"并发执行结果: {results}")
print(f"总耗时: {end_time - start_time:.2f}秒")
# 3. 使用create_task()
print("\n3. 使用create_task():")
task1 = asyncio.create_task(simple_coroutine("任务X", 1))
task2 = asyncio.create_task(simple_coroutine("任务Y", 1))
# 等待任务完成
result1 = await task1
result2 = await task2
print(f"任务结果: {result1}, {result2}")
# 运行协程示例
asyncio.run(coroutine_examples())
'''
print(" 协程基础代码:")
print(coroutine_code)
# 3. 实际运行示例
print("\n3. 实际运行示例:")
async def demo_coroutine(name, delay):
"""演示协程"""
print(f"[{datetime.now().strftime('%H:%M:%S')}] {name} 开始")
await asyncio.sleep(delay)
print(f"[{datetime.now().strftime('%H:%M:%S')}] {name} 完成")
return f"{name}_result"
async def run_demo():
"""运行演示"""
print("开始异步任务演示...")
# 记录开始时间
start = time.time()
# 并发执行三个任务
results = await asyncio.gather(
demo_coroutine("下载文件", 2),
demo_coroutine("处理数据", 1.5),
demo_coroutine("发送邮件", 1)
)
# 记录结束时间
end = time.time()
print(f"\n任务结果: {results}")
print(f"总耗时: {end - start:.2f}秒")
print("如果是同步执行,需要4.5秒")
# 运行演示
asyncio.run(run_demo())
print("\n ✓ asyncio基础演示完成")
# 运行asyncio基础演示
asyncio_basics_demo()
# 2.2 任务管理和并发控制
import asyncio
import random
import time
from concurrent.futures import ThreadPoolExecutor
def task_management_demo():
"""任务管理和并发控制演示"""
print("=== 任务管理和并发控制演示 ===")
# 1. 任务创建和管理
print("\n1. 任务创建和管理:")
task_creation_code = '''
# 任务创建和管理
import asyncio
import random
async def worker_task(worker_id, work_time):
"""工作任务"""
print(f"工作者 {worker_id} 开始工作")
await asyncio.sleep(work_time)
# 模拟随机失败
if random.random() < 0.2: # 20%失败率
raise Exception(f"工作者 {worker_id} 工作失败")
print(f"工作者 {worker_id} 完成工作")
return f"工作者{worker_id}的结果"
async def task_management_example():
"""任务管理示例"""
print("=== 任务管理示例 ===")
# 1. 创建多个任务
tasks = []
for i in range(5):
work_time = random.uniform(1, 3)
task = asyncio.create_task(
worker_task(i, work_time),
name=f"worker_{i}" # 给任务命名
)
tasks.append(task)
# 2. 监控任务状态
print("\n任务状态监控:")
for task in tasks:
print(f"任务 {task.get_name()}: {task.done()}")
# 3. 等待所有任务完成(处理异常)
results = await asyncio.gather(*tasks, return_exceptions=True)
# 4. 处理结果
print("\n任务结果:")
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 失败: {result}")
else:
print(f"任务 {i} 成功: {result}")
# 5. 检查任务最终状态
print("\n最终任务状态:")
for task in tasks:
status = "完成" if task.done() else "运行中"
exception = task.exception() if task.done() else None
print(f"任务 {task.get_name()}: {status}")
if exception:
print(f" 异常: {exception}")
# 运行任务管理示例
asyncio.run(task_management_example())
'''
print(" 任务创建和管理代码:")
print(task_creation_code)
# 2. 并发控制
print("\n2. 并发控制:")
concurrency_control_code = '''
# 并发控制示例
import asyncio
import time
# 信号量控制并发数
async def limited_worker(semaphore, worker_id, work_time):
"""受限制的工作任务"""
async with semaphore: # 获取信号量
print(f"工作者 {worker_id} 开始工作 (当前时间: {time.strftime('%H:%M:%S')})")
await asyncio.sleep(work_time)
print(f"工作者 {worker_id} 完成工作 (当前时间: {time.strftime('%H:%M:%S')})")
return f"结果_{worker_id}"
async def concurrency_control_example():
"""并发控制示例"""
print("=== 并发控制示例 ===")
# 创建信号量,限制同时运行的任务数为3
semaphore = asyncio.Semaphore(3)
# 创建10个任务
tasks = []
for i in range(10):
task = asyncio.create_task(
limited_worker(semaphore, i, random.uniform(1, 2))
)
tasks.append(task)
print(f"创建了 {len(tasks)} 个任务,但同时只能运行3个")
# 等待所有任务完成
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"\n所有任务完成,耗时: {end_time - start_time:.2f}秒")
print(f"结果数量: {len(results)}")
# 运行并发控制示例
asyncio.run(concurrency_control_example())
'''
print(" 并发控制代码:")
print(concurrency_control_code)
# 3. 任务取消和超时
print("\n3. 任务取消和超时:")
cancellation_code = '''
# 任务取消和超时控制
import asyncio
async def long_running_task(task_id, duration):
"""长时间运行的任务"""
try:
print(f"任务 {task_id} 开始执行")
await asyncio.sleep(duration)
print(f"任务 {task_id} 正常完成")
return f"任务{task_id}完成"
except asyncio.CancelledError:
print(f"任务 {task_id} 被取消")
# 清理资源
await asyncio.sleep(0.1) # 模拟清理时间
print(f"任务 {task_id} 清理完成")
raise # 重新抛出取消异常
async def cancellation_example():
"""任务取消示例"""
print("=== 任务取消示例 ===")
# 1. 手动取消任务
print("\n1. 手动取消任务:")
task = asyncio.create_task(long_running_task(1, 5))
# 等待1秒后取消任务
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务已被取消")
# 2. 超时控制
print("\n2. 超时控制:")
try:
# 设置3秒超时
result = await asyncio.wait_for(
long_running_task(2, 5),
timeout=3.0
)
print(f"任务结果: {result}")
except asyncio.TimeoutError:
print("任务超时")
# 3. 批量任务超时控制
print("\n3. 批量任务超时控制:")
tasks = [
asyncio.create_task(long_running_task(i, random.uniform(1, 4)))
for i in range(3, 6)
]
try:
# 等待所有任务完成,但设置总超时时间
results = await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=2.5
)
print(f"批量任务结果: {results}")
except asyncio.TimeoutError:
print("批量任务超时,取消所有未完成的任务")
for task in tasks:
if not task.done():
task.cancel()
# 等待取消完成
await asyncio.gather(*tasks, return_exceptions=True)
# 运行任务取消示例
asyncio.run(cancellation_example())
'''
print(" 任务取消和超时代码:")
print(cancellation_code)
# 4. 实际运行示例
print("\n4. 实际运行示例:")
async def demo_worker(worker_id, duration):
"""演示工作任务"""
print(f"[{time.strftime('%H:%M:%S')}] 工作者 {worker_id} 开始")
await asyncio.sleep(duration)
print(f"[{time.strftime('%H:%M:%S')}] 工作者 {worker_id} 完成")
return f"工作者{worker_id}结果"
async def run_concurrency_demo():
"""运行并发控制演示"""
print("开始并发控制演示...")
# 创建信号量,限制并发数为2
semaphore = asyncio.Semaphore(2)
async def controlled_worker(worker_id):
async with semaphore:
return await demo_worker(worker_id, random.uniform(1, 2))
# 创建5个任务
tasks = [
asyncio.create_task(controlled_worker(i))
for i in range(5)
]
# 等待所有任务完成
start = time.time()
results = await asyncio.gather(*tasks)
end = time.time()
print(f"\n所有任务完成: {results}")
print(f"总耗时: {end - start:.2f}秒")
# 运行演示
asyncio.run(run_concurrency_demo())
print("\n ✓ 任务管理和并发控制演示完成")
# 运行任务管理和并发控制演示
task_management_demo()
# 3. 异步上下文管理器和迭代器
# 3.1 异步上下文管理器
import asyncio
import aiofiles
import aiohttp
from contextlib import asynccontextmanager
def async_context_manager_demo():
"""异步上下文管理器演示"""
print("=== 异步上下文管理器演示 ===")
# 1. 基础异步上下文管理器
print("\n1. 基础异步上下文管理器:")
basic_context_code = '''
# 异步上下文管理器基础
import asyncio
class AsyncResource:
"""异步资源管理器"""
def __init__(self, name):
self.name = name
self.is_open = False
async def __aenter__(self):
"""异步进入上下文"""
print(f"正在打开资源: {self.name}")
await asyncio.sleep(0.1) # 模拟异步操作
self.is_open = True
print(f"资源 {self.name} 已打开")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步退出上下文"""
print(f"正在关闭资源: {self.name}")
await asyncio.sleep(0.1) # 模拟异步操作
self.is_open = False
print(f"资源 {self.name} 已关闭")
# 处理异常
if exc_type:
print(f"处理异常: {exc_type.__name__}: {exc_val}")
return False # 不抑制异常
async def do_work(self):
"""执行工作"""
if not self.is_open:
raise RuntimeError("资源未打开")
print(f"使用资源 {self.name} 执行工作")
await asyncio.sleep(0.5)
return f"工作结果来自 {self.name}"
# 使用异步上下文管理器
async def use_async_context_manager():
"""使用异步上下文管理器"""
print("=== 使用异步上下文管理器 ===")
# 正常使用
async with AsyncResource("数据库连接") as resource:
result = await resource.do_work()
print(f"工作结果: {result}")
print("\n--- 异常处理 ---")
# 异常处理
try:
async with AsyncResource("文件句柄") as resource:
await resource.do_work()
raise ValueError("模拟异常")
except ValueError as e:
print(f"捕获异常: {e}")
# 运行示例
asyncio.run(use_async_context_manager())
'''
print(" 基础异步上下文管理器代码:")
print(basic_context_code)
# 2. 异步上下文管理器装饰器
print("\n2. 异步上下文管理器装饰器:")
decorator_code = '''
# 使用@asynccontextmanager装饰器
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def async_database_connection(db_url):
"""异步数据库连接上下文管理器"""
print(f"连接到数据库: {db_url}")
connection = None
try:
# 模拟建立连接
await asyncio.sleep(0.2)
connection = {"url": db_url, "connected": True}
print("数据库连接已建立")
yield connection # 返回连接对象
except Exception as e:
print(f"数据库操作异常: {e}")
raise
finally:
# 清理资源
if connection:
print("关闭数据库连接")
await asyncio.sleep(0.1)
connection["connected"] = False
print("数据库连接已关闭")
@asynccontextmanager
async def async_file_manager(filename, mode='r'):
"""异步文件管理器"""
print(f"打开文件: {filename} (模式: {mode})")
try:
# 模拟异步文件操作
await asyncio.sleep(0.1)
file_obj = {"name": filename, "mode": mode, "content": ""}
print(f"文件 {filename} 已打开")
yield file_obj
except Exception as e:
print(f"文件操作异常: {e}")
raise
finally:
print(f"关闭文件: {filename}")
await asyncio.sleep(0.1)
print(f"文件 {filename} 已关闭")
# 使用装饰器创建的上下文管理器
async def use_decorator_context_managers():
"""使用装饰器创建的上下文管理器"""
print("=== 使用装饰器上下文管理器 ===")
# 使用数据库连接
async with async_database_connection("postgresql://localhost/mydb") as db:
print(f"执行数据库查询: {db}")
await asyncio.sleep(0.3)
print("查询完成")
print("\n--- 文件操作 ---")
# 使用文件管理器
async with async_file_manager("data.txt", "w") as file:
print(f"写入文件: {file}")
file["content"] = "Hello, async world!"
await asyncio.sleep(0.2)
print("文件写入完成")
# 运行示例
asyncio.run(use_decorator_context_managers())
'''
print(" 装饰器上下文管理器代码:")
print(decorator_code)
# 3. 实际应用示例
print("\n3. 实际应用示例:")
@asynccontextmanager
async def demo_connection_pool(pool_size=3):
"""演示连接池上下文管理器"""
print(f"创建连接池 (大小: {pool_size})")
# 模拟创建连接池
pool = {
"connections": [f"conn_{i}" for i in range(pool_size)],
"available": list(range(pool_size)),
"in_use": []
}
try:
await asyncio.sleep(0.1)
print(f"连接池创建完成: {pool['connections']}")
yield pool
finally:
print("清理连接池")
await asyncio.sleep(0.1)
print("连接池已清理")
async def run_context_demo():
"""运行上下文管理器演示"""
print("开始上下文管理器演示...")
async with demo_connection_pool(2) as pool:
print(f"使用连接池: {pool['connections']}")
# 模拟使用连接
if pool['available']:
conn_id = pool['available'].pop(0)
pool['in_use'].append(conn_id)
print(f"获取连接: {pool['connections'][conn_id]}")
await asyncio.sleep(1)
# 释放连接
pool['in_use'].remove(conn_id)
pool['available'].append(conn_id)
print(f"释放连接: {pool['connections'][conn_id]}")
# 运行演示
asyncio.run(run_context_demo())
print("\n ✓ 异步上下文管理器演示完成")
# 运行异步上下文管理器演示
async_context_manager_demo()
# 3.2 异步迭代器
import asyncio
from typing import AsyncIterator
def async_iterator_demo():
"""异步迭代器演示"""
print("=== 异步迭代器演示 ===")
# 1. 基础异步迭代器
print("\n1. 基础异步迭代器:")
basic_iterator_code = '''
# 异步迭代器基础
import asyncio
class AsyncRange:
"""异步范围迭代器"""
def __init__(self, start, stop, step=1, delay=0.1):
self.start = start
self.stop = stop
self.step = step
self.delay = delay
self.current = start
def __aiter__(self):
"""返回异步迭代器对象"""
return self
async def __anext__(self):
"""返回下一个值"""
if self.current >= self.stop:
raise StopAsyncIteration
# 模拟异步操作
await asyncio.sleep(self.delay)
value = self.current
self.current += self.step
print(f"生成值: {value}")
return value
# 使用异步迭代器
async def use_async_iterator():
"""使用异步迭代器"""
print("=== 使用异步迭代器 ===")
# 使用async for循环
async for value in AsyncRange(0, 5):
print(f"处理值: {value}")
await asyncio.sleep(0.1) # 模拟处理时间
print("异步迭代完成")
# 运行示例
asyncio.run(use_async_iterator())
'''
print(" 基础异步迭代器代码:")
print(basic_iterator_code)
# 2. 异步生成器
print("\n2. 异步生成器:")
async_generator_code = '''
# 异步生成器
import asyncio
import random
async def async_data_generator(count, delay_range=(0.1, 0.5)):
"""异步数据生成器"""
for i in range(count):
# 模拟异步数据获取
delay = random.uniform(*delay_range)
await asyncio.sleep(delay)
# 生成数据
data = {
"id": i,
"value": random.randint(1, 100),
"timestamp": asyncio.get_event_loop().time()
}
print(f"生成数据: {data}")
yield data
async def async_file_reader(filename, chunk_size=1024):
"""异步文件读取生成器"""
print(f"开始读取文件: {filename}")
# 模拟文件内容
content = "这是一个很长的文件内容," * 20
for i in range(0, len(content), chunk_size):
# 模拟异步读取
await asyncio.sleep(0.1)
chunk = content[i:i + chunk_size]
print(f"读取块 {i//chunk_size + 1}: {len(chunk)} 字符")
yield chunk
print("文件读取完成")
# 使用异步生成器
async def use_async_generators():
"""使用异步生成器"""
print("=== 使用异步生成器 ===")
# 1. 数据生成器
print("\n1. 异步数据生成:")
data_count = 0
async for data in async_data_generator(3):
print(f"处理数据: {data['id']} -> {data['value']}")
data_count += 1
print(f"总共处理了 {data_count} 条数据")
# 2. 文件读取生成器
print("\n2. 异步文件读取:")
chunks = []
async for chunk in async_file_reader("example.txt", 50):
chunks.append(chunk)
print(f"累积内容长度: {sum(len(c) for c in chunks)}")
print(f"文件读取完成,总共 {len(chunks)} 个块")
# 运行示例
asyncio.run(use_async_generators())
'''
print(" 异步生成器代码:")
print(async_generator_code)
# 3. 实际应用示例
print("\n3. 实际应用示例:")
async def demo_data_stream(count=3):
"""演示数据流生成器"""
for i in range(count):
await asyncio.sleep(0.5)
data = {
"id": i,
"message": f"数据项 {i}",
"timestamp": asyncio.get_event_loop().time()
}
print(f"[生成器] 产生数据: {data['message']}")
yield data
async def demo_data_processor():
"""演示数据处理器"""
processed_count = 0
async for item in demo_data_stream(5):
print(f"[处理器] 处理: {item['message']}")
await asyncio.sleep(0.2) # 模拟处理时间
processed_count += 1
print(f"[处理器] 已处理 {processed_count} 项")
print(f"数据处理完成,总计: {processed_count} 项")
# 运行演示
asyncio.run(demo_data_processor())
print("\n ✓ 异步迭代器演示完成")
# 运行异步迭代器演示
async_iterator_demo()
# 4. 异步网络编程
# 4.1 异步HTTP客户端
import asyncio
import aiohttp
import time
from typing import List, Dict
def async_http_client_demo():
"""异步HTTP客户端演示"""
print("=== 异步HTTP客户端演示 ===")
# 1. 基础HTTP请求
print("\n1. 基础HTTP请求:")
basic_http_code = '''
# 异步HTTP客户端基础
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""获取单个URL"""
try:
print(f"开始请求: {url}")
async with session.get(url) as response:
content = await response.text()
print(f"完成请求: {url} - 状态码: {response.status}")
return {
"url": url,
"status": response.status,
"content_length": len(content),
"headers": dict(response.headers)
}
except Exception as e:
print(f"请求失败: {url} - 错误: {e}")
return {"url": url, "error": str(e)}
async def fetch_multiple_urls(urls):
"""并发获取多个URL"""
print(f"=== 并发请求 {len(urls)} 个URL ===")
start_time = time.time()
# 创建HTTP会话
async with aiohttp.ClientSession() as session:
# 创建任务列表
tasks = [fetch_url(session, url) for url in urls]
# 并发执行所有请求
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
print(f"\n所有请求完成,耗时: {end_time - start_time:.2f}秒")
# 处理结果
success_count = 0
error_count = 0
for result in results:
if isinstance(result, Exception):
print(f"异常: {result}")
error_count += 1
elif "error" in result:
print(f"错误: {result['url']} - {result['error']}")
error_count += 1
else:
print(f"成功: {result['url']} - {result['status']} - {result['content_length']} 字符")
success_count += 1
print(f"\n统计: 成功 {success_count}, 失败 {error_count}")
return results
# 示例URL列表
test_urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/json",
"https://httpbin.org/headers"
]
# 运行并发请求
# asyncio.run(fetch_multiple_urls(test_urls))
'''
print(" 基础HTTP请求代码:")
print(basic_http_code)
# 2. 高级HTTP功能
print("\n2. 高级HTTP功能:")
advanced_http_code = '''
# 高级HTTP功能
import asyncio
import aiohttp
import json
class AsyncHTTPClient:
"""异步HTTP客户端类"""
def __init__(self, timeout=10, max_connections=100):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.connector = aiohttp.TCPConnector(limit=max_connections)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
timeout=self.timeout,
connector=self.connector
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def get(self, url, headers=None, params=None):
"""GET请求"""
async with self.session.get(url, headers=headers, params=params) as response:
return await self._process_response(response)
async def post(self, url, data=None, json_data=None, headers=None):
"""POST请求"""
kwargs = {"headers": headers}
if json_data:
kwargs["json"] = json_data
elif data:
kwargs["data"] = data
async with self.session.post(url, **kwargs) as response:
return await self._process_response(response)
async def _process_response(self, response):
"""处理响应"""
content_type = response.headers.get('content-type', '')
if 'application/json' in content_type:
content = await response.json()
else:
content = await response.text()
return {
"status": response.status,
"headers": dict(response.headers),
"content": content,
"url": str(response.url)
}
async def download_file(self, url, filename):
"""下载文件"""
print(f"开始下载: {url} -> {filename}")
async with self.session.get(url) as response:
if response.status == 200:
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
# 模拟文件写入(实际应用中使用aiofiles)
content = b""
async for chunk in response.content.iter_chunked(8192):
content += chunk
downloaded += len(chunk)
if total_size > 0:
progress = (downloaded / total_size) * 100
print(f"下载进度: {progress:.1f}% ({downloaded}/{total_size})")
print(f"下载完成: {filename} ({len(content)} 字节)")
return {"filename": filename, "size": len(content)}
else:
raise Exception(f"下载失败: HTTP {response.status}")
# 使用高级HTTP客户端
async def use_advanced_http_client():
"""使用高级HTTP客户端"""
print("=== 使用高级HTTP客户端 ===")
async with AsyncHTTPClient(timeout=15) as client:
# GET请求
print("\n1. GET请求:")
result = await client.get("https://httpbin.org/get",
params={"key": "value", "test": "async"})
print(f"GET结果: {result['status']} - {len(str(result['content']))} 字符")
# POST请求
print("\n2. POST请求:")
post_data = {"name": "异步测试", "type": "demo"}
result = await client.post("https://httpbin.org/post", json_data=post_data)
print(f"POST结果: {result['status']}")
# 文件下载
print("\n3. 文件下载:")
try:
download_result = await client.download_file(
"https://httpbin.org/bytes/1024",
"test_file.bin"
)
print(f"下载结果: {download_result}")
except Exception as e:
print(f"下载失败: {e}")
# 运行高级HTTP客户端示例
# asyncio.run(use_advanced_http_client())
'''
print(" 高级HTTP功能代码:")
print(advanced_http_code)
# 3. 实际应用示例
print("\n3. 实际应用示例:")
async def demo_http_requests():
"""演示HTTP请求"""
print("开始HTTP请求演示...")
# 模拟多个API端点
endpoints = [
{"name": "用户信息", "url": "https://jsonplaceholder.typicode.com/users/1"},
{"name": "文章列表", "url": "https://jsonplaceholder.typicode.com/posts?_limit=3"},
{"name": "评论数据", "url": "https://jsonplaceholder.typicode.com/comments?_limit=3"}
]
async def fetch_endpoint(session, endpoint):
try:
print(f"请求 {endpoint['name']}...")
async with session.get(endpoint['url']) as response:
if response.status == 200:
data = await response.json()
print(f"✓ {endpoint['name']} 获取成功")
return {"name": endpoint['name'], "data": data, "status": "success"}
else:
print(f"✗ {endpoint['name']} 请求失败: {response.status}")
return {"name": endpoint['name'], "status": "failed", "code": response.status}
except Exception as e:
print(f"✗ {endpoint['name']} 异常: {e}")
return {"name": endpoint['name'], "status": "error", "error": str(e)}
# 并发请求所有端点
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_endpoint(session, ep) for ep in endpoints]
results = await asyncio.gather(*tasks)
end_time = time.time()
# 处理结果
print(f"\n请求完成,总耗时: {end_time - start_time:.2f}秒")
for result in results:
if result['status'] == 'success':
data_info = f"数据项: {len(result['data']) if isinstance(result['data'], list) else 1}"
print(f"✓ {result['name']}: {data_info}")
else:
print(f"✗ {result['name']}: {result['status']}")
# 运行演示
asyncio.run(demo_http_requests())
print("\n ✓ 异步HTTP客户端演示完成")
# 运行异步HTTP客户端演示
async_http_client_demo()
# 4.2 异步服务器编程
import asyncio
from aiohttp import web, WSMsgType
import json
import time
def async_server_demo():
"""异步服务器编程演示"""
print("=== 异步服务器编程演示 ===")
# 1. 基础HTTP服务器
print("\n1. 基础HTTP服务器:")
basic_server_code = '''
# 异步HTTP服务器基础
import asyncio
from aiohttp import web
import json
import time
# 全局数据存储
server_data = {
"users": [
{"id": 1, "name": "张三", "email": "zhangsan@example.com"},
{"id": 2, "name": "李四", "email": "lisi@example.com"}
],
"request_count": 0
}
async def hello_handler(request):
"""Hello处理器"""
server_data["request_count"] += 1
name = request.query.get('name', 'World')
response_data = {
"message": f"Hello, {name}!",
"timestamp": time.time(),
"request_count": server_data["request_count"]
}
return web.json_response(response_data)
async def users_handler(request):
"""用户列表处理器"""
server_data["request_count"] += 1
if request.method == 'GET':
# 获取用户列表
return web.json_response({
"users": server_data["users"],
"total": len(server_data["users"])
})
elif request.method == 'POST':
# 创建新用户
try:
data = await request.json()
new_user = {
"id": len(server_data["users"]) + 1,
"name": data.get("name"),
"email": data.get("email")
}
server_data["users"].append(new_user)
return web.json_response(new_user, status=201)
except Exception as e:
return web.json_response(
{"error": str(e)},
status=400
)
async def user_detail_handler(request):
"""用户详情处理器"""
server_data["request_count"] += 1
user_id = int(request.match_info['user_id'])
# 查找用户
user = next((u for u in server_data["users"] if u["id"] == user_id), None)
if user:
return web.json_response(user)
else:
return web.json_response(
{"error": "User not found"},
status=404
)
async def stats_handler(request):
"""统计信息处理器"""
return web.json_response({
"total_users": len(server_data["users"]),
"total_requests": server_data["request_count"],
"server_uptime": time.time() - start_time
})
# 中间件
async def logging_middleware(request, handler):
"""日志中间件"""
start = time.time()
print(f"[{time.strftime('%H:%M:%S')}] {request.method} {request.path}")
response = await handler(request)
process_time = time.time() - start
print(f"[{time.strftime('%H:%M:%S')}] 响应: {response.status} ({process_time:.3f}s)")
return response
# 创建应用
def create_app():
"""创建Web应用"""
app = web.Application(middlewares=[logging_middleware])
# 添加路由
app.router.add_get('/', hello_handler)
app.router.add_get('/hello', hello_handler)
app.router.add_get('/users', users_handler)
app.router.add_post('/users', users_handler)
app.router.add_get('/users/{user_id}', user_detail_handler)
app.router.add_get('/stats', stats_handler)
return app
# 启动服务器
if __name__ == '__main__':
start_time = time.time()
app = create_app()
print("启动异步HTTP服务器...")
print("访问 http://localhost:8080")
print("API端点:")
print(" GET /hello?name=YourName")
print(" GET /users")
print(" POST /users")
print(" GET /users/{id}")
print(" GET /stats")
web.run_app(app, host='localhost', port=8080)
'''
print(" 基础HTTP服务器代码:")
print(basic_server_code)
# 2. WebSocket服务器
print("\n2. WebSocket服务器:")
websocket_server_code = '''
# WebSocket服务器
import asyncio
from aiohttp import web, WSMsgType
import json
import time
import weakref
# WebSocket连接管理
class WebSocketManager:
"""WebSocket连接管理器"""
def __init__(self):
self.connections = weakref.WeakSet()
self.rooms = {} # 房间管理
def add_connection(self, ws, room=None):
"""添加连接"""
self.connections.add(ws)
if room:
if room not in self.rooms:
self.rooms[room] = weakref.WeakSet()
self.rooms[room].add(ws)
print(f"新连接加入,当前连接数: {len(self.connections)}")
def remove_connection(self, ws, room=None):
"""移除连接"""
if ws in self.connections:
self.connections.discard(ws)
if room and room in self.rooms:
self.rooms[room].discard(ws)
print(f"连接断开,当前连接数: {len(self.connections)}")
async def broadcast(self, message, room=None):
"""广播消息"""
if room and room in self.rooms:
connections = self.rooms[room]
else:
connections = self.connections
if connections:
await asyncio.gather(
*[ws.send_str(json.dumps(message)) for ws in connections],
return_exceptions=True
)
# 全局WebSocket管理器
ws_manager = WebSocketManager()
async def websocket_handler(request):
"""WebSocket处理器"""
ws = web.WebSocketResponse()
await ws.prepare(request)
# 获取房间信息
room = request.query.get('room', 'default')
user_name = request.query.get('name', f'用户{int(time.time()) % 1000}')
# 添加连接
ws_manager.add_connection(ws, room)
# 发送欢迎消息
welcome_msg = {
"type": "welcome",
"message": f"欢迎 {user_name} 加入房间 {room}",
"user": user_name,
"room": room,
"timestamp": time.time()
}
await ws.send_str(json.dumps(welcome_msg))
# 通知其他用户
join_msg = {
"type": "user_join",
"message": f"{user_name} 加入了房间",
"user": user_name,
"room": room,
"timestamp": time.time()
}
await ws_manager.broadcast(join_msg, room)
try:
# 处理消息
async for msg in ws:
if msg.type == WSMsgType.TEXT:
try:
data = json.loads(msg.data)
# 处理不同类型的消息
if data.get('type') == 'chat':
# 聊天消息
chat_msg = {
"type": "chat",
"user": user_name,
"message": data.get('message', ''),
"room": room,
"timestamp": time.time()
}
await ws_manager.broadcast(chat_msg, room)
elif data.get('type') == 'ping':
# 心跳检测
pong_msg = {
"type": "pong",
"timestamp": time.time()
}
await ws.send_str(json.dumps(pong_msg))
except json.JSONDecodeError:
error_msg = {
"type": "error",
"message": "无效的JSON格式"
}
await ws.send_str(json.dumps(error_msg))
elif msg.type == WSMsgType.ERROR:
print(f'WebSocket错误: {ws.exception()}')
break
except Exception as e:
print(f"WebSocket处理异常: {e}")
finally:
# 清理连接
ws_manager.remove_connection(ws, room)
# 通知其他用户
leave_msg = {
"type": "user_leave",
"message": f"{user_name} 离开了房间",
"user": user_name,
"room": room,
"timestamp": time.time()
}
await ws_manager.broadcast(leave_msg, room)
return ws
async def websocket_stats_handler(request):
"""WebSocket统计信息"""
stats = {
"total_connections": len(ws_manager.connections),
"rooms": {room: len(connections) for room, connections in ws_manager.rooms.items()},
"timestamp": time.time()
}
return web.json_response(stats)
# WebSocket客户端示例页面
async def websocket_client_page(request):
"""WebSocket客户端页面"""
html = '''
<!DOCTYPE html>
<html>
<head>
<title>WebSocket聊天室</title>
<meta charset="utf-8">
</head>
<body>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="输入消息...">
<button onclick="sendMessage()">发送</button>
<button onclick="sendPing()">Ping</button>
<script>
const ws = new WebSocket('ws://localhost:8080/ws?room=test&name=测试用户');
const messages = document.getElementById('messages');
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
const div = document.createElement('div');
div.innerHTML = `[${data.type}] ${data.message || data.user + ': ' + (data.message || '')}`;
messages.appendChild(div);
messages.scrollTop = messages.scrollHeight;
};
function sendMessage() {
const input = document.getElementById('messageInput');
if (input.value) {
ws.send(JSON.stringify({
type: 'chat',
message: input.value
}));
input.value = '';
}
}
function sendPing() {
ws.send(JSON.stringify({type: 'ping'}));
}
document.getElementById('messageInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
</script>
</body>
</html>
'''
return web.Response(text=html, content_type='text/html')
# 创建WebSocket应用
def create_websocket_app():
"""创建WebSocket应用"""
app = web.Application()
# 添加路由
app.router.add_get('/ws', websocket_handler)
app.router.add_get('/ws/stats', websocket_stats_handler)
app.router.add_get('/chat', websocket_client_page)
return app
# 启动WebSocket服务器
if __name__ == '__main__':
app = create_websocket_app()
print("启动WebSocket服务器...")
print("访问 http://localhost:8080/chat 测试聊天室")
print("WebSocket端点: ws://localhost:8080/ws")
print("统计信息: http://localhost:8080/ws/stats")
web.run_app(app, host='localhost', port=8080)
'''
print(" WebSocket服务器代码:")
print(websocket_server_code)
# 3. 实际应用示例
print("\n3. 实际应用示例:")
async def demo_simple_server():
"""演示简单服务器"""
print("创建演示服务器...")
async def demo_handler(request):
return web.json_response({
"message": "Hello from async server!",
"path": request.path,
"method": request.method,
"timestamp": time.time()
})
app = web.Application()
app.router.add_get('/', demo_handler)
app.router.add_get('/demo', demo_handler)
print("演示服务器配置完成")
print("在实际应用中,可以通过 web.run_app(app, host='localhost', port=8080) 启动")
return app
# 运行演示
asyncio.run(demo_simple_server())
print("\n ✓ 异步服务器编程演示完成")
# 运行异步服务器演示
async_server_demo()
# 5. 实际应用案例
# 5.1 异步文件处理系统
import asyncio
import aiofiles
import aiohttp
import hashlib
import os
from pathlib import Path
from typing import List, Dict
def async_file_system_demo():
"""异步文件处理系统演示"""
print("=== 异步文件处理系统演示 ===")
# 1. 异步文件操作
print("\n1. 异步文件操作:")
file_operations_code = '''
# 异步文件操作
import asyncio
import aiofiles
import hashlib
import os
from pathlib import Path
class AsyncFileProcessor:
"""异步文件处理器"""
def __init__(self, base_dir="./async_files"):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(exist_ok=True)
async def create_file(self, filename, content):
"""创建文件"""
file_path = self.base_dir / filename
async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
await f.write(content)
print(f"文件已创建: {file_path}")
return file_path
async def read_file(self, filename):
"""读取文件"""
file_path = self.base_dir / filename
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()
print(f"文件已读取: {file_path} ({len(content)} 字符)")
return content
async def copy_file(self, src_filename, dst_filename):
"""复制文件"""
src_path = self.base_dir / src_filename
dst_path = self.base_dir / dst_filename
async with aiofiles.open(src_path, 'rb') as src:
async with aiofiles.open(dst_path, 'wb') as dst:
while True:
chunk = await src.read(8192)
if not chunk:
break
await dst.write(chunk)
print(f"文件已复制: {src_path} -> {dst_path}")
return dst_path
async def calculate_hash(self, filename):
"""计算文件哈希"""
file_path = self.base_dir / filename
hash_md5 = hashlib.md5()
async with aiofiles.open(file_path, 'rb') as f:
while True:
chunk = await f.read(8192)
if not chunk:
break
hash_md5.update(chunk)
file_hash = hash_md5.hexdigest()
print(f"文件哈希: {filename} -> {file_hash}")
return file_hash
async def process_multiple_files(self, file_list):
"""并发处理多个文件"""
print(f"开始并发处理 {len(file_list)} 个文件...")
tasks = []
for file_info in file_list:
if file_info['action'] == 'create':
task = self.create_file(file_info['name'], file_info['content'])
elif file_info['action'] == 'read':
task = self.read_file(file_info['name'])
elif file_info['action'] == 'hash':
task = self.calculate_hash(file_info['name'])
else:
continue
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
success_count = 0
error_count = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 失败: {result}")
error_count += 1
else:
success_count += 1
print(f"处理完成: 成功 {success_count}, 失败 {error_count}")
return results
# 使用异步文件处理器
async def use_async_file_processor():
"""使用异步文件处理器"""
print("=== 使用异步文件处理器 ===")
processor = AsyncFileProcessor()
# 1. 创建测试文件
test_files = [
{"action": "create", "name": "test1.txt", "content": "这是测试文件1的内容\n" * 100},
{"action": "create", "name": "test2.txt", "content": "这是测试文件2的内容\n" * 200},
{"action": "create", "name": "test3.txt", "content": "这是测试文件3的内容\n" * 150}
]
await processor.process_multiple_files(test_files)
# 2. 并发读取和计算哈希
read_tasks = [
{"action": "read", "name": "test1.txt"},
{"action": "hash", "name": "test2.txt"},
{"action": "hash", "name": "test3.txt"}
]
await processor.process_multiple_files(read_tasks)
# 3. 复制文件
await processor.copy_file("test1.txt", "test1_copy.txt")
print("文件处理演示完成")
# 运行示例
# asyncio.run(use_async_file_processor())
'''
print(" 异步文件操作代码:")
print(file_operations_code)
# 2. 异步数据采集系统
print("\n2. 异步数据采集系统:")
data_collector_code = '''
# 异步数据采集系统
import asyncio
import aiohttp
import aiofiles
import json
import time
from datetime import datetime
class AsyncDataCollector:
"""异步数据采集器"""
def __init__(self, output_dir="./collected_data"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.session = None
self.collected_data = []
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_api_data(self, api_config):
"""获取API数据"""
try:
print(f"获取数据: {api_config['name']}")
async with self.session.get(api_config['url']) as response:
if response.status == 200:
if 'application/json' in response.headers.get('content-type', ''):
data = await response.json()
else:
data = await response.text()
result = {
"source": api_config['name'],
"url": api_config['url'],
"data": data,
"timestamp": datetime.now().isoformat(),
"status": "success"
}
print(f"✓ {api_config['name']} 数据获取成功")
return result
else:
raise Exception(f"HTTP {response.status}")
except Exception as e:
print(f"✗ {api_config['name']} 数据获取失败: {e}")
return {
"source": api_config['name'],
"url": api_config['url'],
"error": str(e),
"timestamp": datetime.now().isoformat(),
"status": "error"
}
async def save_data(self, data, filename=None):
"""保存数据到文件"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"collected_data_{timestamp}.json"
file_path = self.output_dir / filename
async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
await f.write(json.dumps(data, ensure_ascii=False, indent=2))
print(f"数据已保存: {file_path}")
return file_path
async def collect_batch(self, api_configs, save_individual=True):
"""批量采集数据"""
print(f"开始批量采集 {len(api_configs)} 个数据源...")
start_time = time.time()
# 并发获取所有数据
tasks = [self.fetch_api_data(config) for config in api_configs]
results = await asyncio.gather(*tasks)
end_time = time.time()
# 统计结果
success_results = [r for r in results if r.get('status') == 'success']
error_results = [r for r in results if r.get('status') == 'error']
print(f"采集完成: 成功 {len(success_results)}, 失败 {len(error_results)}, 耗时 {end_time - start_time:.2f}秒")
# 保存数据
if save_individual:
# 分别保存每个数据源
save_tasks = []
for result in success_results:
filename = f"{result['source']}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
save_tasks.append(self.save_data(result, filename))
if save_tasks:
await asyncio.gather(*save_tasks)
# 保存汇总数据
summary = {
"collection_time": datetime.now().isoformat(),
"total_sources": len(api_configs),
"successful": len(success_results),
"failed": len(error_results),
"duration_seconds": end_time - start_time,
"results": results
}
await self.save_data(summary, "collection_summary.json")
return summary
# 使用数据采集器
async def use_data_collector():
"""使用数据采集器"""
print("=== 使用异步数据采集器 ===")
# 配置数据源
api_configs = [
{"name": "用户数据", "url": "https://jsonplaceholder.typicode.com/users"},
{"name": "文章数据", "url": "https://jsonplaceholder.typicode.com/posts?_limit=5"},
{"name": "评论数据", "url": "https://jsonplaceholder.typicode.com/comments?_limit=5"},
{"name": "相册数据", "url": "https://jsonplaceholder.typicode.com/albums?_limit=3"}
]
async with AsyncDataCollector() as collector:
summary = await collector.collect_batch(api_configs)
print(f"\n采集汇总:")
print(f" 总数据源: {summary['total_sources']}")
print(f" 成功: {summary['successful']}")
print(f" 失败: {summary['failed']}")
print(f" 耗时: {summary['duration_seconds']:.2f}秒")
# 运行示例
# asyncio.run(use_data_collector())
'''
print(" 异步数据采集系统代码:")
print(data_collector_code)
# 3. 实际应用演示
print("\n3. 实际应用演示:")
async def demo_file_operations():
"""演示文件操作"""
print("开始文件操作演示...")
# 模拟创建多个文件
files_to_create = [
{"name": f"demo_{i}.txt", "content": f"演示文件 {i} 的内容\n" * (i + 1) * 10}
for i in range(3)
]
print(f"创建 {len(files_to_create)} 个演示文件...")
# 模拟并发文件操作
async def create_demo_file(file_info):
await asyncio.sleep(0.1) # 模拟文件创建时间
print(f"创建文件: {file_info['name']} ({len(file_info['content'])} 字符)")
return {"name": file_info['name'], "size": len(file_info['content'])}
# 并发创建文件
tasks = [create_demo_file(file_info) for file_info in files_to_create]
results = await asyncio.gather(*tasks)
total_size = sum(result['size'] for result in results)
print(f"文件创建完成,总大小: {total_size} 字符")
return results
# 运行演示
asyncio.run(demo_file_operations())
print("\n ✓ 异步文件处理系统演示完成")
# 运行异步文件处理系统演示
async_file_system_demo()
# 5.2 异步爬虫系统
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin, urlparse
def async_crawler_demo():
"""异步爬虫系统演示"""
print("=== 异步爬虫系统演示 ===")
# 1. 基础异步爬虫
print("\n1. 基础异步爬虫:")
basic_crawler_code = '''
# 异步爬虫基础
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin, urlparse
class AsyncWebCrawler:
"""异步网页爬虫"""
def __init__(self, max_concurrent=10, delay=1.0):
self.max_concurrent = max_concurrent
self.delay = delay
self.session = None
self.semaphore = asyncio.Semaphore(max_concurrent)
self.visited_urls = set()
self.results = []
async def __aenter__(self):
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
headers={'User-Agent': 'AsyncCrawler/1.0'}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_page(self, url):
"""获取单个页面"""
async with self.semaphore:
if url in self.visited_urls:
return None
self.visited_urls.add(url)
try:
print(f"爬取: {url}")
async with self.session.get(url) as response:
if response.status == 200:
content = await response.text()
# 解析页面
soup = BeautifulSoup(content, 'html.parser')
result = {
"url": url,
"title": soup.title.string if soup.title else "无标题",
"content_length": len(content),
"links": self._extract_links(soup, url),
"status": "success",
"timestamp": time.time()
}
print(f"✓ 爬取成功: {url} - {result['title']}")
# 添加延迟
await asyncio.sleep(self.delay)
return result
else:
print(f"✗ HTTP错误: {url} - {response.status}")
return {"url": url, "status": "error", "error": f"HTTP {response.status}"}
except Exception as e:
print(f"✗ 爬取失败: {url} - {e}")
return {"url": url, "status": "error", "error": str(e)}
def _extract_links(self, soup, base_url):
"""提取页面链接"""
links = []
for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(base_url, href)
# 只保留HTTP/HTTPS链接
if full_url.startswith(('http://', 'https://')):
links.append({
"url": full_url,
"text": link.get_text(strip=True)[:100] # 限制文本长度
})
return links[:10] # 限制链接数量
async def crawl_urls(self, urls, max_depth=1):
"""爬取URL列表"""
print(f"开始爬取 {len(urls)} 个URL,最大深度: {max_depth}")
start_time = time.time()
current_urls = list(urls)
for depth in range(max_depth):
print(f"\n=== 深度 {depth + 1} ===")
if not current_urls:
break
# 并发爬取当前层级的URL
tasks = [self.fetch_page(url) for url in current_urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
next_urls = set()
for result in results:
if isinstance(result, Exception):
print(f"异常: {result}")
continue
if result and result.get('status') == 'success':
self.results.append(result)
# 收集下一层的URL
if depth < max_depth - 1:
for link in result.get('links', []):
if link['url'] not in self.visited_urls:
next_urls.add(link['url'])
current_urls = list(next_urls)[:5] # 限制下一层URL数量
end_time = time.time()
print(f"\n爬取完成:")
print(f" 总页面: {len(self.results)}")
print(f" 总耗时: {end_time - start_time:.2f}秒")
print(f" 平均速度: {len(self.results) / (end_time - start_time):.2f} 页面/秒")
return self.results
# 使用异步爬虫
async def use_async_crawler():
"""使用异步爬虫"""
print("=== 使用异步爬虫 ===")
# 测试URL列表
test_urls = [
"https://httpbin.org/html",
"https://httpbin.org/links/5",
"https://httpbin.org/forms/post"
]
async with AsyncWebCrawler(max_concurrent=3, delay=0.5) as crawler:
results = await crawler.crawl_urls(test_urls, max_depth=2)
# 显示结果摘要
print("\n=== 爬取结果摘要 ===")
for result in results:
print(f" {result['title'][:50]}... - {result['url']}")
# 运行示例
# asyncio.run(use_async_crawler())
'''
print(" 基础异步爬虫代码:")
print(basic_crawler_code)
# 2. 实际应用演示
print("\n2. 实际应用演示:")
async def demo_simple_crawler():
"""演示简单爬虫"""
print("开始简单爬虫演示...")
# 模拟爬取多个页面
urls_to_crawl = [
"https://httpbin.org/html",
"https://httpbin.org/json",
"https://httpbin.org/xml"
]
async def fetch_demo_page(session, url):
try:
print(f"爬取演示页面: {url}")
async with session.get(url) as response:
if response.status == 200:
content = await response.text()
print(f"✓ 页面获取成功: {url} ({len(content)} 字符)")
return {"url": url, "size": len(content), "status": "success"}
else:
print(f"✗ 页面获取失败: {url} - HTTP {response.status}")
return {"url": url, "status": "failed", "code": response.status}
except Exception as e:
print(f"✗ 爬取异常: {url} - {e}")
return {"url": url, "status": "error", "error": str(e)}
# 并发爬取
async with aiohttp.ClientSession() as session:
tasks = [fetch_demo_page(session, url) for url in urls_to_crawl]
results = await asyncio.gather(*tasks)
# 统计结果
success_count = sum(1 for r in results if r.get('status') == 'success')
total_size = sum(r.get('size', 0) for r in results if r.get('status') == 'success')
print(f"\n爬取统计:")
print(f" 成功页面: {success_count}/{len(urls_to_crawl)}")
print(f" 总内容大小: {total_size} 字符")
return results
# 运行演示
asyncio.run(demo_simple_crawler())
print("\n ✓ 异步爬虫系统演示完成")
# 运行异步爬虫演示
async_crawler_demo()
# 6. 性能优化和最佳实践
# 6.1 性能优化技巧
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def async_performance_demo():
"""异步性能优化演示"""
print("=== 异步性能优化演示 ===")
# 1. 并发控制
print("\n1. 并发控制:")
concurrency_control_code = '''
# 并发控制优化
import asyncio
import time
# 1. 使用信号量控制并发数
async def controlled_task(semaphore, task_id, duration):
"""受控制的任务"""
async with semaphore:
print(f"任务 {task_id} 开始执行")
await asyncio.sleep(duration)
print(f"任务 {task_id} 执行完成")
return f"结果_{task_id}"
async def demo_semaphore_control():
"""演示信号量控制"""
print("=== 信号量并发控制 ===")
# 限制最多3个并发任务
semaphore = asyncio.Semaphore(3)
# 创建10个任务
tasks = [
controlled_task(semaphore, i, 1.0)
for i in range(10)
]
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"完成 {len(results)} 个任务,耗时: {end_time - start_time:.2f}秒")
return results
# 2. 任务分批处理
async def batch_process_tasks(tasks, batch_size=5):
"""分批处理任务"""
print(f"分批处理 {len(tasks)} 个任务,批大小: {batch_size}")
results = []
for i in range(0, len(tasks), batch_size):
batch = tasks[i:i + batch_size]
print(f"处理批次 {i//batch_size + 1}: {len(batch)} 个任务")
batch_results = await asyncio.gather(*batch)
results.extend(batch_results)
# 批次间添加小延迟
if i + batch_size < len(tasks):
await asyncio.sleep(0.1)
print(f"所有批次处理完成,总结果: {len(results)}")
return results
# 3. 连接池优化
class AsyncConnectionPool:
"""异步连接池"""
def __init__(self, max_connections=10):
self.max_connections = max_connections
self.available_connections = asyncio.Queue(maxsize=max_connections)
self.total_connections = 0
async def get_connection(self):
"""获取连接"""
try:
# 尝试从池中获取现有连接
connection = self.available_connections.get_nowait()
print(f"复用连接: {connection}")
return connection
except asyncio.QueueEmpty:
# 创建新连接
if self.total_connections < self.max_connections:
connection = f"conn_{self.total_connections}"
self.total_connections += 1
print(f"创建新连接: {connection}")
return connection
else:
# 等待可用连接
print("等待可用连接...")
return await self.available_connections.get()
async def return_connection(self, connection):
"""归还连接"""
try:
self.available_connections.put_nowait(connection)
print(f"归还连接: {connection}")
except asyncio.QueueFull:
print(f"连接池已满,丢弃连接: {connection}")
# 运行并发控制演示
# asyncio.run(demo_semaphore_control())
'''
print(" 并发控制代码:")
print(concurrency_control_code)
# 2. 内存优化
print("\n2. 内存优化:")
memory_optimization_code = '''
# 内存优化技巧
import asyncio
import weakref
from typing import AsyncIterator
# 1. 使用异步生成器减少内存占用
async def memory_efficient_data_processor(data_source):
"""内存高效的数据处理器"""
async for item in data_source:
# 处理单个数据项
processed_item = await process_single_item(item)
# 立即yield,不在内存中累积
yield processed_item
# 可选:触发垃圾回收
if processed_item['id'] % 100 == 0:
import gc
gc.collect()
async def process_single_item(item):
"""处理单个数据项"""
await asyncio.sleep(0.01) # 模拟处理时间
return {
"id": item.get("id"),
"processed": True,
"result": f"processed_{item.get('value', 'unknown')}"
}
# 2. 弱引用管理连接
class WeakConnectionManager:
"""使用弱引用管理连接"""
def __init__(self):
self._connections = weakref.WeakSet()
def add_connection(self, conn):
"""添加连接"""
self._connections.add(conn)
print(f"添加连接,当前连接数: {len(self._connections)}")
def get_connection_count(self):
"""获取当前连接数"""
return len(self._connections)
async def broadcast_to_all(self, message):
"""向所有连接广播消息"""
if self._connections:
tasks = []
for conn in self._connections:
tasks.append(conn.send_message(message))
await asyncio.gather(*tasks, return_exceptions=True)
# 3. 流式处理大数据
async def stream_large_dataset(dataset_size=10000):
"""流式处理大数据集"""
print(f"开始流式处理 {dataset_size} 条数据...")
processed_count = 0
async def data_generator():
"""数据生成器"""
for i in range(dataset_size):
yield {"id": i, "value": f"data_{i}"}
# 每1000条数据暂停一下
if i % 1000 == 0:
await asyncio.sleep(0.01)
# 流式处理
async for processed_item in memory_efficient_data_processor(data_generator()):
processed_count += 1
# 定期报告进度
if processed_count % 1000 == 0:
print(f"已处理: {processed_count}/{dataset_size}")
print(f"流式处理完成,总计: {processed_count} 条数据")
return processed_count
# 运行内存优化演示
# asyncio.run(stream_large_dataset(5000))
'''
print(" 内存优化代码:")
print(memory_optimization_code)
# 3. 实际应用演示
print("\n3. 实际应用演示:")
async def demo_performance_optimization():
"""演示性能优化"""
print("开始性能优化演示...")
# 1. 并发控制演示
semaphore = asyncio.Semaphore(3)
async def demo_task(task_id):
async with semaphore:
print(f"执行任务 {task_id}")
await asyncio.sleep(0.5)
return f"任务{task_id}完成"
# 创建多个任务
tasks = [demo_task(i) for i in range(8)]
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"并发控制演示完成: {len(results)} 个任务,耗时 {end_time - start_time:.2f}秒")
# 2. 批处理演示
print("\n批处理演示:")
async def batch_task(batch_id, items):
print(f"处理批次 {batch_id}: {len(items)} 个项目")
await asyncio.sleep(0.3)
return f"批次{batch_id}处理完成"
# 分批处理
all_items = list(range(20))
batch_size = 5
batch_tasks = []
for i in range(0, len(all_items), batch_size):
batch = all_items[i:i + batch_size]
batch_tasks.append(batch_task(i // batch_size, batch))
batch_results = await asyncio.gather(*batch_tasks)
print(f"批处理完成: {len(batch_results)} 个批次")
return {"concurrent_results": len(results), "batch_results": len(batch_results)}
# 运行演示
asyncio.run(demo_performance_optimization())
print("\n ✓ 性能优化演示完成")
# 运行性能优化演示
async_performance_demo()
# 6.2 最佳实践
def async_best_practices_demo():
"""异步编程最佳实践演示"""
print("=== 异步编程最佳实践演示 ===")
# 1. 错误处理最佳实践
print("\n1. 错误处理最佳实践:")
error_handling_code = '''
# 异步错误处理最佳实践
import asyncio
import logging
from typing import List, Optional
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AsyncTaskManager:
"""异步任务管理器"""
def __init__(self):
self.failed_tasks = []
self.successful_tasks = []
async def execute_task_safely(self, task_func, *args, **kwargs):
"""安全执行任务"""
try:
result = await task_func(*args, **kwargs)
self.successful_tasks.append({
"task": task_func.__name__,
"result": result,
"status": "success"
})
return result
except asyncio.TimeoutError:
error_msg = f"任务超时: {task_func.__name__}"
logger.error(error_msg)
self.failed_tasks.append({
"task": task_func.__name__,
"error": "timeout",
"status": "failed"
})
return None
except Exception as e:
error_msg = f"任务异常: {task_func.__name__} - {e}"
logger.error(error_msg)
self.failed_tasks.append({
"task": task_func.__name__,
"error": str(e),
"status": "failed"
})
return None
async def execute_tasks_with_retry(self, tasks, max_retries=3):
"""带重试的任务执行"""
results = []
for task_info in tasks:
task_func = task_info["func"]
args = task_info.get("args", [])
kwargs = task_info.get("kwargs", {})
for attempt in range(max_retries + 1):
try:
if attempt > 0:
logger.info(f"重试任务 {task_func.__name__} (第{attempt}次)")
await asyncio.sleep(2 ** attempt) # 指数退避
result = await asyncio.wait_for(
task_func(*args, **kwargs),
timeout=10.0
)
results.append(result)
logger.info(f"任务成功: {task_func.__name__}")
break
except Exception as e:
if attempt == max_retries:
logger.error(f"任务最终失败: {task_func.__name__} - {e}")
results.append(None)
else:
logger.warning(f"任务失败,将重试: {task_func.__name__} - {e}")
return results
def get_summary(self):
"""获取执行摘要"""
return {
"successful": len(self.successful_tasks),
"failed": len(self.failed_tasks),
"total": len(self.successful_tasks) + len(self.failed_tasks),
"success_rate": len(self.successful_tasks) / max(1, len(self.successful_tasks) + len(self.failed_tasks))
}
# 示例任务函数
async def reliable_task(task_id, success_rate=0.8):
"""可靠性测试任务"""
import random
await asyncio.sleep(0.1) # 模拟工作
if random.random() < success_rate:
return f"任务{task_id}成功"
else:
raise Exception(f"任务{task_id}随机失败")
async def timeout_task(duration):
"""可能超时的任务"""
await asyncio.sleep(duration)
return f"任务完成,耗时{duration}秒"
# 使用示例
async def demo_error_handling():
"""演示错误处理"""
manager = AsyncTaskManager()
# 1. 安全执行任务
print("=== 安全任务执行 ===")
tasks = [
manager.execute_task_safely(reliable_task, i, 0.7)
for i in range(5)
]
results = await asyncio.gather(*tasks)
print(f"安全执行结果: {[r for r in results if r is not None]}")
# 2. 带重试的任务执行
print("\n=== 带重试的任务执行 ===")
retry_tasks = [
{"func": reliable_task, "args": [i], "kwargs": {"success_rate": 0.3}}
for i in range(3)
]
retry_results = await manager.execute_tasks_with_retry(retry_tasks, max_retries=2)
print(f"重试执行结果: {retry_results}")
# 3. 显示摘要
summary = manager.get_summary()
print(f"\n执行摘要: {summary}")
# 运行错误处理演示
# asyncio.run(demo_error_handling())
'''
print(" 错误处理最佳实践代码:")
print(error_handling_code)
# 2. 资源管理最佳实践
print("\n2. 资源管理最佳实践:")
resource_management_code = '''
# 资源管理最佳实践
import asyncio
from contextlib import asynccontextmanager
import aiohttp
import aiofiles
class AsyncResourceManager:
"""异步资源管理器"""
def __init__(self):
self.active_resources = set()
self.resource_stats = {
"created": 0,
"destroyed": 0,
"active": 0
}
@asynccontextmanager
async def managed_http_session(self, **kwargs):
"""管理HTTP会话"""
session = aiohttp.ClientSession(**kwargs)
self.active_resources.add(session)
self.resource_stats["created"] += 1
self.resource_stats["active"] += 1
try:
print(f"创建HTTP会话,当前活跃资源: {self.resource_stats['active']}")
yield session
finally:
await session.close()
self.active_resources.discard(session)
self.resource_stats["destroyed"] += 1
self.resource_stats["active"] -= 1
print(f"关闭HTTP会话,当前活跃资源: {self.resource_stats['active']}")
@asynccontextmanager
async def managed_file(self, filename, mode='r', **kwargs):
"""管理文件资源"""
file_handle = await aiofiles.open(filename, mode, **kwargs)
self.active_resources.add(file_handle)
self.resource_stats["created"] += 1
self.resource_stats["active"] += 1
try:
print(f"打开文件 {filename},当前活跃资源: {self.resource_stats['active']}")
yield file_handle
finally:
await file_handle.close()
self.active_resources.discard(file_handle)
self.resource_stats["destroyed"] += 1
self.resource_stats["active"] -= 1
print(f"关闭文件 {filename},当前活跃资源: {self.resource_stats['active']}")
async def cleanup_all_resources(self):
"""清理所有资源"""
print(f"开始清理 {len(self.active_resources)} 个活跃资源...")
cleanup_tasks = []
for resource in list(self.active_resources):
if hasattr(resource, 'close'):
cleanup_tasks.append(resource.close())
if cleanup_tasks:
await asyncio.gather(*cleanup_tasks, return_exceptions=True)
self.active_resources.clear()
self.resource_stats["active"] = 0
print("所有资源已清理")
def get_resource_stats(self):
"""获取资源统计"""
return self.resource_stats.copy()
# 使用示例
async def demo_resource_management():
"""演示资源管理"""
manager = AsyncResourceManager()
try:
# 1. HTTP会话管理
print("=== HTTP会话管理 ===")
async with manager.managed_http_session() as session:
async with session.get('https://httpbin.org/json') as response:
data = await response.json()
print(f"获取数据: {len(str(data))} 字符")
# 2. 文件资源管理
print("\n=== 文件资源管理 ===")
# 创建测试文件
async with manager.managed_file('test_resource.txt', 'w') as f:
await f.write("测试资源管理\n" * 100)
# 读取测试文件
async with manager.managed_file('test_resource.txt', 'r') as f:
content = await f.read()
print(f"读取文件内容: {len(content)} 字符")
# 3. 并发资源使用
print("\n=== 并发资源使用 ===")
async def concurrent_http_task(task_id):
async with manager.managed_http_session() as session:
async with session.get(f'https://httpbin.org/delay/{task_id % 3 + 1}') as response:
return await response.json()
# 并发执行多个HTTP任务
tasks = [concurrent_http_task(i) for i in range(3)]
results = await asyncio.gather(*tasks, return_exceptions=True)
print(f"并发任务完成: {len([r for r in results if not isinstance(r, Exception)])} 个成功")
finally:
# 确保清理所有资源
await manager.cleanup_all_resources()
# 显示资源统计
stats = manager.get_resource_stats()
print(f"\n资源统计: {stats}")
# 运行资源管理演示
# asyncio.run(demo_resource_management())
'''
print(" 资源管理最佳实践代码:")
print(resource_management_code)
# 3. 实际应用演示
print("\n3. 实际应用演示:")
async def demo_best_practices():
"""演示最佳实践"""
print("开始最佳实践演示...")
# 1. 任务超时控制
async def timeout_controlled_task(duration):
try:
result = await asyncio.wait_for(
asyncio.sleep(duration),
timeout=2.0
)
return f"任务完成: {duration}秒"
except asyncio.TimeoutError:
return f"任务超时: {duration}秒"
# 测试不同持续时间的任务
durations = [1, 3, 0.5, 4]
tasks = [timeout_controlled_task(d) for d in durations]
results = await asyncio.gather(*tasks)
print("超时控制结果:")
for duration, result in zip(durations, results):
print(f" {duration}秒任务: {result}")
# 2. 优雅关闭演示
print("\n优雅关闭演示:")
shutdown_event = asyncio.Event()
async def long_running_task():
count = 0
while not shutdown_event.is_set():
count += 1
print(f"长期任务运行中... {count}")
try:
await asyncio.wait_for(asyncio.sleep(1), timeout=0.5)
except asyncio.TimeoutError:
continue
print("长期任务优雅退出")
return count
# 启动长期任务
task = asyncio.create_task(long_running_task())
# 运行一段时间后关闭
await asyncio.sleep(2.5)
shutdown_event.set()
result = await task
print(f"长期任务执行了 {result} 次循环")
return {"timeout_tests": len(results), "long_task_cycles": result}
# 运行演示
asyncio.run(demo_best_practices())
print("\n ✓ 最佳实践演示完成")
# 运行最佳实践演示
async_best_practices_demo()
# 7. 学习建议和总结
# 7.1 学习路径
def async_learning_guide():
"""异步编程学习指南"""
print("=== 异步编程学习指南 ===")
learning_path = {
"初级阶段": [
"理解同步vs异步的概念",
"掌握async/await语法",
"学习asyncio.run()和基础事件循环",
"练习简单的协程函数",
"理解并发vs并行的区别"
],
"中级阶段": [
"掌握asyncio.gather()和asyncio.create_task()",
"学习异步上下文管理器",
"理解异步迭代器和生成器",
"掌握信号量和锁等同步原语",
"学习异步HTTP客户端(aiohttp)"
],
"高级阶段": [
"掌握异步服务器编程",
"学习WebSocket编程",
"理解事件循环的内部机制",
"掌握性能优化技巧",
"学习异步测试和调试"
],
"专家阶段": [
"设计复杂的异步系统架构",
"掌握异步框架的源码",
"优化异步应用的性能",
"处理大规模并发场景",
"贡献开源异步项目"
]
}
print("\n推荐学习路径:")
for stage, topics in learning_path.items():
print(f"\n{stage}:")
for i, topic in enumerate(topics, 1):
print(f" {i}. {topic}")
# 实践项目建议
practice_projects = [
"异步文件下载器",
"网页爬虫系统",
"实时聊天服务器",
"API数据聚合器",
"异步任务队列",
"实时数据监控系统"
]
print("\n推荐实践项目:")
for i, project in enumerate(practice_projects, 1):
print(f" {i}. {project}")
# 运行学习指南
async_learning_guide()
# 7.2 常见陷阱和解决方案
def async_common_pitfalls():
"""异步编程常见陷阱"""
print("=== 异步编程常见陷阱和解决方案 ===")
pitfalls = {
"忘记使用await": {
"问题": "调用异步函数时忘记使用await关键字",
"错误示例": "result = async_function() # 返回协程对象,不是结果",
"正确示例": "result = await async_function() # 正确获取结果",
"解决方案": "始终在调用异步函数时使用await,或使用asyncio.create_task()"
},
"在同步函数中调用异步函数": {
"问题": "在普通函数中直接调用异步函数",
"错误示例": "def sync_func(): return async_func() # 错误",
"正确示例": "def sync_func(): return asyncio.run(async_func()) # 正确",
"解决方案": "使用asyncio.run()或确保调用者也是异步函数"
},
"阻塞事件循环": {
"问题": "在异步函数中使用阻塞操作",
"错误示例": "time.sleep(1) # 阻塞整个事件循环",
"正确示例": "await asyncio.sleep(1) # 非阻塞等待",
"解决方案": "使用异步版本的操作,或使用run_in_executor()"
},
"资源泄漏": {
"问题": "未正确关闭异步资源",
"错误示例": "session = aiohttp.ClientSession() # 未关闭",
"正确示例": "async with aiohttp.ClientSession() as session: # 自动关闭",
"解决方案": "使用异步上下文管理器或确保在finally块中关闭资源"
},
"过度并发": {
"问题": "创建过多并发任务导致资源耗尽",
"错误示例": "tasks = [fetch(url) for url in huge_url_list] # 可能创建数千个任务",
"正确示例": "使用Semaphore或分批处理限制并发数",
"解决方案": "使用信号量、连接池或任务队列控制并发"
}
}
print("\n常见陷阱详解:")
for pitfall, details in pitfalls.items():
print(f"\n{pitfall}:")
print(f" 问题: {details['问题']}")
print(f" 错误示例: {details['错误示例']}")
print(f" 正确示例: {details['正确示例']}")
print(f" 解决方案: {details['解决方案']}")
# 调试技巧
debugging_tips = [
"使用asyncio.get_event_loop().set_debug(True)开启调试模式",
"使用logging记录异步操作的执行流程",
"使用asyncio.current_task()和asyncio.all_tasks()监控任务",
"使用pytest-asyncio进行异步代码测试",
"使用aiomonitor监控异步应用的运行状态"
]
print("\n调试技巧:")
for i, tip in enumerate(debugging_tips, 1):
print(f" {i}. {tip}")
# 运行常见陷阱指南
async_common_pitfalls()
# 7.3 本章总结
def chapter_summary():
"""第22天学习总结"""
print("=== 第22天:异步IO - 学习总结 ===")
summary_points = {
"核心概念": [
"异步编程基础:协程、事件循环、任务",
"async/await语法的使用",
"并发vs并行的区别",
"异步IO的优势和适用场景"
],
"重要模块": [
"asyncio:Python异步编程的核心模块",
"aiohttp:异步HTTP客户端和服务器",
"aiofiles:异步文件操作",
"contextlib.asynccontextmanager:异步上下文管理器"
],
"关键技能": [
"编写和调用异步函数",
"使用asyncio.gather()进行并发执行",
"实现异步上下文管理器和迭代器",
"构建异步HTTP客户端和服务器",
"处理异步编程中的错误和异常"
],
"实际应用": [
"异步网络编程:HTTP客户端、WebSocket",
"异步文件处理:大文件读写、批量处理",
"异步数据采集:网页爬虫、API聚合",
"异步服务器:Web服务、实时通信"
],
"最佳实践": [
"合理控制并发数量",
"正确处理异步资源的生命周期",
"使用适当的错误处理和重试机制",
"优化异步应用的性能和内存使用"
]
}
print("\n学习要点总结:")
for category, points in summary_points.items():
print(f"\n{category}:")
for point in points:
print(f" • {point}")
# 下一步学习建议
next_steps = [
"深入学习特定的异步框架(如FastAPI、Tornado)",
"探索异步数据库操作(如asyncpg、motor)",
"学习异步消息队列和任务调度",
"研究微服务架构中的异步通信",
"掌握异步应用的部署和监控"
]
print("\n下一步学习建议:")
for i, step in enumerate(next_steps, 1):
print(f" {i}. {step}")
print("\n🎉 恭喜完成第22天的学习!")
print("异步IO是现代Python开发的重要技能,")
print("掌握它将大大提升你处理并发任务的能力。")
print("继续练习和探索,你将成为异步编程的专家!")
# 运行章节总结
chapter_summary()
# 实践练习
基础练习:
- 编写一个异步函数,模拟网络请求的延迟
- 使用asyncio.gather()并发执行多个异步任务
- 实现一个简单的异步上下文管理器
进阶练习:
- 构建一个异步文件下载器
- 实现一个简单的异步Web爬虫
- 创建一个基于WebSocket的实时聊天系统
项目练习:
- 开发一个异步API数据聚合服务
- 构建一个异步任务队列系统
- 实现一个异步日志收集和分析工具
通过本章的学习,你已经掌握了Python异步IO编程的核心概念和实践技能。异步编程是现代高性能应用开发的关键技术,继续深入学习和实践将帮助你构建更加高效和可扩展的应用程序!