第13天-进程和线程

2023/6/15

# 第13天-进程和线程

# 学习目标

通过本章学习,你将掌握:

  • 理解进程和线程的概念与区别
  • 掌握Python多线程编程
  • 学会使用多进程处理CPU密集型任务
  • 理解GIL(全局解释器锁)的影响
  • 掌握线程同步和进程间通信
  • 学会使用线程池和进程池
  • 理解异步编程的基本概念

# 一、进程和线程基础概念

# 1.1 基本概念

import threading
import multiprocessing
import time
import os

def concept_demo():
    """进程和线程概念演示"""
    print("=== 进程和线程概念演示 ===")
    
    # 1. 进程信息
    print("\n1. 进程信息")
    print(f"当前进程ID: {os.getpid()}")
    print(f"父进程ID: {os.getppid()}")
    print(f"CPU核心数: {multiprocessing.cpu_count()}")
    
    # 2. 线程信息
    print("\n2. 线程信息")
    print(f"当前线程名称: {threading.current_thread().name}")
    print(f"当前线程ID: {threading.get_ident()}")
    print(f"活跃线程数: {threading.active_count()}")
    
    # 3. 简单的线程示例
    print("\n3. 简单线程示例")
    
    def worker_function(name, delay):
        """工作线程函数"""
        print(f"线程 {name} 开始工作 (线程ID: {threading.get_ident()})")
        time.sleep(delay)
        print(f"线程 {name} 工作完成")
    
    # 创建并启动线程
    thread1 = threading.Thread(target=worker_function, args=("Worker-1", 2))
    thread2 = threading.Thread(target=worker_function, args=("Worker-2", 1))
    
    print("启动线程...")
    thread1.start()
    thread2.start()
    
    # 等待线程完成
    thread1.join()
    thread2.join()
    
    print("所有线程完成")
    
    # 4. 进程vs线程的区别
    print("\n4. 进程vs线程的区别")
    print("""
    进程 (Process):
    - 独立的内存空间
    - 进程间通信需要特殊机制
    - 创建开销大
    - 适合CPU密集型任务
    - 真正的并行执行
    
    线程 (Thread):
    - 共享进程内存空间
    - 线程间通信简单
    - 创建开销小
    - 适合IO密集型任务
    - 受GIL限制(CPython)
    """)

# 运行演示
concept_demo()

# 1.2 GIL(全局解释器锁)

import threading
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def gil_demo():
    """GIL影响演示"""
    print("=== GIL影响演示 ===")
    
    def cpu_intensive_task(n):
        """CPU密集型任务"""
        result = 0
        for i in range(n):
            result += i * i
        return result
    
    def io_intensive_task(duration):
        """IO密集型任务"""
        time.sleep(duration)
        return f"IO任务完成,耗时{duration}秒"
    
    # 1. CPU密集型任务对比
    print("\n1. CPU密集型任务对比")
    
    # 单线程执行
    start_time = time.time()
    results = []
    for i in range(4):
        result = cpu_intensive_task(1000000)
        results.append(result)
    single_thread_time = time.time() - start_time
    print(f"单线程执行时间: {single_thread_time:.2f}秒")
    
    # 多线程执行(受GIL限制)
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(cpu_intensive_task, 1000000) for _ in range(4)]
        results = [future.result() for future in futures]
    multi_thread_time = time.time() - start_time
    print(f"多线程执行时间: {multi_thread_time:.2f}秒")
    
    # 多进程执行(不受GIL限制)
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(cpu_intensive_task, 1000000) for _ in range(4)]
        results = [future.result() for future in futures]
    multi_process_time = time.time() - start_time
    print(f"多进程执行时间: {multi_process_time:.2f}秒")
    
    print(f"\n性能对比:")
    print(f"多线程相对单线程: {single_thread_time/multi_thread_time:.2f}x")
    print(f"多进程相对单线程: {single_thread_time/multi_process_time:.2f}x")
    
    # 2. IO密集型任务对比
    print("\n2. IO密集型任务对比")
    
    # 单线程执行
    start_time = time.time()
    results = []
    for i in range(4):
        result = io_intensive_task(1)
        results.append(result)
    single_thread_io_time = time.time() - start_time
    print(f"单线程IO执行时间: {single_thread_io_time:.2f}秒")
    
    # 多线程执行(IO密集型受益于多线程)
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(io_intensive_task, 1) for _ in range(4)]
        results = [future.result() for future in futures]
    multi_thread_io_time = time.time() - start_time
    print(f"多线程IO执行时间: {multi_thread_io_time:.2f}秒")
    
    print(f"\nIO任务性能对比:")
    print(f"多线程相对单线程: {single_thread_io_time/multi_thread_io_time:.2f}x")
    
    # 3. GIL的影响总结
    print("\n3. GIL影响总结")
    print("""
    GIL(全局解释器锁)的影响:
    
    CPU密集型任务:
    - 多线程无法提升性能,甚至可能更慢
    - 多进程可以充分利用多核CPU
    - 推荐使用multiprocessing
    
    IO密集型任务:
    - 多线程可以显著提升性能
    - 线程在等待IO时会释放GIL
    - 推荐使用threading
    
    混合型任务:
    - 根据任务特点选择合适的并发方式
    - 可以考虑使用asyncio异步编程
    """)

# 运行演示
gil_demo()

# 二、多线程编程

# 2.1 线程基础操作

import threading
import time
import random
from queue import Queue

