第13天-进程和线程
哪吒 2023/6/15
# 第13天-进程和线程
# 学习目标
通过本章学习,你将掌握:
- 理解进程和线程的概念与区别
- 掌握Python多线程编程
- 学会使用多进程处理CPU密集型任务
- 理解GIL(全局解释器锁)的影响
- 掌握线程同步和进程间通信
- 学会使用线程池和进程池
- 理解异步编程的基本概念
# 一、进程和线程基础概念
# 1.1 基本概念
import threading
import multiprocessing
import time
import os
def concept_demo():
"""进程和线程概念演示"""
print("=== 进程和线程概念演示 ===")
# 1. 进程信息
print("\n1. 进程信息")
print(f"当前进程ID: {os.getpid()}")
print(f"父进程ID: {os.getppid()}")
print(f"CPU核心数: {multiprocessing.cpu_count()}")
# 2. 线程信息
print("\n2. 线程信息")
print(f"当前线程名称: {threading.current_thread().name}")
print(f"当前线程ID: {threading.get_ident()}")
print(f"活跃线程数: {threading.active_count()}")
# 3. 简单的线程示例
print("\n3. 简单线程示例")
def worker_function(name, delay):
"""工作线程函数"""
print(f"线程 {name} 开始工作 (线程ID: {threading.get_ident()})")
time.sleep(delay)
print(f"线程 {name} 工作完成")
# 创建并启动线程
thread1 = threading.Thread(target=worker_function, args=("Worker-1", 2))
thread2 = threading.Thread(target=worker_function, args=("Worker-2", 1))
print("启动线程...")
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print("所有线程完成")
# 4. 进程vs线程的区别
print("\n4. 进程vs线程的区别")
print("""
进程 (Process):
- 独立的内存空间
- 进程间通信需要特殊机制
- 创建开销大
- 适合CPU密集型任务
- 真正的并行执行
线程 (Thread):
- 共享进程内存空间
- 线程间通信简单
- 创建开销小
- 适合IO密集型任务
- 受GIL限制(CPython)
""")
# 运行演示
concept_demo()
# 1.2 GIL(全局解释器锁)
import threading
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def gil_demo():
"""GIL影响演示"""
print("=== GIL影响演示 ===")
def cpu_intensive_task(n):
"""CPU密集型任务"""
result = 0
for i in range(n):
result += i * i
return result
def io_intensive_task(duration):
"""IO密集型任务"""
time.sleep(duration)
return f"IO任务完成,耗时{duration}秒"
# 1. CPU密集型任务对比
print("\n1. CPU密集型任务对比")
# 单线程执行
start_time = time.time()
results = []
for i in range(4):
result = cpu_intensive_task(1000000)
results.append(result)
single_thread_time = time.time() - start_time
print(f"单线程执行时间: {single_thread_time:.2f}秒")
# 多线程执行(受GIL限制)
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_intensive_task, 1000000) for _ in range(4)]
results = [future.result() for future in futures]
multi_thread_time = time.time() - start_time
print(f"多线程执行时间: {multi_thread_time:.2f}秒")
# 多进程执行(不受GIL限制)
start_time = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_intensive_task, 1000000) for _ in range(4)]
results = [future.result() for future in futures]
multi_process_time = time.time() - start_time
print(f"多进程执行时间: {multi_process_time:.2f}秒")
print(f"\n性能对比:")
print(f"多线程相对单线程: {single_thread_time/multi_thread_time:.2f}x")
print(f"多进程相对单线程: {single_thread_time/multi_process_time:.2f}x")
# 2. IO密集型任务对比
print("\n2. IO密集型任务对比")
# 单线程执行
start_time = time.time()
results = []
for i in range(4):
result = io_intensive_task(1)
results.append(result)
single_thread_io_time = time.time() - start_time
print(f"单线程IO执行时间: {single_thread_io_time:.2f}秒")
# 多线程执行(IO密集型受益于多线程)
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(io_intensive_task, 1) for _ in range(4)]
results = [future.result() for future in futures]
multi_thread_io_time = time.time() - start_time
print(f"多线程IO执行时间: {multi_thread_io_time:.2f}秒")
print(f"\nIO任务性能对比:")
print(f"多线程相对单线程: {single_thread_io_time/multi_thread_io_time:.2f}x")
# 3. GIL的影响总结
print("\n3. GIL影响总结")
print("""
GIL(全局解释器锁)的影响:
CPU密集型任务:
- 多线程无法提升性能,甚至可能更慢
- 多进程可以充分利用多核CPU
- 推荐使用multiprocessing
IO密集型任务:
- 多线程可以显著提升性能
- 线程在等待IO时会释放GIL
- 推荐使用threading
混合型任务:
- 根据任务特点选择合适的并发方式
- 可以考虑使用asyncio异步编程
""")
# 运行演示
gil_demo()
# 二、多线程编程
# 2.1 线程基础操作
import threading
import time
import random
from queue import Queue
def threading_basics():
"""线程基础操作演示"""
print("=== 线程基础操作演示 ===")
# 1. 创建线程的不同方式
print("\n1. 创建线程的不同方式")
# 方式1:使用函数创建线程
def simple_task(name, count):
for i in range(count):
print(f"线程 {name}: 执行第 {i+1} 次")
time.sleep(0.5)
thread1 = threading.Thread(target=simple_task, args=("Function-Thread", 3))
# 方式2:继承Thread类
class CustomThread(threading.Thread):
def __init__(self, name, count):
super().__init__()
self.name = name
self.count = count
def run(self):
for i in range(self.count):
print(f"自定义线程 {self.name}: 执行第 {i+1} 次")
time.sleep(0.5)
thread2 = CustomThread("Custom-Thread", 3)
# 启动线程
print("启动线程...")
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print("所有线程完成")
# 2. 线程属性和方法
print("\n2. 线程属性和方法")
def demo_thread_properties():
current = threading.current_thread()
print(f"线程名称: {current.name}")
print(f"线程ID: {current.ident}")
print(f"是否存活: {current.is_alive()}")
print(f"是否为守护线程: {current.daemon}")
# 创建普通线程
normal_thread = threading.Thread(target=demo_thread_properties, name="NormalThread")
# 创建守护线程
daemon_thread = threading.Thread(target=demo_thread_properties, name="DaemonThread")
daemon_thread.daemon = True
print("普通线程属性:")
normal_thread.start()
normal_thread.join()
print("\n守护线程属性:")
daemon_thread.start()
daemon_thread.join()
# 3. 线程间数据传递
print("\n3. 线程间数据传递")
# 使用Queue进行线程间通信
data_queue = Queue()
result_queue = Queue()
def producer(queue, count):
"""生产者线程"""
for i in range(count):
item = f"数据-{i}"
queue.put(item)
print(f"生产者: 生产了 {item}")
time.sleep(0.1)
queue.put(None) # 结束标志
def consumer(data_queue, result_queue):
"""消费者线程"""
while True:
item = data_queue.get()
if item is None:
break
# 处理数据
processed = f"处理后的-{item}"
result_queue.put(processed)
print(f"消费者: 处理了 {item} -> {processed}")
time.sleep(0.2)
data_queue.task_done()
# 启动生产者和消费者线程
producer_thread = threading.Thread(target=producer, args=(data_queue, 5))
consumer_thread = threading.Thread(target=consumer, args=(data_queue, result_queue))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
# 获取结果
print("\n处理结果:")
while not result_queue.empty():
result = result_queue.get()
print(f" {result}")
# 运行演示
threading_basics()
# 2.2 线程同步
import threading
import time
import random
def thread_synchronization():
"""线程同步演示"""
print("=== 线程同步演示 ===")
# 1. 不使用锁的问题演示
print("\n1. 不使用锁的问题")
shared_counter = 0
def unsafe_increment(count):
global shared_counter
for _ in range(count):
temp = shared_counter
time.sleep(0.0001) # 模拟处理时间
shared_counter = temp + 1
# 创建多个线程同时修改共享变量
threads = []
for i in range(5):
thread = threading.Thread(target=unsafe_increment, args=(100,))
threads.append(thread)
start_time = time.time()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"不安全的计数结果: {shared_counter} (期望: 500)")
print(f"执行时间: {time.time() - start_time:.2f}秒")
# 2. 使用Lock锁
print("\n2. 使用Lock锁")
shared_counter = 0
counter_lock = threading.Lock()
def safe_increment(count, lock):
global shared_counter
for _ in range(count):
with lock: # 使用with语句自动获取和释放锁
temp = shared_counter
time.sleep(0.0001)
shared_counter = temp + 1
threads = []
for i in range(5):
thread = threading.Thread(target=safe_increment, args=(100, counter_lock))
threads.append(thread)
start_time = time.time()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"安全的计数结果: {shared_counter} (期望: 500)")
print(f"执行时间: {time.time() - start_time:.2f}秒")
# 3. 使用RLock可重入锁
print("\n3. 使用RLock可重入锁")
rlock = threading.RLock()
def recursive_function(n, lock):
with lock:
print(f"递归调用 {n}")
if n > 0:
recursive_function(n-1, lock) # 同一线程可以多次获取RLock
thread = threading.Thread(target=recursive_function, args=(3, rlock))
thread.start()
thread.join()
# 4. 使用Condition条件变量
print("\n4. 使用Condition条件变量")
condition = threading.Condition()
items = []
def consumer(name):
with condition:
while len(items) == 0:
print(f"消费者 {name} 等待商品...")
condition.wait() # 等待条件满足
item = items.pop(0)
print(f"消费者 {name} 消费了 {item}")
def producer():
for i in range(3):
with condition:
item = f"商品-{i}"
items.append(item)
print(f"生产者生产了 {item}")
condition.notify_all() # 通知所有等待的线程
time.sleep(1)
# 启动消费者线程
consumer_threads = []
for i in range(2):
thread = threading.Thread(target=consumer, args=(f"Consumer-{i}",))
consumer_threads.append(thread)
thread.start()
# 启动生产者线程
producer_thread = threading.Thread(target=producer)
producer_thread.start()
# 等待所有线程完成
producer_thread.join()
for thread in consumer_threads:
thread.join()
# 5. 使用Semaphore信号量
print("\n5. 使用Semaphore信号量")
# 限制同时访问资源的线程数量
semaphore = threading.Semaphore(2) # 最多2个线程同时访问
def access_resource(name):
with semaphore:
print(f"{name} 获得资源访问权限")
time.sleep(2) # 模拟使用资源
print(f"{name} 释放资源")
threads = []
for i in range(5):
thread = threading.Thread(target=access_resource, args=(f"Thread-{i}",))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
# 6. 使用Event事件
print("\n6. 使用Event事件")
event = threading.Event()
def waiter(name):
print(f"{name} 等待事件...")
event.wait() # 等待事件被设置
print(f"{name} 收到事件,开始工作")
def setter():
time.sleep(2)
print("设置事件")
event.set() # 设置事件
# 启动等待线程
waiter_threads = []
for i in range(3):
thread = threading.Thread(target=waiter, args=(f"Waiter-{i}",))
waiter_threads.append(thread)
thread.start()
# 启动设置线程
setter_thread = threading.Thread(target=setter)
setter_thread.start()
# 等待所有线程完成
setter_thread.join()
for thread in waiter_threads:
thread.join()
# 运行演示
thread_synchronization()
# 2.3 线程池
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
import requests
from queue import Queue
def thread_pool_demo():
"""线程池演示"""
print("=== 线程池演示 ===")
# 1. 基本线程池使用
print("\n1. 基本线程池使用")
def worker_task(name, duration):
print(f"任务 {name} 开始执行 (线程: {threading.current_thread().name})")
time.sleep(duration)
result = f"任务 {name} 完成,耗时 {duration} 秒"
print(result)
return result
# 使用ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=3, thread_name_prefix="Worker") as executor:
# 提交任务
futures = []
for i in range(5):
future = executor.submit(worker_task, f"Task-{i}", random.uniform(1, 3))
futures.append(future)
# 获取结果
for future in as_completed(futures):
result = future.result()
print(f"收到结果: {result}")
# 2. 使用map方法
print("\n2. 使用map方法")
def square_number(n):
time.sleep(0.1) # 模拟计算时间
return n * n
numbers = list(range(1, 11))
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square_number, numbers))
print(f"平方结果: {results}")
# 3. 网络请求示例
print("\n3. 并发网络请求示例")
def fetch_url(url):
"""获取URL内容"""
try:
# 注意:这里使用模拟的网络请求
print(f"正在请求: {url}")
time.sleep(random.uniform(0.5, 2)) # 模拟网络延迟
return {
'url': url,
'status': 200,
'content_length': random.randint(1000, 5000)
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
urls = [
'http://example.com/page1',
'http://example.com/page2',
'http://example.com/page3',
'http://example.com/page4',
'http://example.com/page5'
]
# 串行请求
start_time = time.time()
serial_results = []
for url in urls:
result = fetch_url(url)
serial_results.append(result)
serial_time = time.time() - start_time
print(f"串行请求耗时: {serial_time:.2f}秒")
# 并行请求
start_time = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
parallel_results = list(executor.map(fetch_url, urls))
parallel_time = time.time() - start_time
print(f"并行请求耗时: {parallel_time:.2f}秒")
print(f"性能提升: {serial_time/parallel_time:.2f}x")
# 4. 任务队列处理
print("\n4. 任务队列处理")
task_queue = Queue()
result_queue = Queue()
# 添加任务到队列
for i in range(10):
task_queue.put(f"任务-{i}")
def process_task_queue(task_queue, result_queue):
"""处理任务队列"""
while True:
try:
task = task_queue.get(timeout=1)
# 处理任务
time.sleep(0.5)
result = f"处理完成: {task}"
result_queue.put(result)
print(f"线程 {threading.current_thread().name} 完成 {task}")
task_queue.task_done()
except:
break # 队列为空,退出
with ThreadPoolExecutor(max_workers=3) as executor:
# 启动工作线程
futures = []
for i in range(3):
future = executor.submit(process_task_queue, task_queue, result_queue)
futures.append(future)
# 等待所有任务完成
task_queue.join()
# 收集结果
results = []
while not result_queue.empty():
results.append(result_queue.get())
print(f"\n处理完成,共 {len(results)} 个结果")
# 5. 异常处理
print("\n5. 线程池异常处理")
def risky_task(n):
if n % 3 == 0:
raise ValueError(f"任务 {n} 出现错误")
return f"任务 {n} 成功完成"
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(risky_task, i) for i in range(6)]
for i, future in enumerate(futures):
try:
result = future.result()
print(f"任务 {i}: {result}")
except Exception as e:
print(f"任务 {i} 异常: {e}")
# 运行演示
thread_pool_demo()
# 三、多进程编程
# 3.1 进程基础操作
import multiprocessing
import os
import time
from multiprocessing import Process, Queue, Pipe, Value, Array, Lock
def process_basics():
"""进程基础操作演示"""
print("=== 进程基础操作演示 ===")
# 1. 创建进程的不同方式
print("\n1. 创建进程的不同方式")
def worker_process(name, duration):
"""工作进程函数"""
print(f"进程 {name} 开始工作 (PID: {os.getpid()})")
time.sleep(duration)
print(f"进程 {name} 工作完成")
# 方式1:使用函数创建进程
process1 = Process(target=worker_process, args=("Process-1", 2))
# 方式2:继承Process类
class CustomProcess(Process):
def __init__(self, name, duration):
super().__init__()
self.process_name = name
self.duration = duration
def run(self):
print(f"自定义进程 {self.process_name} 开始工作 (PID: {os.getpid()})")
time.sleep(self.duration)
print(f"自定义进程 {self.process_name} 工作完成")
process2 = CustomProcess("Custom-Process", 1)
# 启动进程
print(f"主进程 PID: {os.getpid()}")
print("启动子进程...")
process1.start()
process2.start()
# 等待进程完成
process1.join()
process2.join()
print("所有子进程完成")
# 2. 进程属性和方法
print("\n2. 进程属性和方法")
def demo_process_properties():
current = multiprocessing.current_process()
print(f"进程名称: {current.name}")
print(f"进程PID: {current.pid}")
print(f"是否存活: {current.is_alive()}")
print(f"是否为守护进程: {current.daemon}")
process = Process(target=demo_process_properties, name="DemoProcess")
process.start()
process.join()
# 3. 守护进程
print("\n3. 守护进程")
def daemon_worker():
while True:
print(f"守护进程工作中... (PID: {os.getpid()})")
time.sleep(1)
daemon_process = Process(target=daemon_worker)
daemon_process.daemon = True # 设置为守护进程
daemon_process.start()
time.sleep(3) # 主进程工作3秒
print("主进程结束,守护进程也会自动结束")
# 守护进程会随主进程结束而结束
# 运行演示(注意:某些功能在Windows上可能需要特殊处理)
if __name__ == '__main__':
process_basics()
# 3.2 进程间通信
import multiprocessing
from multiprocessing import Process, Queue, Pipe, Value, Array, Lock, Manager
import time
import os
def inter_process_communication():
"""进程间通信演示"""
print("=== 进程间通信演示 ===")
# 1. 使用Queue队列
print("\n1. 使用Queue队列")
def producer(queue, name, count):
"""生产者进程"""
for i in range(count):
item = f"{name}-数据-{i}"
queue.put(item)
print(f"生产者 {name} (PID: {os.getpid()}) 生产: {item}")
time.sleep(0.5)
queue.put(None) # 结束标志
def consumer(queue, name):
"""消费者进程"""
while True:
item = queue.get()
if item is None:
break
print(f"消费者 {name} (PID: {os.getpid()}) 消费: {item}")
time.sleep(0.3)
# 创建队列
queue = Queue()
# 创建进程
producer_process = Process(target=producer, args=(queue, "Producer-1", 5))
consumer_process = Process(target=consumer, args=(queue, "Consumer-1"))
# 启动进程
producer_process.start()
consumer_process.start()
# 等待生产者完成
producer_process.join()
consumer_process.join()
# 2. 使用Pipe管道
print("\n2. 使用Pipe管道")
def sender(conn, messages):
"""发送者进程"""
for msg in messages:
conn.send(msg)
print(f"发送者 (PID: {os.getpid()}) 发送: {msg}")
time.sleep(0.5)
conn.close()
def receiver(conn):
"""接收者进程"""
while True:
try:
msg = conn.recv()
print(f"接收者 (PID: {os.getpid()}) 接收: {msg}")
except EOFError:
break
conn.close()
# 创建管道
parent_conn, child_conn = Pipe()
messages = ["消息1", "消息2", "消息3"]
# 创建进程
sender_process = Process(target=sender, args=(child_conn, messages))
receiver_process = Process(target=receiver, args=(parent_conn,))
# 启动进程
sender_process.start()
receiver_process.start()
# 等待进程完成
sender_process.join()
receiver_process.join()
# 3. 使用共享内存
print("\n3. 使用共享内存")
def worker_with_shared_data(shared_value, shared_array, lock, worker_id):
"""使用共享数据的工作进程"""
for i in range(5):
with lock:
# 修改共享值
shared_value.value += 1
# 修改共享数组
shared_array[worker_id] += 1
print(f"工作进程 {worker_id} (PID: {os.getpid()}): "
f"共享值={shared_value.value}, 数组[{worker_id}]={shared_array[worker_id]}")
time.sleep(0.1)
# 创建共享数据
shared_value = Value('i', 0) # 共享整数
shared_array = Array('i', [0, 0, 0]) # 共享数组
lock = Lock() # 锁
# 创建多个工作进程
processes = []
for i in range(3):
p = Process(target=worker_with_shared_data,
args=(shared_value, shared_array, lock, i))
processes.append(p)
p.start()
# 等待所有进程完成
for p in processes:
p.join()
print(f"最终结果: 共享值={shared_value.value}, 共享数组={list(shared_array)}")
# 4. 使用Manager
print("\n4. 使用Manager")
def worker_with_manager(shared_dict, shared_list, worker_id):
"""使用Manager的工作进程"""
# 修改共享字典
shared_dict[f'worker_{worker_id}'] = os.getpid()
# 修改共享列表
shared_list.append(f'来自进程{worker_id}的数据')
print(f"工作进程 {worker_id} (PID: {os.getpid()}) 完成数据更新")
# 创建Manager
with Manager() as manager:
# 创建共享数据结构
shared_dict = manager.dict()
shared_list = manager.list()
# 创建进程
processes = []
for i in range(3):
p = Process(target=worker_with_manager,
args=(shared_dict, shared_list, i))
processes.append(p)
p.start()
# 等待所有进程完成
for p in processes:
p.join()
print(f"共享字典: {dict(shared_dict)}")
print(f"共享列表: {list(shared_list)}")
# 运行演示
if __name__ == '__main__':
inter_process_communication()
# 3.3 进程池
from multiprocessing import Pool, cpu_count
import time
import os
def process_pool_demo():
"""进程池演示"""
print("=== 进程池演示 ===")
# 1. 基本进程池使用
print("\n1. 基本进程池使用")
def cpu_intensive_task(n):
"""CPU密集型任务"""
print(f"进程 {os.getpid()} 开始处理任务 {n}")
result = sum(i * i for i in range(n))
print(f"进程 {os.getpid()} 完成任务 {n},结果: {result}")
return result
# 使用进程池
with Pool(processes=cpu_count()) as pool:
tasks = [100000, 200000, 300000, 400000]
# 方式1:使用map
print("使用map方法:")
start_time = time.time()
results = pool.map(cpu_intensive_task, tasks)
end_time = time.time()
print(f"结果: {results}")
print(f"执行时间: {end_time - start_time:.2f}秒")
# 2. 异步执行
print("\n2. 异步执行")
def long_running_task(name, duration):
print(f"任务 {name} 开始 (PID: {os.getpid()})")
time.sleep(duration)
return f"任务 {name} 完成,耗时 {duration} 秒"
with Pool(processes=3) as pool:
# 异步提交任务
async_results = []
tasks = [("Task-A", 2), ("Task-B", 1), ("Task-C", 3), ("Task-D", 1)]
for name, duration in tasks:
async_result = pool.apply_async(long_running_task, (name, duration))
async_results.append((name, async_result))
# 获取结果
for name, async_result in async_results:
try:
result = async_result.get(timeout=5) # 设置超时
print(f"收到结果: {result}")
except Exception as e:
print(f"任务 {name} 出现异常: {e}")
# 3. 使用回调函数
print("\n3. 使用回调函数")
def callback_function(result):
print(f"回调函数收到结果: {result}")
def error_callback(error):
print(f"回调函数收到错误: {error}")
def task_with_callback(x):
if x == 3:
raise ValueError("任务3出现错误")
return x * x
with Pool(processes=2) as pool:
for i in range(1, 5):
pool.apply_async(
task_with_callback,
(i,),
callback=callback_function,
error_callback=error_callback
)
pool.close() # 不再接受新任务
pool.join() # 等待所有任务完成
# 4. 进程池vs线程池性能对比
print("\n4. 进程池vs线程池性能对比")
def cpu_bound_task(n):
"""CPU密集型任务"""
return sum(i * i for i in range(n))
tasks = [500000] * 4
# 串行执行
start_time = time.time()
serial_results = [cpu_bound_task(task) for task in tasks]
serial_time = time.time() - start_time
print(f"串行执行时间: {serial_time:.2f}秒")
# 进程池执行
start_time = time.time()
with Pool(processes=4) as pool:
process_results = pool.map(cpu_bound_task, tasks)
process_time = time.time() - start_time
print(f"进程池执行时间: {process_time:.2f}秒")
# 线程池执行(用于对比)
from concurrent.futures import ThreadPoolExecutor
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
thread_results = list(executor.map(cpu_bound_task, tasks))
thread_time = time.time() - start_time
print(f"线程池执行时间: {thread_time:.2f}秒")
print(f"\n性能对比:")
print(f"进程池相对串行: {serial_time/process_time:.2f}x")
print(f"线程池相对串行: {serial_time/thread_time:.2f}x")
print(f"进程池相对线程池: {thread_time/process_time:.2f}x")
# 运行演示
if __name__ == '__main__':
process_pool_demo()
# 四、异步编程基础
# 4.1 异步编程概念
import asyncio
import time
import aiohttp
import aiofiles
async def async_programming_basics():
"""异步编程基础演示"""
print("=== 异步编程基础演示 ===")
# 1. 基本异步函数
print("\n1. 基本异步函数")
async def simple_async_task(name, delay):
print(f"任务 {name} 开始")
await asyncio.sleep(delay) # 异步等待
print(f"任务 {name} 完成")
return f"任务 {name} 结果"
# 运行单个异步任务
result = await simple_async_task("Task-1", 1)
print(f"结果: {result}")
# 2. 并发执行多个异步任务
print("\n2. 并发执行多个异步任务")
async def concurrent_tasks():
# 创建多个任务
tasks = [
simple_async_task("Concurrent-A", 2),
simple_async_task("Concurrent-B", 1),
simple_async_task("Concurrent-C", 3)
]
# 并发执行
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"并发执行结果: {results}")
print(f"总耗时: {end_time - start_time:.2f}秒")
await concurrent_tasks()
# 3. 异步生成器
print("\n3. 异步生成器")
async def async_generator(count):
for i in range(count):
await asyncio.sleep(0.5)
yield f"数据-{i}"
async def consume_async_generator():
async for item in async_generator(5):
print(f"接收到: {item}")
await consume_async_generator()
# 4. 异步上下文管理器
print("\n4. 异步上下文管理器")
class AsyncContextManager:
async def __aenter__(self):
print("进入异步上下文")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("退出异步上下文")
await asyncio.sleep(0.1)
async with AsyncContextManager() as acm:
print("在异步上下文中工作")
await asyncio.sleep(0.5)
# 5. 异步迭代器
print("\n5. 异步迭代器")
class AsyncIterator:
def __init__(self, count):
self.count = count
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.count:
raise StopAsyncIteration
await asyncio.sleep(0.2)
self.current += 1
return f"异步项-{self.current}"
async for item in AsyncIterator(3):
print(f"迭代得到: {item}")
# 运行异步演示
async def run_async_demo():
await async_programming_basics()
# 如果直接运行此脚本
if __name__ == '__main__':
asyncio.run(run_async_demo())
# 4.2 异步IO操作
import asyncio
import aiofiles
import aiohttp
import time
from pathlib import Path
async def async_io_demo():
"""异步IO操作演示"""
print("=== 异步IO操作演示 ===")
# 1. 异步文件操作
print("\n1. 异步文件操作")
async def async_file_operations():
# 异步写入文件
async with aiofiles.open('async_test.txt', 'w', encoding='utf-8') as f:
await f.write('这是异步写入的内容\n')
await f.write('第二行内容\n')
# 异步读取文件
async with aiofiles.open('async_test.txt', 'r', encoding='utf-8') as f:
content = await f.read()
print(f"异步读取的内容:\n{content}")
# 清理文件
Path('async_test.txt').unlink()
await async_file_operations()
# 2. 模拟异步网络请求
print("\n2. 模拟异步网络请求")
async def fetch_data(url, delay):
"""模拟异步网络请求"""
print(f"开始请求: {url}")
await asyncio.sleep(delay) # 模拟网络延迟
return {
'url': url,
'status': 200,
'data': f'来自 {url} 的数据'
}
async def fetch_multiple_urls():
urls = [
('http://api1.example.com', 1),
('http://api2.example.com', 2),
('http://api3.example.com', 1.5),
('http://api4.example.com', 0.5)
]
# 串行请求
start_time = time.time()
serial_results = []
for url, delay in urls:
result = await fetch_data(url, delay)
serial_results.append(result)
serial_time = time.time() - start_time
print(f"串行请求耗时: {serial_time:.2f}秒")
# 并发请求
start_time = time.time()
tasks = [fetch_data(url, delay) for url, delay in urls]
concurrent_results = await asyncio.gather(*tasks)
concurrent_time = time.time() - start_time
print(f"并发请求耗时: {concurrent_time:.2f}秒")
print(f"性能提升: {serial_time/concurrent_time:.2f}x")
return concurrent_results
results = await fetch_multiple_urls()
print(f"请求结果数量: {len(results)}")
# 3. 异步任务控制
print("\n3. 异步任务控制")
async def task_control_demo():
# 任务超时控制
async def slow_task():
await asyncio.sleep(3)
return "慢任务完成"
try:
result = await asyncio.wait_for(slow_task(), timeout=2)
print(f"任务结果: {result}")
except asyncio.TimeoutError:
print("任务超时")
# 任务取消
async def cancellable_task():
try:
for i in range(10):
print(f"可取消任务执行中: {i}")
await asyncio.sleep(0.5)
return "任务完成"
except asyncio.CancelledError:
print("任务被取消")
raise
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(2) # 让任务运行一段时间
task.cancel() # 取消任务
try:
await task
except asyncio.CancelledError:
print("确认任务已取消")
await task_control_demo()
# 4. 异步队列
print("\n4. 异步队列")
async def async_queue_demo():
queue = asyncio.Queue(maxsize=3)
async def producer(queue, name, count):
for i in range(count):
item = f"{name}-项目-{i}"
await queue.put(item)
print(f"生产者 {name} 生产: {item}")
await asyncio.sleep(0.5)
async def consumer(queue, name):
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=3)
print(f"消费者 {name} 消费: {item}")
queue.task_done()
await asyncio.sleep(0.3)
except asyncio.TimeoutError:
print(f"消费者 {name} 超时退出")
break
# 创建生产者和消费者任务
producer_task = asyncio.create_task(producer(queue, "Producer-1", 5))
consumer_task1 = asyncio.create_task(consumer(queue, "Consumer-1"))
consumer_task2 = asyncio.create_task(consumer(queue, "Consumer-2"))
# 等待生产者完成
await producer_task
# 等待队列清空
await queue.join()
# 取消消费者任务
consumer_task1.cancel()
consumer_task2.cancel()
try:
await asyncio.gather(consumer_task1, consumer_task2)
except asyncio.CancelledError:
pass
await async_queue_demo()
# 运行异步IO演示
if __name__ == '__main__':
asyncio.run(async_io_demo())
# 五、实战练习:并发下载器
# 5.1 构建一个完整的并发下载系统
import asyncio
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
import os
from pathlib import Path
import hashlib
from dataclasses import dataclass
from typing import List, Optional
from queue import Queue
import json
@dataclass
class DownloadTask:
"""下载任务数据类"""
url: str
filename: str
size: Optional[int] = None
checksum: Optional[str] = None
@dataclass
class DownloadResult:
"""下载结果数据类"""
task: DownloadTask
success: bool
duration: float
error: Optional[str] = None
actual_size: Optional[int] = None
class ConcurrentDownloader:
"""并发下载器"""
def __init__(self, download_dir: str = "downloads"):
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
self.results: List[DownloadResult] = []
self.progress_queue = Queue()
def simulate_download(self, task: DownloadTask) -> DownloadResult:
"""模拟下载文件"""
start_time = time.time()
try:
# 模拟下载过程
print(f"开始下载: {task.url} -> {task.filename}")
# 模拟网络延迟和下载时间
download_time = len(task.filename) * 0.1 + 1 # 基于文件名长度模拟下载时间
time.sleep(download_time)
# 创建模拟文件
file_path = self.download_dir / task.filename
content = f"模拟下载的文件内容: {task.url}\n" * 100
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
actual_size = len(content.encode('utf-8'))
duration = time.time() - start_time
# 验证文件大小(如果提供了预期大小)
if task.size and abs(actual_size - task.size) > 100:
raise ValueError(f"文件大小不匹配: 期望 {task.size}, 实际 {actual_size}")
print(f"下载完成: {task.filename} ({actual_size} 字节, {duration:.2f}秒)")
return DownloadResult(
task=task,
success=True,
duration=duration,
actual_size=actual_size
)
except Exception as e:
duration = time.time() - start_time
print(f"下载失败: {task.filename} - {str(e)}")
return DownloadResult(
task=task,
success=False,
duration=duration,
error=str(e)
)
def download_with_threads(self, tasks: List[DownloadTask], max_workers: int = 4) -> List[DownloadResult]:
"""使用线程池下载"""
print(f"\n=== 使用线程池下载 (工作线程数: {max_workers}) ===")
start_time = time.time()
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_task = {executor.submit(self.simulate_download, task): task for task in tasks}
# 收集结果
for future in as_completed(future_to_task):
result = future.result()
results.append(result)
# 显示进度
completed = len(results)
total = len(tasks)
print(f"进度: {completed}/{total} ({completed/total*100:.1f}%)")
total_time = time.time() - start_time
print(f"线程池下载完成,总耗时: {total_time:.2f}秒")
return results
def download_with_processes(self, tasks: List[DownloadTask], max_workers: int = None) -> List[DownloadResult]:
"""使用进程池下载"""
if max_workers is None:
max_workers = multiprocessing.cpu_count()
print(f"\n=== 使用进程池下载 (工作进程数: {max_workers}) ===")
start_time = time.time()
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_task = {executor.submit(self.simulate_download, task): task for task in tasks}
# 收集结果
for future in as_completed(future_to_task):
result = future.result()
results.append(result)
# 显示进度
completed = len(results)
total = len(tasks)
print(f"进度: {completed}/{total} ({completed/total*100:.1f}%)")
total_time = time.time() - start_time
print(f"进程池下载完成,总耗时: {total_time:.2f}秒")
return results
async def download_with_asyncio(self, tasks: List[DownloadTask], max_concurrent: int = 10) -> List[DownloadResult]:
"""使用异步IO下载"""
print(f"\n=== 使用异步IO下载 (最大并发数: {max_concurrent}) ===")
async def async_download(task: DownloadTask) -> DownloadResult:
"""异步下载单个文件"""
start_time = time.time()
try:
print(f"开始异步下载: {task.url} -> {task.filename}")
# 模拟异步网络请求
download_time = len(task.filename) * 0.1 + 1
await asyncio.sleep(download_time)
# 创建模拟文件
file_path = self.download_dir / task.filename
content = f"异步下载的文件内容: {task.url}\n" * 100
# 异步写入文件(这里简化为同步写入)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
actual_size = len(content.encode('utf-8'))
duration = time.time() - start_time
print(f"异步下载完成: {task.filename} ({actual_size} 字节, {duration:.2f}秒)")
return DownloadResult(
task=task,
success=True,
duration=duration,
actual_size=actual_size
)
except Exception as e:
duration = time.time() - start_time
print(f"异步下载失败: {task.filename} - {str(e)}")
return DownloadResult(
task=task,
success=False,
duration=duration,
error=str(e)
)
start_time = time.time()
# 使用信号量限制并发数
semaphore = asyncio.Semaphore(max_concurrent)
async def download_with_semaphore(task):
async with semaphore:
return await async_download(task)
# 创建所有任务
download_tasks = [download_with_semaphore(task) for task in tasks]
# 并发执行所有下载任务
results = await asyncio.gather(*download_tasks)
total_time = time.time() - start_time
print(f"异步下载完成,总耗时: {total_time:.2f}秒")
return results
def generate_report(self, results: List[DownloadResult]) -> dict:
"""生成下载报告"""
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
total_size = sum(r.actual_size or 0 for r in successful)
total_time = sum(r.duration for r in results)
avg_speed = total_size / total_time if total_time > 0 else 0
report = {
'total_tasks': len(results),
'successful': len(successful),
'failed': len(failed),
'success_rate': len(successful) / len(results) * 100 if results else 0,
'total_size_bytes': total_size,
'total_time_seconds': total_time,
'average_speed_bps': avg_speed,
'failed_tasks': [{'filename': r.task.filename, 'error': r.error} for r in failed]
}
return report
def save_report(self, report: dict, filename: str = "download_report.json"):
"""保存下载报告"""
report_path = self.download_dir / filename
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"下载报告已保存到: {report_path}")
# 演示函数
def demo_concurrent_downloader():
"""演示并发下载器"""
print("=== 并发下载器演示 ===")
# 创建下载任务
tasks = [
DownloadTask("http://example.com/file1.txt", "file1.txt", 5000),
DownloadTask("http://example.com/file2.pdf", "file2.pdf", 8000),
DownloadTask("http://example.com/file3.jpg", "file3.jpg", 3000),
DownloadTask("http://example.com/file4.zip", "file4.zip", 12000),
DownloadTask("http://example.com/file5.doc", "file5.doc", 6000),
DownloadTask("http://example.com/file6.mp3", "file6.mp3", 9000),
]
downloader = ConcurrentDownloader()
# 1. 线程池下载
thread_results = downloader.download_with_threads(tasks, max_workers=3)
thread_report = downloader.generate_report(thread_results)
print(f"\n线程池下载报告:")
print(f"成功: {thread_report['successful']}/{thread_report['total_tasks']}")
print(f"成功率: {thread_report['success_rate']:.1f}%")
print(f"总耗时: {thread_report['total_time_seconds']:.2f}秒")
# 2. 进程池下载
process_results = downloader.download_with_processes(tasks, max_workers=2)
process_report = downloader.generate_report(process_results)
print(f"\n进程池下载报告:")
print(f"成功: {process_report['successful']}/{process_report['total_tasks']}")
print(f"成功率: {process_report['success_rate']:.1f}%")
print(f"总耗时: {process_report['total_time_seconds']:.2f}秒")
# 3. 异步下载
async def async_download_demo():
async_results = await downloader.download_with_asyncio(tasks, max_concurrent=4)
async_report = downloader.generate_report(async_results)
print(f"\n异步下载报告:")
print(f"成功: {async_report['successful']}/{async_report['total_tasks']}")
print(f"成功率: {async_report['success_rate']:.1f}%")
print(f"总耗时: {async_report['total_time_seconds']:.2f}秒")
# 保存报告
downloader.save_report(async_report, "async_download_report.json")
return async_report
# 运行异步下载
async_report = asyncio.run(async_download_demo())
# 4. 性能对比
print(f"\n=== 性能对比 ===")
print(f"线程池总耗时: {thread_report['total_time_seconds']:.2f}秒")
print(f"进程池总耗时: {process_report['total_time_seconds']:.2f}秒")
print(f"异步IO总耗时: {async_report['total_time_seconds']:.2f}秒")
# 清理下载的文件
import shutil
if downloader.download_dir.exists():
shutil.rmtree(downloader.download_dir)
print(f"\n已清理下载目录: {downloader.download_dir}")
# 运行演示
if __name__ == '__main__':
demo_concurrent_downloader()
# 六、总结与最佳实践
# 6.1 选择合适的并发方式
def choose_concurrency_method():
"""选择合适的并发方式指南"""
print("=== 并发方式选择指南 ===")
guidelines = {
"CPU密集型任务": {
"推荐方式": "多进程 (multiprocessing)",
"原因": "不受GIL限制,可以充分利用多核CPU",
"适用场景": [
"数学计算",
"图像处理",
"数据分析",
"加密解密",
"科学计算"
],
"示例代码": """
from multiprocessing import Pool
def cpu_task(n):
return sum(i*i for i in range(n))
with Pool() as pool:
results = pool.map(cpu_task, [100000, 200000, 300000])
"""
},
"IO密集型任务": {
"推荐方式": "多线程 (threading) 或 异步IO (asyncio)",
"原因": "IO等待时会释放GIL,线程切换开销小",
"适用场景": [
"文件读写",
"网络请求",
"数据库操作",
"API调用",
"爬虫"
],
"示例代码": """
# 线程方式
from concurrent.futures import ThreadPoolExecutor
def io_task(url):
# 模拟网络请求
time.sleep(1)
return f"数据来自 {url}"
with ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(io_task, urls)
# 异步方式
import asyncio
async def async_io_task(url):
await asyncio.sleep(1) # 模拟异步IO
return f"数据来自 {url}"
async def main():
tasks = [async_io_task(url) for url in urls]
results = await asyncio.gather(*tasks)
"""
},
"混合型任务": {
"推荐方式": "根据主要瓶颈选择,或使用混合方案",
"原因": "需要根据具体情况分析",
"适用场景": [
"Web服务器",
"数据处理管道",
"实时系统",
"游戏服务器"
],
"示例代码": """
# 混合方案:异步IO + 进程池
import asyncio
from concurrent.futures import ProcessPoolExecutor
async def hybrid_task():
# IO密集型部分用异步
data = await fetch_data_async()
# CPU密集型部分用进程池
with ProcessPoolExecutor() as executor:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, cpu_intensive_func, data)
return result
"""
}
}
for task_type, info in guidelines.items():
print(f"\n{task_type}:")
print(f" 推荐方式: {info['推荐方式']}")
print(f" 原因: {info['原因']}")
print(f" 适用场景: {', '.join(info['适用场景'])}")
print(f" 示例代码:{info['示例代码']}")
# 运行指南
choose_concurrency_method()
# 6.2 最佳实践
def best_practices():
"""并发编程最佳实践"""
print("=== 并发编程最佳实践 ===")
practices = {
"1. 避免共享状态": {
"说明": "尽量避免多个线程/进程共享可变状态",
"建议": [
"使用不可变数据结构",
"通过消息传递而非共享内存通信",
"使用函数式编程思想"
]
},
"2. 正确使用锁": {
"说明": "必须共享状态时,正确使用同步原语",
"建议": [
"使用with语句自动管理锁",
"避免死锁(按固定顺序获取锁)",
"尽量减少锁的持有时间",
"考虑使用无锁数据结构"
]
},
"3. 异常处理": {
"说明": "并发程序中的异常处理更加重要",
"建议": [
"为每个任务添加异常处理",
"使用日志记录异常信息",
"实现优雅的错误恢复机制",
"避免异常导致整个程序崩溃"
]
},
"4. 资源管理": {
"说明": "正确管理线程、进程和其他资源",
"建议": [
"使用上下文管理器",
"及时释放不需要的资源",
"设置合理的超时时间",
"监控资源使用情况"
]
},
"5. 性能调优": {
"说明": "根据实际情况调优并发参数",
"建议": [
"测试不同的工作线程/进程数量",
"监控CPU和内存使用率",
"使用性能分析工具",
"避免过度并发"
]
}
}
for practice, details in practices.items():
print(f"\n{practice}")
print(f" {details['说明']}")
for suggestion in details['建议']:
print(f" • {suggestion}")
# 运行最佳实践
best_practices()
# 6.3 常见陷阱和解决方案
def common_pitfalls():
"""常见陷阱和解决方案"""
print("=== 常见陷阱和解决方案 ===")
pitfalls = {
"竞态条件 (Race Condition)": {
"问题": "多个线程同时访问共享资源导致不可预期的结果",
"解决方案": "使用锁、原子操作或无锁数据结构",
"示例": """
# 错误示例
counter = 0
def increment():
global counter
counter += 1 # 不是原子操作
# 正确示例
import threading
counter = 0
lock = threading.Lock()
def safe_increment():
global counter
with lock:
counter += 1
"""
},
"死锁 (Deadlock)": {
"问题": "多个线程相互等待对方释放资源",
"解决方案": "按固定顺序获取锁,使用超时,避免嵌套锁",
"示例": """
# 可能死锁的代码
lock1 = threading.Lock()
lock2 = threading.Lock()
def task1():
with lock1:
with lock2: # 可能死锁
pass
def task2():
with lock2:
with lock1: # 可能死锁
pass
# 避免死锁
def safe_task1():
with lock1:
with lock2: # 固定顺序
pass
def safe_task2():
with lock1: # 相同顺序
with lock2:
pass
"""
},
"GIL限制": {
"问题": "Python的GIL限制了多线程的CPU密集型任务性能",
"解决方案": "对CPU密集型任务使用多进程",
"示例": """
# CPU密集型任务应该使用多进程
from multiprocessing import Pool
def cpu_task(n):
return sum(i*i for i in range(n))
# 使用进程池而不是线程池
with Pool() as pool:
results = pool.map(cpu_task, [100000, 200000])
"""
},
"内存泄漏": {
"问题": "线程或进程没有正确清理导致内存泄漏",
"解决方案": "使用上下文管理器,及时join线程/进程",
"示例": """
# 错误示例
thread = threading.Thread(target=worker)
thread.start()
# 忘记join,可能导致资源泄漏
# 正确示例
with ThreadPoolExecutor() as executor:
future = executor.submit(worker)
result = future.result() # 自动清理
"""
}
}
for pitfall, details in pitfalls.items():
print(f"\n{pitfall}:")
print(f" 问题: {details['问题']}")
print(f" 解决方案: {details['解决方案']}")
print(f" 示例:{details['示例']}")
# 运行陷阱说明
common_pitfalls()
# 七、下一步学习
# 7.1 进阶主题
- 高级异步编程:深入学习asyncio、aiohttp、aiofiles等异步库
- 分布式计算:学习Celery、Dask等分布式计算框架
- 并发数据结构:学习无锁数据结构和并发安全的容器
- 性能分析:使用cProfile、line_profiler等工具分析并发程序性能
- 网络编程:学习socket编程、网络协议和服务器架构
# 7.2 实践项目建议
- Web爬虫系统:结合多线程和异步IO实现高效爬虫
- 文件处理工具:批量处理大量文件的并发程序
- API服务器:使用FastAPI或aiohttp构建高并发API服务
- 数据处理管道:实现ETL流程的并发数据处理系统
- 实时监控系统:监控系统资源和应用状态的并发程序
# 7.3 推荐资源
- 官方文档:Python threading、multiprocessing、asyncio模块文档
- 书籍推荐:《Python并发编程》、《流畅的Python》
- 在线资源:Real Python的并发编程教程
- 实践平台:GitHub上的开源并发项目
通过本章的学习,你应该已经掌握了Python中进程和线程的基本概念、使用方法和最佳实践。记住,并发编程需要大量的实践来掌握,建议多写代码、多做实验,逐步提高并发编程的技能。