第22天-异步IO

2023/6/15

# 第22天-异步IO

# 1. 异步编程基础

# 1.1 异步编程概念

def async_programming_concepts():
    """异步编程概念演示"""
    print("=== 异步编程概念演示 ===")
    
    # 1. 同步vs异步对比
    print("\n1. 同步vs异步编程对比:")
    
    sync_async_comparison = '''
# 同步编程特点:
# - 代码按顺序执行,一行执行完才执行下一行
# - 遇到耗时操作(如网络请求、文件读写)会阻塞
# - 简单直观,但效率较低

import time
import requests

def sync_example():
    """同步编程示例"""
    print("开始同步任务")
    
    # 模拟耗时操作
    time.sleep(2)
    print("任务1完成")
    
    time.sleep(2)
    print("任务2完成")
    
    time.sleep(2)
    print("任务3完成")
    
    print("所有同步任务完成")

# 异步编程特点:
# - 可以在等待耗时操作时执行其他任务
# - 不会阻塞程序执行
# - 提高程序效率,特别是I/O密集型任务
# - 代码相对复杂,需要理解事件循环概念

import asyncio

async def async_example():
    """异步编程示例"""
    print("开始异步任务")
    
    # 创建异步任务
    task1 = asyncio.create_task(async_task("任务1", 2))
    task2 = asyncio.create_task(async_task("任务2", 2))
    task3 = asyncio.create_task(async_task("任务3", 2))
    
    # 并发执行任务
    await asyncio.gather(task1, task2, task3)
    
    print("所有异步任务完成")

async def async_task(name, delay):
    """异步任务"""
    await asyncio.sleep(delay)
    print(f"{name}完成")
    return name

# 运行异步示例
# asyncio.run(async_example())
'''
    
    print("   同步vs异步对比:")
    print(sync_async_comparison)
    
    # 2. 异步编程的优势
    print("\n2. 异步编程的优势:")
    
    advantages = '''
异步编程的主要优势:

1. 提高并发性能:
   - 在等待I/O操作时可以执行其他任务
   - 单线程处理大量并发请求
   - 避免线程切换开销

2. 资源利用率高:
   - 减少内存占用(相比多线程)
   - 降低CPU上下文切换成本
   - 更好的可扩展性

3. 适用场景:
   - 网络编程(Web服务器、爬虫)
   - 文件I/O操作
   - 数据库访问
   - 实时通信应用

4. 性能对比示例:
   - 同步:3个2秒任务 = 6秒总时间
   - 异步:3个2秒任务 = 2秒总时间(并发执行)
'''
    
    print("   异步编程优势:")
    print(advantages)
    
    # 3. 异步编程的挑战
    print("\n3. 异步编程的挑战:")
    
    challenges = '''
异步编程的挑战:

1. 学习曲线陡峭:
   - 需要理解事件循环、协程等概念
   - 调试相对困难
   - 错误处理更复杂

2. 代码复杂性:
   - 需要使用async/await语法
   - 回调地狱问题
   - 状态管理困难

3. 生态系统要求:
   - 需要异步版本的库
   - 不是所有库都支持异步
   - 混合同步/异步代码的复杂性

4. 性能陷阱:
   - CPU密集型任务不适合异步
   - 不当使用可能降低性能
   - 需要合理设计异步架构
'''
    
    print("   异步编程挑战:")
    print(challenges)
    
    print("\n   ✓ 异步编程概念演示完成")

# 运行异步编程概念演示
async_programming_concepts()

# 1.2 Python异步编程发展历程

def python_async_history():
    """Python异步编程发展历程"""
    print("=== Python异步编程发展历程 ===")
    
    # 1. 发展历程
    print("\n1. Python异步编程发展历程:")
    
    history = '''
Python异步编程发展历程:

1. Python 2.5 (2006年):
   - 引入生成器的send()和throw()方法
   - 为协程奠定基础

2. Python 3.3 (2012年):
   - 引入yield from语法
   - 简化生成器委托

3. Python 3.4 (2014年):
   - 引入asyncio模块
   - 提供事件循环和协程支持
   - 使用@asyncio.coroutine装饰器

4. Python 3.5 (2015年):
   - 引入async/await语法
   - 原生协程支持
   - 异步编程成为语言特性

5. Python 3.6+ (2016年至今):
   - 持续改进asyncio性能
   - 增加新的异步特性
   - 生态系统不断完善
'''
    
    print("   发展历程:")
    print(history)
    
    # 2. 核心概念
    print("\n2. 异步编程核心概念:")
    
    core_concepts = '''
异步编程核心概念:

1. 事件循环 (Event Loop):
   - 异步程序的核心调度器
   - 管理和执行异步任务
   - 处理I/O事件和回调

2. 协程 (Coroutine):
   - 可以暂停和恢复的函数
   - 使用async def定义
   - 通过await调用

3. 任务 (Task):
   - 协程的包装器
   - 可以并发执行
   - 提供状态管理

4. Future:
   - 表示异步操作的结果
   - 可以设置回调
   - 任务的基类

5. 异步上下文管理器:
   - 支持async with语法
   - 异步资源管理

6. 异步迭代器:
   - 支持async for语法
   - 异步数据流处理
'''
    
    print("   核心概念:")
    print(core_concepts)
    
    print("\n   ✓ Python异步编程发展历程演示完成")

# 运行Python异步编程发展历程演示
python_async_history()

# 2. asyncio基础

# 2.1 事件循环和协程

import asyncio
import time
from datetime import datetime

def asyncio_basics_demo():
    """asyncio基础演示"""
    print("=== asyncio基础演示 ===")
    
    # 1. 事件循环基础
    print("\n1. 事件循环基础:")
    
    event_loop_code = '''
# 事件循环基础操作
import asyncio

# 方法1: 使用asyncio.run()(推荐)
async def main():
    print("Hello, asyncio!")
    await asyncio.sleep(1)
    print("Goodbye, asyncio!")

# 运行协程
asyncio.run(main())

# 方法2: 手动管理事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
    loop.run_until_complete(main())
finally:
    loop.close()

# 方法3: 获取当前事件循环
async def get_loop_info():
    loop = asyncio.get_running_loop()
    print(f"当前事件循环: {loop}")
    print(f"循环是否运行: {loop.is_running()}")
    
    # 事件循环调试信息
    print(f"循环调试模式: {loop.get_debug()}")
    
asyncio.run(get_loop_info())
'''
    
    print("   事件循环基础代码:")
    print(event_loop_code)
    
    # 2. 协程基础
    print("\n2. 协程基础:")
    
    coroutine_code = '''
# 协程定义和使用
import asyncio
import time

# 定义协程函数
async def simple_coroutine(name, delay):
    """简单协程示例"""
    print(f"{name} 开始执行")
    await asyncio.sleep(delay)  # 异步等待
    print(f"{name} 执行完成")
    return f"{name} 的结果"

# 协程的不同调用方式
async def coroutine_examples():
    print("=== 协程调用示例 ===")
    
    # 1. 直接await
    print("\n1. 直接await调用:")
    result = await simple_coroutine("任务1", 1)
    print(f"结果: {result}")
    
    # 2. 并发执行多个协程
    print("\n2. 并发执行:")
    start_time = time.time()
    
    # 使用asyncio.gather()
    results = await asyncio.gather(
        simple_coroutine("任务A", 1),
        simple_coroutine("任务B", 2),
        simple_coroutine("任务C", 1.5)
    )
    
    end_time = time.time()
    print(f"并发执行结果: {results}")
    print(f"总耗时: {end_time - start_time:.2f}秒")
    
    # 3. 使用create_task()
    print("\n3. 使用create_task():")
    task1 = asyncio.create_task(simple_coroutine("任务X", 1))
    task2 = asyncio.create_task(simple_coroutine("任务Y", 1))
    
    # 等待任务完成
    result1 = await task1
    result2 = await task2
    
    print(f"任务结果: {result1}, {result2}")

# 运行协程示例
asyncio.run(coroutine_examples())
'''
    
    print("   协程基础代码:")
    print(coroutine_code)
    
    # 3. 实际运行示例
    print("\n3. 实际运行示例:")
    
    async def demo_coroutine(name, delay):
        """演示协程"""
        print(f"[{datetime.now().strftime('%H:%M:%S')}] {name} 开始")
        await asyncio.sleep(delay)
        print(f"[{datetime.now().strftime('%H:%M:%S')}] {name} 完成")
        return f"{name}_result"
    
    async def run_demo():
        """运行演示"""
        print("开始异步任务演示...")
        
        # 记录开始时间
        start = time.time()
        
        # 并发执行三个任务
        results = await asyncio.gather(
            demo_coroutine("下载文件", 2),
            demo_coroutine("处理数据", 1.5),
            demo_coroutine("发送邮件", 1)
        )
        
        # 记录结束时间
        end = time.time()
        
        print(f"\n任务结果: {results}")
        print(f"总耗时: {end - start:.2f}秒")
        print("如果是同步执行,需要4.5秒")
    
    # 运行演示
    asyncio.run(run_demo())
    
    print("\n   ✓ asyncio基础演示完成")

# 运行asyncio基础演示
asyncio_basics_demo()

# 2.2 任务管理和并发控制

import asyncio
import random
import time
from concurrent.futures import ThreadPoolExecutor