def threading_basics():
    """线程基础操作演示"""
    print("=== 线程基础操作演示 ===")
    
    # 1. 创建线程的不同方式
    print("\n1. 创建线程的不同方式")
    
    # 方式1:使用函数创建线程
    def simple_task(name, count):
        for i in range(count):
            print(f"线程 {name}: 执行第 {i+1} 次")
            time.sleep(0.5)
    
    thread1 = threading.Thread(target=simple_task, args=("Function-Thread", 3))
    
    # 方式2:继承Thread类
    class CustomThread(threading.Thread):
        def __init__(self, name, count):
            super().__init__()
            self.name = name
            self.count = count
            
        def run(self):
            for i in range(self.count):
                print(f"自定义线程 {self.name}: 执行第 {i+1} 次")
                time.sleep(0.5)
    
    thread2 = CustomThread("Custom-Thread", 3)
    
    # 启动线程
    print("启动线程...")
    thread1.start()
    thread2.start()
    
    # 等待线程完成
    thread1.join()
    thread2.join()
    print("所有线程完成")
    
    # 2. 线程属性和方法
    print("\n2. 线程属性和方法")
    
    def demo_thread_properties():
        current = threading.current_thread()
        print(f"线程名称: {current.name}")
        print(f"线程ID: {current.ident}")
        print(f"是否存活: {current.is_alive()}")
        print(f"是否为守护线程: {current.daemon}")
    
    # 创建普通线程
    normal_thread = threading.Thread(target=demo_thread_properties, name="NormalThread")
    
    # 创建守护线程
    daemon_thread = threading.Thread(target=demo_thread_properties, name="DaemonThread")
    daemon_thread.daemon = True
    
    print("普通线程属性:")
    normal_thread.start()
    normal_thread.join()
    
    print("\n守护线程属性:")
    daemon_thread.start()
    daemon_thread.join()
    
    # 3. 线程间数据传递
    print("\n3. 线程间数据传递")
    
    # 使用Queue进行线程间通信
    data_queue = Queue()
    result_queue = Queue()
    
    def producer(queue, count):
        """生产者线程"""
        for i in range(count):
            item = f"数据-{i}"
            queue.put(item)
            print(f"生产者: 生产了 {item}")
            time.sleep(0.1)
        queue.put(None)  # 结束标志
    
    def consumer(data_queue, result_queue):
        """消费者线程"""
        while True:
            item = data_queue.get()
            if item is None:
                break
            
            # 处理数据
            processed = f"处理后的-{item}"
            result_queue.put(processed)
            print(f"消费者: 处理了 {item} -> {processed}")
            time.sleep(0.2)
            
            data_queue.task_done()
    
    # 启动生产者和消费者线程
    producer_thread = threading.Thread(target=producer, args=(data_queue, 5))
    consumer_thread = threading.Thread(target=consumer, args=(data_queue, result_queue))
    
    producer_thread.start()
    consumer_thread.start()
    
    producer_thread.join()
    consumer_thread.join()
    
    # 获取结果
    print("\n处理结果:")
    while not result_queue.empty():
        result = result_queue.get()
        print(f"  {result}")

# 运行演示
threading_basics()

# 2.2 线程同步

import threading
import time
import random

def thread_synchronization():
    """线程同步演示"""
    print("=== 线程同步演示 ===")
    
    # 1. 不使用锁的问题演示
    print("\n1. 不使用锁的问题")
    
    shared_counter = 0
    
    def unsafe_increment(count):
        global shared_counter
        for _ in range(count):
            temp = shared_counter
            time.sleep(0.0001)  # 模拟处理时间
            shared_counter = temp + 1
    
    # 创建多个线程同时修改共享变量
    threads = []
    for i in range(5):
        thread = threading.Thread(target=unsafe_increment, args=(100,))
        threads.append(thread)
    
    start_time = time.time()
    for thread in threads:
        thread.start()
    
    for thread in threads:
        thread.join()
    
    print(f"不安全的计数结果: {shared_counter} (期望: 500)")
    print(f"执行时间: {time.time() - start_time:.2f}秒")
    
    # 2. 使用Lock锁
    print("\n2. 使用Lock锁")
    
    shared_counter = 0
    counter_lock = threading.Lock()
    
    def safe_increment(count, lock):
        global shared_counter
        for _ in range(count):
            with lock:  # 使用with语句自动获取和释放锁
                temp = shared_counter
                time.sleep(0.0001)
                shared_counter = temp + 1
    
    threads = []
    for i in range(5):
        thread = threading.Thread(target=safe_increment, args=(100, counter_lock))
        threads.append(thread)
    
    start_time = time.time()
    for thread in threads:
        thread.start()
    
    for thread in threads:
        thread.join()
    
    print(f"安全的计数结果: {shared_counter} (期望: 500)")
    print(f"执行时间: {time.time() - start_time:.2f}秒")
    
    # 3. 使用RLock可重入锁
    print("\n3. 使用RLock可重入锁")
    
    rlock = threading.RLock()
    
    def recursive_function(n, lock):
        with lock:
            print(f"递归调用 {n}")
            if n > 0:
                recursive_function(n-1, lock)  # 同一线程可以多次获取RLock
    
    thread = threading.Thread(target=recursive_function, args=(3, rlock))
    thread.start()
    thread.join()
    
    # 4. 使用Condition条件变量
    print("\n4. 使用Condition条件变量")
    
    condition = threading.Condition()
    items = []
    
    def consumer(name):
        with condition:
            while len(items) == 0:
                print(f"消费者 {name} 等待商品...")
                condition.wait()  # 等待条件满足
            
            item = items.pop(0)
            print(f"消费者 {name} 消费了 {item}")
    
    def producer():
        for i in range(3):
            with condition:
                item = f"商品-{i}"
                items.append(item)
                print(f"生产者生产了 {item}")
                condition.notify_all()  # 通知所有等待的线程
            time.sleep(1)
    
    # 启动消费者线程
    consumer_threads = []
    for i in range(2):
        thread = threading.Thread(target=consumer, args=(f"Consumer-{i}",))
        consumer_threads.append(thread)
        thread.start()
    
    # 启动生产者线程
    producer_thread = threading.Thread(target=producer)
    producer_thread.start()
    
    # 等待所有线程完成
    producer_thread.join()
    for thread in consumer_threads:
        thread.join()
    
    # 5. 使用Semaphore信号量
    print("\n5. 使用Semaphore信号量")
    
    # 限制同时访问资源的线程数量
    semaphore = threading.Semaphore(2)  # 最多2个线程同时访问
    
    def access_resource(name):
        with semaphore:
            print(f"{name} 获得资源访问权限")
            time.sleep(2)  # 模拟使用资源
            print(f"{name} 释放资源")
    
    threads = []
    for i in range(5):
        thread = threading.Thread(target=access_resource, args=(f"Thread-{i}",))
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()
    
    # 6. 使用Event事件
    print("\n6. 使用Event事件")
    
    event = threading.Event()
    
    def waiter(name):
        print(f"{name} 等待事件...")
        event.wait()  # 等待事件被设置
        print(f"{name} 收到事件,开始工作")
    
    def setter():
        time.sleep(2)
        print("设置事件")
        event.set()  # 设置事件
    
    # 启动等待线程
    waiter_threads = []
    for i in range(3):
        thread = threading.Thread(target=waiter, args=(f"Waiter-{i}",))
        waiter_threads.append(thread)
        thread.start()
    
    # 启动设置线程
    setter_thread = threading.Thread(target=setter)
    setter_thread.start()
    
    # 等待所有线程完成
    setter_thread.join()
    for thread in waiter_threads:
        thread.join()

# 运行演示
thread_synchronization()

# 2.3 线程池

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
import requests
from queue import Queue

