第18天-网络编程
哪吒 2023/6/15
# 第18天-网络编程
# 1. 网络编程基础
# 1.1 网络编程概述
# 网络编程基础概念演示
def network_programming_basics():
"""网络编程基础概念"""
print("=== 网络编程基础概念 ===")
# 1. 网络编程定义
print("\n1. 网络编程定义:")
print(" - 通过网络协议实现不同计算机间的通信")
print(" - 包括客户端和服务器端编程")
print(" - 支持数据传输、远程调用、分布式计算等")
# 2. 网络协议层次
print("\n2. 网络协议层次:")
protocols = {
"应用层": ["HTTP", "HTTPS", "FTP", "SMTP", "POP3", "IMAP"],
"传输层": ["TCP", "UDP"],
"网络层": ["IP", "ICMP", "ARP"],
"数据链路层": ["Ethernet", "WiFi"],
"物理层": ["光纤", "双绞线", "无线"]
}
for layer, items in protocols.items():
print(f" {layer}: {', '.join(items)}")
# 3. TCP vs UDP
print("\n3. TCP vs UDP:")
comparison = {
"特性": ["连接性", "可靠性", "速度", "开销", "应用场景"],
"TCP": ["面向连接", "可靠传输", "较慢", "较高", "文件传输、网页浏览"],
"UDP": ["无连接", "不可靠", "较快", "较低", "视频直播、游戏"]
}
for i, feature in enumerate(comparison["特性"]):
print(f" {feature}: TCP({comparison['TCP'][i]}) vs UDP({comparison['UDP'][i]})")
# 4. 常用端口
print("\n4. 常用端口:")
common_ports = {
"HTTP": 80,
"HTTPS": 443,
"FTP": 21,
"SSH": 22,
"Telnet": 23,
"SMTP": 25,
"DNS": 53,
"POP3": 110,
"IMAP": 143
}
for service, port in common_ports.items():
print(f" {service}: {port}")
# 运行网络编程基础演示
network_programming_basics()
# 1.2 Python网络编程模块
import socket
import urllib.request
import urllib.parse
import http.client
import json
from datetime import datetime
def python_network_modules():
"""Python网络编程模块介绍"""
print("=== Python网络编程模块 ===")
# 1. socket模块
print("\n1. socket模块:")
print(" - 底层网络编程接口")
print(" - 支持TCP和UDP协议")
print(" - 提供客户端和服务器端功能")
print(" - 跨平台支持")
# 2. urllib模块
print("\n2. urllib模块:")
print(" - 高级URL处理库")
print(" - 支持HTTP/HTTPS请求")
print(" - 包含request、parse、error等子模块")
print(" - 适合简单的网络请求")
# 3. http.client模块
print("\n3. http.client模块:")
print(" - 低级HTTP客户端接口")
print(" - 更精细的HTTP控制")
print(" - 支持HTTP/1.1协议")
print(" - 适合复杂的HTTP操作")
# 4. 第三方库
print("\n4. 常用第三方库:")
third_party = {
"requests": "简洁易用的HTTP库",
"aiohttp": "异步HTTP客户端/服务器",
"tornado": "异步网络库和Web框架",
"twisted": "事件驱动网络引擎",
"paramiko": "SSH客户端库",
"ftplib": "FTP客户端库"
}
for lib, desc in third_party.items():
print(f" {lib}: {desc}")
# 运行模块介绍
python_network_modules()
# 2. Socket编程
# 2.1 TCP Socket编程
import socket
import threading
import time
def tcp_server_demo():
"""TCP服务器演示"""
print("=== TCP服务器演示 ===")
def handle_client(client_socket, client_address):
"""处理客户端连接"""
print(f"客户端 {client_address} 已连接")
try:
while True:
# 接收数据
data = client_socket.recv(1024)
if not data:
break
message = data.decode('utf-8')
print(f"收到来自 {client_address} 的消息: {message}")
# 处理特殊命令
if message.lower() == 'quit':
break
elif message.lower() == 'time':
response = f"服务器时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
elif message.lower().startswith('echo '):
response = f"回声: {message[5:]}"
else:
response = f"服务器收到: {message}"
# 发送响应
client_socket.send(response.encode('utf-8'))
except Exception as e:
print(f"处理客户端 {client_address} 时出错: {e}")
finally:
client_socket.close()
print(f"客户端 {client_address} 已断开连接")
# 创建服务器socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置socket选项,允许地址重用
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定地址和端口
host = 'localhost'
port = 12345
server_socket.bind((host, port))
# 开始监听
server_socket.listen(5)
print(f"TCP服务器启动,监听 {host}:{port}")
print("等待客户端连接...")
try:
while True:
# 接受客户端连接
client_socket, client_address = server_socket.accept()
# 为每个客户端创建新线程
client_thread = threading.Thread(
target=handle_client,
args=(client_socket, client_address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\n服务器正在关闭...")
finally:
server_socket.close()
print("服务器已关闭")
def tcp_client_demo():
"""TCP客户端演示"""
print("=== TCP客户端演示 ===")
# 创建客户端socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# 连接服务器
host = 'localhost'
port = 12345
client_socket.connect((host, port))
print(f"已连接到服务器 {host}:{port}")
# 发送和接收数据
messages = [
"Hello, Server!",
"time",
"echo Python网络编程",
"这是一条测试消息",
"quit"
]
for message in messages:
print(f"发送: {message}")
client_socket.send(message.encode('utf-8'))
if message.lower() == 'quit':
break
# 接收响应
response = client_socket.recv(1024)
print(f"收到: {response.decode('utf-8')}")
print("-" * 40)
time.sleep(1) # 延迟1秒
except Exception as e:
print(f"客户端错误: {e}")
finally:
client_socket.close()
print("客户端已断开连接")
# 注意:实际运行时需要先启动服务器,再启动客户端
print("TCP Socket编程示例")
print("请先运行 tcp_server_demo() 启动服务器")
print("然后运行 tcp_client_demo() 启动客户端")
# 2.2 UDP Socket编程
def udp_server_demo():
"""UDP服务器演示"""
print("=== UDP服务器演示 ===")
# 创建UDP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 绑定地址和端口
host = 'localhost'
port = 12346
server_socket.bind((host, port))
print(f"UDP服务器启动,监听 {host}:{port}")
print("等待数据包...")
try:
while True:
# 接收数据
data, client_address = server_socket.recvfrom(1024)
message = data.decode('utf-8')
print(f"收到来自 {client_address} 的数据: {message}")
# 处理数据并发送响应
if message.lower() == 'quit':
response = "服务器正在关闭"
server_socket.sendto(response.encode('utf-8'), client_address)
break
elif message.lower() == 'time':
response = f"服务器时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
else:
response = f"UDP服务器收到: {message}"
# 发送响应
server_socket.sendto(response.encode('utf-8'), client_address)
except KeyboardInterrupt:
print("\n服务器正在关闭...")
finally:
server_socket.close()
print("UDP服务器已关闭")
def udp_client_demo():
"""UDP客户端演示"""
print("=== UDP客户端演示 ===")
# 创建UDP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# 服务器地址
server_address = ('localhost', 12346)
# 发送数据
messages = [
"Hello, UDP Server!",
"time",
"这是UDP消息",
"quit"
]
for message in messages:
print(f"发送: {message}")
client_socket.sendto(message.encode('utf-8'), server_address)
# 接收响应
response, server_addr = client_socket.recvfrom(1024)
print(f"收到: {response.decode('utf-8')}")
print("-" * 40)
if message.lower() == 'quit':
break
time.sleep(1)
except Exception as e:
print(f"UDP客户端错误: {e}")
finally:
client_socket.close()
print("UDP客户端已关闭")
# UDP编程示例
print("\nUDP Socket编程示例")
print("请先运行 udp_server_demo() 启动服务器")
print("然后运行 udp_client_demo() 启动客户端")
# 2.3 Socket编程进阶
import select
import errno
def advanced_socket_demo():
"""高级Socket编程演示"""
print("=== 高级Socket编程 ===")
# 1. 非阻塞Socket
print("\n1. 非阻塞Socket:")
def non_blocking_server():
"""非阻塞服务器示例"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('localhost', 12347))
server_socket.listen(5)
# 设置为非阻塞模式
server_socket.setblocking(False)
print("非阻塞服务器启动")
sockets_list = [server_socket]
clients = {}
try:
while True:
# 使用select监控socket
ready_to_read, _, exception_sockets = select.select(
sockets_list, [], sockets_list, 1
)
for notified_socket in ready_to_read:
if notified_socket == server_socket:
# 新的客户端连接
try:
client_socket, client_address = server_socket.accept()
client_socket.setblocking(False)
sockets_list.append(client_socket)
clients[client_socket] = client_address
print(f"新客户端连接: {client_address}")
except:
pass
else:
# 现有客户端发送数据
try:
data = notified_socket.recv(1024)
if data:
message = data.decode('utf-8')
client_addr = clients[notified_socket]
print(f"收到来自 {client_addr} 的消息: {message}")
# 回声响应
response = f"回声: {message}"
notified_socket.send(response.encode('utf-8'))
else:
# 客户端断开连接
client_addr = clients[notified_socket]
print(f"客户端 {client_addr} 断开连接")
sockets_list.remove(notified_socket)
del clients[notified_socket]
notified_socket.close()
except:
# 连接错误
if notified_socket in clients:
client_addr = clients[notified_socket]
print(f"客户端 {client_addr} 连接错误")
sockets_list.remove(notified_socket)
del clients[notified_socket]
notified_socket.close()
# 处理异常socket
for notified_socket in exception_sockets:
if notified_socket in clients:
client_addr = clients[notified_socket]
print(f"客户端 {client_addr} 异常")
sockets_list.remove(notified_socket)
del clients[notified_socket]
notified_socket.close()
except KeyboardInterrupt:
print("\n服务器关闭")
finally:
server_socket.close()
# 2. Socket选项设置
print("\n2. Socket选项设置:")
def socket_options_demo():
"""Socket选项演示"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 常用选项
options = {
"SO_REUSEADDR": "允许地址重用",
"SO_KEEPALIVE": "启用保活机制",
"SO_RCVBUF": "设置接收缓冲区大小",
"SO_SNDBUF": "设置发送缓冲区大小",
"TCP_NODELAY": "禁用Nagle算法"
}
for option, desc in options.items():
print(f" {option}: {desc}")
# 设置选项示例
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# 获取选项值
reuse_addr = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
print(f" SO_REUSEADDR 当前值: {reuse_addr}")
sock.close()
# 3. 超时设置
print("\n3. 超时设置:")
def timeout_demo():
"""超时设置演示"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置连接超时
sock.settimeout(5.0) # 5秒超时
try:
# 尝试连接到不存在的服务器
sock.connect(('192.168.1.999', 80))
except socket.timeout:
print(" 连接超时")
except Exception as e:
print(f" 连接错误: {e}")
finally:
sock.close()
socket_options_demo()
timeout_demo()
# 运行高级Socket演示
advanced_socket_demo()
# 3. HTTP编程
# 3.1 urllib模块
import urllib.request
import urllib.parse
import urllib.error
import json
def urllib_demo():
"""urllib模块演示"""
print("=== urllib模块演示 ===")
# 1. 基本GET请求
print("\n1. 基本GET请求:")
def simple_get_request():
"""简单GET请求"""
try:
# 发送GET请求
url = "https://httpbin.org/get"
response = urllib.request.urlopen(url)
# 获取响应信息
print(f" 状态码: {response.getcode()}")
print(f" 响应头: {dict(response.headers)}")
# 读取响应内容
content = response.read().decode('utf-8')
data = json.loads(content)
print(f" 响应数据: {data['url']}")
except urllib.error.URLError as e:
print(f" 请求错误: {e}")
# 2. 带参数的GET请求
print("\n2. 带参数的GET请求:")
def get_with_params():
"""带参数的GET请求"""
try:
# 构建URL参数
base_url = "https://httpbin.org/get"
params = {
'name': 'Python',
'version': '3.9',
'type': 'programming'
}
# 编码参数
query_string = urllib.parse.urlencode(params)
url = f"{base_url}?{query_string}"
print(f" 请求URL: {url}")
response = urllib.request.urlopen(url)
content = response.read().decode('utf-8')
data = json.loads(content)
print(f" 服务器收到的参数: {data['args']}")
except Exception as e:
print(f" 请求错误: {e}")
# 3. POST请求
print("\n3. POST请求:")
def post_request():
"""POST请求"""
try:
url = "https://httpbin.org/post"
# 准备POST数据
post_data = {
'username': 'python_user',
'password': 'secret123',
'email': 'user@example.com'
}
# 编码POST数据
data = urllib.parse.urlencode(post_data).encode('utf-8')
# 创建请求对象
request = urllib.request.Request(url, data=data)
request.add_header('Content-Type', 'application/x-www-form-urlencoded')
# 发送请求
response = urllib.request.urlopen(request)
content = response.read().decode('utf-8')
result = json.loads(content)
print(f" POST数据: {result['form']}")
print(f" 请求头: {result['headers']}")
except Exception as e:
print(f" POST请求错误: {e}")
# 4. JSON数据请求
print("\n4. JSON数据请求:")
def json_request():
"""发送JSON数据"""
try:
url = "https://httpbin.org/post"
# 准备JSON数据
json_data = {
'name': 'Python网络编程',
'author': '哪吒',
'topics': ['socket', 'http', 'urllib'],
'difficulty': 'intermediate'
}
# 转换为JSON字符串并编码
data = json.dumps(json_data).encode('utf-8')
# 创建请求
request = urllib.request.Request(url, data=data)
request.add_header('Content-Type', 'application/json')
# 发送请求
response = urllib.request.urlopen(request)
content = response.read().decode('utf-8')
result = json.loads(content)
print(f" 发送的JSON: {result['json']}")
except Exception as e:
print(f" JSON请求错误: {e}")
# 5. 自定义请求头
print("\n5. 自定义请求头:")
def custom_headers():
"""自定义请求头"""
try:
url = "https://httpbin.org/headers"
# 创建请求对象
request = urllib.request.Request(url)
# 添加自定义头
request.add_header('User-Agent', 'Python-urllib/3.9')
request.add_header('Accept', 'application/json')
request.add_header('X-Custom-Header', 'Python网络编程示例')
response = urllib.request.urlopen(request)
content = response.read().decode('utf-8')
result = json.loads(content)
print(f" 请求头: {result['headers']}")
except Exception as e:
print(f" 自定义头请求错误: {e}")
# 运行所有示例
simple_get_request()
get_with_params()
post_request()
json_request()
custom_headers()
# 运行urllib演示
urllib_demo()
# 3.2 http.client模块
import http.client
import json
def http_client_demo():
"""http.client模块演示"""
print("=== http.client模块演示 ===")
# 1. 基本HTTP连接
print("\n1. 基本HTTP连接:")
def basic_http_connection():
"""基本HTTP连接"""
try:
# 创建HTTP连接
conn = http.client.HTTPSConnection("httpbin.org")
# 发送GET请求
conn.request("GET", "/get")
# 获取响应
response = conn.getresponse()
print(f" 状态码: {response.status}")
print(f" 状态信息: {response.reason}")
print(f" HTTP版本: {response.version}")
# 读取响应数据
data = response.read().decode('utf-8')
result = json.loads(data)
print(f" 响应URL: {result['url']}")
conn.close()
except Exception as e:
print(f" HTTP连接错误: {e}")
# 2. 带参数的请求
print("\n2. 带参数的请求:")
def http_with_params():
"""带参数的HTTP请求"""
try:
conn = http.client.HTTPSConnection("httpbin.org")
# 构建查询参数
params = urllib.parse.urlencode({
'param1': 'value1',
'param2': 'value2',
'message': 'Hello from http.client'
})
# 发送请求
conn.request("GET", f"/get?{params}")
response = conn.getresponse()
data = response.read().decode('utf-8')
result = json.loads(data)
print(f" 查询参数: {result['args']}")
conn.close()
except Exception as e:
print(f" 参数请求错误: {e}")
# 3. POST请求
print("\n3. POST请求:")
def http_post_request():
"""HTTP POST请求"""
try:
conn = http.client.HTTPSConnection("httpbin.org")
# 准备POST数据
post_data = json.dumps({
'title': 'Python网络编程',
'content': '使用http.client发送POST请求',
'tags': ['python', 'network', 'http']
})
# 设置请求头
headers = {
'Content-Type': 'application/json',
'Content-Length': str(len(post_data)),
'User-Agent': 'Python-http.client'
}
# 发送POST请求
conn.request("POST", "/post", post_data, headers)
response = conn.getresponse()
print(f" 状态码: {response.status}")
data = response.read().decode('utf-8')
result = json.loads(data)
print(f" 发送的数据: {result['json']}")
print(f" 请求头: {result['headers']}")
conn.close()
except Exception as e:
print(f" POST请求错误: {e}")
# 4. 响应头处理
print("\n4. 响应头处理:")
def response_headers():
"""处理响应头"""
try:
conn = http.client.HTTPSConnection("httpbin.org")
conn.request("GET", "/response-headers?Content-Type=application/json&Server=Python-Demo")
response = conn.getresponse()
# 获取所有响应头
print(" 响应头:")
for header, value in response.getheaders():
print(f" {header}: {value}")
# 获取特定响应头
content_type = response.getheader('Content-Type')
print(f" Content-Type: {content_type}")
conn.close()
except Exception as e:
print(f" 响应头处理错误: {e}")
# 运行所有示例
basic_http_connection()
http_with_params()
http_post_request()
response_headers()
# 运行http.client演示
http_client_demo()
# 3.3 Web爬虫基础
import urllib.request
import urllib.parse
import re
import time
from urllib.robotparser import RobotFileParser
def web_scraping_demo():
"""Web爬虫基础演示"""
print("=== Web爬虫基础演示 ===")
# 1. 基本网页抓取
print("\n1. 基本网页抓取:")
def fetch_webpage():
"""抓取网页内容"""
try:
# 设置User-Agent避免被拒绝
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
url = "https://httpbin.org/html"
request = urllib.request.Request(url, headers=headers)
response = urllib.request.urlopen(request)
# 获取网页内容
html_content = response.read().decode('utf-8')
# 简单的HTML解析
title_match = re.search(r'<title>(.*?)</title>', html_content, re.IGNORECASE)
if title_match:
title = title_match.group(1)
print(f" 网页标题: {title}")
# 提取链接
links = re.findall(r'<a[^>]+href=["\']([^"\'>]+)["\'][^>]*>(.*?)</a>', html_content, re.IGNORECASE)
print(f" 找到 {len(links)} 个链接")
for href, text in links[:3]: # 只显示前3个
print(f" {text.strip()}: {href}")
except Exception as e:
print(f" 网页抓取错误: {e}")
# 2. 处理表单和Cookie
print("\n2. 处理表单和Cookie:")
def handle_forms_cookies():
"""处理表单和Cookie"""
try:
# 创建Cookie处理器
import http.cookiejar
cookie_jar = http.cookiejar.CookieJar()
cookie_processor = urllib.request.HTTPCookieProcessor(cookie_jar)
opener = urllib.request.build_opener(cookie_processor)
# 设置为全局opener
urllib.request.install_opener(opener)
# 访问设置Cookie的页面
response = opener.open("https://httpbin.org/cookies/set/session_id/abc123")
print(f" 设置Cookie后的状态码: {response.getcode()}")
# 查看Cookie
for cookie in cookie_jar:
print(f" Cookie: {cookie.name}={cookie.value}")
# 访问需要Cookie的页面
response = opener.open("https://httpbin.org/cookies")
content = response.read().decode('utf-8')
result = json.loads(content)
print(f" 服务器收到的Cookie: {result['cookies']}")
except Exception as e:
print(f" Cookie处理错误: {e}")
# 3. 处理重定向
print("\n3. 处理重定向:")
def handle_redirects():
"""处理HTTP重定向"""
try:
# 自动处理重定向(默认行为)
url = "https://httpbin.org/redirect/3" # 重定向3次
response = urllib.request.urlopen(url)
print(f" 最终URL: {response.url}")
print(f" 状态码: {response.getcode()}")
# 禁用自动重定向
class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
def redirect_request(self, req, fp, code, msg, headers, newurl):
return None
opener = urllib.request.build_opener(NoRedirectHandler)
try:
response = opener.open("https://httpbin.org/redirect/1")
except urllib.error.HTTPError as e:
print(f" 重定向状态码: {e.code}")
print(f" 重定向位置: {e.headers.get('Location')}")
except Exception as e:
print(f" 重定向处理错误: {e}")
# 4. 爬虫礼仪
print("\n4. 爬虫礼仪:")
def crawler_etiquette():
"""爬虫礼仪演示"""
print(" 爬虫最佳实践:")
print(" - 遵守robots.txt")
print(" - 设置合理的请求间隔")
print(" - 使用适当的User-Agent")
print(" - 避免过度请求")
print(" - 尊重网站的服务条款")
# robots.txt检查示例
def check_robots_txt(url):
"""检查robots.txt"""
try:
rp = RobotFileParser()
rp.set_url(f"{url}/robots.txt")
rp.read()
# 检查是否允许抓取
user_agent = "*"
test_url = f"{url}/get"
if rp.can_fetch(user_agent, test_url):
print(f" 允许抓取: {test_url}")
else:
print(f" 禁止抓取: {test_url}")
except Exception as e:
print(f" robots.txt检查错误: {e}")
check_robots_txt("https://httpbin.org")
# 请求间隔示例
print(" 实施请求间隔...")
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1"
]
for i, url in enumerate(urls, 1):
try:
start_time = time.time()
response = urllib.request.urlopen(url)
end_time = time.time()
print(f" 请求 {i}: 状态码 {response.getcode()}, 耗时 {end_time - start_time:.2f}秒")
# 请求间隔
if i < len(urls):
time.sleep(1) # 1秒间隔
except Exception as e:
print(f" 请求 {i} 错误: {e}")
# 运行所有示例
fetch_webpage()
handle_forms_cookies()
handle_redirects()
crawler_etiquette()
# 运行Web爬虫演示
web_scraping_demo()
# 4. 异步网络编程
# 4.1 asyncio基础
import asyncio
import aiohttp
import time
def asyncio_networking_demo():
"""异步网络编程演示"""
print("=== 异步网络编程演示 ===")
# 1. 异步HTTP客户端
print("\n1. 异步HTTP客户端:")
async def fetch_url(session, url):
"""异步获取URL内容"""
try:
async with session.get(url) as response:
content = await response.text()
return {
'url': url,
'status': response.status,
'length': len(content)
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
async def fetch_multiple_urls():
"""并发获取多个URL"""
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1',
'https://httpbin.org/get',
'https://httpbin.org/json'
]
start_time = time.time()
async with aiohttp.ClientSession() as session:
# 并发执行所有请求
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f" 并发请求完成,总耗时: {end_time - start_time:.2f}秒")
for result in results:
if 'error' in result:
print(f" {result['url']}: 错误 - {result['error']}")
else:
print(f" {result['url']}: 状态码 {result['status']}, 长度 {result['length']}")
# 2. 异步TCP服务器
print("\n2. 异步TCP服务器:")
async def handle_client_async(reader, writer):
"""异步处理客户端连接"""
client_address = writer.get_extra_info('peername')
print(f" 客户端 {client_address} 已连接")
try:
while True:
# 异步读取数据
data = await reader.read(1024)
if not data:
break
message = data.decode('utf-8').strip()
print(f" 收到来自 {client_address} 的消息: {message}")
if message.lower() == 'quit':
break
elif message.lower() == 'time':
response = f"服务器时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
else:
response = f"异步服务器收到: {message}\n"
# 异步发送响应
writer.write(response.encode('utf-8'))
await writer.drain()
except Exception as e:
print(f" 处理客户端 {client_address} 时出错: {e}")
finally:
writer.close()
await writer.wait_closed()
print(f" 客户端 {client_address} 已断开连接")
async def start_async_server():
"""启动异步TCP服务器"""
server = await asyncio.start_server(
handle_client_async,
'localhost',
12348
)
addr = server.sockets[0].getsockname()
print(f" 异步TCP服务器启动,监听 {addr[0]}:{addr[1]}")
async with server:
await server.serve_forever()
# 3. 异步HTTP服务器
print("\n3. 异步HTTP服务器:")
async def handle_http_request(request):
"""处理HTTP请求"""
path = request.path
method = request.method
print(f" 收到 {method} 请求: {path}")
if path == '/':
return aiohttp.web.Response(text="欢迎访问异步HTTP服务器!")
elif path == '/api/time':
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return aiohttp.web.json_response({'time': current_time})
elif path == '/api/echo' and method == 'POST':
data = await request.json()
return aiohttp.web.json_response({'echo': data})
else:
return aiohttp.web.Response(text="页面未找到", status=404)
async def start_http_server():
"""启动异步HTTP服务器"""
app = aiohttp.web.Application()
app.router.add_get('/', handle_http_request)
app.router.add_get('/api/time', handle_http_request)
app.router.add_post('/api/echo', handle_http_request)
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, 'localhost', 8080)
await site.start()
print(" 异步HTTP服务器启动,监听 localhost:8080")
print(" 访问 http://localhost:8080 查看效果")
# 保持服务器运行
try:
await asyncio.Future() # 永远等待
except KeyboardInterrupt:
print("\n HTTP服务器正在关闭...")
finally:
await runner.cleanup()
# 运行示例(注意:实际使用时需要在异步环境中运行)
print("\n异步网络编程示例:")
print("请在异步环境中运行以下函数:")
print("- asyncio.run(fetch_multiple_urls())")
print("- asyncio.run(start_async_server())")
print("- asyncio.run(start_http_server())")
# 运行异步网络编程演示
asyncio_networking_demo()
# 4.2 WebSocket编程
import asyncio
import websockets
import json
def websocket_demo():
"""WebSocket编程演示"""
print("=== WebSocket编程演示 ===")
# 1. WebSocket服务器
print("\n1. WebSocket服务器:")
class WebSocketServer:
def __init__(self):
self.clients = set()
self.rooms = {}
async def register_client(self, websocket, path):
"""注册新客户端"""
self.clients.add(websocket)
client_id = id(websocket)
print(f" 客户端 {client_id} 已连接")
try:
await self.handle_client(websocket)
except websockets.exceptions.ConnectionClosed:
print(f" 客户端 {client_id} 连接已关闭")
finally:
self.clients.remove(websocket)
# 从所有房间中移除客户端
for room_clients in self.rooms.values():
room_clients.discard(websocket)
async def handle_client(self, websocket):
"""处理客户端消息"""
async for message in websocket:
try:
data = json.loads(message)
await self.process_message(websocket, data)
except json.JSONDecodeError:
await websocket.send(json.dumps({
'type': 'error',
'message': '无效的JSON格式'
}))
async def process_message(self, websocket, data):
"""处理消息"""
message_type = data.get('type')
if message_type == 'join_room':
room = data.get('room', 'default')
if room not in self.rooms:
self.rooms[room] = set()
self.rooms[room].add(websocket)
await websocket.send(json.dumps({
'type': 'joined',
'room': room,
'message': f'已加入房间 {room}'
}))
# 通知房间内其他用户
await self.broadcast_to_room(room, {
'type': 'user_joined',
'message': '新用户加入房间'
}, exclude=websocket)
elif message_type == 'chat':
room = data.get('room', 'default')
content = data.get('content', '')
username = data.get('username', '匿名用户')
# 广播消息到房间
await self.broadcast_to_room(room, {
'type': 'chat',
'username': username,
'content': content,
'timestamp': datetime.now().isoformat()
})
elif message_type == 'ping':
await websocket.send(json.dumps({
'type': 'pong',
'timestamp': datetime.now().isoformat()
}))
async def broadcast_to_room(self, room, message, exclude=None):
"""向房间广播消息"""
if room in self.rooms:
message_str = json.dumps(message)
disconnected = set()
for client in self.rooms[room]:
if client != exclude:
try:
await client.send(message_str)
except websockets.exceptions.ConnectionClosed:
disconnected.add(client)
# 清理断开的连接
self.rooms[room] -= disconnected
async def start_server(self, host='localhost', port=8765):
"""启动WebSocket服务器"""
print(f" WebSocket服务器启动,监听 {host}:{port}")
async with websockets.serve(self.register_client, host, port):
await asyncio.Future() # 永远运行
# 2. WebSocket客户端
print("\n2. WebSocket客户端:")
class WebSocketClient:
def __init__(self, uri):
self.uri = uri
self.websocket = None
async def connect(self):
"""连接到WebSocket服务器"""
self.websocket = await websockets.connect(self.uri)
print(f" 已连接到 {self.uri}")
async def send_message(self, message):
"""发送消息"""
if self.websocket:
await self.websocket.send(json.dumps(message))
async def listen_messages(self):
"""监听消息"""
if self.websocket:
async for message in self.websocket:
data = json.loads(message)
await self.handle_message(data)
async def handle_message(self, data):
"""处理收到的消息"""
message_type = data.get('type')
if message_type == 'chat':
username = data.get('username', '未知用户')
content = data.get('content', '')
timestamp = data.get('timestamp', '')
print(f" [{timestamp}] {username}: {content}")
elif message_type == 'joined':
room = data.get('room')
print(f" 成功加入房间: {room}")
elif message_type == 'user_joined':
print(f" {data.get('message')}")
elif message_type == 'pong':
print(f" 收到pong: {data.get('timestamp')}")
async def close(self):
"""关闭连接"""
if self.websocket:
await self.websocket.close()
# 3. 使用示例
print("\n3. 使用示例:")
async def websocket_server_example():
"""WebSocket服务器示例"""
server = WebSocketServer()
await server.start_server()
async def websocket_client_example():
"""WebSocket客户端示例"""
client = WebSocketClient('ws://localhost:8765')
try:
await client.connect()
# 加入房间
await client.send_message({
'type': 'join_room',
'room': 'python_chat'
})
# 发送聊天消息
await client.send_message({
'type': 'chat',
'room': 'python_chat',
'username': 'Python用户',
'content': '大家好!'
})
# 发送ping
await client.send_message({
'type': 'ping'
})
# 监听消息(这里只监听5秒作为示例)
await asyncio.wait_for(client.listen_messages(), timeout=5.0)
except asyncio.TimeoutError:
print(" 客户端示例完成")
finally:
await client.close()
print("\nWebSocket编程示例:")
print("请在异步环境中运行以下函数:")
print("- 服务器: asyncio.run(websocket_server_example())")
print("- 客户端: asyncio.run(websocket_client_example())")
# 运行WebSocket演示
websocket_demo()
# 5. 网络安全
# 5.1 SSL/TLS编程
import ssl
import socket
import urllib.request
import urllib.parse
def ssl_tls_demo():
"""SSL/TLS编程演示"""
print("=== SSL/TLS编程演示 ===")
# 1. SSL上下文配置
print("\n1. SSL上下文配置:")
def ssl_context_demo():
"""SSL上下文演示"""
# 创建SSL上下文
context = ssl.create_default_context()
# 配置SSL选项
print(" SSL上下文配置:")
print(f" - 协议版本: {context.protocol}")
print(f" - 验证模式: {context.verify_mode}")
print(f" - 检查主机名: {context.check_hostname}")
# 自定义SSL上下文
custom_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
custom_context.check_hostname = False
custom_context.verify_mode = ssl.CERT_NONE
print("\n 自定义SSL上下文(仅用于测试):")
print(f" - 验证模式: {custom_context.verify_mode}")
print(f" - 检查主机名: {custom_context.check_hostname}")
# 加载证书和密钥(示例)
try:
# custom_context.load_cert_chain('client.crt', 'client.key')
# custom_context.load_verify_locations('ca.crt')
print(" - 证书链加载: 需要实际证书文件")
except Exception as e:
print(f" - 证书加载错误: {e}")
# 2. HTTPS请求
print("\n2. HTTPS请求:")
def https_request_demo():
"""HTTPS请求演示"""
try:
# 标准HTTPS请求
url = "https://httpbin.org/get"
response = urllib.request.urlopen(url)
print(f" HTTPS请求成功: {response.getcode()}")
# 获取SSL信息
if hasattr(response, 'fp') and hasattr(response.fp, 'raw'):
sock = response.fp.raw._sock
if isinstance(sock, ssl.SSLSocket):
print(f" SSL版本: {sock.version()}")
print(f" 加密套件: {sock.cipher()}")
# 自定义SSL上下文的HTTPS请求
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
https_handler = urllib.request.HTTPSHandler(context=context)
opener = urllib.request.build_opener(https_handler)
response = opener.open("https://httpbin.org/get")
print(f" 自定义SSL上下文请求: {response.getcode()}")
except Exception as e:
print(f" HTTPS请求错误: {e}")
# 3. SSL Socket编程
print("\n3. SSL Socket编程:")
def ssl_socket_demo():
"""SSL Socket演示"""
try:
# 创建SSL套接字
context = ssl.create_default_context()
# 连接到HTTPS服务器
with socket.create_connection(('httpbin.org', 443)) as sock:
with context.wrap_socket(sock, server_hostname='httpbin.org') as ssock:
print(f" SSL连接建立: {ssock.version()}")
print(f" 服务器证书: {ssock.getpeercert()['subject']}")
# 发送HTTP请求
request = b"GET /get HTTP/1.1\r\nHost: httpbin.org\r\nConnection: close\r\n\r\n"
ssock.send(request)
# 接收响应
response = ssock.recv(4096)
response_str = response.decode('utf-8')
# 提取状态行
status_line = response_str.split('\r\n')[0]
print(f" HTTP响应: {status_line}")
except Exception as e:
print(f" SSL Socket错误: {e}")
# 4. 证书验证
print("\n4. 证书验证:")
def certificate_verification():
"""证书验证演示"""
try:
# 获取服务器证书
context = ssl.create_default_context()
with socket.create_connection(('httpbin.org', 443)) as sock:
with context.wrap_socket(sock, server_hostname='httpbin.org') as ssock:
cert = ssock.getpeercert()
print(" 服务器证书信息:")
print(f" - 主题: {cert.get('subject')}")
print(f" - 颁发者: {cert.get('issuer')}")
print(f" - 版本: {cert.get('version')}")
print(f" - 序列号: {cert.get('serialNumber')}")
print(f" - 有效期从: {cert.get('notBefore')}")
print(f" - 有效期到: {cert.get('notAfter')}")
# 检查证书有效性
import datetime
not_after = datetime.datetime.strptime(
cert['notAfter'], '%b %d %H:%M:%S %Y %Z'
)
if not_after > datetime.datetime.now():
print(" - 证书状态: 有效")
else:
print(" - 证书状态: 已过期")
except Exception as e:
print(f" 证书验证错误: {e}")
# 运行所有示例
ssl_context_demo()
https_request_demo()
ssl_socket_demo()
certificate_verification()
# 运行SSL/TLS演示
ssl_tls_demo()
# 5.2 网络安全最佳实践
import hashlib
import hmac
import secrets
import base64
from urllib.parse import quote, unquote
def network_security_demo():
"""网络安全最佳实践演示"""
print("=== 网络安全最佳实践 ===")
# 1. 数据加密和哈希
print("\n1. 数据加密和哈希:")
def encryption_hashing_demo():
"""加密和哈希演示"""
# 生成安全随机数
random_token = secrets.token_hex(16)
print(f" 随机令牌: {random_token}")
# 密码哈希
password = "my_secure_password"
salt = secrets.token_hex(16)
# 使用PBKDF2进行密码哈希
password_hash = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100000 # 迭代次数
)
print(f" 密码盐值: {salt}")
print(f" 密码哈希: {password_hash.hex()}")
# HMAC签名
secret_key = secrets.token_bytes(32)
message = "重要的数据"
signature = hmac.new(
secret_key,
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
print(f" HMAC签名: {signature}")
# 验证HMAC签名
def verify_hmac(key, message, signature):
expected = hmac.new(
key,
message.encode('utf-8'),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
is_valid = verify_hmac(secret_key, message, signature)
print(f" 签名验证: {'有效' if is_valid else '无效'}")
# 2. 输入验证和清理
print("\n2. 输入验证和清理:")
def input_validation_demo():
"""输入验证演示"""
import re
# URL编码/解码
unsafe_input = "<script>alert('XSS')</script>"
encoded_input = quote(unsafe_input)
decoded_input = unquote(encoded_input)
print(f" 原始输入: {unsafe_input}")
print(f" URL编码: {encoded_input}")
print(f" URL解码: {decoded_input}")
# HTML转义
def html_escape(text):
"""HTML转义"""
escape_chars = {
'&': '&',
'<': '<',
'>': '>',
'"': '"',
"'": '''
}
for char, escape in escape_chars.items():
text = text.replace(char, escape)
return text
escaped_html = html_escape(unsafe_input)
print(f" HTML转义: {escaped_html}")
# 输入验证函数
def validate_email(email):
"""验证邮箱格式"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def validate_ip(ip):
"""验证IP地址"""
parts = ip.split('.')
if len(parts) != 4:
return False
try:
return all(0 <= int(part) <= 255 for part in parts)
except ValueError:
return False
# 测试验证函数
test_emails = ['user@example.com', 'invalid-email', 'test@domain']
test_ips = ['192.168.1.1', '256.1.1.1', '192.168.1']
print("\n 邮箱验证:")
for email in test_emails:
is_valid = validate_email(email)
print(f" {email}: {'有效' if is_valid else '无效'}")
print("\n IP地址验证:")
for ip in test_ips:
is_valid = validate_ip(ip)
print(f" {ip}: {'有效' if is_valid else '无效'}")
# 3. 安全的HTTP请求
print("\n3. 安全的HTTP请求:")
def secure_http_demo():
"""安全HTTP请求演示"""
# 安全请求头
secure_headers = {
'User-Agent': 'Python-Security-Demo/1.0',
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
'X-Requested-With': 'XMLHttpRequest'
}
print(" 安全请求头:")
for header, value in secure_headers.items():
print(f" {header}: {value}")
# 请求超时设置
timeout_settings = {
'connect_timeout': 10, # 连接超时
'read_timeout': 30, # 读取超时
'total_timeout': 60 # 总超时
}
print("\n 超时设置:")
for setting, value in timeout_settings.items():
print(f" {setting}: {value}秒")
# 代理设置示例
proxy_config = {
'http': 'http://proxy.example.com:8080',
'https': 'https://proxy.example.com:8080'
}
print("\n 代理配置示例:")
for protocol, proxy in proxy_config.items():
print(f" {protocol}: {proxy}")
# 4. 错误处理和日志
print("\n4. 错误处理和日志:")
def security_logging_demo():
"""安全日志演示"""
import logging
from datetime import datetime
# 配置安全日志
security_logger = logging.getLogger('security')
security_logger.setLevel(logging.INFO)
# 创建格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
security_logger.addHandler(console_handler)
# 安全事件记录
def log_security_event(event_type, details, severity='INFO'):
"""记录安全事件"""
timestamp = datetime.now().isoformat()
log_message = f"[{event_type}] {details} - {timestamp}"
if severity == 'CRITICAL':
security_logger.critical(log_message)
elif severity == 'ERROR':
security_logger.error(log_message)
elif severity == 'WARNING':
security_logger.warning(log_message)
else:
security_logger.info(log_message)
# 示例安全事件
log_security_event('LOGIN_ATTEMPT', '用户尝试登录: user@example.com')
log_security_event('INVALID_INPUT', '检测到可疑输入', 'WARNING')
log_security_event('RATE_LIMIT', '请求频率超限', 'ERROR')
log_security_event('SECURITY_BREACH', '检测到安全漏洞', 'CRITICAL')
print("\n 安全最佳实践:")
best_practices = [
"始终使用HTTPS进行敏感数据传输",
"验证和清理所有用户输入",
"使用强密码和多因素认证",
"定期更新依赖库和系统",
"实施适当的访问控制",
"记录和监控安全事件",
"使用安全的随机数生成器",
"避免在日志中记录敏感信息",
"实施请求频率限制",
"使用安全的会话管理"
]
for i, practice in enumerate(best_practices, 1):
print(f" {i}. {practice}")
# 运行所有示例
encryption_hashing_demo()
input_validation_demo()
secure_http_demo()
security_logging_demo()
# 运行网络安全演示
network_security_demo()
# 6. 实际应用案例
# 6.1 简单的Web API客户端
import urllib.request
import urllib.parse
import json
import time
from datetime import datetime
class APIClient:
"""简单的Web API客户端"""
def __init__(self, base_url, api_key=None, timeout=30):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
self.session_headers = {
'User-Agent': 'Python-API-Client/1.0',
'Accept': 'application/json',
'Content-Type': 'application/json'
}
if api_key:
self.session_headers['Authorization'] = f'Bearer {api_key}'
def _make_request(self, method, endpoint, data=None, params=None):
"""发送HTTP请求"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
# 添加查询参数
if params:
query_string = urllib.parse.urlencode(params)
url = f"{url}?{query_string}"
# 准备请求数据
request_data = None
if data:
request_data = json.dumps(data).encode('utf-8')
# 创建请求
request = urllib.request.Request(
url,
data=request_data,
headers=self.session_headers,
method=method
)
try:
with urllib.request.urlopen(request, timeout=self.timeout) as response:
content = response.read().decode('utf-8')
return {
'status_code': response.getcode(),
'headers': dict(response.headers),
'data': json.loads(content) if content else None
}
except urllib.error.HTTPError as e:
error_content = e.read().decode('utf-8')
return {
'status_code': e.code,
'headers': dict(e.headers),
'error': error_content
}
except Exception as e:
return {
'status_code': 0,
'error': str(e)
}
def get(self, endpoint, params=None):
"""GET请求"""
return self._make_request('GET', endpoint, params=params)
def post(self, endpoint, data=None):
"""POST请求"""
return self._make_request('POST', endpoint, data=data)
def put(self, endpoint, data=None):
"""PUT请求"""
return self._make_request('PUT', endpoint, data=data)
def delete(self, endpoint):
"""DELETE请求"""
return self._make_request('DELETE', endpoint)
def api_client_demo():
"""API客户端演示"""
print("=== Web API客户端演示 ===")
# 创建API客户端
client = APIClient('https://httpbin.org')
# GET请求
print("\n1. GET请求:")
response = client.get('/get', params={'key': 'value', 'test': 'python'})
if response['status_code'] == 200:
print(f" 状态码: {response['status_code']}")
print(f" 参数: {response['data']['args']}")
else:
print(f" 请求失败: {response.get('error')}")
# POST请求
print("\n2. POST请求:")
post_data = {
'name': 'Python网络编程',
'type': 'tutorial',
'timestamp': datetime.now().isoformat()
}
response = client.post('/post', data=post_data)
if response['status_code'] == 200:
print(f" 状态码: {response['status_code']}")
print(f" 发送的数据: {response['data']['json']}")
else:
print(f" 请求失败: {response.get('error')}")
# 错误处理
print("\n3. 错误处理:")
response = client.get('/status/404')
print(f" 状态码: {response['status_code']}")
if 'error' in response:
print(f" 错误信息: {response['error']}")
# 运行API客户端演示
api_client_demo()
# 6.2 文件下载器
import urllib.request
import urllib.parse
import os
import time
from pathlib import Path
class FileDownloader:
"""文件下载器"""
def __init__(self, download_dir='downloads', chunk_size=8192):
self.download_dir = Path(download_dir)
self.download_dir.mkdir(exist_ok=True)
self.chunk_size = chunk_size
def download_file(self, url, filename=None, progress_callback=None):
"""下载文件"""
try:
# 获取文件名
if not filename:
parsed_url = urllib.parse.urlparse(url)
filename = os.path.basename(parsed_url.path) or 'download'
filepath = self.download_dir / filename
# 创建请求
request = urllib.request.Request(url)
request.add_header('User-Agent', 'Python-Downloader/1.0')
with urllib.request.urlopen(request) as response:
# 获取文件大小
content_length = response.headers.get('Content-Length')
total_size = int(content_length) if content_length else None
print(f"开始下载: {filename}")
if total_size:
print(f"文件大小: {self._format_size(total_size)}")
downloaded = 0
start_time = time.time()
with open(filepath, 'wb') as f:
while True:
chunk = response.read(self.chunk_size)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
# 调用进度回调
if progress_callback:
progress_callback(downloaded, total_size)
# 显示进度
if total_size:
progress = (downloaded / total_size) * 100
speed = downloaded / (time.time() - start_time)
print(f"\r进度: {progress:.1f}% ({self._format_size(downloaded)}/{self._format_size(total_size)}) - {self._format_size(speed)}/s", end='')
print(f"\n下载完成: {filepath}")
return str(filepath)
except Exception as e:
print(f"下载失败: {e}")
return None
def download_multiple(self, urls, max_concurrent=3):
"""下载多个文件"""
import threading
from queue import Queue
def worker():
while True:
url = url_queue.get()
if url is None:
break
filename = os.path.basename(urllib.parse.urlparse(url).path)
print(f"\n[线程 {threading.current_thread().name}] 开始下载: {filename}")
self.download_file(url)
url_queue.task_done()
# 创建队列和线程
url_queue = Queue()
threads = []
# 启动工作线程
for i in range(min(max_concurrent, len(urls))):
t = threading.Thread(target=worker, name=f'Worker-{i+1}')
t.start()
threads.append(t)
# 添加URL到队列
for url in urls:
url_queue.put(url)
# 等待所有下载完成
url_queue.join()
# 停止工作线程
for _ in threads:
url_queue.put(None)
for t in threads:
t.join()
print("\n所有文件下载完成")
def _format_size(self, size):
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024:
return f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} TB"
def file_downloader_demo():
"""文件下载器演示"""
print("=== 文件下载器演示 ===")
downloader = FileDownloader()
# 单文件下载
print("\n1. 单文件下载:")
test_url = "https://httpbin.org/json"
downloader.download_file(test_url, "test.json")
# 多文件下载
print("\n2. 多文件下载:")
test_urls = [
"https://httpbin.org/json",
"https://httpbin.org/xml",
"https://httpbin.org/html"
]
downloader.download_multiple(test_urls)
# 运行文件下载器演示
file_downloader_demo()
# 7. 学习建议和总结
# 7.1 学习路径
def learning_path():
"""网络编程学习路径"""
print("=== 网络编程学习路径 ===")
learning_stages = {
"基础阶段": [
"理解网络协议基础(TCP/IP、HTTP)",
"掌握socket编程基础",
"学习urllib模块使用",
"了解客户端-服务器架构"
],
"进阶阶段": [
"掌握异步网络编程(asyncio)",
"学习WebSocket编程",
"了解网络安全基础",
"掌握SSL/TLS编程"
],
"高级阶段": [
"学习网络框架(如aiohttp、tornado)",
"掌握微服务架构",
"了解负载均衡和集群",
"学习网络监控和调试"
],
"实战阶段": [
"开发Web API客户端",
"构建网络爬虫",
"实现聊天应用",
"开发分布式系统"
]
}
for stage, topics in learning_stages.items():
print(f"\n{stage}:")
for i, topic in enumerate(topics, 1):
print(f" {i}. {topic}")
def best_practices():
"""网络编程最佳实践"""
print("\n=== 网络编程最佳实践 ===")
practices = {
"性能优化": [
"使用连接池复用连接",
"实施适当的超时设置",
"使用异步编程提高并发性",
"合理设置缓冲区大小",
"避免阻塞操作"
],
"错误处理": [
"捕获和处理网络异常",
"实施重试机制",
"记录详细的错误日志",
"提供友好的错误信息",
"实施熔断器模式"
],
"安全考虑": [
"始终使用HTTPS传输敏感数据",
"验证和清理用户输入",
"实施认证和授权",
"使用安全的随机数",
"定期更新依赖库"
],
"代码质量": [
"编写清晰的文档",
"使用类型提示",
"编写单元测试",
"遵循编码规范",
"进行代码审查"
]
}
for category, items in practices.items():
print(f"\n{category}:")
for i, item in enumerate(items, 1):
print(f" {i}. {item}")
def common_pitfalls():
"""常见陷阱和解决方案"""
print("\n=== 常见陷阱和解决方案 ===")
pitfalls = {
"阻塞操作": {
"问题": "同步网络操作阻塞程序执行",
"解决方案": "使用异步编程或多线程"
},
"资源泄漏": {
"问题": "未正确关闭网络连接",
"解决方案": "使用上下文管理器或try-finally"
},
"超时设置": {
"问题": "未设置适当的超时时间",
"解决方案": "根据应用场景设置合理超时"
},
"错误处理": {
"问题": "未处理网络异常",
"解决方案": "捕获并适当处理各种网络异常"
},
"安全漏洞": {
"问题": "未验证用户输入或使用不安全协议",
"解决方案": "实施输入验证和使用安全协议"
}
}
for pitfall, details in pitfalls.items():
print(f"\n{pitfall}:")
print(f" 问题: {details['问题']}")
print(f" 解决方案: {details['解决方案']}")
def chapter_summary():
"""本章总结"""
print("\n=== 本章总结 ===")
summary_points = [
"网络编程是现代应用开发的重要技能",
"Python提供了丰富的网络编程库和工具",
"Socket编程是网络编程的基础",
"HTTP编程适用于Web应用开发",
"异步编程可以提高网络应用的性能",
"网络安全是网络编程中的重要考虑因素",
"实际项目中需要综合考虑性能、安全和可维护性",
"持续学习和实践是掌握网络编程的关键"
]
for i, point in enumerate(summary_points, 1):
print(f"{i}. {point}")
print("\n通过本章的学习,你应该能够:")
skills = [
"理解网络编程的基本概念和原理",
"使用socket进行TCP和UDP编程",
"使用urllib和http.client进行HTTP编程",
"开发简单的网络应用和爬虫",
"了解异步网络编程的基础",
"实施基本的网络安全措施",
"调试和优化网络应用"
]
for i, skill in enumerate(skills, 1):
print(f" {i}. {skill}")
# 运行学习建议和总结
learning_path()
best_practices()
common_pitfalls()
chapter_summary()