def task_management_demo():
    """任务管理和并发控制演示"""
    print("=== 任务管理和并发控制演示 ===")
    
    # 1. 任务创建和管理
    print("\n1. 任务创建和管理:")
    
    task_creation_code = '''
# 任务创建和管理
import asyncio
import random

async def worker_task(worker_id, work_time):
    """工作任务"""
    print(f"工作者 {worker_id} 开始工作")
    await asyncio.sleep(work_time)
    
    # 模拟随机失败
    if random.random() < 0.2:  # 20%失败率
        raise Exception(f"工作者 {worker_id} 工作失败")
    
    print(f"工作者 {worker_id} 完成工作")
    return f"工作者{worker_id}的结果"

async def task_management_example():
    """任务管理示例"""
    print("=== 任务管理示例 ===")
    
    # 1. 创建多个任务
    tasks = []
    for i in range(5):
        work_time = random.uniform(1, 3)
        task = asyncio.create_task(
            worker_task(i, work_time),
            name=f"worker_{i}"  # 给任务命名
        )
        tasks.append(task)
    
    # 2. 监控任务状态
    print("\n任务状态监控:")
    for task in tasks:
        print(f"任务 {task.get_name()}: {task.done()}")
    
    # 3. 等待所有任务完成(处理异常)
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # 4. 处理结果
    print("\n任务结果:")
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i} 失败: {result}")
        else:
            print(f"任务 {i} 成功: {result}")
    
    # 5. 检查任务最终状态
    print("\n最终任务状态:")
    for task in tasks:
        status = "完成" if task.done() else "运行中"
        exception = task.exception() if task.done() else None
        print(f"任务 {task.get_name()}: {status}")
        if exception:
            print(f"  异常: {exception}")

# 运行任务管理示例
asyncio.run(task_management_example())
'''
    
    print("   任务创建和管理代码:")
    print(task_creation_code)
    
    # 2. 并发控制
    print("\n2. 并发控制:")
    
    concurrency_control_code = '''
# 并发控制示例
import asyncio
import time

# 信号量控制并发数
async def limited_worker(semaphore, worker_id, work_time):
    """受限制的工作任务"""
    async with semaphore:  # 获取信号量
        print(f"工作者 {worker_id} 开始工作 (当前时间: {time.strftime('%H:%M:%S')})")
        await asyncio.sleep(work_time)
        print(f"工作者 {worker_id} 完成工作 (当前时间: {time.strftime('%H:%M:%S')})")
        return f"结果_{worker_id}"

async def concurrency_control_example():
    """并发控制示例"""
    print("=== 并发控制示例 ===")
    
    # 创建信号量,限制同时运行的任务数为3
    semaphore = asyncio.Semaphore(3)
    
    # 创建10个任务
    tasks = []
    for i in range(10):
        task = asyncio.create_task(
            limited_worker(semaphore, i, random.uniform(1, 2))
        )
        tasks.append(task)
    
    print(f"创建了 {len(tasks)} 个任务,但同时只能运行3个")
    
    # 等待所有任务完成
    start_time = time.time()
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    
    print(f"\n所有任务完成,耗时: {end_time - start_time:.2f}秒")
    print(f"结果数量: {len(results)}")

# 运行并发控制示例
asyncio.run(concurrency_control_example())
'''
    
    print("   并发控制代码:")
    print(concurrency_control_code)
    
    # 3. 任务取消和超时
    print("\n3. 任务取消和超时:")
    
    cancellation_code = '''
# 任务取消和超时控制
import asyncio

async def long_running_task(task_id, duration):
    """长时间运行的任务"""
    try:
        print(f"任务 {task_id} 开始执行")
        await asyncio.sleep(duration)
        print(f"任务 {task_id} 正常完成")
        return f"任务{task_id}完成"
    except asyncio.CancelledError:
        print(f"任务 {task_id} 被取消")
        # 清理资源
        await asyncio.sleep(0.1)  # 模拟清理时间
        print(f"任务 {task_id} 清理完成")
        raise  # 重新抛出取消异常

async def cancellation_example():
    """任务取消示例"""
    print("=== 任务取消示例 ===")
    
    # 1. 手动取消任务
    print("\n1. 手动取消任务:")
    task = asyncio.create_task(long_running_task(1, 5))
    
    # 等待1秒后取消任务
    await asyncio.sleep(1)
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("任务已被取消")
    
    # 2. 超时控制
    print("\n2. 超时控制:")
    try:
        # 设置3秒超时
        result = await asyncio.wait_for(
            long_running_task(2, 5), 
            timeout=3.0
        )
        print(f"任务结果: {result}")
    except asyncio.TimeoutError:
        print("任务超时")
    
    # 3. 批量任务超时控制
    print("\n3. 批量任务超时控制:")
    tasks = [
        asyncio.create_task(long_running_task(i, random.uniform(1, 4)))
        for i in range(3, 6)
    ]
    
    try:
        # 等待所有任务完成,但设置总超时时间
        results = await asyncio.wait_for(
            asyncio.gather(*tasks, return_exceptions=True),
            timeout=2.5
        )
        print(f"批量任务结果: {results}")
    except asyncio.TimeoutError:
        print("批量任务超时,取消所有未完成的任务")
        for task in tasks:
            if not task.done():
                task.cancel()
        
        # 等待取消完成
        await asyncio.gather(*tasks, return_exceptions=True)

# 运行任务取消示例
asyncio.run(cancellation_example())
'''
    
    print("   任务取消和超时代码:")
    print(cancellation_code)
    
    # 4. 实际运行示例
    print("\n4. 实际运行示例:")
    
    async def demo_worker(worker_id, duration):
        """演示工作任务"""
        print(f"[{time.strftime('%H:%M:%S')}] 工作者 {worker_id} 开始")
        await asyncio.sleep(duration)
        print(f"[{time.strftime('%H:%M:%S')}] 工作者 {worker_id} 完成")
        return f"工作者{worker_id}结果"
    
    async def run_concurrency_demo():
        """运行并发控制演示"""
        print("开始并发控制演示...")
        
        # 创建信号量,限制并发数为2
        semaphore = asyncio.Semaphore(2)
        
        async def controlled_worker(worker_id):
            async with semaphore:
                return await demo_worker(worker_id, random.uniform(1, 2))
        
        # 创建5个任务
        tasks = [
            asyncio.create_task(controlled_worker(i))
            for i in range(5)
        ]
        
        # 等待所有任务完成
        start = time.time()
        results = await asyncio.gather(*tasks)
        end = time.time()
        
        print(f"\n所有任务完成: {results}")
        print(f"总耗时: {end - start:.2f}秒")
    
    # 运行演示
    asyncio.run(run_concurrency_demo())
    
    print("\n   ✓ 任务管理和并发控制演示完成")

# 运行任务管理和并发控制演示
task_management_demo()

# 3. 异步上下文管理器和迭代器

# 3.1 异步上下文管理器

import asyncio
import aiofiles
import aiohttp
from contextlib import asynccontextmanager

def async_context_manager_demo():
    """异步上下文管理器演示"""
    print("=== 异步上下文管理器演示 ===")
    
    # 1. 基础异步上下文管理器
    print("\n1. 基础异步上下文管理器:")
    
    basic_context_code = '''
# 异步上下文管理器基础
import asyncio

class AsyncResource:
    """异步资源管理器"""
    
    def __init__(self, name):
        self.name = name
        self.is_open = False
    
    async def __aenter__(self):
        """异步进入上下文"""
        print(f"正在打开资源: {self.name}")
        await asyncio.sleep(0.1)  # 模拟异步操作
        self.is_open = True
        print(f"资源 {self.name} 已打开")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步退出上下文"""
        print(f"正在关闭资源: {self.name}")
        await asyncio.sleep(0.1)  # 模拟异步操作
        self.is_open = False
        print(f"资源 {self.name} 已关闭")
        
        # 处理异常
        if exc_type:
            print(f"处理异常: {exc_type.__name__}: {exc_val}")
            return False  # 不抑制异常
    
    async def do_work(self):
        """执行工作"""
        if not self.is_open:
            raise RuntimeError("资源未打开")
        print(f"使用资源 {self.name} 执行工作")
        await asyncio.sleep(0.5)
        return f"工作结果来自 {self.name}"

# 使用异步上下文管理器
async def use_async_context_manager():
    """使用异步上下文管理器"""
    print("=== 使用异步上下文管理器 ===")
    
    # 正常使用
    async with AsyncResource("数据库连接") as resource:
        result = await resource.do_work()
        print(f"工作结果: {result}")
    
    print("\n--- 异常处理 ---")
    
    # 异常处理
    try:
        async with AsyncResource("文件句柄") as resource:
            await resource.do_work()
            raise ValueError("模拟异常")
    except ValueError as e:
        print(f"捕获异常: {e}")

# 运行示例
asyncio.run(use_async_context_manager())
'''
    
    print("   基础异步上下文管理器代码:")
    print(basic_context_code)
    
    # 2. 异步上下文管理器装饰器
    print("\n2. 异步上下文管理器装饰器:")
    
    decorator_code = '''
# 使用@asynccontextmanager装饰器
from contextlib import asynccontextmanager
import asyncio

@asynccontextmanager
async def async_database_connection(db_url):
    """异步数据库连接上下文管理器"""
    print(f"连接到数据库: {db_url}")
    connection = None
    
    try:
        # 模拟建立连接
        await asyncio.sleep(0.2)
        connection = {"url": db_url, "connected": True}
        print("数据库连接已建立")
        
        yield connection  # 返回连接对象
        
    except Exception as e:
        print(f"数据库操作异常: {e}")
        raise
    finally:
        # 清理资源
        if connection:
            print("关闭数据库连接")
            await asyncio.sleep(0.1)
            connection["connected"] = False
            print("数据库连接已关闭")

@asynccontextmanager
async def async_file_manager(filename, mode='r'):
    """异步文件管理器"""
    print(f"打开文件: {filename} (模式: {mode})")
    
    try:
        # 模拟异步文件操作
        await asyncio.sleep(0.1)
        file_obj = {"name": filename, "mode": mode, "content": ""}
        print(f"文件 {filename} 已打开")
        
        yield file_obj
        
    except Exception as e:
        print(f"文件操作异常: {e}")
        raise
    finally:
        print(f"关闭文件: {filename}")
        await asyncio.sleep(0.1)
        print(f"文件 {filename} 已关闭")

# 使用装饰器创建的上下文管理器
async def use_decorator_context_managers():
    """使用装饰器创建的上下文管理器"""
    print("=== 使用装饰器上下文管理器 ===")
    
    # 使用数据库连接
    async with async_database_connection("postgresql://localhost/mydb") as db:
        print(f"执行数据库查询: {db}")
        await asyncio.sleep(0.3)
        print("查询完成")
    
    print("\n--- 文件操作 ---")
    
    # 使用文件管理器
    async with async_file_manager("data.txt", "w") as file:
        print(f"写入文件: {file}")
        file["content"] = "Hello, async world!"
        await asyncio.sleep(0.2)
        print("文件写入完成")

# 运行示例
asyncio.run(use_decorator_context_managers())
'''
    
    print("   装饰器上下文管理器代码:")
    print(decorator_code)
    
    # 3. 实际应用示例
    print("\n3. 实际应用示例:")
    
    @asynccontextmanager
    async def demo_connection_pool(pool_size=3):
        """演示连接池上下文管理器"""
        print(f"创建连接池 (大小: {pool_size})")
        
        # 模拟创建连接池
        pool = {
            "connections": [f"conn_{i}" for i in range(pool_size)],
            "available": list(range(pool_size)),
            "in_use": []
        }
        
        try:
            await asyncio.sleep(0.1)
            print(f"连接池创建完成: {pool['connections']}")
            yield pool
        finally:
            print("清理连接池")
            await asyncio.sleep(0.1)
            print("连接池已清理")
    
    async def run_context_demo():
        """运行上下文管理器演示"""
        print("开始上下文管理器演示...")
        
        async with demo_connection_pool(2) as pool:
            print(f"使用连接池: {pool['connections']}")
            
            # 模拟使用连接
            if pool['available']:
                conn_id = pool['available'].pop(0)
                pool['in_use'].append(conn_id)
                print(f"获取连接: {pool['connections'][conn_id]}")
                
                await asyncio.sleep(1)
                
                # 释放连接
                pool['in_use'].remove(conn_id)
                pool['available'].append(conn_id)
                print(f"释放连接: {pool['connections'][conn_id]}")
    
    # 运行演示
    asyncio.run(run_context_demo())
    
    print("\n   ✓ 异步上下文管理器演示完成")