def thread_pool_demo():
    """线程池演示"""
    print("=== 线程池演示 ===")
    
    # 1. 基本线程池使用
    print("\n1. 基本线程池使用")
    
    def worker_task(name, duration):
        print(f"任务 {name} 开始执行 (线程: {threading.current_thread().name})")
        time.sleep(duration)
        result = f"任务 {name} 完成,耗时 {duration} 秒"
        print(result)
        return result
    
    # 使用ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=3, thread_name_prefix="Worker") as executor:
        # 提交任务
        futures = []
        for i in range(5):
            future = executor.submit(worker_task, f"Task-{i}", random.uniform(1, 3))
            futures.append(future)
        
        # 获取结果
        for future in as_completed(futures):
            result = future.result()
            print(f"收到结果: {result}")
    
    # 2. 使用map方法
    print("\n2. 使用map方法")
    
    def square_number(n):
        time.sleep(0.1)  # 模拟计算时间
        return n * n
    
    numbers = list(range(1, 11))
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(square_number, numbers))
        print(f"平方结果: {results}")
    
    # 3. 网络请求示例
    print("\n3. 并发网络请求示例")
    
    def fetch_url(url):
        """获取URL内容"""
        try:
            # 注意:这里使用模拟的网络请求
            print(f"正在请求: {url}")
            time.sleep(random.uniform(0.5, 2))  # 模拟网络延迟
            return {
                'url': url,
                'status': 200,
                'content_length': random.randint(1000, 5000)
            }
        except Exception as e:
            return {
                'url': url,
                'error': str(e)
            }
    
    urls = [
        'http://example.com/page1',
        'http://example.com/page2',
        'http://example.com/page3',
        'http://example.com/page4',
        'http://example.com/page5'
    ]
    
    # 串行请求
    start_time = time.time()
    serial_results = []
    for url in urls:
        result = fetch_url(url)
        serial_results.append(result)
    serial_time = time.time() - start_time
    print(f"串行请求耗时: {serial_time:.2f}秒")
    
    # 并行请求
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=5) as executor:
        parallel_results = list(executor.map(fetch_url, urls))
    parallel_time = time.time() - start_time
    print(f"并行请求耗时: {parallel_time:.2f}秒")
    print(f"性能提升: {serial_time/parallel_time:.2f}x")
    
    # 4. 任务队列处理
    print("\n4. 任务队列处理")
    
    task_queue = Queue()
    result_queue = Queue()
    
    # 添加任务到队列
    for i in range(10):
        task_queue.put(f"任务-{i}")
    
    def process_task_queue(task_queue, result_queue):
        """处理任务队列"""
        while True:
            try:
                task = task_queue.get(timeout=1)
                # 处理任务
                time.sleep(0.5)
                result = f"处理完成: {task}"
                result_queue.put(result)
                print(f"线程 {threading.current_thread().name} 完成 {task}")
                task_queue.task_done()
            except:
                break  # 队列为空,退出
    
    with ThreadPoolExecutor(max_workers=3) as executor:
        # 启动工作线程
        futures = []
        for i in range(3):
            future = executor.submit(process_task_queue, task_queue, result_queue)
            futures.append(future)
        
        # 等待所有任务完成
        task_queue.join()
    
    # 收集结果
    results = []
    while not result_queue.empty():
        results.append(result_queue.get())
    
    print(f"\n处理完成,共 {len(results)} 个结果")
    
    # 5. 异常处理
    print("\n5. 线程池异常处理")
    
    def risky_task(n):
        if n % 3 == 0:
            raise ValueError(f"任务 {n} 出现错误")
        return f"任务 {n} 成功完成"
    
    with ThreadPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(risky_task, i) for i in range(6)]
        
        for i, future in enumerate(futures):
            try:
                result = future.result()
                print(f"任务 {i}: {result}")
            except Exception as e:
                print(f"任务 {i} 异常: {e}")

# 运行演示
thread_pool_demo()

# 三、多进程编程

# 3.1 进程基础操作

import multiprocessing
import os
import time
from multiprocessing import Process, Queue, Pipe, Value, Array, Lock

def process_basics():
    """进程基础操作演示"""
    print("=== 进程基础操作演示 ===")
    
    # 1. 创建进程的不同方式
    print("\n1. 创建进程的不同方式")
    
    def worker_process(name, duration):
        """工作进程函数"""
        print(f"进程 {name} 开始工作 (PID: {os.getpid()})")
        time.sleep(duration)
        print(f"进程 {name} 工作完成")
    
    # 方式1:使用函数创建进程
    process1 = Process(target=worker_process, args=("Process-1", 2))
    
    # 方式2:继承Process类
    class CustomProcess(Process):
        def __init__(self, name, duration):
            super().__init__()
            self.process_name = name
            self.duration = duration
        
        def run(self):
            print(f"自定义进程 {self.process_name} 开始工作 (PID: {os.getpid()})")
            time.sleep(self.duration)
            print(f"自定义进程 {self.process_name} 工作完成")
    
    process2 = CustomProcess("Custom-Process", 1)
    
    # 启动进程
    print(f"主进程 PID: {os.getpid()}")
    print("启动子进程...")
    
    process1.start()
    process2.start()
    
    # 等待进程完成
    process1.join()
    process2.join()
    
    print("所有子进程完成")
    
    # 2. 进程属性和方法
    print("\n2. 进程属性和方法")
    
    def demo_process_properties():
        current = multiprocessing.current_process()
        print(f"进程名称: {current.name}")
        print(f"进程PID: {current.pid}")
        print(f"是否存活: {current.is_alive()}")
        print(f"是否为守护进程: {current.daemon}")
    
    process = Process(target=demo_process_properties, name="DemoProcess")
    process.start()
    process.join()
    
    # 3. 守护进程
    print("\n3. 守护进程")
    
    def daemon_worker():
        while True:
            print(f"守护进程工作中... (PID: {os.getpid()})")
            time.sleep(1)
    
    daemon_process = Process(target=daemon_worker)
    daemon_process.daemon = True  # 设置为守护进程
    daemon_process.start()
    
    time.sleep(3)  # 主进程工作3秒
    print("主进程结束,守护进程也会自动结束")
    # 守护进程会随主进程结束而结束

# 运行演示(注意:某些功能在Windows上可能需要特殊处理)
if __name__ == '__main__':
    process_basics()

# 3.2 进程间通信

import multiprocessing
from multiprocessing import Process, Queue, Pipe, Value, Array, Lock, Manager
import time
import os

def inter_process_communication():
    """进程间通信演示"""
    print("=== 进程间通信演示 ===")
    
    # 1. 使用Queue队列
    print("\n1. 使用Queue队列")
    
    def producer(queue, name, count):
        """生产者进程"""
        for i in range(count):
            item = f"{name}-数据-{i}"
            queue.put(item)
            print(f"生产者 {name} (PID: {os.getpid()}) 生产: {item}")
            time.sleep(0.5)
        queue.put(None)  # 结束标志
    
    def consumer(queue, name):
        """消费者进程"""
        while True:
            item = queue.get()
            if item is None:
                break
            print(f"消费者 {name} (PID: {os.getpid()}) 消费: {item}")
            time.sleep(0.3)
    
    # 创建队列
    queue = Queue()
    
    # 创建进程
    producer_process = Process(target=producer, args=(queue, "Producer-1", 5))
    consumer_process = Process(target=consumer, args=(queue, "Consumer-1"))
    
    # 启动进程
    producer_process.start()
    consumer_process.start()
    
    # 等待生产者完成
    producer_process.join()
    consumer_process.join()
    
    # 2. 使用Pipe管道
    print("\n2. 使用Pipe管道")
    
    def sender(conn, messages):
        """发送者进程"""
        for msg in messages:
            conn.send(msg)
            print(f"发送者 (PID: {os.getpid()}) 发送: {msg}")
            time.sleep(0.5)
        conn.close()
    
    def receiver(conn):
        """接收者进程"""
        while True:
            try:
                msg = conn.recv()
                print(f"接收者 (PID: {os.getpid()}) 接收: {msg}")
            except EOFError:
                break
        conn.close()
    
    # 创建管道
    parent_conn, child_conn = Pipe()
    
    messages = ["消息1", "消息2", "消息3"]
    
    # 创建进程
    sender_process = Process(target=sender, args=(child_conn, messages))
    receiver_process = Process(target=receiver, args=(parent_conn,))
    
    # 启动进程
    sender_process.start()
    receiver_process.start()
    
    # 等待进程完成
    sender_process.join()
    receiver_process.join()
    
    # 3. 使用共享内存
    print("\n3. 使用共享内存")
    
    def worker_with_shared_data(shared_value, shared_array, lock, worker_id):
        """使用共享数据的工作进程"""
        for i in range(5):
            with lock:
                # 修改共享值
                shared_value.value += 1
                # 修改共享数组
                shared_array[worker_id] += 1
                print(f"工作进程 {worker_id} (PID: {os.getpid()}): "
                      f"共享值={shared_value.value}, 数组[{worker_id}]={shared_array[worker_id]}")
            time.sleep(0.1)
    
    # 创建共享数据
    shared_value = Value('i', 0)  # 共享整数
    shared_array = Array('i', [0, 0, 0])  # 共享数组
    lock = Lock()  # 锁
    
    # 创建多个工作进程
    processes = []
    for i in range(3):
        p = Process(target=worker_with_shared_data, 
                   args=(shared_value, shared_array, lock, i))
        processes.append(p)
        p.start()
    
    # 等待所有进程完成
    for p in processes:
        p.join()
    
    print(f"最终结果: 共享值={shared_value.value}, 共享数组={list(shared_array)}")
    
    # 4. 使用Manager
    print("\n4. 使用Manager")
    
    def worker_with_manager(shared_dict, shared_list, worker_id):
        """使用Manager的工作进程"""
        # 修改共享字典
        shared_dict[f'worker_{worker_id}'] = os.getpid()
        
        # 修改共享列表
        shared_list.append(f'来自进程{worker_id}的数据')
        
        print(f"工作进程 {worker_id} (PID: {os.getpid()}) 完成数据更新")
    
    # 创建Manager
    with Manager() as manager:
        # 创建共享数据结构
        shared_dict = manager.dict()
        shared_list = manager.list()
        
        # 创建进程
        processes = []
        for i in range(3):
            p = Process(target=worker_with_manager, 
                       args=(shared_dict, shared_list, i))
            processes.append(p)
            p.start()
        
        # 等待所有进程完成
        for p in processes:
            p.join()
        
        print(f"共享字典: {dict(shared_dict)}")
        print(f"共享列表: {list(shared_list)}")

# 运行演示
if __name__ == '__main__':
    inter_process_communication()

# 3.3 进程池

from multiprocessing import Pool, cpu_count
import time
import os

def process_pool_demo():
    """进程池演示"""
    print("=== 进程池演示 ===")
    
    # 1. 基本进程池使用
    print("\n1. 基本进程池使用")
    
    def cpu_intensive_task(n):
        """CPU密集型任务"""
        print(f"进程 {os.getpid()} 开始处理任务 {n}")
        result = sum(i * i for i in range(n))
        print(f"进程 {os.getpid()} 完成任务 {n},结果: {result}")
        return result
    
    # 使用进程池
    with Pool(processes=cpu_count()) as pool:
        tasks = [100000, 200000, 300000, 400000]
        
        # 方式1:使用map
        print("使用map方法:")
        start_time = time.time()
        results = pool.map(cpu_intensive_task, tasks)
        end_time = time.time()
        
        print(f"结果: {results}")
        print(f"执行时间: {end_time - start_time:.2f}秒")
    
    # 2. 异步执行
    print("\n2. 异步执行")
    
    def long_running_task(name, duration):
        print(f"任务 {name} 开始 (PID: {os.getpid()})")
        time.sleep(duration)
        return f"任务 {name} 完成,耗时 {duration} 秒"
    
    with Pool(processes=3) as pool:
        # 异步提交任务
        async_results = []
        tasks = [("Task-A", 2), ("Task-B", 1), ("Task-C", 3), ("Task-D", 1)]
        
        for name, duration in tasks:
            async_result = pool.apply_async(long_running_task, (name, duration))
            async_results.append((name, async_result))
        
        # 获取结果
        for name, async_result in async_results:
            try:
                result = async_result.get(timeout=5)  # 设置超时
                print(f"收到结果: {result}")
            except Exception as e:
                print(f"任务 {name} 出现异常: {e}")
    
    # 3. 使用回调函数
    print("\n3. 使用回调函数")
    
    def callback_function(result):
        print(f"回调函数收到结果: {result}")
    
    def error_callback(error):
        print(f"回调函数收到错误: {error}")
    
    def task_with_callback(x):
        if x == 3:
            raise ValueError("任务3出现错误")
        return x * x
    
    with Pool(processes=2) as pool:
        for i in range(1, 5):
            pool.apply_async(
                task_with_callback, 
                (i,), 
                callback=callback_function,
                error_callback=error_callback
            )
        
        pool.close()  # 不再接受新任务
        pool.join()   # 等待所有任务完成
    
    # 4. 进程池vs线程池性能对比
    print("\n4. 进程池vs线程池性能对比")
    
    def cpu_bound_task(n):
        """CPU密集型任务"""
        return sum(i * i for i in range(n))
    
    tasks = [500000] * 4
    
    # 串行执行
    start_time = time.time()
    serial_results = [cpu_bound_task(task) for task in tasks]
    serial_time = time.time() - start_time
    print(f"串行执行时间: {serial_time:.2f}秒")
    
    # 进程池执行
    start_time = time.time()
    with Pool(processes=4) as pool:
        process_results = pool.map(cpu_bound_task, tasks)
    process_time = time.time() - start_time
    print(f"进程池执行时间: {process_time:.2f}秒")
    
    # 线程池执行(用于对比)
    from concurrent.futures import ThreadPoolExecutor
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=4) as executor:
        thread_results = list(executor.map(cpu_bound_task, tasks))
    thread_time = time.time() - start_time
    print(f"线程池执行时间: {thread_time:.2f}秒")
    
    print(f"\n性能对比:")
    print(f"进程池相对串行: {serial_time/process_time:.2f}x")
    print(f"线程池相对串行: {serial_time/thread_time:.2f}x")
    print(f"进程池相对线程池: {thread_time/process_time:.2f}x")