# 运行异步上下文管理器演示
async_context_manager_demo()

# 3.2 异步迭代器

import asyncio
from typing import AsyncIterator

def async_iterator_demo():
    """异步迭代器演示"""
    print("=== 异步迭代器演示 ===")
    
    # 1. 基础异步迭代器
    print("\n1. 基础异步迭代器:")
    
    basic_iterator_code = '''
# 异步迭代器基础
import asyncio

class AsyncRange:
    """异步范围迭代器"""
    
    def __init__(self, start, stop, step=1, delay=0.1):
        self.start = start
        self.stop = stop
        self.step = step
        self.delay = delay
        self.current = start
    
    def __aiter__(self):
        """返回异步迭代器对象"""
        return self
    
    async def __anext__(self):
        """返回下一个值"""
        if self.current >= self.stop:
            raise StopAsyncIteration
        
        # 模拟异步操作
        await asyncio.sleep(self.delay)
        
        value = self.current
        self.current += self.step
        print(f"生成值: {value}")
        return value

# 使用异步迭代器
async def use_async_iterator():
    """使用异步迭代器"""
    print("=== 使用异步迭代器 ===")
    
    # 使用async for循环
    async for value in AsyncRange(0, 5):
        print(f"处理值: {value}")
        await asyncio.sleep(0.1)  # 模拟处理时间
    
    print("异步迭代完成")

# 运行示例
asyncio.run(use_async_iterator())
'''
    
    print("   基础异步迭代器代码:")
    print(basic_iterator_code)
    
    # 2. 异步生成器
    print("\n2. 异步生成器:")
    
    async_generator_code = '''
# 异步生成器
import asyncio
import random

async def async_data_generator(count, delay_range=(0.1, 0.5)):
    """异步数据生成器"""
    for i in range(count):
        # 模拟异步数据获取
        delay = random.uniform(*delay_range)
        await asyncio.sleep(delay)
        
        # 生成数据
        data = {
            "id": i,
            "value": random.randint(1, 100),
            "timestamp": asyncio.get_event_loop().time()
        }
        
        print(f"生成数据: {data}")
        yield data

async def async_file_reader(filename, chunk_size=1024):
    """异步文件读取生成器"""
    print(f"开始读取文件: {filename}")
    
    # 模拟文件内容
    content = "这是一个很长的文件内容," * 20
    
    for i in range(0, len(content), chunk_size):
        # 模拟异步读取
        await asyncio.sleep(0.1)
        
        chunk = content[i:i + chunk_size]
        print(f"读取块 {i//chunk_size + 1}: {len(chunk)} 字符")
        yield chunk
    
    print("文件读取完成")

# 使用异步生成器
async def use_async_generators():
    """使用异步生成器"""
    print("=== 使用异步生成器 ===")
    
    # 1. 数据生成器
    print("\n1. 异步数据生成:")
    data_count = 0
    async for data in async_data_generator(3):
        print(f"处理数据: {data['id']} -> {data['value']}")
        data_count += 1
    
    print(f"总共处理了 {data_count} 条数据")
    
    # 2. 文件读取生成器
    print("\n2. 异步文件读取:")
    chunks = []
    async for chunk in async_file_reader("example.txt", 50):
        chunks.append(chunk)
        print(f"累积内容长度: {sum(len(c) for c in chunks)}")
    
    print(f"文件读取完成,总共 {len(chunks)} 个块")

# 运行示例
asyncio.run(use_async_generators())
'''
    
    print("   异步生成器代码:")
    print(async_generator_code)
    
    # 3. 实际应用示例
    print("\n3. 实际应用示例:")
    
    async def demo_data_stream(count=3):
        """演示数据流生成器"""
        for i in range(count):
            await asyncio.sleep(0.5)
            data = {
                "id": i,
                "message": f"数据项 {i}",
                "timestamp": asyncio.get_event_loop().time()
            }
            print(f"[生成器] 产生数据: {data['message']}")
            yield data
    
    async def demo_data_processor():
        """演示数据处理器"""
        processed_count = 0
        
        async for item in demo_data_stream(5):
            print(f"[处理器] 处理: {item['message']}")
            await asyncio.sleep(0.2)  # 模拟处理时间
            processed_count += 1
            print(f"[处理器] 已处理 {processed_count} 项")
        
        print(f"数据处理完成,总计: {processed_count} 项")
    
    # 运行演示
    asyncio.run(demo_data_processor())
    
    print("\n   ✓ 异步迭代器演示完成")

# 运行异步迭代器演示
async_iterator_demo()

# 4. 异步网络编程

# 4.1 异步HTTP客户端

import asyncio
import aiohttp
import time
from typing import List, Dict

def async_http_client_demo():
    """异步HTTP客户端演示"""
    print("=== 异步HTTP客户端演示 ===")
    
    # 1. 基础HTTP请求
    print("\n1. 基础HTTP请求:")
    
    basic_http_code = '''
# 异步HTTP客户端基础
import asyncio
import aiohttp
import time

async def fetch_url(session, url):
    """获取单个URL"""
    try:
        print(f"开始请求: {url}")
        async with session.get(url) as response:
            content = await response.text()
            print(f"完成请求: {url} - 状态码: {response.status}")
            return {
                "url": url,
                "status": response.status,
                "content_length": len(content),
                "headers": dict(response.headers)
            }
    except Exception as e:
        print(f"请求失败: {url} - 错误: {e}")
        return {"url": url, "error": str(e)}

async def fetch_multiple_urls(urls):
    """并发获取多个URL"""
    print(f"=== 并发请求 {len(urls)} 个URL ===")
    
    start_time = time.time()
    
    # 创建HTTP会话
    async with aiohttp.ClientSession() as session:
        # 创建任务列表
        tasks = [fetch_url(session, url) for url in urls]
        
        # 并发执行所有请求
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    end_time = time.time()
    
    print(f"\n所有请求完成,耗时: {end_time - start_time:.2f}秒")
    
    # 处理结果
    success_count = 0
    error_count = 0
    
    for result in results:
        if isinstance(result, Exception):
            print(f"异常: {result}")
            error_count += 1
        elif "error" in result:
            print(f"错误: {result['url']} - {result['error']}")
            error_count += 1
        else:
            print(f"成功: {result['url']} - {result['status']} - {result['content_length']} 字符")
            success_count += 1
    
    print(f"\n统计: 成功 {success_count}, 失败 {error_count}")
    return results

# 示例URL列表
test_urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/status/200",
    "https://httpbin.org/json",
    "https://httpbin.org/headers"
]

# 运行并发请求
# asyncio.run(fetch_multiple_urls(test_urls))
'''
    
    print("   基础HTTP请求代码:")
    print(basic_http_code)
    
    # 2. 高级HTTP功能
    print("\n2. 高级HTTP功能:")
    
    advanced_http_code = '''
# 高级HTTP功能
import asyncio
import aiohttp
import json

class AsyncHTTPClient:
    """异步HTTP客户端类"""
    
    def __init__(self, timeout=10, max_connections=100):
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.connector = aiohttp.TCPConnector(limit=max_connections)
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            timeout=self.timeout,
            connector=self.connector
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def get(self, url, headers=None, params=None):
        """GET请求"""
        async with self.session.get(url, headers=headers, params=params) as response:
            return await self._process_response(response)
    
    async def post(self, url, data=None, json_data=None, headers=None):
        """POST请求"""
        kwargs = {"headers": headers}
        if json_data:
            kwargs["json"] = json_data
        elif data:
            kwargs["data"] = data
        
        async with self.session.post(url, **kwargs) as response:
            return await self._process_response(response)
    
    async def _process_response(self, response):
        """处理响应"""
        content_type = response.headers.get('content-type', '')
        
        if 'application/json' in content_type:
            content = await response.json()
        else:
            content = await response.text()
        
        return {
            "status": response.status,
            "headers": dict(response.headers),
            "content": content,
            "url": str(response.url)
        }
    
    async def download_file(self, url, filename):
        """下载文件"""
        print(f"开始下载: {url} -> {filename}")
        
        async with self.session.get(url) as response:
            if response.status == 200:
                total_size = int(response.headers.get('content-length', 0))
                downloaded = 0
                
                # 模拟文件写入(实际应用中使用aiofiles)
                content = b""
                async for chunk in response.content.iter_chunked(8192):
                    content += chunk
                    downloaded += len(chunk)
                    
                    if total_size > 0:
                        progress = (downloaded / total_size) * 100
                        print(f"下载进度: {progress:.1f}% ({downloaded}/{total_size})")
                
                print(f"下载完成: {filename} ({len(content)} 字节)")
                return {"filename": filename, "size": len(content)}
            else:
                raise Exception(f"下载失败: HTTP {response.status}")

# 使用高级HTTP客户端
async def use_advanced_http_client():
    """使用高级HTTP客户端"""
    print("=== 使用高级HTTP客户端 ===")
    
    async with AsyncHTTPClient(timeout=15) as client:
        # GET请求
        print("\n1. GET请求:")
        result = await client.get("https://httpbin.org/get", 
                                params={"key": "value", "test": "async"})
        print(f"GET结果: {result['status']} - {len(str(result['content']))} 字符")
        
        # POST请求
        print("\n2. POST请求:")
        post_data = {"name": "异步测试", "type": "demo"}
        result = await client.post("https://httpbin.org/post", json_data=post_data)
        print(f"POST结果: {result['status']}")
        
        # 文件下载
        print("\n3. 文件下载:")
        try:
            download_result = await client.download_file(
                "https://httpbin.org/bytes/1024", 
                "test_file.bin"
            )
            print(f"下载结果: {download_result}")
        except Exception as e:
            print(f"下载失败: {e}")

# 运行高级HTTP客户端示例
# asyncio.run(use_advanced_http_client())
'''
    
    print("   高级HTTP功能代码:")
    print(advanced_http_code)
    
    # 3. 实际应用示例
    print("\n3. 实际应用示例:")
    
    async def demo_http_requests():
        """演示HTTP请求"""
        print("开始HTTP请求演示...")
        
        # 模拟多个API端点
        endpoints = [
            {"name": "用户信息", "url": "https://jsonplaceholder.typicode.com/users/1"},
            {"name": "文章列表", "url": "https://jsonplaceholder.typicode.com/posts?_limit=3"},
            {"name": "评论数据", "url": "https://jsonplaceholder.typicode.com/comments?_limit=3"}
        ]
        
        async def fetch_endpoint(session, endpoint):
            try:
                print(f"请求 {endpoint['name']}...")
                async with session.get(endpoint['url']) as response:
                    if response.status == 200:
                        data = await response.json()
                        print(f"✓ {endpoint['name']} 获取成功")
                        return {"name": endpoint['name'], "data": data, "status": "success"}
                    else:
                        print(f"✗ {endpoint['name']} 请求失败: {response.status}")
                        return {"name": endpoint['name'], "status": "failed", "code": response.status}
            except Exception as e:
                print(f"✗ {endpoint['name']} 异常: {e}")
                return {"name": endpoint['name'], "status": "error", "error": str(e)}
        
        # 并发请求所有端点
        start_time = time.time()
        
        async with aiohttp.ClientSession() as session:
            tasks = [fetch_endpoint(session, ep) for ep in endpoints]
            results = await asyncio.gather(*tasks)
        
        end_time = time.time()
        
        # 处理结果
        print(f"\n请求完成,总耗时: {end_time - start_time:.2f}秒")
        
        for result in results:
            if result['status'] == 'success':
                data_info = f"数据项: {len(result['data']) if isinstance(result['data'], list) else 1}"
                print(f"✓ {result['name']}: {data_info}")
            else:
                print(f"✗ {result['name']}: {result['status']}")
    
    # 运行演示
    asyncio.run(demo_http_requests())
    
    print("\n   ✓ 异步HTTP客户端演示完成")

# 运行异步HTTP客户端演示
async_http_client_demo()

# 4.2 异步服务器编程

import asyncio
from aiohttp import web, WSMsgType
import json
import time

def async_server_demo():
    """异步服务器编程演示"""
    print("=== 异步服务器编程演示 ===")
    
    # 1. 基础HTTP服务器
    print("\n1. 基础HTTP服务器:")
    
    basic_server_code = '''
# 异步HTTP服务器基础
import asyncio
from aiohttp import web
import json
import time

# 全局数据存储
server_data = {
    "users": [
        {"id": 1, "name": "张三", "email": "zhangsan@example.com"},
        {"id": 2, "name": "李四", "email": "lisi@example.com"}
    ],
    "request_count": 0
}

async def hello_handler(request):
    """Hello处理器"""
    server_data["request_count"] += 1
    name = request.query.get('name', 'World')
    
    response_data = {
        "message": f"Hello, {name}!",
        "timestamp": time.time(),
        "request_count": server_data["request_count"]
    }
    
    return web.json_response(response_data)

async def users_handler(request):
    """用户列表处理器"""
    server_data["request_count"] += 1
    
    if request.method == 'GET':
        # 获取用户列表
        return web.json_response({
            "users": server_data["users"],
            "total": len(server_data["users"])
        })
    
    elif request.method == 'POST':
        # 创建新用户
        try:
            data = await request.json()
            new_user = {
                "id": len(server_data["users"]) + 1,
                "name": data.get("name"),
                "email": data.get("email")
            }
            server_data["users"].append(new_user)
            
            return web.json_response(new_user, status=201)
        except Exception as e:
            return web.json_response(
                {"error": str(e)}, 
                status=400
            )

async def user_detail_handler(request):
    """用户详情处理器"""
    server_data["request_count"] += 1
    user_id = int(request.match_info['user_id'])
    
    # 查找用户
    user = next((u for u in server_data["users"] if u["id"] == user_id), None)
    
    if user:
        return web.json_response(user)
    else:
        return web.json_response(
            {"error": "User not found"}, 
            status=404
        )

async def stats_handler(request):
    """统计信息处理器"""
    return web.json_response({
        "total_users": len(server_data["users"]),
        "total_requests": server_data["request_count"],
        "server_uptime": time.time() - start_time
    })

# 中间件
async def logging_middleware(request, handler):
    """日志中间件"""
    start = time.time()
    print(f"[{time.strftime('%H:%M:%S')}] {request.method} {request.path}")
    
    response = await handler(request)
    
    process_time = time.time() - start
    print(f"[{time.strftime('%H:%M:%S')}] 响应: {response.status} ({process_time:.3f}s)")
    
    return response

# 创建应用
def create_app():
    """创建Web应用"""
    app = web.Application(middlewares=[logging_middleware])
    
    # 添加路由
    app.router.add_get('/', hello_handler)
    app.router.add_get('/hello', hello_handler)
    app.router.add_get('/users', users_handler)
    app.router.add_post('/users', users_handler)
    app.router.add_get('/users/{user_id}', user_detail_handler)
    app.router.add_get('/stats', stats_handler)
    
    return app

# 启动服务器
if __name__ == '__main__':
    start_time = time.time()
    app = create_app()
    
    print("启动异步HTTP服务器...")
    print("访问 http://localhost:8080")
    print("API端点:")
    print("  GET  /hello?name=YourName")
    print("  GET  /users")
    print("  POST /users")
    print("  GET  /users/{id}")
    print("  GET  /stats")
    
    web.run_app(app, host='localhost', port=8080)
'''
    
    print("   基础HTTP服务器代码:")
    print(basic_server_code)
    
    # 2. WebSocket服务器
    print("\n2. WebSocket服务器:")
    
    websocket_server_code = '''
# WebSocket服务器
import asyncio
from aiohttp import web, WSMsgType
import json
import time
import weakref

# WebSocket连接管理
class WebSocketManager:
    """WebSocket连接管理器"""
    
    def __init__(self):
        self.connections = weakref.WeakSet()
        self.rooms = {}  # 房间管理
    
    def add_connection(self, ws, room=None):
        """添加连接"""
        self.connections.add(ws)
        if room:
            if room not in self.rooms:
                self.rooms[room] = weakref.WeakSet()
            self.rooms[room].add(ws)
        print(f"新连接加入,当前连接数: {len(self.connections)}")
    
    def remove_connection(self, ws, room=None):
        """移除连接"""
        if ws in self.connections:
            self.connections.discard(ws)
        if room and room in self.rooms:
            self.rooms[room].discard(ws)
        print(f"连接断开,当前连接数: {len(self.connections)}")
    
    async def broadcast(self, message, room=None):
        """广播消息"""
        if room and room in self.rooms:
            connections = self.rooms[room]
        else:
            connections = self.connections
        
        if connections:
            await asyncio.gather(
                *[ws.send_str(json.dumps(message)) for ws in connections],
                return_exceptions=True
            )

# 全局WebSocket管理器
ws_manager = WebSocketManager()

async def websocket_handler(request):
    """WebSocket处理器"""
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    
    # 获取房间信息
    room = request.query.get('room', 'default')
    user_name = request.query.get('name', f'用户{int(time.time()) % 1000}')
    
    # 添加连接
    ws_manager.add_connection(ws, room)
    
    # 发送欢迎消息
    welcome_msg = {
        "type": "welcome",
        "message": f"欢迎 {user_name} 加入房间 {room}",
        "user": user_name,
        "room": room,
        "timestamp": time.time()
    }
    await ws.send_str(json.dumps(welcome_msg))
    
    # 通知其他用户
    join_msg = {
        "type": "user_join",
        "message": f"{user_name} 加入了房间",
        "user": user_name,
        "room": room,
        "timestamp": time.time()
    }
    await ws_manager.broadcast(join_msg, room)
    
    try:
        # 处理消息
        async for msg in ws:
            if msg.type == WSMsgType.TEXT:
                try:
                    data = json.loads(msg.data)
                    
                    # 处理不同类型的消息
                    if data.get('type') == 'chat':
                        # 聊天消息
                        chat_msg = {
                            "type": "chat",
                            "user": user_name,
                            "message": data.get('message', ''),
                            "room": room,
                            "timestamp": time.time()
                        }
                        await ws_manager.broadcast(chat_msg, room)
                    
                    elif data.get('type') == 'ping':
                        # 心跳检测
                        pong_msg = {
                            "type": "pong",
                            "timestamp": time.time()
                        }
                        await ws.send_str(json.dumps(pong_msg))
                    
                except json.JSONDecodeError:
                    error_msg = {
                        "type": "error",
                        "message": "无效的JSON格式"
                    }
                    await ws.send_str(json.dumps(error_msg))
            
            elif msg.type == WSMsgType.ERROR:
                print(f'WebSocket错误: {ws.exception()}')
                break
    
    except Exception as e:
        print(f"WebSocket处理异常: {e}")
    
    finally:
        # 清理连接
        ws_manager.remove_connection(ws, room)
        
        # 通知其他用户
        leave_msg = {
            "type": "user_leave",
            "message": f"{user_name} 离开了房间",
            "user": user_name,
            "room": room,
            "timestamp": time.time()
        }
        await ws_manager.broadcast(leave_msg, room)
    
    return ws

async def websocket_stats_handler(request):
    """WebSocket统计信息"""
    stats = {
        "total_connections": len(ws_manager.connections),
        "rooms": {room: len(connections) for room, connections in ws_manager.rooms.items()},
        "timestamp": time.time()
    }
    return web.json_response(stats)

# WebSocket客户端示例页面
async def websocket_client_page(request):
    """WebSocket客户端页面"""
    html = '''
<!DOCTYPE html>
<html>
<head>
    <title>WebSocket聊天室</title>
    <meta charset="utf-8">
</head>
<body>
    <div id="messages"></div>
    <input type="text" id="messageInput" placeholder="输入消息...">
    <button onclick="sendMessage()">发送</button>
    <button onclick="sendPing()">Ping</button>
    
    <script>
        const ws = new WebSocket('ws://localhost:8080/ws?room=test&name=测试用户');
        const messages = document.getElementById('messages');
        
        ws.onmessage = function(event) {
            const data = JSON.parse(event.data);
            const div = document.createElement('div');
            div.innerHTML = `[${data.type}] ${data.message || data.user + ': ' + (data.message || '')}`;
            messages.appendChild(div);
            messages.scrollTop = messages.scrollHeight;
        };
        
        function sendMessage() {
            const input = document.getElementById('messageInput');
            if (input.value) {
                ws.send(JSON.stringify({
                    type: 'chat',
                    message: input.value
                }));
                input.value = '';
            }
        }
        
        function sendPing() {
            ws.send(JSON.stringify({type: 'ping'}));
        }
        
        document.getElementById('messageInput').addEventListener('keypress', function(e) {
            if (e.key === 'Enter') {
                sendMessage();
            }
        });
    </script>