# 运行演示
if __name__ == '__main__':
    process_pool_demo()

# 四、异步编程基础

# 4.1 异步编程概念

import asyncio
import time
import aiohttp
import aiofiles

async def async_programming_basics():
    """异步编程基础演示"""
    print("=== 异步编程基础演示 ===")
    
    # 1. 基本异步函数
    print("\n1. 基本异步函数")
    
    async def simple_async_task(name, delay):
        print(f"任务 {name} 开始")
        await asyncio.sleep(delay)  # 异步等待
        print(f"任务 {name} 完成")
        return f"任务 {name} 结果"
    
    # 运行单个异步任务
    result = await simple_async_task("Task-1", 1)
    print(f"结果: {result}")
    
    # 2. 并发执行多个异步任务
    print("\n2. 并发执行多个异步任务")
    
    async def concurrent_tasks():
        # 创建多个任务
        tasks = [
            simple_async_task("Concurrent-A", 2),
            simple_async_task("Concurrent-B", 1),
            simple_async_task("Concurrent-C", 3)
        ]
        
        # 并发执行
        start_time = time.time()
        results = await asyncio.gather(*tasks)
        end_time = time.time()
        
        print(f"并发执行结果: {results}")
        print(f"总耗时: {end_time - start_time:.2f}秒")
    
    await concurrent_tasks()
    
    # 3. 异步生成器
    print("\n3. 异步生成器")
    
    async def async_generator(count):
        for i in range(count):
            await asyncio.sleep(0.5)
            yield f"数据-{i}"
    
    async def consume_async_generator():
        async for item in async_generator(5):
            print(f"接收到: {item}")
    
    await consume_async_generator()
    
    # 4. 异步上下文管理器
    print("\n4. 异步上下文管理器")
    
    class AsyncContextManager:
        async def __aenter__(self):
            print("进入异步上下文")
            await asyncio.sleep(0.1)
            return self
        
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            print("退出异步上下文")
            await asyncio.sleep(0.1)
    
    async with AsyncContextManager() as acm:
        print("在异步上下文中工作")
        await asyncio.sleep(0.5)
    
    # 5. 异步迭代器
    print("\n5. 异步迭代器")
    
    class AsyncIterator:
        def __init__(self, count):
            self.count = count
            self.current = 0
        
        def __aiter__(self):
            return self
        
        async def __anext__(self):
            if self.current >= self.count:
                raise StopAsyncIteration
            
            await asyncio.sleep(0.2)
            self.current += 1
            return f"异步项-{self.current}"
    
    async for item in AsyncIterator(3):
        print(f"迭代得到: {item}")

# 运行异步演示
async def run_async_demo():
    await async_programming_basics()

# 如果直接运行此脚本
if __name__ == '__main__':
    asyncio.run(run_async_demo())

# 4.2 异步IO操作

import asyncio
import aiofiles
import aiohttp
import time
from pathlib import Path

async def async_io_demo():
    """异步IO操作演示"""
    print("=== 异步IO操作演示 ===")
    
    # 1. 异步文件操作
    print("\n1. 异步文件操作")
    
    async def async_file_operations():
        # 异步写入文件
        async with aiofiles.open('async_test.txt', 'w', encoding='utf-8') as f:
            await f.write('这是异步写入的内容\n')
            await f.write('第二行内容\n')
        
        # 异步读取文件
        async with aiofiles.open('async_test.txt', 'r', encoding='utf-8') as f:
            content = await f.read()
            print(f"异步读取的内容:\n{content}")
        
        # 清理文件
        Path('async_test.txt').unlink()
    
    await async_file_operations()
    
    # 2. 模拟异步网络请求
    print("\n2. 模拟异步网络请求")
    
    async def fetch_data(url, delay):
        """模拟异步网络请求"""
        print(f"开始请求: {url}")
        await asyncio.sleep(delay)  # 模拟网络延迟
        return {
            'url': url,
            'status': 200,
            'data': f'来自 {url} 的数据'
        }
    
    async def fetch_multiple_urls():
        urls = [
            ('http://api1.example.com', 1),
            ('http://api2.example.com', 2),
            ('http://api3.example.com', 1.5),
            ('http://api4.example.com', 0.5)
        ]
        
        # 串行请求
        start_time = time.time()
        serial_results = []
        for url, delay in urls:
            result = await fetch_data(url, delay)
            serial_results.append(result)
        serial_time = time.time() - start_time
        print(f"串行请求耗时: {serial_time:.2f}秒")
        
        # 并发请求
        start_time = time.time()
        tasks = [fetch_data(url, delay) for url, delay in urls]
        concurrent_results = await asyncio.gather(*tasks)
        concurrent_time = time.time() - start_time
        print(f"并发请求耗时: {concurrent_time:.2f}秒")
        print(f"性能提升: {serial_time/concurrent_time:.2f}x")
        
        return concurrent_results
    
    results = await fetch_multiple_urls()
    print(f"请求结果数量: {len(results)}")
    
    # 3. 异步任务控制
    print("\n3. 异步任务控制")
    
    async def task_control_demo():
        # 任务超时控制
        async def slow_task():
            await asyncio.sleep(3)
            return "慢任务完成"
        
        try:
            result = await asyncio.wait_for(slow_task(), timeout=2)
            print(f"任务结果: {result}")
        except asyncio.TimeoutError:
            print("任务超时")
        
        # 任务取消
        async def cancellable_task():
            try:
                for i in range(10):
                    print(f"可取消任务执行中: {i}")
                    await asyncio.sleep(0.5)
                return "任务完成"
            except asyncio.CancelledError:
                print("任务被取消")
                raise
        
        task = asyncio.create_task(cancellable_task())
        await asyncio.sleep(2)  # 让任务运行一段时间
        task.cancel()  # 取消任务
        
        try:
            await task
        except asyncio.CancelledError:
            print("确认任务已取消")
    
    await task_control_demo()
    
    # 4. 异步队列
    print("\n4. 异步队列")
    
    async def async_queue_demo():
        queue = asyncio.Queue(maxsize=3)
        
        async def producer(queue, name, count):
            for i in range(count):
                item = f"{name}-项目-{i}"
                await queue.put(item)
                print(f"生产者 {name} 生产: {item}")
                await asyncio.sleep(0.5)
        
        async def consumer(queue, name):
            while True:
                try:
                    item = await asyncio.wait_for(queue.get(), timeout=3)
                    print(f"消费者 {name} 消费: {item}")
                    queue.task_done()
                    await asyncio.sleep(0.3)
                except asyncio.TimeoutError:
                    print(f"消费者 {name} 超时退出")
                    break
        
        # 创建生产者和消费者任务
        producer_task = asyncio.create_task(producer(queue, "Producer-1", 5))
        consumer_task1 = asyncio.create_task(consumer(queue, "Consumer-1"))
        consumer_task2 = asyncio.create_task(consumer(queue, "Consumer-2"))
        
        # 等待生产者完成
        await producer_task
        
        # 等待队列清空
        await queue.join()
        
        # 取消消费者任务
        consumer_task1.cancel()
        consumer_task2.cancel()
        
        try:
            await asyncio.gather(consumer_task1, consumer_task2)
        except asyncio.CancelledError:
            pass
    
    await async_queue_demo()

# 运行异步IO演示
if __name__ == '__main__':
    asyncio.run(async_io_demo())

# 五、实战练习:并发下载器

# 5.1 构建一个完整的并发下载系统

import asyncio
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
import os
from pathlib import Path
import hashlib
from dataclasses import dataclass
from typing import List, Optional
from queue import Queue
import json

@dataclass
class DownloadTask:
    """下载任务数据类"""
    url: str
    filename: str
    size: Optional[int] = None
    checksum: Optional[str] = None

@dataclass
class DownloadResult:
    """下载结果数据类"""
    task: DownloadTask
    success: bool
    duration: float
    error: Optional[str] = None
    actual_size: Optional[int] = None

class ConcurrentDownloader:
    """并发下载器"""
    
    def __init__(self, download_dir: str = "downloads"):
        self.download_dir = Path(download_dir)
        self.download_dir.mkdir(exist_ok=True)
        self.results: List[DownloadResult] = []
        self.progress_queue = Queue()
    
    def simulate_download(self, task: DownloadTask) -> DownloadResult:
        """模拟下载文件"""
        start_time = time.time()
        
        try:
            # 模拟下载过程
            print(f"开始下载: {task.url} -> {task.filename}")
            
            # 模拟网络延迟和下载时间
            download_time = len(task.filename) * 0.1 + 1  # 基于文件名长度模拟下载时间
            time.sleep(download_time)
            
            # 创建模拟文件
            file_path = self.download_dir / task.filename
            content = f"模拟下载的文件内容: {task.url}\n" * 100
            
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write(content)
            
            actual_size = len(content.encode('utf-8'))
            duration = time.time() - start_time
            
            # 验证文件大小(如果提供了预期大小)
            if task.size and abs(actual_size - task.size) > 100:
                raise ValueError(f"文件大小不匹配: 期望 {task.size}, 实际 {actual_size}")
            
            print(f"下载完成: {task.filename} ({actual_size} 字节, {duration:.2f}秒)")
            
            return DownloadResult(
                task=task,
                success=True,
                duration=duration,
                actual_size=actual_size
            )
            
        except Exception as e:
            duration = time.time() - start_time
            print(f"下载失败: {task.filename} - {str(e)}")
            
            return DownloadResult(
                task=task,
                success=False,
                duration=duration,
                error=str(e)
            )
    
    def download_with_threads(self, tasks: List[DownloadTask], max_workers: int = 4) -> List[DownloadResult]:
        """使用线程池下载"""
        print(f"\n=== 使用线程池下载 (工作线程数: {max_workers}) ===")
        
        start_time = time.time()
        results = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 提交所有任务
            future_to_task = {executor.submit(self.simulate_download, task): task for task in tasks}
            
            # 收集结果
            for future in as_completed(future_to_task):
                result = future.result()
                results.append(result)
                
                # 显示进度
                completed = len(results)
                total = len(tasks)
                print(f"进度: {completed}/{total} ({completed/total*100:.1f}%)")
        
        total_time = time.time() - start_time
        print(f"线程池下载完成,总耗时: {total_time:.2f}秒")
        
        return results
    
    def download_with_processes(self, tasks: List[DownloadTask], max_workers: int = None) -> List[DownloadResult]:
        """使用进程池下载"""
        if max_workers is None:
            max_workers = multiprocessing.cpu_count()
        
        print(f"\n=== 使用进程池下载 (工作进程数: {max_workers}) ===")
        
        start_time = time.time()
        results = []
        
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            # 提交所有任务
            future_to_task = {executor.submit(self.simulate_download, task): task for task in tasks}
            
            # 收集结果
            for future in as_completed(future_to_task):
                result = future.result()
                results.append(result)
                
                # 显示进度
                completed = len(results)
                total = len(tasks)
                print(f"进度: {completed}/{total} ({completed/total*100:.1f}%)")
        
        total_time = time.time() - start_time
        print(f"进程池下载完成,总耗时: {total_time:.2f}秒")
        
        return results
    
    async def download_with_asyncio(self, tasks: List[DownloadTask], max_concurrent: int = 10) -> List[DownloadResult]:
        """使用异步IO下载"""
        print(f"\n=== 使用异步IO下载 (最大并发数: {max_concurrent}) ===")
        
        async def async_download(task: DownloadTask) -> DownloadResult:
            """异步下载单个文件"""
            start_time = time.time()
            
            try:
                print(f"开始异步下载: {task.url} -> {task.filename}")
                
                # 模拟异步网络请求
                download_time = len(task.filename) * 0.1 + 1
                await asyncio.sleep(download_time)
                
                # 创建模拟文件
                file_path = self.download_dir / task.filename
                content = f"异步下载的文件内容: {task.url}\n" * 100
                
                # 异步写入文件(这里简化为同步写入)
                with open(file_path, 'w', encoding='utf-8') as f:
                    f.write(content)
                
                actual_size = len(content.encode('utf-8'))
                duration = time.time() - start_time
                
                print(f"异步下载完成: {task.filename} ({actual_size} 字节, {duration:.2f}秒)")
                
                return DownloadResult(
                    task=task,
                    success=True,
                    duration=duration,
                    actual_size=actual_size
                )
                
            except Exception as e:
                duration = time.time() - start_time
                print(f"异步下载失败: {task.filename} - {str(e)}")
                
                return DownloadResult(
                    task=task,
                    success=False,
                    duration=duration,
                    error=str(e)
                )
        
        start_time = time.time()
        
        # 使用信号量限制并发数
        semaphore = asyncio.Semaphore(max_concurrent)
        
        async def download_with_semaphore(task):
            async with semaphore:
                return await async_download(task)
        
        # 创建所有任务
        download_tasks = [download_with_semaphore(task) for task in tasks]
        
        # 并发执行所有下载任务
        results = await asyncio.gather(*download_tasks)
        
        total_time = time.time() - start_time
        print(f"异步下载完成,总耗时: {total_time:.2f}秒")
        
        return results
    
    def generate_report(self, results: List[DownloadResult]) -> dict:
        """生成下载报告"""
        successful = [r for r in results if r.success]
        failed = [r for r in results if not r.success]
        
        total_size = sum(r.actual_size or 0 for r in successful)
        total_time = sum(r.duration for r in results)
        avg_speed = total_size / total_time if total_time > 0 else 0
        
        report = {
            'total_tasks': len(results),
            'successful': len(successful),
            'failed': len(failed),
            'success_rate': len(successful) / len(results) * 100 if results else 0,
            'total_size_bytes': total_size,
            'total_time_seconds': total_time,
            'average_speed_bps': avg_speed,
            'failed_tasks': [{'filename': r.task.filename, 'error': r.error} for r in failed]
        }
        
        return report
    
    def save_report(self, report: dict, filename: str = "download_report.json"):
        """保存下载报告"""
        report_path = self.download_dir / filename
        with open(report_path, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
        print(f"下载报告已保存到: {report_path}")

# 演示函数
def demo_concurrent_downloader():
    """演示并发下载器"""
    print("=== 并发下载器演示 ===")
    
    # 创建下载任务
    tasks = [
        DownloadTask("http://example.com/file1.txt", "file1.txt", 5000),
        DownloadTask("http://example.com/file2.pdf", "file2.pdf", 8000),
        DownloadTask("http://example.com/file3.jpg", "file3.jpg", 3000),
        DownloadTask("http://example.com/file4.zip", "file4.zip", 12000),
        DownloadTask("http://example.com/file5.doc", "file5.doc", 6000),
        DownloadTask("http://example.com/file6.mp3", "file6.mp3", 9000),
    ]
    
    downloader = ConcurrentDownloader()
    
    # 1. 线程池下载
    thread_results = downloader.download_with_threads(tasks, max_workers=3)
    thread_report = downloader.generate_report(thread_results)
    print(f"\n线程池下载报告:")
    print(f"成功: {thread_report['successful']}/{thread_report['total_tasks']}")
    print(f"成功率: {thread_report['success_rate']:.1f}%")
    print(f"总耗时: {thread_report['total_time_seconds']:.2f}秒")
    
    # 2. 进程池下载
    process_results = downloader.download_with_processes(tasks, max_workers=2)
    process_report = downloader.generate_report(process_results)
    print(f"\n进程池下载报告:")
    print(f"成功: {process_report['successful']}/{process_report['total_tasks']}")
    print(f"成功率: {process_report['success_rate']:.1f}%")
    print(f"总耗时: {process_report['total_time_seconds']:.2f}秒")
    
    # 3. 异步下载
    async def async_download_demo():
        async_results = await downloader.download_with_asyncio(tasks, max_concurrent=4)
        async_report = downloader.generate_report(async_results)
        print(f"\n异步下载报告:")
        print(f"成功: {async_report['successful']}/{async_report['total_tasks']}")
        print(f"成功率: {async_report['success_rate']:.1f}%")
        print(f"总耗时: {async_report['total_time_seconds']:.2f}秒")
        
        # 保存报告
        downloader.save_report(async_report, "async_download_report.json")
        
        return async_report
    
    # 运行异步下载
    async_report = asyncio.run(async_download_demo())
    
    # 4. 性能对比
    print(f"\n=== 性能对比 ===")
    print(f"线程池总耗时: {thread_report['total_time_seconds']:.2f}秒")
    print(f"进程池总耗时: {process_report['total_time_seconds']:.2f}秒")
    print(f"异步IO总耗时: {async_report['total_time_seconds']:.2f}秒")
    
    # 清理下载的文件
    import shutil
    if downloader.download_dir.exists():
        shutil.rmtree(downloader.download_dir)
        print(f"\n已清理下载目录: {downloader.download_dir}")

# 运行演示
if __name__ == '__main__':
    demo_concurrent_downloader()

# 六、总结与最佳实践

# 6.1 选择合适的并发方式

def choose_concurrency_method():
    """选择合适的并发方式指南"""
    print("=== 并发方式选择指南 ===")
    
    guidelines = {
        "CPU密集型任务": {
            "推荐方式": "多进程 (multiprocessing)",
            "原因": "不受GIL限制,可以充分利用多核CPU",
            "适用场景": [
                "数学计算",
                "图像处理",
                "数据分析",
                "加密解密",
                "科学计算"
            ],
            "示例代码": """
from multiprocessing import Pool

def cpu_task(n):
    return sum(i*i for i in range(n))

with Pool() as pool:
    results = pool.map(cpu_task, [100000, 200000, 300000])
            """
        },
        
        "IO密集型任务": {
            "推荐方式": "多线程 (threading) 或 异步IO (asyncio)",
            "原因": "IO等待时会释放GIL,线程切换开销小",
            "适用场景": [
                "文件读写",
                "网络请求",
                "数据库操作",
                "API调用",
                "爬虫"
            ],
            "示例代码": """
# 线程方式
from concurrent.futures import ThreadPoolExecutor

def io_task(url):
    # 模拟网络请求
    time.sleep(1)
    return f"数据来自 {url}"

with ThreadPoolExecutor(max_workers=5) as executor:
    results = executor.map(io_task, urls)

# 异步方式
import asyncio

async def async_io_task(url):
    await asyncio.sleep(1)  # 模拟异步IO
    return f"数据来自 {url}"

async def main():
    tasks = [async_io_task(url) for url in urls]
    results = await asyncio.gather(*tasks)
            """
        },
        
        "混合型任务": {
            "推荐方式": "根据主要瓶颈选择,或使用混合方案",
            "原因": "需要根据具体情况分析",
            "适用场景": [
                "Web服务器",
                "数据处理管道",
                "实时系统",
                "游戏服务器"
            ],
            "示例代码": """
# 混合方案:异步IO + 进程池
import asyncio
from concurrent.futures import ProcessPoolExecutor

async def hybrid_task():
    # IO密集型部分用异步
    data = await fetch_data_async()
    
    # CPU密集型部分用进程池
    with ProcessPoolExecutor() as executor:
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(executor, cpu_intensive_func, data)
    
    return result
            """
        }
    }
    
    for task_type, info in guidelines.items():
        print(f"\n{task_type}:")
        print(f"  推荐方式: {info['推荐方式']}")
        print(f"  原因: {info['原因']}")
        print(f"  适用场景: {', '.join(info['适用场景'])}")
        print(f"  示例代码:{info['示例代码']}")

# 运行指南
choose_concurrency_method()

# 6.2 最佳实践

def best_practices():
    """并发编程最佳实践"""
    print("=== 并发编程最佳实践 ===")
    
    practices = {
        "1. 避免共享状态": {
            "说明": "尽量避免多个线程/进程共享可变状态",
            "建议": [
                "使用不可变数据结构",
                "通过消息传递而非共享内存通信",
                "使用函数式编程思想"
            ]
        },
        
        "2. 正确使用锁": {
            "说明": "必须共享状态时,正确使用同步原语",
            "建议": [
                "使用with语句自动管理锁",
                "避免死锁(按固定顺序获取锁)",
                "尽量减少锁的持有时间",
                "考虑使用无锁数据结构"
            ]
        },
        
        "3. 异常处理": {
            "说明": "并发程序中的异常处理更加重要",
            "建议": [
                "为每个任务添加异常处理",
                "使用日志记录异常信息",
                "实现优雅的错误恢复机制",
                "避免异常导致整个程序崩溃"
            ]
        },
        
        "4. 资源管理": {
            "说明": "正确管理线程、进程和其他资源",
            "建议": [
                "使用上下文管理器",
                "及时释放不需要的资源",
                "设置合理的超时时间",
                "监控资源使用情况"
            ]
        },
        
        "5. 性能调优": {
            "说明": "根据实际情况调优并发参数",
            "建议": [
                "测试不同的工作线程/进程数量",
                "监控CPU和内存使用率",
                "使用性能分析工具",
                "避免过度并发"
            ]
        }
    }
    
    for practice, details in practices.items():
        print(f"\n{practice}")
        print(f"  {details['说明']}")
        for suggestion in details['建议']:
            print(f"  • {suggestion}")

# 运行最佳实践
best_practices()

# 6.3 常见陷阱和解决方案

def common_pitfalls():
    """常见陷阱和解决方案"""
    print("=== 常见陷阱和解决方案 ===")
    
    pitfalls = {
        "竞态条件 (Race Condition)": {
            "问题": "多个线程同时访问共享资源导致不可预期的结果",
            "解决方案": "使用锁、原子操作或无锁数据结构",
            "示例": """
# 错误示例
counter = 0
def increment():
    global counter
    counter += 1  # 不是原子操作

# 正确示例
import threading
counter = 0
lock = threading.Lock()
def safe_increment():
    global counter
    with lock:
        counter += 1
            """
        },
        
        "死锁 (Deadlock)": {
            "问题": "多个线程相互等待对方释放资源",
            "解决方案": "按固定顺序获取锁,使用超时,避免嵌套锁",
            "示例": """
# 可能死锁的代码
lock1 = threading.Lock()
lock2 = threading.Lock()

def task1():
    with lock1:
        with lock2:  # 可能死锁
            pass

def task2():
    with lock2:
        with lock1:  # 可能死锁
            pass

# 避免死锁
def safe_task1():
    with lock1:
        with lock2:  # 固定顺序
            pass

def safe_task2():
    with lock1:  # 相同顺序
        with lock2:
            pass
            """
        },
        
        "GIL限制": {
            "问题": "Python的GIL限制了多线程的CPU密集型任务性能",
            "解决方案": "对CPU密集型任务使用多进程",
            "示例": """
# CPU密集型任务应该使用多进程
from multiprocessing import Pool

def cpu_task(n):
    return sum(i*i for i in range(n))

# 使用进程池而不是线程池
with Pool() as pool:
    results = pool.map(cpu_task, [100000, 200000])
            """
        },
        
        "内存泄漏": {
            "问题": "线程或进程没有正确清理导致内存泄漏",
            "解决方案": "使用上下文管理器,及时join线程/进程",
            "示例": """
# 错误示例
thread = threading.Thread(target=worker)
thread.start()
# 忘记join,可能导致资源泄漏

# 正确示例
with ThreadPoolExecutor() as executor:
    future = executor.submit(worker)
    result = future.result()  # 自动清理
            """
        }
    }
    
    for pitfall, details in pitfalls.items():
        print(f"\n{pitfall}:")
        print(f"  问题: {details['问题']}")
        print(f"  解决方案: {details['解决方案']}")
        print(f"  示例:{details['示例']}")

# 运行陷阱说明
common_pitfalls()

# 七、下一步学习

# 7.1 进阶主题

  • 高级异步编程:深入学习asyncio、aiohttp、aiofiles等异步库
  • 分布式计算:学习Celery、Dask等分布式计算框架
  • 并发数据结构:学习无锁数据结构和并发安全的容器
  • 性能分析:使用cProfile、line_profiler等工具分析并发程序性能
  • 网络编程:学习socket编程、网络协议和服务器架构

# 7.2 实践项目建议

  1. Web爬虫系统:结合多线程和异步IO实现高效爬虫
  2. 文件处理工具:批量处理大量文件的并发程序
  3. API服务器:使用FastAPI或aiohttp构建高并发API服务
  4. 数据处理管道:实现ETL流程的并发数据处理系统
  5. 实时监控系统:监控系统资源和应用状态的并发程序

# 7.3 推荐资源

  • 官方文档:Python threading、multiprocessing、asyncio模块文档
  • 书籍推荐:《Python并发编程》、《流畅的Python》
  • 在线资源:Real Python的并发编程教程
  • 实践平台:GitHub上的开源并发项目

通过本章的学习,你应该已经掌握了Python中进程和线程的基本概念、使用方法和最佳实践。记住,并发编程需要大量的实践来掌握,建议多写代码、多做实验,逐步提高并发编程的技能。