</body>
</html>
    '''
    return web.Response(text=html, content_type='text/html')

# 创建WebSocket应用
def create_websocket_app():
    """创建WebSocket应用"""
    app = web.Application()
    
    # 添加路由
    app.router.add_get('/ws', websocket_handler)
    app.router.add_get('/ws/stats', websocket_stats_handler)
    app.router.add_get('/chat', websocket_client_page)
    
    return app

# 启动WebSocket服务器
if __name__ == '__main__':
    app = create_websocket_app()
    
    print("启动WebSocket服务器...")
    print("访问 http://localhost:8080/chat 测试聊天室")
    print("WebSocket端点: ws://localhost:8080/ws")
    print("统计信息: http://localhost:8080/ws/stats")
    
    web.run_app(app, host='localhost', port=8080)
'''
    
    print("   WebSocket服务器代码:")
    print(websocket_server_code)
    
    # 3. 实际应用示例
    print("\n3. 实际应用示例:")
    
    async def demo_simple_server():
        """演示简单服务器"""
        print("创建演示服务器...")
        
        async def demo_handler(request):
            return web.json_response({
                "message": "Hello from async server!",
                "path": request.path,
                "method": request.method,
                "timestamp": time.time()
            })
        
        app = web.Application()
        app.router.add_get('/', demo_handler)
        app.router.add_get('/demo', demo_handler)
        
        print("演示服务器配置完成")
        print("在实际应用中,可以通过 web.run_app(app, host='localhost', port=8080) 启动")
        
        return app
    
    # 运行演示
    asyncio.run(demo_simple_server())
    
    print("\n   ✓ 异步服务器编程演示完成")

# 运行异步服务器演示
async_server_demo()

# 5. 实际应用案例

# 5.1 异步文件处理系统

import asyncio
import aiofiles
import aiohttp
import hashlib
import os
from pathlib import Path
from typing import List, Dict

def async_file_system_demo():
    """异步文件处理系统演示"""
    print("=== 异步文件处理系统演示 ===")
    
    # 1. 异步文件操作
    print("\n1. 异步文件操作:")
    
    file_operations_code = '''
# 异步文件操作
import asyncio
import aiofiles
import hashlib
import os
from pathlib import Path

class AsyncFileProcessor:
    """异步文件处理器"""
    
    def __init__(self, base_dir="./async_files"):
        self.base_dir = Path(base_dir)
        self.base_dir.mkdir(exist_ok=True)
    
    async def create_file(self, filename, content):
        """创建文件"""
        file_path = self.base_dir / filename
        
        async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
            await f.write(content)
        
        print(f"文件已创建: {file_path}")
        return file_path
    
    async def read_file(self, filename):
        """读取文件"""
        file_path = self.base_dir / filename
        
        if not file_path.exists():
            raise FileNotFoundError(f"文件不存在: {file_path}")
        
        async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
            content = await f.read()
        
        print(f"文件已读取: {file_path} ({len(content)} 字符)")
        return content
    
    async def copy_file(self, src_filename, dst_filename):
        """复制文件"""
        src_path = self.base_dir / src_filename
        dst_path = self.base_dir / dst_filename
        
        async with aiofiles.open(src_path, 'rb') as src:
            async with aiofiles.open(dst_path, 'wb') as dst:
                while True:
                    chunk = await src.read(8192)
                    if not chunk:
                        break
                    await dst.write(chunk)
        
        print(f"文件已复制: {src_path} -> {dst_path}")
        return dst_path
    
    async def calculate_hash(self, filename):
        """计算文件哈希"""
        file_path = self.base_dir / filename
        hash_md5 = hashlib.md5()
        
        async with aiofiles.open(file_path, 'rb') as f:
            while True:
                chunk = await f.read(8192)
                if not chunk:
                    break
                hash_md5.update(chunk)
        
        file_hash = hash_md5.hexdigest()
        print(f"文件哈希: {filename} -> {file_hash}")
        return file_hash
    
    async def process_multiple_files(self, file_list):
        """并发处理多个文件"""
        print(f"开始并发处理 {len(file_list)} 个文件...")
        
        tasks = []
        for file_info in file_list:
            if file_info['action'] == 'create':
                task = self.create_file(file_info['name'], file_info['content'])
            elif file_info['action'] == 'read':
                task = self.read_file(file_info['name'])
            elif file_info['action'] == 'hash':
                task = self.calculate_hash(file_info['name'])
            else:
                continue
            
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 处理结果
        success_count = 0
        error_count = 0
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"任务 {i} 失败: {result}")
                error_count += 1
            else:
                success_count += 1
        
        print(f"处理完成: 成功 {success_count}, 失败 {error_count}")
        return results

# 使用异步文件处理器
async def use_async_file_processor():
    """使用异步文件处理器"""
    print("=== 使用异步文件处理器 ===")
    
    processor = AsyncFileProcessor()
    
    # 1. 创建测试文件
    test_files = [
        {"action": "create", "name": "test1.txt", "content": "这是测试文件1的内容\n" * 100},
        {"action": "create", "name": "test2.txt", "content": "这是测试文件2的内容\n" * 200},
        {"action": "create", "name": "test3.txt", "content": "这是测试文件3的内容\n" * 150}
    ]
    
    await processor.process_multiple_files(test_files)
    
    # 2. 并发读取和计算哈希
    read_tasks = [
        {"action": "read", "name": "test1.txt"},
        {"action": "hash", "name": "test2.txt"},
        {"action": "hash", "name": "test3.txt"}
    ]
    
    await processor.process_multiple_files(read_tasks)
    
    # 3. 复制文件
    await processor.copy_file("test1.txt", "test1_copy.txt")
    
    print("文件处理演示完成")

# 运行示例
# asyncio.run(use_async_file_processor())
'''
    
    print("   异步文件操作代码:")
    print(file_operations_code)
    
    # 2. 异步数据采集系统
    print("\n2. 异步数据采集系统:")
    
    data_collector_code = '''
# 异步数据采集系统
import asyncio
import aiohttp
import aiofiles
import json
import time
from datetime import datetime

class AsyncDataCollector:
    """异步数据采集器"""
    
    def __init__(self, output_dir="./collected_data"):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.session = None
        self.collected_data = []
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def fetch_api_data(self, api_config):
        """获取API数据"""
        try:
            print(f"获取数据: {api_config['name']}")
            
            async with self.session.get(api_config['url']) as response:
                if response.status == 200:
                    if 'application/json' in response.headers.get('content-type', ''):
                        data = await response.json()
                    else:
                        data = await response.text()
                    
                    result = {
                        "source": api_config['name'],
                        "url": api_config['url'],
                        "data": data,
                        "timestamp": datetime.now().isoformat(),
                        "status": "success"
                    }
                    
                    print(f"✓ {api_config['name']} 数据获取成功")
                    return result
                else:
                    raise Exception(f"HTTP {response.status}")
        
        except Exception as e:
            print(f"✗ {api_config['name']} 数据获取失败: {e}")
            return {
                "source": api_config['name'],
                "url": api_config['url'],
                "error": str(e),
                "timestamp": datetime.now().isoformat(),
                "status": "error"
            }
    
    async def save_data(self, data, filename=None):
        """保存数据到文件"""
        if filename is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"collected_data_{timestamp}.json"
        
        file_path = self.output_dir / filename
        
        async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
            await f.write(json.dumps(data, ensure_ascii=False, indent=2))
        
        print(f"数据已保存: {file_path}")
        return file_path
    
    async def collect_batch(self, api_configs, save_individual=True):
        """批量采集数据"""
        print(f"开始批量采集 {len(api_configs)} 个数据源...")
        
        start_time = time.time()
        
        # 并发获取所有数据
        tasks = [self.fetch_api_data(config) for config in api_configs]
        results = await asyncio.gather(*tasks)
        
        end_time = time.time()
        
        # 统计结果
        success_results = [r for r in results if r.get('status') == 'success']
        error_results = [r for r in results if r.get('status') == 'error']
        
        print(f"采集完成: 成功 {len(success_results)}, 失败 {len(error_results)}, 耗时 {end_time - start_time:.2f}秒")
        
        # 保存数据
        if save_individual:
            # 分别保存每个数据源
            save_tasks = []
            for result in success_results:
                filename = f"{result['source']}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
                save_tasks.append(self.save_data(result, filename))
            
            if save_tasks:
                await asyncio.gather(*save_tasks)
        
        # 保存汇总数据
        summary = {
            "collection_time": datetime.now().isoformat(),
            "total_sources": len(api_configs),
            "successful": len(success_results),
            "failed": len(error_results),
            "duration_seconds": end_time - start_time,
            "results": results
        }
        
        await self.save_data(summary, "collection_summary.json")
        
        return summary

# 使用数据采集器
async def use_data_collector():
    """使用数据采集器"""
    print("=== 使用异步数据采集器 ===")
    
    # 配置数据源
    api_configs = [
        {"name": "用户数据", "url": "https://jsonplaceholder.typicode.com/users"},
        {"name": "文章数据", "url": "https://jsonplaceholder.typicode.com/posts?_limit=5"},
        {"name": "评论数据", "url": "https://jsonplaceholder.typicode.com/comments?_limit=5"},
        {"name": "相册数据", "url": "https://jsonplaceholder.typicode.com/albums?_limit=3"}
    ]
    
    async with AsyncDataCollector() as collector:
        summary = await collector.collect_batch(api_configs)
        
        print(f"\n采集汇总:")
        print(f"  总数据源: {summary['total_sources']}")
        print(f"  成功: {summary['successful']}")
        print(f"  失败: {summary['failed']}")
        print(f"  耗时: {summary['duration_seconds']:.2f}秒")

# 运行示例
# asyncio.run(use_data_collector())
'''
    
    print("   异步数据采集系统代码:")
    print(data_collector_code)
    
    # 3. 实际应用演示
    print("\n3. 实际应用演示:")
    
    async def demo_file_operations():
        """演示文件操作"""
        print("开始文件操作演示...")
        
        # 模拟创建多个文件
        files_to_create = [
            {"name": f"demo_{i}.txt", "content": f"演示文件 {i} 的内容\n" * (i + 1) * 10}
            for i in range(3)
        ]
        
        print(f"创建 {len(files_to_create)} 个演示文件...")
        
        # 模拟并发文件操作
        async def create_demo_file(file_info):
            await asyncio.sleep(0.1)  # 模拟文件创建时间
            print(f"创建文件: {file_info['name']} ({len(file_info['content'])} 字符)")
            return {"name": file_info['name'], "size": len(file_info['content'])}
        
        # 并发创建文件
        tasks = [create_demo_file(file_info) for file_info in files_to_create]
        results = await asyncio.gather(*tasks)
        
        total_size = sum(result['size'] for result in results)
        print(f"文件创建完成,总大小: {total_size} 字符")
        
        return results
    
    # 运行演示
    asyncio.run(demo_file_operations())
    
    print("\n   ✓ 异步文件处理系统演示完成")

# 运行异步文件处理系统演示
async_file_system_demo()

# 5.2 异步爬虫系统

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin, urlparse

def async_crawler_demo():
    """异步爬虫系统演示"""
    print("=== 异步爬虫系统演示 ===")
    
    # 1. 基础异步爬虫
    print("\n1. 基础异步爬虫:")
    
    basic_crawler_code = '''
# 异步爬虫基础
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin, urlparse

class AsyncWebCrawler:
    """异步网页爬虫"""
    
    def __init__(self, max_concurrent=10, delay=1.0):
        self.max_concurrent = max_concurrent
        self.delay = delay
        self.session = None
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.visited_urls = set()
        self.results = []
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=30),
            headers={'User-Agent': 'AsyncCrawler/1.0'}
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def fetch_page(self, url):
        """获取单个页面"""
        async with self.semaphore:
            if url in self.visited_urls:
                return None
            
            self.visited_urls.add(url)
            
            try:
                print(f"爬取: {url}")
                
                async with self.session.get(url) as response:
                    if response.status == 200:
                        content = await response.text()
                        
                        # 解析页面
                        soup = BeautifulSoup(content, 'html.parser')
                        
                        result = {
                            "url": url,
                            "title": soup.title.string if soup.title else "无标题",
                            "content_length": len(content),
                            "links": self._extract_links(soup, url),
                            "status": "success",
                            "timestamp": time.time()
                        }
                        
                        print(f"✓ 爬取成功: {url} - {result['title']}")
                        
                        # 添加延迟
                        await asyncio.sleep(self.delay)
                        
                        return result
                    else:
                        print(f"✗ HTTP错误: {url} - {response.status}")
                        return {"url": url, "status": "error", "error": f"HTTP {response.status}"}
            
            except Exception as e:
                print(f"✗ 爬取失败: {url} - {e}")
                return {"url": url, "status": "error", "error": str(e)}
    
    def _extract_links(self, soup, base_url):
        """提取页面链接"""
        links = []
        for link in soup.find_all('a', href=True):
            href = link['href']
            full_url = urljoin(base_url, href)
            
            # 只保留HTTP/HTTPS链接
            if full_url.startswith(('http://', 'https://')):
                links.append({
                    "url": full_url,
                    "text": link.get_text(strip=True)[:100]  # 限制文本长度
                })
        
        return links[:10]  # 限制链接数量
    
    async def crawl_urls(self, urls, max_depth=1):
        """爬取URL列表"""
        print(f"开始爬取 {len(urls)} 个URL,最大深度: {max_depth}")
        
        start_time = time.time()
        current_urls = list(urls)
        
        for depth in range(max_depth):
            print(f"\n=== 深度 {depth + 1} ===")
            
            if not current_urls:
                break
            
            # 并发爬取当前层级的URL
            tasks = [self.fetch_page(url) for url in current_urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 处理结果
            next_urls = set()
            for result in results:
                if isinstance(result, Exception):
                    print(f"异常: {result}")
                    continue
                
                if result and result.get('status') == 'success':
                    self.results.append(result)
                    
                    # 收集下一层的URL
                    if depth < max_depth - 1:
                        for link in result.get('links', []):
                            if link['url'] not in self.visited_urls:
                                next_urls.add(link['url'])
            
            current_urls = list(next_urls)[:5]  # 限制下一层URL数量
        
        end_time = time.time()
        
        print(f"\n爬取完成:")
        print(f"  总页面: {len(self.results)}")
        print(f"  总耗时: {end_time - start_time:.2f}秒")
        print(f"  平均速度: {len(self.results) / (end_time - start_time):.2f} 页面/秒")
        
        return self.results

# 使用异步爬虫
async def use_async_crawler():
    """使用异步爬虫"""
    print("=== 使用异步爬虫 ===")
    
    # 测试URL列表
    test_urls = [
        "https://httpbin.org/html",
        "https://httpbin.org/links/5",
        "https://httpbin.org/forms/post"
    ]
    
    async with AsyncWebCrawler(max_concurrent=3, delay=0.5) as crawler:
        results = await crawler.crawl_urls(test_urls, max_depth=2)
        
        # 显示结果摘要
        print("\n=== 爬取结果摘要 ===")
        for result in results:
            print(f"  {result['title'][:50]}... - {result['url']}")

# 运行示例
# asyncio.run(use_async_crawler())
'''
    
    print("   基础异步爬虫代码:")
    print(basic_crawler_code)
    
    # 2. 实际应用演示
    print("\n2. 实际应用演示:")
    
    async def demo_simple_crawler():
        """演示简单爬虫"""
        print("开始简单爬虫演示...")
        
        # 模拟爬取多个页面
        urls_to_crawl = [
            "https://httpbin.org/html",
            "https://httpbin.org/json",
            "https://httpbin.org/xml"
        ]
        
        async def fetch_demo_page(session, url):
            try:
                print(f"爬取演示页面: {url}")
                async with session.get(url) as response:
                    if response.status == 200:
                        content = await response.text()
                        print(f"✓ 页面获取成功: {url} ({len(content)} 字符)")
                        return {"url": url, "size": len(content), "status": "success"}
                    else:
                        print(f"✗ 页面获取失败: {url} - HTTP {response.status}")
                        return {"url": url, "status": "failed", "code": response.status}
            except Exception as e:
                print(f"✗ 爬取异常: {url} - {e}")
                return {"url": url, "status": "error", "error": str(e)}
        
        # 并发爬取
        async with aiohttp.ClientSession() as session:
            tasks = [fetch_demo_page(session, url) for url in urls_to_crawl]
            results = await asyncio.gather(*tasks)
        
        # 统计结果
        success_count = sum(1 for r in results if r.get('status') == 'success')
        total_size = sum(r.get('size', 0) for r in results if r.get('status') == 'success')
        
        print(f"\n爬取统计:")
        print(f"  成功页面: {success_count}/{len(urls_to_crawl)}")
        print(f"  总内容大小: {total_size} 字符")
        
        return results
    
    # 运行演示
    asyncio.run(demo_simple_crawler())
    
    print("\n   ✓ 异步爬虫系统演示完成")

# 运行异步爬虫演示
async_crawler_demo()

# 6. 性能优化和最佳实践

# 6.1 性能优化技巧

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def async_performance_demo():
    """异步性能优化演示"""
    print("=== 异步性能优化演示 ===")
    
    # 1. 并发控制
    print("\n1. 并发控制:")
    
    concurrency_control_code = '''
# 并发控制优化
import asyncio
import time

# 1. 使用信号量控制并发数
async def controlled_task(semaphore, task_id, duration):
    """受控制的任务"""
    async with semaphore:
        print(f"任务 {task_id} 开始执行")
        await asyncio.sleep(duration)
        print(f"任务 {task_id} 执行完成")
        return f"结果_{task_id}"

async def demo_semaphore_control():
    """演示信号量控制"""
    print("=== 信号量并发控制 ===")
    
    # 限制最多3个并发任务
    semaphore = asyncio.Semaphore(3)
    
    # 创建10个任务
    tasks = [
        controlled_task(semaphore, i, 1.0)
        for i in range(10)
    ]
    
    start_time = time.time()
    results = await asyncio.gather(*tasks)
    end_time = time.time()
    
    print(f"完成 {len(results)} 个任务,耗时: {end_time - start_time:.2f}秒")
    return results

# 2. 任务分批处理
async def batch_process_tasks(tasks, batch_size=5):
    """分批处理任务"""
    print(f"分批处理 {len(tasks)} 个任务,批大小: {batch_size}")
    
    results = []
    
    for i in range(0, len(tasks), batch_size):
        batch = tasks[i:i + batch_size]
        print(f"处理批次 {i//batch_size + 1}: {len(batch)} 个任务")
        
        batch_results = await asyncio.gather(*batch)
        results.extend(batch_results)
        
        # 批次间添加小延迟
        if i + batch_size < len(tasks):
            await asyncio.sleep(0.1)
    
    print(f"所有批次处理完成,总结果: {len(results)}")
    return results

# 3. 连接池优化
class AsyncConnectionPool:
    """异步连接池"""
    
    def __init__(self, max_connections=10):
        self.max_connections = max_connections
        self.available_connections = asyncio.Queue(maxsize=max_connections)
        self.total_connections = 0
    
    async def get_connection(self):
        """获取连接"""
        try:
            # 尝试从池中获取现有连接
            connection = self.available_connections.get_nowait()
            print(f"复用连接: {connection}")
            return connection
        except asyncio.QueueEmpty:
            # 创建新连接
            if self.total_connections < self.max_connections:
                connection = f"conn_{self.total_connections}"
                self.total_connections += 1
                print(f"创建新连接: {connection}")
                return connection
            else:
                # 等待可用连接
                print("等待可用连接...")
                return await self.available_connections.get()
    
    async def return_connection(self, connection):
        """归还连接"""
        try:
            self.available_connections.put_nowait(connection)
            print(f"归还连接: {connection}")
        except asyncio.QueueFull:
            print(f"连接池已满,丢弃连接: {connection}")

# 运行并发控制演示
# asyncio.run(demo_semaphore_control())
'''
    
    print("   并发控制代码:")
    print(concurrency_control_code)
    
    # 2. 内存优化
    print("\n2. 内存优化:")
    
    memory_optimization_code = '''
# 内存优化技巧
import asyncio
import weakref
from typing import AsyncIterator

# 1. 使用异步生成器减少内存占用
async def memory_efficient_data_processor(data_source):
    """内存高效的数据处理器"""
    async for item in data_source:
        # 处理单个数据项
        processed_item = await process_single_item(item)
        
        # 立即yield,不在内存中累积
        yield processed_item
        
        # 可选:触发垃圾回收
        if processed_item['id'] % 100 == 0:
            import gc
            gc.collect()

async def process_single_item(item):
    """处理单个数据项"""
    await asyncio.sleep(0.01)  # 模拟处理时间
    return {
        "id": item.get("id"),
        "processed": True,
        "result": f"processed_{item.get('value', 'unknown')}"
    }

# 2. 弱引用管理连接
class WeakConnectionManager:
    """使用弱引用管理连接"""
    
    def __init__(self):
        self._connections = weakref.WeakSet()
    
    def add_connection(self, conn):
        """添加连接"""
        self._connections.add(conn)
        print(f"添加连接,当前连接数: {len(self._connections)}")
    
    def get_connection_count(self):
        """获取当前连接数"""
        return len(self._connections)
    
    async def broadcast_to_all(self, message):
        """向所有连接广播消息"""
        if self._connections:
            tasks = []
            for conn in self._connections:
                tasks.append(conn.send_message(message))
            
            await asyncio.gather(*tasks, return_exceptions=True)

# 3. 流式处理大数据
async def stream_large_dataset(dataset_size=10000):
    """流式处理大数据集"""
    print(f"开始流式处理 {dataset_size} 条数据...")
    
    processed_count = 0
    
    async def data_generator():
        """数据生成器"""
        for i in range(dataset_size):
            yield {"id": i, "value": f"data_{i}"}
            
            # 每1000条数据暂停一下
            if i % 1000 == 0:
                await asyncio.sleep(0.01)
    
    # 流式处理
    async for processed_item in memory_efficient_data_processor(data_generator()):
        processed_count += 1
        
        # 定期报告进度
        if processed_count % 1000 == 0:
            print(f"已处理: {processed_count}/{dataset_size}")
    
    print(f"流式处理完成,总计: {processed_count} 条数据")
    return processed_count

# 运行内存优化演示
# asyncio.run(stream_large_dataset(5000))
'''
    
    print("   内存优化代码:")
    print(memory_optimization_code)
    
    # 3. 实际应用演示
    print("\n3. 实际应用演示:")
    
    async def demo_performance_optimization():
        """演示性能优化"""
        print("开始性能优化演示...")
        
        # 1. 并发控制演示
        semaphore = asyncio.Semaphore(3)
        
        async def demo_task(task_id):
            async with semaphore:
                print(f"执行任务 {task_id}")
                await asyncio.sleep(0.5)
                return f"任务{task_id}完成"
        
        # 创建多个任务
        tasks = [demo_task(i) for i in range(8)]
        
        start_time = time.time()
        results = await asyncio.gather(*tasks)
        end_time = time.time()
        
        print(f"并发控制演示完成: {len(results)} 个任务,耗时 {end_time - start_time:.2f}秒")
        
        # 2. 批处理演示
        print("\n批处理演示:")
        
        async def batch_task(batch_id, items):
            print(f"处理批次 {batch_id}: {len(items)} 个项目")
            await asyncio.sleep(0.3)
            return f"批次{batch_id}处理完成"
        
        # 分批处理
        all_items = list(range(20))
        batch_size = 5
        batch_tasks = []
        
        for i in range(0, len(all_items), batch_size):
            batch = all_items[i:i + batch_size]
            batch_tasks.append(batch_task(i // batch_size, batch))
        
        batch_results = await asyncio.gather(*batch_tasks)
        print(f"批处理完成: {len(batch_results)} 个批次")
        
        return {"concurrent_results": len(results), "batch_results": len(batch_results)}
    
    # 运行演示
    asyncio.run(demo_performance_optimization())
    
    print("\n   ✓ 性能优化演示完成")

# 运行性能优化演示
async_performance_demo()

# 6.2 最佳实践

def async_best_practices_demo():
    """异步编程最佳实践演示"""
    print("=== 异步编程最佳实践演示 ===")
    
    # 1. 错误处理最佳实践
    print("\n1. 错误处理最佳实践:")
    
    error_handling_code = '''
# 异步错误处理最佳实践
import asyncio
import logging
from typing import List, Optional

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AsyncTaskManager:
    """异步任务管理器"""
    
    def __init__(self):
        self.failed_tasks = []
        self.successful_tasks = []
    
    async def execute_task_safely(self, task_func, *args, **kwargs):
        """安全执行任务"""
        try:
            result = await task_func(*args, **kwargs)
            self.successful_tasks.append({
                "task": task_func.__name__,
                "result": result,
                "status": "success"
            })
            return result
        
        except asyncio.TimeoutError:
            error_msg = f"任务超时: {task_func.__name__}"
            logger.error(error_msg)
            self.failed_tasks.append({
                "task": task_func.__name__,
                "error": "timeout",
                "status": "failed"
            })
            return None
        
        except Exception as e:
            error_msg = f"任务异常: {task_func.__name__} - {e}"
            logger.error(error_msg)
            self.failed_tasks.append({
                "task": task_func.__name__,
                "error": str(e),
                "status": "failed"
            })
            return None
    
    async def execute_tasks_with_retry(self, tasks, max_retries=3):
        """带重试的任务执行"""
        results = []
        
        for task_info in tasks:
            task_func = task_info["func"]
            args = task_info.get("args", [])
            kwargs = task_info.get("kwargs", {})
            
            for attempt in range(max_retries + 1):
                try:
                    if attempt > 0:
                        logger.info(f"重试任务 {task_func.__name__} (第{attempt}次)")
                        await asyncio.sleep(2 ** attempt)  # 指数退避
                    
                    result = await asyncio.wait_for(
                        task_func(*args, **kwargs),
                        timeout=10.0
                    )
                    
                    results.append(result)
                    logger.info(f"任务成功: {task_func.__name__}")
                    break
                
                except Exception as e:
                    if attempt == max_retries:
                        logger.error(f"任务最终失败: {task_func.__name__} - {e}")
                        results.append(None)
                    else:
                        logger.warning(f"任务失败,将重试: {task_func.__name__} - {e}")
        
        return results
    
    def get_summary(self):
        """获取执行摘要"""
        return {
            "successful": len(self.successful_tasks),
            "failed": len(self.failed_tasks),
            "total": len(self.successful_tasks) + len(self.failed_tasks),
            "success_rate": len(self.successful_tasks) / max(1, len(self.successful_tasks) + len(self.failed_tasks))
        }

# 示例任务函数
async def reliable_task(task_id, success_rate=0.8):
    """可靠性测试任务"""
    import random
    
    await asyncio.sleep(0.1)  # 模拟工作
    
    if random.random() < success_rate:
        return f"任务{task_id}成功"
    else:
        raise Exception(f"任务{task_id}随机失败")

async def timeout_task(duration):
    """可能超时的任务"""
    await asyncio.sleep(duration)
    return f"任务完成,耗时{duration}秒"

# 使用示例
async def demo_error_handling():
    """演示错误处理"""
    manager = AsyncTaskManager()
    
    # 1. 安全执行任务
    print("=== 安全任务执行 ===")
    
    tasks = [
        manager.execute_task_safely(reliable_task, i, 0.7)
        for i in range(5)
    ]
    
    results = await asyncio.gather(*tasks)
    print(f"安全执行结果: {[r for r in results if r is not None]}")
    
    # 2. 带重试的任务执行
    print("\n=== 带重试的任务执行 ===")
    
    retry_tasks = [
        {"func": reliable_task, "args": [i], "kwargs": {"success_rate": 0.3}}
        for i in range(3)
    ]
    
    retry_results = await manager.execute_tasks_with_retry(retry_tasks, max_retries=2)
    print(f"重试执行结果: {retry_results}")
    
    # 3. 显示摘要
    summary = manager.get_summary()
    print(f"\n执行摘要: {summary}")

# 运行错误处理演示
# asyncio.run(demo_error_handling())
'''
    
    print("   错误处理最佳实践代码:")
    print(error_handling_code)
    
    # 2. 资源管理最佳实践
    print("\n2. 资源管理最佳实践:")
    
    resource_management_code = '''
# 资源管理最佳实践
import asyncio
from contextlib import asynccontextmanager
import aiohttp
import aiofiles

class AsyncResourceManager:
    """异步资源管理器"""
    
    def __init__(self):
        self.active_resources = set()
        self.resource_stats = {
            "created": 0,
            "destroyed": 0,
            "active": 0
        }
    
    @asynccontextmanager
    async def managed_http_session(self, **kwargs):
        """管理HTTP会话"""
        session = aiohttp.ClientSession(**kwargs)
        self.active_resources.add(session)
        self.resource_stats["created"] += 1
        self.resource_stats["active"] += 1
        
        try:
            print(f"创建HTTP会话,当前活跃资源: {self.resource_stats['active']}")
            yield session
        finally:
            await session.close()
            self.active_resources.discard(session)
            self.resource_stats["destroyed"] += 1
            self.resource_stats["active"] -= 1
            print(f"关闭HTTP会话,当前活跃资源: {self.resource_stats['active']}")
    
    @asynccontextmanager
    async def managed_file(self, filename, mode='r', **kwargs):
        """管理文件资源"""
        file_handle = await aiofiles.open(filename, mode, **kwargs)
        self.active_resources.add(file_handle)
        self.resource_stats["created"] += 1
        self.resource_stats["active"] += 1
        
        try:
            print(f"打开文件 {filename},当前活跃资源: {self.resource_stats['active']}")
            yield file_handle
        finally:
            await file_handle.close()
            self.active_resources.discard(file_handle)
            self.resource_stats["destroyed"] += 1
            self.resource_stats["active"] -= 1
            print(f"关闭文件 {filename},当前活跃资源: {self.resource_stats['active']}")
    
    async def cleanup_all_resources(self):
        """清理所有资源"""
        print(f"开始清理 {len(self.active_resources)} 个活跃资源...")
        
        cleanup_tasks = []
        for resource in list(self.active_resources):
            if hasattr(resource, 'close'):
                cleanup_tasks.append(resource.close())
        
        if cleanup_tasks:
            await asyncio.gather(*cleanup_tasks, return_exceptions=True)
        
        self.active_resources.clear()
        self.resource_stats["active"] = 0
        print("所有资源已清理")
    
    def get_resource_stats(self):
        """获取资源统计"""
        return self.resource_stats.copy()

# 使用示例
async def demo_resource_management():
    """演示资源管理"""
    manager = AsyncResourceManager()
    
    try:
        # 1. HTTP会话管理
        print("=== HTTP会话管理 ===")
        
        async with manager.managed_http_session() as session:
            async with session.get('https://httpbin.org/json') as response:
                data = await response.json()
                print(f"获取数据: {len(str(data))} 字符")
        
        # 2. 文件资源管理
        print("\n=== 文件资源管理 ===")
        
        # 创建测试文件
        async with manager.managed_file('test_resource.txt', 'w') as f:
            await f.write("测试资源管理\n" * 100)
        
        # 读取测试文件
        async with manager.managed_file('test_resource.txt', 'r') as f:
            content = await f.read()
            print(f"读取文件内容: {len(content)} 字符")
        
        # 3. 并发资源使用
        print("\n=== 并发资源使用 ===")
        
        async def concurrent_http_task(task_id):
            async with manager.managed_http_session() as session:
                async with session.get(f'https://httpbin.org/delay/{task_id % 3 + 1}') as response:
                    return await response.json()
        
        # 并发执行多个HTTP任务
        tasks = [concurrent_http_task(i) for i in range(3)]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        print(f"并发任务完成: {len([r for r in results if not isinstance(r, Exception)])} 个成功")
    
    finally:
        # 确保清理所有资源
        await manager.cleanup_all_resources()
        
        # 显示资源统计
        stats = manager.get_resource_stats()
        print(f"\n资源统计: {stats}")

# 运行资源管理演示
# asyncio.run(demo_resource_management())
'''
    
    print("   资源管理最佳实践代码:")
    print(resource_management_code)
    
    # 3. 实际应用演示
    print("\n3. 实际应用演示:")
    
    async def demo_best_practices():
        """演示最佳实践"""
        print("开始最佳实践演示...")
        
        # 1. 任务超时控制
        async def timeout_controlled_task(duration):
            try:
                result = await asyncio.wait_for(
                    asyncio.sleep(duration),
                    timeout=2.0
                )
                return f"任务完成: {duration}秒"
            except asyncio.TimeoutError:
                return f"任务超时: {duration}秒"
        
        # 测试不同持续时间的任务
        durations = [1, 3, 0.5, 4]
        tasks = [timeout_controlled_task(d) for d in durations]
        results = await asyncio.gather(*tasks)
        
        print("超时控制结果:")
        for duration, result in zip(durations, results):
            print(f"  {duration}秒任务: {result}")
        
        # 2. 优雅关闭演示
        print("\n优雅关闭演示:")
        
        shutdown_event = asyncio.Event()
        
        async def long_running_task():
            count = 0
            while not shutdown_event.is_set():
                count += 1
                print(f"长期任务运行中... {count}")
                try:
                    await asyncio.wait_for(asyncio.sleep(1), timeout=0.5)
                except asyncio.TimeoutError:
                    continue
            print("长期任务优雅退出")
            return count
        
        # 启动长期任务
        task = asyncio.create_task(long_running_task())
        
        # 运行一段时间后关闭
        await asyncio.sleep(2.5)
        shutdown_event.set()
        
        result = await task
        print(f"长期任务执行了 {result} 次循环")
        
        return {"timeout_tests": len(results), "long_task_cycles": result}
    
    # 运行演示
    asyncio.run(demo_best_practices())
    
    print("\n   ✓ 最佳实践演示完成")

# 运行最佳实践演示
async_best_practices_demo()

# 7. 学习建议和总结

# 7.1 学习路径

def async_learning_guide():
    """异步编程学习指南"""
    print("=== 异步编程学习指南 ===")
    
    learning_path = {
        "初级阶段": [
            "理解同步vs异步的概念",
            "掌握async/await语法",
            "学习asyncio.run()和基础事件循环",
            "练习简单的协程函数",
            "理解并发vs并行的区别"
        ],
        
        "中级阶段": [
            "掌握asyncio.gather()和asyncio.create_task()",
            "学习异步上下文管理器",
            "理解异步迭代器和生成器",
            "掌握信号量和锁等同步原语",
            "学习异步HTTP客户端(aiohttp)"
        ],
        
        "高级阶段": [
            "掌握异步服务器编程",
            "学习WebSocket编程",
            "理解事件循环的内部机制",
            "掌握性能优化技巧",
            "学习异步测试和调试"
        ],
        
        "专家阶段": [
            "设计复杂的异步系统架构",
            "掌握异步框架的源码",
            "优化异步应用的性能",
            "处理大规模并发场景",
            "贡献开源异步项目"
        ]
    }
    
    print("\n推荐学习路径:")
    for stage, topics in learning_path.items():
        print(f"\n{stage}:")
        for i, topic in enumerate(topics, 1):
            print(f"  {i}. {topic}")
    
    # 实践项目建议
    practice_projects = [
        "异步文件下载器",
        "网页爬虫系统",
        "实时聊天服务器",
        "API数据聚合器",
        "异步任务队列",
        "实时数据监控系统"
    ]
    
    print("\n推荐实践项目:")
    for i, project in enumerate(practice_projects, 1):
        print(f"  {i}. {project}")

# 运行学习指南
async_learning_guide()

# 7.2 常见陷阱和解决方案

def async_common_pitfalls():
    """异步编程常见陷阱"""
    print("=== 异步编程常见陷阱和解决方案 ===")
    
    pitfalls = {
        "忘记使用await": {
            "问题": "调用异步函数时忘记使用await关键字",
            "错误示例": "result = async_function()  # 返回协程对象,不是结果",
            "正确示例": "result = await async_function()  # 正确获取结果",
            "解决方案": "始终在调用异步函数时使用await,或使用asyncio.create_task()"
        },
        
        "在同步函数中调用异步函数": {
            "问题": "在普通函数中直接调用异步函数",
            "错误示例": "def sync_func(): return async_func()  # 错误",
            "正确示例": "def sync_func(): return asyncio.run(async_func())  # 正确",
            "解决方案": "使用asyncio.run()或确保调用者也是异步函数"
        },
        
        "阻塞事件循环": {
            "问题": "在异步函数中使用阻塞操作",
            "错误示例": "time.sleep(1)  # 阻塞整个事件循环",
            "正确示例": "await asyncio.sleep(1)  # 非阻塞等待",
            "解决方案": "使用异步版本的操作,或使用run_in_executor()"
        },
        
        "资源泄漏": {
            "问题": "未正确关闭异步资源",
            "错误示例": "session = aiohttp.ClientSession()  # 未关闭",
            "正确示例": "async with aiohttp.ClientSession() as session:  # 自动关闭",
            "解决方案": "使用异步上下文管理器或确保在finally块中关闭资源"
        },
        
        "过度并发": {
            "问题": "创建过多并发任务导致资源耗尽",
            "错误示例": "tasks = [fetch(url) for url in huge_url_list]  # 可能创建数千个任务",
            "正确示例": "使用Semaphore或分批处理限制并发数",
            "解决方案": "使用信号量、连接池或任务队列控制并发"
        }
    }
    
    print("\n常见陷阱详解:")
    for pitfall, details in pitfalls.items():
        print(f"\n{pitfall}:")
        print(f"  问题: {details['问题']}")
        print(f"  错误示例: {details['错误示例']}")
        print(f"  正确示例: {details['正确示例']}")
        print(f"  解决方案: {details['解决方案']}")
    
    # 调试技巧
    debugging_tips = [
        "使用asyncio.get_event_loop().set_debug(True)开启调试模式",
        "使用logging记录异步操作的执行流程",
        "使用asyncio.current_task()和asyncio.all_tasks()监控任务",
        "使用pytest-asyncio进行异步代码测试",
        "使用aiomonitor监控异步应用的运行状态"
    ]
    
    print("\n调试技巧:")
    for i, tip in enumerate(debugging_tips, 1):
        print(f"  {i}. {tip}")

# 运行常见陷阱指南
async_common_pitfalls()

# 7.3 本章总结

def chapter_summary():
    """第22天学习总结"""
    print("=== 第22天:异步IO - 学习总结 ===")
    
    summary_points = {
        "核心概念": [
            "异步编程基础:协程、事件循环、任务",
            "async/await语法的使用",
            "并发vs并行的区别",
            "异步IO的优势和适用场景"
        ],
        
        "重要模块": [
            "asyncio:Python异步编程的核心模块",
            "aiohttp:异步HTTP客户端和服务器",
            "aiofiles:异步文件操作",
            "contextlib.asynccontextmanager:异步上下文管理器"
        ],
        
        "关键技能": [
            "编写和调用异步函数",
            "使用asyncio.gather()进行并发执行",
            "实现异步上下文管理器和迭代器",
            "构建异步HTTP客户端和服务器",
            "处理异步编程中的错误和异常"
        ],
        
        "实际应用": [
            "异步网络编程:HTTP客户端、WebSocket",
            "异步文件处理:大文件读写、批量处理",
            "异步数据采集:网页爬虫、API聚合",
            "异步服务器:Web服务、实时通信"
        ],
        
        "最佳实践": [
            "合理控制并发数量",
            "正确处理异步资源的生命周期",
            "使用适当的错误处理和重试机制",
            "优化异步应用的性能和内存使用"
        ]
    }
    
    print("\n学习要点总结:")
    for category, points in summary_points.items():
        print(f"\n{category}:")
        for point in points:
            print(f"  • {point}")
    
    # 下一步学习建议
    next_steps = [
        "深入学习特定的异步框架(如FastAPI、Tornado)",
        "探索异步数据库操作(如asyncpg、motor)",
        "学习异步消息队列和任务调度",
        "研究微服务架构中的异步通信",
        "掌握异步应用的部署和监控"
    ]
    
    print("\n下一步学习建议:")
    for i, step in enumerate(next_steps, 1):
        print(f"  {i}. {step}")
    
    print("\n🎉 恭喜完成第22天的学习!")
    print("异步IO是现代Python开发的重要技能,")
    print("掌握它将大大提升你处理并发任务的能力。")
    print("继续练习和探索,你将成为异步编程的专家!")

# 运行章节总结
chapter_summary()

# 实践练习

  1. 基础练习

    • 编写一个异步函数,模拟网络请求的延迟
    • 使用asyncio.gather()并发执行多个异步任务
    • 实现一个简单的异步上下文管理器
  2. 进阶练习

    • 构建一个异步文件下载器
    • 实现一个简单的异步Web爬虫
    • 创建一个基于WebSocket的实时聊天系统
  3. 项目练习

    • 开发一个异步API数据聚合服务
    • 构建一个异步任务队列系统
    • 实现一个异步日志收集和分析工具

通过本章的学习,你已经掌握了Python异步IO编程的核心概念和实践技能。异步编程是现代高性能应用开发的关键技术,继续深入学习和实践将帮助你构建更加高效和可扩展的应用程序!