第18天-网络编程

2023/6/15

# 第18天-网络编程

# 1. 网络编程基础

# 1.1 网络编程概述

# 网络编程基础概念演示

def network_programming_basics():
    """网络编程基础概念"""
    print("=== 网络编程基础概念 ===")
    
    # 1. 网络编程定义
    print("\n1. 网络编程定义:")
    print("   - 通过网络协议实现不同计算机间的通信")
    print("   - 包括客户端和服务器端编程")
    print("   - 支持数据传输、远程调用、分布式计算等")
    
    # 2. 网络协议层次
    print("\n2. 网络协议层次:")
    protocols = {
        "应用层": ["HTTP", "HTTPS", "FTP", "SMTP", "POP3", "IMAP"],
        "传输层": ["TCP", "UDP"],
        "网络层": ["IP", "ICMP", "ARP"],
        "数据链路层": ["Ethernet", "WiFi"],
        "物理层": ["光纤", "双绞线", "无线"]
    }
    
    for layer, items in protocols.items():
        print(f"   {layer}: {', '.join(items)}")
    
    # 3. TCP vs UDP
    print("\n3. TCP vs UDP:")
    comparison = {
        "特性": ["连接性", "可靠性", "速度", "开销", "应用场景"],
        "TCP": ["面向连接", "可靠传输", "较慢", "较高", "文件传输、网页浏览"],
        "UDP": ["无连接", "不可靠", "较快", "较低", "视频直播、游戏"]
    }
    
    for i, feature in enumerate(comparison["特性"]):
        print(f"   {feature}: TCP({comparison['TCP'][i]}) vs UDP({comparison['UDP'][i]})")
    
    # 4. 常用端口
    print("\n4. 常用端口:")
    common_ports = {
        "HTTP": 80,
        "HTTPS": 443,
        "FTP": 21,
        "SSH": 22,
        "Telnet": 23,
        "SMTP": 25,
        "DNS": 53,
        "POP3": 110,
        "IMAP": 143
    }
    
    for service, port in common_ports.items():
        print(f"   {service}: {port}")

# 运行网络编程基础演示
network_programming_basics()

# 1.2 Python网络编程模块

import socket
import urllib.request
import urllib.parse
import http.client
import json
from datetime import datetime

def python_network_modules():
    """Python网络编程模块介绍"""
    print("=== Python网络编程模块 ===")
    
    # 1. socket模块
    print("\n1. socket模块:")
    print("   - 底层网络编程接口")
    print("   - 支持TCP和UDP协议")
    print("   - 提供客户端和服务器端功能")
    print("   - 跨平台支持")
    
    # 2. urllib模块
    print("\n2. urllib模块:")
    print("   - 高级URL处理库")
    print("   - 支持HTTP/HTTPS请求")
    print("   - 包含request、parse、error等子模块")
    print("   - 适合简单的网络请求")
    
    # 3. http.client模块
    print("\n3. http.client模块:")
    print("   - 低级HTTP客户端接口")
    print("   - 更精细的HTTP控制")
    print("   - 支持HTTP/1.1协议")
    print("   - 适合复杂的HTTP操作")
    
    # 4. 第三方库
    print("\n4. 常用第三方库:")
    third_party = {
        "requests": "简洁易用的HTTP库",
        "aiohttp": "异步HTTP客户端/服务器",
        "tornado": "异步网络库和Web框架",
        "twisted": "事件驱动网络引擎",
        "paramiko": "SSH客户端库",
        "ftplib": "FTP客户端库"
    }
    
    for lib, desc in third_party.items():
        print(f"   {lib}: {desc}")

# 运行模块介绍
python_network_modules()

# 2. Socket编程

# 2.1 TCP Socket编程

import socket
import threading
import time

def tcp_server_demo():
    """TCP服务器演示"""
    print("=== TCP服务器演示 ===")
    
    def handle_client(client_socket, client_address):
        """处理客户端连接"""
        print(f"客户端 {client_address} 已连接")
        
        try:
            while True:
                # 接收数据
                data = client_socket.recv(1024)
                if not data:
                    break
                
                message = data.decode('utf-8')
                print(f"收到来自 {client_address} 的消息: {message}")
                
                # 处理特殊命令
                if message.lower() == 'quit':
                    break
                elif message.lower() == 'time':
                    response = f"服务器时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
                elif message.lower().startswith('echo '):
                    response = f"回声: {message[5:]}"
                else:
                    response = f"服务器收到: {message}"
                
                # 发送响应
                client_socket.send(response.encode('utf-8'))
                
        except Exception as e:
            print(f"处理客户端 {client_address} 时出错: {e}")
        finally:
            client_socket.close()
            print(f"客户端 {client_address} 已断开连接")
    
    # 创建服务器socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # 设置socket选项,允许地址重用
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
    # 绑定地址和端口
    host = 'localhost'
    port = 12345
    server_socket.bind((host, port))
    
    # 开始监听
    server_socket.listen(5)
    print(f"TCP服务器启动,监听 {host}:{port}")
    print("等待客户端连接...")
    
    try:
        while True:
            # 接受客户端连接
            client_socket, client_address = server_socket.accept()
            
            # 为每个客户端创建新线程
            client_thread = threading.Thread(
                target=handle_client,
                args=(client_socket, client_address)
            )
            client_thread.daemon = True
            client_thread.start()
            
    except KeyboardInterrupt:
        print("\n服务器正在关闭...")
    finally:
        server_socket.close()
        print("服务器已关闭")

def tcp_client_demo():
    """TCP客户端演示"""
    print("=== TCP客户端演示 ===")
    
    # 创建客户端socket
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    try:
        # 连接服务器
        host = 'localhost'
        port = 12345
        client_socket.connect((host, port))
        print(f"已连接到服务器 {host}:{port}")
        
        # 发送和接收数据
        messages = [
            "Hello, Server!",
            "time",
            "echo Python网络编程",
            "这是一条测试消息",
            "quit"
        ]
        
        for message in messages:
            print(f"发送: {message}")
            client_socket.send(message.encode('utf-8'))
            
            if message.lower() == 'quit':
                break
            
            # 接收响应
            response = client_socket.recv(1024)
            print(f"收到: {response.decode('utf-8')}")
            print("-" * 40)
            
            time.sleep(1)  # 延迟1秒
            
    except Exception as e:
        print(f"客户端错误: {e}")
    finally:
        client_socket.close()
        print("客户端已断开连接")

# 注意:实际运行时需要先启动服务器,再启动客户端
print("TCP Socket编程示例")
print("请先运行 tcp_server_demo() 启动服务器")
print("然后运行 tcp_client_demo() 启动客户端")

# 2.2 UDP Socket编程

def udp_server_demo():
    """UDP服务器演示"""
    print("=== UDP服务器演示 ===")
    
    # 创建UDP socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    # 绑定地址和端口
    host = 'localhost'
    port = 12346
    server_socket.bind((host, port))
    
    print(f"UDP服务器启动,监听 {host}:{port}")
    print("等待数据包...")
    
    try:
        while True:
            # 接收数据
            data, client_address = server_socket.recvfrom(1024)
            message = data.decode('utf-8')
            
            print(f"收到来自 {client_address} 的数据: {message}")
            
            # 处理数据并发送响应
            if message.lower() == 'quit':
                response = "服务器正在关闭"
                server_socket.sendto(response.encode('utf-8'), client_address)
                break
            elif message.lower() == 'time':
                response = f"服务器时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
            else:
                response = f"UDP服务器收到: {message}"
            
            # 发送响应
            server_socket.sendto(response.encode('utf-8'), client_address)
            
    except KeyboardInterrupt:
        print("\n服务器正在关闭...")
    finally:
        server_socket.close()
        print("UDP服务器已关闭")

def udp_client_demo():
    """UDP客户端演示"""
    print("=== UDP客户端演示 ===")
    
    # 创建UDP socket
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    try:
        # 服务器地址
        server_address = ('localhost', 12346)
        
        # 发送数据
        messages = [
            "Hello, UDP Server!",
            "time",
            "这是UDP消息",
            "quit"
        ]
        
        for message in messages:
            print(f"发送: {message}")
            client_socket.sendto(message.encode('utf-8'), server_address)
            
            # 接收响应
            response, server_addr = client_socket.recvfrom(1024)
            print(f"收到: {response.decode('utf-8')}")
            print("-" * 40)
            
            if message.lower() == 'quit':
                break
                
            time.sleep(1)
            
    except Exception as e:
        print(f"UDP客户端错误: {e}")
    finally:
        client_socket.close()
        print("UDP客户端已关闭")

# UDP编程示例
print("\nUDP Socket编程示例")
print("请先运行 udp_server_demo() 启动服务器")
print("然后运行 udp_client_demo() 启动客户端")

# 2.3 Socket编程进阶

import select
import errno

def advanced_socket_demo():
    """高级Socket编程演示"""
    print("=== 高级Socket编程 ===")
    
    # 1. 非阻塞Socket
    print("\n1. 非阻塞Socket:")
    
    def non_blocking_server():
        """非阻塞服务器示例"""
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server_socket.bind(('localhost', 12347))
        server_socket.listen(5)
        
        # 设置为非阻塞模式
        server_socket.setblocking(False)
        
        print("非阻塞服务器启动")
        
        sockets_list = [server_socket]
        clients = {}
        
        try:
            while True:
                # 使用select监控socket
                ready_to_read, _, exception_sockets = select.select(
                    sockets_list, [], sockets_list, 1
                )
                
                for notified_socket in ready_to_read:
                    if notified_socket == server_socket:
                        # 新的客户端连接
                        try:
                            client_socket, client_address = server_socket.accept()
                            client_socket.setblocking(False)
                            sockets_list.append(client_socket)
                            clients[client_socket] = client_address
                            print(f"新客户端连接: {client_address}")
                        except:
                            pass
                    else:
                        # 现有客户端发送数据
                        try:
                            data = notified_socket.recv(1024)
                            if data:
                                message = data.decode('utf-8')
                                client_addr = clients[notified_socket]
                                print(f"收到来自 {client_addr} 的消息: {message}")
                                
                                # 回声响应
                                response = f"回声: {message}"
                                notified_socket.send(response.encode('utf-8'))
                            else:
                                # 客户端断开连接
                                client_addr = clients[notified_socket]
                                print(f"客户端 {client_addr} 断开连接")
                                sockets_list.remove(notified_socket)
                                del clients[notified_socket]
                                notified_socket.close()
                        except:
                            # 连接错误
                            if notified_socket in clients:
                                client_addr = clients[notified_socket]
                                print(f"客户端 {client_addr} 连接错误")
                                sockets_list.remove(notified_socket)
                                del clients[notified_socket]
                                notified_socket.close()
                
                # 处理异常socket
                for notified_socket in exception_sockets:
                    if notified_socket in clients:
                        client_addr = clients[notified_socket]
                        print(f"客户端 {client_addr} 异常")
                        sockets_list.remove(notified_socket)
                        del clients[notified_socket]
                        notified_socket.close()
                        
        except KeyboardInterrupt:
            print("\n服务器关闭")
        finally:
            server_socket.close()
    
    # 2. Socket选项设置
    print("\n2. Socket选项设置:")
    
    def socket_options_demo():
        """Socket选项演示"""
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        # 常用选项
        options = {
            "SO_REUSEADDR": "允许地址重用",
            "SO_KEEPALIVE": "启用保活机制",
            "SO_RCVBUF": "设置接收缓冲区大小",
            "SO_SNDBUF": "设置发送缓冲区大小",
            "TCP_NODELAY": "禁用Nagle算法"
        }
        
        for option, desc in options.items():
            print(f"   {option}: {desc}")
        
        # 设置选项示例
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        
        # 获取选项值
        reuse_addr = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
        print(f"   SO_REUSEADDR 当前值: {reuse_addr}")
        
        sock.close()
    
    # 3. 超时设置
    print("\n3. 超时设置:")
    
    def timeout_demo():
        """超时设置演示"""
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        # 设置连接超时
        sock.settimeout(5.0)  # 5秒超时
        
        try:
            # 尝试连接到不存在的服务器
            sock.connect(('192.168.1.999', 80))
        except socket.timeout:
            print("   连接超时")
        except Exception as e:
            print(f"   连接错误: {e}")
        finally:
            sock.close()
    
    socket_options_demo()
    timeout_demo()

# 运行高级Socket演示
advanced_socket_demo()

# 3. HTTP编程

# 3.1 urllib模块

import urllib.request
import urllib.parse
import urllib.error
import json

def urllib_demo():
    """urllib模块演示"""
    print("=== urllib模块演示 ===")
    
    # 1. 基本GET请求
    print("\n1. 基本GET请求:")
    
    def simple_get_request():
        """简单GET请求"""
        try:
            # 发送GET请求
            url = "https://httpbin.org/get"
            response = urllib.request.urlopen(url)
            
            # 获取响应信息
            print(f"   状态码: {response.getcode()}")
            print(f"   响应头: {dict(response.headers)}")
            
            # 读取响应内容
            content = response.read().decode('utf-8')
            data = json.loads(content)
            print(f"   响应数据: {data['url']}")
            
        except urllib.error.URLError as e:
            print(f"   请求错误: {e}")
    
    # 2. 带参数的GET请求
    print("\n2. 带参数的GET请求:")
    
    def get_with_params():
        """带参数的GET请求"""
        try:
            # 构建URL参数
            base_url = "https://httpbin.org/get"
            params = {
                'name': 'Python',
                'version': '3.9',
                'type': 'programming'
            }
            
            # 编码参数
            query_string = urllib.parse.urlencode(params)
            url = f"{base_url}?{query_string}"
            
            print(f"   请求URL: {url}")
            
            response = urllib.request.urlopen(url)
            content = response.read().decode('utf-8')
            data = json.loads(content)
            
            print(f"   服务器收到的参数: {data['args']}")
            
        except Exception as e:
            print(f"   请求错误: {e}")
    
    # 3. POST请求
    print("\n3. POST请求:")
    
    def post_request():
        """POST请求"""
        try:
            url = "https://httpbin.org/post"
            
            # 准备POST数据
            post_data = {
                'username': 'python_user',
                'password': 'secret123',
                'email': 'user@example.com'
            }
            
            # 编码POST数据
            data = urllib.parse.urlencode(post_data).encode('utf-8')
            
            # 创建请求对象
            request = urllib.request.Request(url, data=data)
            request.add_header('Content-Type', 'application/x-www-form-urlencoded')
            
            # 发送请求
            response = urllib.request.urlopen(request)
            content = response.read().decode('utf-8')
            result = json.loads(content)
            
            print(f"   POST数据: {result['form']}")
            print(f"   请求头: {result['headers']}")
            
        except Exception as e:
            print(f"   POST请求错误: {e}")
    
    # 4. JSON数据请求
    print("\n4. JSON数据请求:")
    
    def json_request():
        """发送JSON数据"""
        try:
            url = "https://httpbin.org/post"
            
            # 准备JSON数据
            json_data = {
                'name': 'Python网络编程',
                'author': '哪吒',
                'topics': ['socket', 'http', 'urllib'],
                'difficulty': 'intermediate'
            }
            
            # 转换为JSON字符串并编码
            data = json.dumps(json_data).encode('utf-8')
            
            # 创建请求
            request = urllib.request.Request(url, data=data)
            request.add_header('Content-Type', 'application/json')
            
            # 发送请求
            response = urllib.request.urlopen(request)
            content = response.read().decode('utf-8')
            result = json.loads(content)
            
            print(f"   发送的JSON: {result['json']}")
            
        except Exception as e:
            print(f"   JSON请求错误: {e}")
    
    # 5. 自定义请求头
    print("\n5. 自定义请求头:")
    
    def custom_headers():
        """自定义请求头"""
        try:
            url = "https://httpbin.org/headers"
            
            # 创建请求对象
            request = urllib.request.Request(url)
            
            # 添加自定义头
            request.add_header('User-Agent', 'Python-urllib/3.9')
            request.add_header('Accept', 'application/json')
            request.add_header('X-Custom-Header', 'Python网络编程示例')
            
            response = urllib.request.urlopen(request)
            content = response.read().decode('utf-8')
            result = json.loads(content)
            
            print(f"   请求头: {result['headers']}")
            
        except Exception as e:
            print(f"   自定义头请求错误: {e}")
    
    # 运行所有示例
    simple_get_request()
    get_with_params()
    post_request()
    json_request()
    custom_headers()

# 运行urllib演示
urllib_demo()

# 3.2 http.client模块

import http.client
import json

def http_client_demo():
    """http.client模块演示"""
    print("=== http.client模块演示 ===")
    
    # 1. 基本HTTP连接
    print("\n1. 基本HTTP连接:")
    
    def basic_http_connection():
        """基本HTTP连接"""
        try:
            # 创建HTTP连接
            conn = http.client.HTTPSConnection("httpbin.org")
            
            # 发送GET请求
            conn.request("GET", "/get")
            
            # 获取响应
            response = conn.getresponse()
            
            print(f"   状态码: {response.status}")
            print(f"   状态信息: {response.reason}")
            print(f"   HTTP版本: {response.version}")
            
            # 读取响应数据
            data = response.read().decode('utf-8')
            result = json.loads(data)
            print(f"   响应URL: {result['url']}")
            
            conn.close()
            
        except Exception as e:
            print(f"   HTTP连接错误: {e}")
    
    # 2. 带参数的请求
    print("\n2. 带参数的请求:")
    
    def http_with_params():
        """带参数的HTTP请求"""
        try:
            conn = http.client.HTTPSConnection("httpbin.org")
            
            # 构建查询参数
            params = urllib.parse.urlencode({
                'param1': 'value1',
                'param2': 'value2',
                'message': 'Hello from http.client'
            })
            
            # 发送请求
            conn.request("GET", f"/get?{params}")
            response = conn.getresponse()
            
            data = response.read().decode('utf-8')
            result = json.loads(data)
            
            print(f"   查询参数: {result['args']}")
            
            conn.close()
            
        except Exception as e:
            print(f"   参数请求错误: {e}")
    
    # 3. POST请求
    print("\n3. POST请求:")
    
    def http_post_request():
        """HTTP POST请求"""
        try:
            conn = http.client.HTTPSConnection("httpbin.org")
            
            # 准备POST数据
            post_data = json.dumps({
                'title': 'Python网络编程',
                'content': '使用http.client发送POST请求',
                'tags': ['python', 'network', 'http']
            })
            
            # 设置请求头
            headers = {
                'Content-Type': 'application/json',
                'Content-Length': str(len(post_data)),
                'User-Agent': 'Python-http.client'
            }
            
            # 发送POST请求
            conn.request("POST", "/post", post_data, headers)
            response = conn.getresponse()
            
            print(f"   状态码: {response.status}")
            
            data = response.read().decode('utf-8')
            result = json.loads(data)
            
            print(f"   发送的数据: {result['json']}")
            print(f"   请求头: {result['headers']}")
            
            conn.close()
            
        except Exception as e:
            print(f"   POST请求错误: {e}")
    
    # 4. 响应头处理
    print("\n4. 响应头处理:")
    
    def response_headers():
        """处理响应头"""
        try:
            conn = http.client.HTTPSConnection("httpbin.org")
            conn.request("GET", "/response-headers?Content-Type=application/json&Server=Python-Demo")
            
            response = conn.getresponse()
            
            # 获取所有响应头
            print("   响应头:")
            for header, value in response.getheaders():
                print(f"     {header}: {value}")
            
            # 获取特定响应头
            content_type = response.getheader('Content-Type')
            print(f"   Content-Type: {content_type}")
            
            conn.close()
            
        except Exception as e:
            print(f"   响应头处理错误: {e}")
    
    # 运行所有示例
    basic_http_connection()
    http_with_params()
    http_post_request()
    response_headers()

# 运行http.client演示
http_client_demo()

# 3.3 Web爬虫基础

import urllib.request
import urllib.parse
import re
import time
from urllib.robotparser import RobotFileParser

def web_scraping_demo():
    """Web爬虫基础演示"""
    print("=== Web爬虫基础演示 ===")
    
    # 1. 基本网页抓取
    print("\n1. 基本网页抓取:")
    
    def fetch_webpage():
        """抓取网页内容"""
        try:
            # 设置User-Agent避免被拒绝
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
            
            url = "https://httpbin.org/html"
            request = urllib.request.Request(url, headers=headers)
            response = urllib.request.urlopen(request)
            
            # 获取网页内容
            html_content = response.read().decode('utf-8')
            
            # 简单的HTML解析
            title_match = re.search(r'<title>(.*?)</title>', html_content, re.IGNORECASE)
            if title_match:
                title = title_match.group(1)
                print(f"   网页标题: {title}")
            
            # 提取链接
            links = re.findall(r'<a[^>]+href=["\']([^"\'>]+)["\'][^>]*>(.*?)</a>', html_content, re.IGNORECASE)
            print(f"   找到 {len(links)} 个链接")
            for href, text in links[:3]:  # 只显示前3个
                print(f"     {text.strip()}: {href}")
                
        except Exception as e:
            print(f"   网页抓取错误: {e}")
    
    # 2. 处理表单和Cookie
    print("\n2. 处理表单和Cookie:")
    
    def handle_forms_cookies():
        """处理表单和Cookie"""
        try:
            # 创建Cookie处理器
            import http.cookiejar
            
            cookie_jar = http.cookiejar.CookieJar()
            cookie_processor = urllib.request.HTTPCookieProcessor(cookie_jar)
            opener = urllib.request.build_opener(cookie_processor)
            
            # 设置为全局opener
            urllib.request.install_opener(opener)
            
            # 访问设置Cookie的页面
            response = opener.open("https://httpbin.org/cookies/set/session_id/abc123")
            
            print(f"   设置Cookie后的状态码: {response.getcode()}")
            
            # 查看Cookie
            for cookie in cookie_jar:
                print(f"   Cookie: {cookie.name}={cookie.value}")
            
            # 访问需要Cookie的页面
            response = opener.open("https://httpbin.org/cookies")
            content = response.read().decode('utf-8')
            result = json.loads(content)
            
            print(f"   服务器收到的Cookie: {result['cookies']}")
            
        except Exception as e:
            print(f"   Cookie处理错误: {e}")
    
    # 3. 处理重定向
    print("\n3. 处理重定向:")
    
    def handle_redirects():
        """处理HTTP重定向"""
        try:
            # 自动处理重定向(默认行为)
            url = "https://httpbin.org/redirect/3"  # 重定向3次
            response = urllib.request.urlopen(url)
            
            print(f"   最终URL: {response.url}")
            print(f"   状态码: {response.getcode()}")
            
            # 禁用自动重定向
            class NoRedirectHandler(urllib.request.HTTPRedirectHandler):
                def redirect_request(self, req, fp, code, msg, headers, newurl):
                    return None
            
            opener = urllib.request.build_opener(NoRedirectHandler)
            
            try:
                response = opener.open("https://httpbin.org/redirect/1")
            except urllib.error.HTTPError as e:
                print(f"   重定向状态码: {e.code}")
                print(f"   重定向位置: {e.headers.get('Location')}")
                
        except Exception as e:
            print(f"   重定向处理错误: {e}")
    
    # 4. 爬虫礼仪
    print("\n4. 爬虫礼仪:")
    
    def crawler_etiquette():
        """爬虫礼仪演示"""
        print("   爬虫最佳实践:")
        print("   - 遵守robots.txt")
        print("   - 设置合理的请求间隔")
        print("   - 使用适当的User-Agent")
        print("   - 避免过度请求")
        print("   - 尊重网站的服务条款")
        
        # robots.txt检查示例
        def check_robots_txt(url):
            """检查robots.txt"""
            try:
                rp = RobotFileParser()
                rp.set_url(f"{url}/robots.txt")
                rp.read()
                
                # 检查是否允许抓取
                user_agent = "*"
                test_url = f"{url}/get"
                
                if rp.can_fetch(user_agent, test_url):
                    print(f"   允许抓取: {test_url}")
                else:
                    print(f"   禁止抓取: {test_url}")
                    
            except Exception as e:
                print(f"   robots.txt检查错误: {e}")
        
        check_robots_txt("https://httpbin.org")
        
        # 请求间隔示例
        print("   实施请求间隔...")
        urls = [
            "https://httpbin.org/delay/1",
            "https://httpbin.org/delay/1",
            "https://httpbin.org/delay/1"
        ]
        
        for i, url in enumerate(urls, 1):
            try:
                start_time = time.time()
                response = urllib.request.urlopen(url)
                end_time = time.time()
                
                print(f"   请求 {i}: 状态码 {response.getcode()}, 耗时 {end_time - start_time:.2f}秒")
                
                # 请求间隔
                if i < len(urls):
                    time.sleep(1)  # 1秒间隔
                    
            except Exception as e:
                print(f"   请求 {i} 错误: {e}")
    
    # 运行所有示例
    fetch_webpage()
    handle_forms_cookies()
    handle_redirects()
    crawler_etiquette()

# 运行Web爬虫演示
web_scraping_demo()

# 4. 异步网络编程

# 4.1 asyncio基础

import asyncio
import aiohttp
import time

def asyncio_networking_demo():
    """异步网络编程演示"""
    print("=== 异步网络编程演示 ===")
    
    # 1. 异步HTTP客户端
    print("\n1. 异步HTTP客户端:")
    
    async def fetch_url(session, url):
        """异步获取URL内容"""
        try:
            async with session.get(url) as response:
                content = await response.text()
                return {
                    'url': url,
                    'status': response.status,
                    'length': len(content)
                }
        except Exception as e:
            return {
                'url': url,
                'error': str(e)
            }
    
    async def fetch_multiple_urls():
        """并发获取多个URL"""
        urls = [
            'https://httpbin.org/delay/1',
            'https://httpbin.org/delay/2',
            'https://httpbin.org/delay/1',
            'https://httpbin.org/get',
            'https://httpbin.org/json'
        ]
        
        start_time = time.time()
        
        async with aiohttp.ClientSession() as session:
            # 并发执行所有请求
            tasks = [fetch_url(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
            
        end_time = time.time()
        
        print(f"   并发请求完成,总耗时: {end_time - start_time:.2f}秒")
        
        for result in results:
            if 'error' in result:
                print(f"   {result['url']}: 错误 - {result['error']}")
            else:
                print(f"   {result['url']}: 状态码 {result['status']}, 长度 {result['length']}")
    
    # 2. 异步TCP服务器
    print("\n2. 异步TCP服务器:")
    
    async def handle_client_async(reader, writer):
        """异步处理客户端连接"""
        client_address = writer.get_extra_info('peername')
        print(f"   客户端 {client_address} 已连接")
        
        try:
            while True:
                # 异步读取数据
                data = await reader.read(1024)
                if not data:
                    break
                
                message = data.decode('utf-8').strip()
                print(f"   收到来自 {client_address} 的消息: {message}")
                
                if message.lower() == 'quit':
                    break
                elif message.lower() == 'time':
                    response = f"服务器时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
                else:
                    response = f"异步服务器收到: {message}\n"
                
                # 异步发送响应
                writer.write(response.encode('utf-8'))
                await writer.drain()
                
        except Exception as e:
            print(f"   处理客户端 {client_address} 时出错: {e}")
        finally:
            writer.close()
            await writer.wait_closed()
            print(f"   客户端 {client_address} 已断开连接")
    
    async def start_async_server():
        """启动异步TCP服务器"""
        server = await asyncio.start_server(
            handle_client_async,
            'localhost',
            12348
        )
        
        addr = server.sockets[0].getsockname()
        print(f"   异步TCP服务器启动,监听 {addr[0]}:{addr[1]}")
        
        async with server:
            await server.serve_forever()
    
    # 3. 异步HTTP服务器
    print("\n3. 异步HTTP服务器:")
    
    async def handle_http_request(request):
        """处理HTTP请求"""
        path = request.path
        method = request.method
        
        print(f"   收到 {method} 请求: {path}")
        
        if path == '/':
            return aiohttp.web.Response(text="欢迎访问异步HTTP服务器!")
        elif path == '/api/time':
            current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            return aiohttp.web.json_response({'time': current_time})
        elif path == '/api/echo' and method == 'POST':
            data = await request.json()
            return aiohttp.web.json_response({'echo': data})
        else:
            return aiohttp.web.Response(text="页面未找到", status=404)
    
    async def start_http_server():
        """启动异步HTTP服务器"""
        app = aiohttp.web.Application()
        app.router.add_get('/', handle_http_request)
        app.router.add_get('/api/time', handle_http_request)
        app.router.add_post('/api/echo', handle_http_request)
        
        runner = aiohttp.web.AppRunner(app)
        await runner.setup()
        
        site = aiohttp.web.TCPSite(runner, 'localhost', 8080)
        await site.start()
        
        print("   异步HTTP服务器启动,监听 localhost:8080")
        print("   访问 http://localhost:8080 查看效果")
        
        # 保持服务器运行
        try:
            await asyncio.Future()  # 永远等待
        except KeyboardInterrupt:
            print("\n   HTTP服务器正在关闭...")
        finally:
            await runner.cleanup()
    
    # 运行示例(注意:实际使用时需要在异步环境中运行)
    print("\n异步网络编程示例:")
    print("请在异步环境中运行以下函数:")
    print("- asyncio.run(fetch_multiple_urls())")
    print("- asyncio.run(start_async_server())")
    print("- asyncio.run(start_http_server())")

# 运行异步网络编程演示
asyncio_networking_demo()

# 4.2 WebSocket编程

import asyncio
import websockets
import json

def websocket_demo():
    """WebSocket编程演示"""
    print("=== WebSocket编程演示 ===")
    
    # 1. WebSocket服务器
    print("\n1. WebSocket服务器:")
    
    class WebSocketServer:
        def __init__(self):
            self.clients = set()
            self.rooms = {}
        
        async def register_client(self, websocket, path):
            """注册新客户端"""
            self.clients.add(websocket)
            client_id = id(websocket)
            print(f"   客户端 {client_id} 已连接")
            
            try:
                await self.handle_client(websocket)
            except websockets.exceptions.ConnectionClosed:
                print(f"   客户端 {client_id} 连接已关闭")
            finally:
                self.clients.remove(websocket)
                # 从所有房间中移除客户端
                for room_clients in self.rooms.values():
                    room_clients.discard(websocket)
        
        async def handle_client(self, websocket):
            """处理客户端消息"""
            async for message in websocket:
                try:
                    data = json.loads(message)
                    await self.process_message(websocket, data)
                except json.JSONDecodeError:
                    await websocket.send(json.dumps({
                        'type': 'error',
                        'message': '无效的JSON格式'
                    }))
        
        async def process_message(self, websocket, data):
            """处理消息"""
            message_type = data.get('type')
            
            if message_type == 'join_room':
                room = data.get('room', 'default')
                if room not in self.rooms:
                    self.rooms[room] = set()
                self.rooms[room].add(websocket)
                
                await websocket.send(json.dumps({
                    'type': 'joined',
                    'room': room,
                    'message': f'已加入房间 {room}'
                }))
                
                # 通知房间内其他用户
                await self.broadcast_to_room(room, {
                    'type': 'user_joined',
                    'message': '新用户加入房间'
                }, exclude=websocket)
            
            elif message_type == 'chat':
                room = data.get('room', 'default')
                content = data.get('content', '')
                username = data.get('username', '匿名用户')
                
                # 广播消息到房间
                await self.broadcast_to_room(room, {
                    'type': 'chat',
                    'username': username,
                    'content': content,
                    'timestamp': datetime.now().isoformat()
                })
            
            elif message_type == 'ping':
                await websocket.send(json.dumps({
                    'type': 'pong',
                    'timestamp': datetime.now().isoformat()
                }))
        
        async def broadcast_to_room(self, room, message, exclude=None):
            """向房间广播消息"""
            if room in self.rooms:
                message_str = json.dumps(message)
                disconnected = set()
                
                for client in self.rooms[room]:
                    if client != exclude:
                        try:
                            await client.send(message_str)
                        except websockets.exceptions.ConnectionClosed:
                            disconnected.add(client)
                
                # 清理断开的连接
                self.rooms[room] -= disconnected
        
        async def start_server(self, host='localhost', port=8765):
            """启动WebSocket服务器"""
            print(f"   WebSocket服务器启动,监听 {host}:{port}")
            
            async with websockets.serve(self.register_client, host, port):
                await asyncio.Future()  # 永远运行
    
    # 2. WebSocket客户端
    print("\n2. WebSocket客户端:")
    
    class WebSocketClient:
        def __init__(self, uri):
            self.uri = uri
            self.websocket = None
        
        async def connect(self):
            """连接到WebSocket服务器"""
            self.websocket = await websockets.connect(self.uri)
            print(f"   已连接到 {self.uri}")
        
        async def send_message(self, message):
            """发送消息"""
            if self.websocket:
                await self.websocket.send(json.dumps(message))
        
        async def listen_messages(self):
            """监听消息"""
            if self.websocket:
                async for message in self.websocket:
                    data = json.loads(message)
                    await self.handle_message(data)
        
        async def handle_message(self, data):
            """处理收到的消息"""
            message_type = data.get('type')
            
            if message_type == 'chat':
                username = data.get('username', '未知用户')
                content = data.get('content', '')
                timestamp = data.get('timestamp', '')
                print(f"   [{timestamp}] {username}: {content}")
            
            elif message_type == 'joined':
                room = data.get('room')
                print(f"   成功加入房间: {room}")
            
            elif message_type == 'user_joined':
                print(f"   {data.get('message')}")
            
            elif message_type == 'pong':
                print(f"   收到pong: {data.get('timestamp')}")
        
        async def close(self):
            """关闭连接"""
            if self.websocket:
                await self.websocket.close()
    
    # 3. 使用示例
    print("\n3. 使用示例:")
    
    async def websocket_server_example():
        """WebSocket服务器示例"""
        server = WebSocketServer()
        await server.start_server()
    
    async def websocket_client_example():
        """WebSocket客户端示例"""
        client = WebSocketClient('ws://localhost:8765')
        
        try:
            await client.connect()
            
            # 加入房间
            await client.send_message({
                'type': 'join_room',
                'room': 'python_chat'
            })
            
            # 发送聊天消息
            await client.send_message({
                'type': 'chat',
                'room': 'python_chat',
                'username': 'Python用户',
                'content': '大家好!'
            })
            
            # 发送ping
            await client.send_message({
                'type': 'ping'
            })
            
            # 监听消息(这里只监听5秒作为示例)
            await asyncio.wait_for(client.listen_messages(), timeout=5.0)
            
        except asyncio.TimeoutError:
            print("   客户端示例完成")
        finally:
            await client.close()
    
    print("\nWebSocket编程示例:")
    print("请在异步环境中运行以下函数:")
    print("- 服务器: asyncio.run(websocket_server_example())")
    print("- 客户端: asyncio.run(websocket_client_example())")

# 运行WebSocket演示
websocket_demo()

# 5. 网络安全

# 5.1 SSL/TLS编程

import ssl
import socket
import urllib.request
import urllib.parse

def ssl_tls_demo():
    """SSL/TLS编程演示"""
    print("=== SSL/TLS编程演示 ===")
    
    # 1. SSL上下文配置
    print("\n1. SSL上下文配置:")
    
    def ssl_context_demo():
        """SSL上下文演示"""
        # 创建SSL上下文
        context = ssl.create_default_context()
        
        # 配置SSL选项
        print("   SSL上下文配置:")
        print(f"   - 协议版本: {context.protocol}")
        print(f"   - 验证模式: {context.verify_mode}")
        print(f"   - 检查主机名: {context.check_hostname}")
        
        # 自定义SSL上下文
        custom_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
        custom_context.check_hostname = False
        custom_context.verify_mode = ssl.CERT_NONE
        
        print("\n   自定义SSL上下文(仅用于测试):")
        print(f"   - 验证模式: {custom_context.verify_mode}")
        print(f"   - 检查主机名: {custom_context.check_hostname}")
        
        # 加载证书和密钥(示例)
        try:
            # custom_context.load_cert_chain('client.crt', 'client.key')
            # custom_context.load_verify_locations('ca.crt')
            print("   - 证书链加载: 需要实际证书文件")
        except Exception as e:
            print(f"   - 证书加载错误: {e}")
    
    # 2. HTTPS请求
    print("\n2. HTTPS请求:")
    
    def https_request_demo():
        """HTTPS请求演示"""
        try:
            # 标准HTTPS请求
            url = "https://httpbin.org/get"
            response = urllib.request.urlopen(url)
            
            print(f"   HTTPS请求成功: {response.getcode()}")
            
            # 获取SSL信息
            if hasattr(response, 'fp') and hasattr(response.fp, 'raw'):
                sock = response.fp.raw._sock
                if isinstance(sock, ssl.SSLSocket):
                    print(f"   SSL版本: {sock.version()}")
                    print(f"   加密套件: {sock.cipher()}")
            
            # 自定义SSL上下文的HTTPS请求
            context = ssl.create_default_context()
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE
            
            https_handler = urllib.request.HTTPSHandler(context=context)
            opener = urllib.request.build_opener(https_handler)
            
            response = opener.open("https://httpbin.org/get")
            print(f"   自定义SSL上下文请求: {response.getcode()}")
            
        except Exception as e:
            print(f"   HTTPS请求错误: {e}")
    
    # 3. SSL Socket编程
    print("\n3. SSL Socket编程:")
    
    def ssl_socket_demo():
        """SSL Socket演示"""
        try:
            # 创建SSL套接字
            context = ssl.create_default_context()
            
            # 连接到HTTPS服务器
            with socket.create_connection(('httpbin.org', 443)) as sock:
                with context.wrap_socket(sock, server_hostname='httpbin.org') as ssock:
                    print(f"   SSL连接建立: {ssock.version()}")
                    print(f"   服务器证书: {ssock.getpeercert()['subject']}")
                    
                    # 发送HTTP请求
                    request = b"GET /get HTTP/1.1\r\nHost: httpbin.org\r\nConnection: close\r\n\r\n"
                    ssock.send(request)
                    
                    # 接收响应
                    response = ssock.recv(4096)
                    response_str = response.decode('utf-8')
                    
                    # 提取状态行
                    status_line = response_str.split('\r\n')[0]
                    print(f"   HTTP响应: {status_line}")
                    
        except Exception as e:
            print(f"   SSL Socket错误: {e}")
    
    # 4. 证书验证
    print("\n4. 证书验证:")
    
    def certificate_verification():
        """证书验证演示"""
        try:
            # 获取服务器证书
            context = ssl.create_default_context()
            
            with socket.create_connection(('httpbin.org', 443)) as sock:
                with context.wrap_socket(sock, server_hostname='httpbin.org') as ssock:
                    cert = ssock.getpeercert()
                    
                    print("   服务器证书信息:")
                    print(f"   - 主题: {cert.get('subject')}")
                    print(f"   - 颁发者: {cert.get('issuer')}")
                    print(f"   - 版本: {cert.get('version')}")
                    print(f"   - 序列号: {cert.get('serialNumber')}")
                    print(f"   - 有效期从: {cert.get('notBefore')}")
                    print(f"   - 有效期到: {cert.get('notAfter')}")
                    
                    # 检查证书有效性
                    import datetime
                    not_after = datetime.datetime.strptime(
                        cert['notAfter'], '%b %d %H:%M:%S %Y %Z'
                    )
                    
                    if not_after > datetime.datetime.now():
                        print("   - 证书状态: 有效")
                    else:
                        print("   - 证书状态: 已过期")
                        
        except Exception as e:
            print(f"   证书验证错误: {e}")
    
    # 运行所有示例
    ssl_context_demo()
    https_request_demo()
    ssl_socket_demo()
    certificate_verification()

# 运行SSL/TLS演示
ssl_tls_demo()

# 5.2 网络安全最佳实践

import hashlib
import hmac
import secrets
import base64
from urllib.parse import quote, unquote

def network_security_demo():
    """网络安全最佳实践演示"""
    print("=== 网络安全最佳实践 ===")
    
    # 1. 数据加密和哈希
    print("\n1. 数据加密和哈希:")
    
    def encryption_hashing_demo():
        """加密和哈希演示"""
        # 生成安全随机数
        random_token = secrets.token_hex(16)
        print(f"   随机令牌: {random_token}")
        
        # 密码哈希
        password = "my_secure_password"
        salt = secrets.token_hex(16)
        
        # 使用PBKDF2进行密码哈希
        password_hash = hashlib.pbkdf2_hmac(
            'sha256',
            password.encode('utf-8'),
            salt.encode('utf-8'),
            100000  # 迭代次数
        )
        
        print(f"   密码盐值: {salt}")
        print(f"   密码哈希: {password_hash.hex()}")
        
        # HMAC签名
        secret_key = secrets.token_bytes(32)
        message = "重要的数据"
        
        signature = hmac.new(
            secret_key,
            message.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        
        print(f"   HMAC签名: {signature}")
        
        # 验证HMAC签名
        def verify_hmac(key, message, signature):
            expected = hmac.new(
                key,
                message.encode('utf-8'),
                hashlib.sha256
            ).hexdigest()
            return hmac.compare_digest(expected, signature)
        
        is_valid = verify_hmac(secret_key, message, signature)
        print(f"   签名验证: {'有效' if is_valid else '无效'}")
    
    # 2. 输入验证和清理
    print("\n2. 输入验证和清理:")
    
    def input_validation_demo():
        """输入验证演示"""
        import re
        
        # URL编码/解码
        unsafe_input = "<script>alert('XSS')</script>"
        encoded_input = quote(unsafe_input)
        decoded_input = unquote(encoded_input)
        
        print(f"   原始输入: {unsafe_input}")
        print(f"   URL编码: {encoded_input}")
        print(f"   URL解码: {decoded_input}")
        
        # HTML转义
        def html_escape(text):
            """HTML转义"""
            escape_chars = {
                '&': '&amp;',
                '<': '&lt;',
                '>': '&gt;',
                '"': '&quot;',
                "'": '&#x27;'
            }
            for char, escape in escape_chars.items():
                text = text.replace(char, escape)
            return text
        
        escaped_html = html_escape(unsafe_input)
        print(f"   HTML转义: {escaped_html}")
        
        # 输入验证函数
        def validate_email(email):
            """验证邮箱格式"""
            pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
            return re.match(pattern, email) is not None
        
        def validate_ip(ip):
            """验证IP地址"""
            parts = ip.split('.')
            if len(parts) != 4:
                return False
            try:
                return all(0 <= int(part) <= 255 for part in parts)
            except ValueError:
                return False
        
        # 测试验证函数
        test_emails = ['user@example.com', 'invalid-email', 'test@domain']
        test_ips = ['192.168.1.1', '256.1.1.1', '192.168.1']
        
        print("\n   邮箱验证:")
        for email in test_emails:
            is_valid = validate_email(email)
            print(f"     {email}: {'有效' if is_valid else '无效'}")
        
        print("\n   IP地址验证:")
        for ip in test_ips:
            is_valid = validate_ip(ip)
            print(f"     {ip}: {'有效' if is_valid else '无效'}")
    
    # 3. 安全的HTTP请求
    print("\n3. 安全的HTTP请求:")
    
    def secure_http_demo():
        """安全HTTP请求演示"""
        # 安全请求头
        secure_headers = {
            'User-Agent': 'Python-Security-Demo/1.0',
            'Accept': 'application/json',
            'Accept-Encoding': 'gzip, deflate',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Cache-Control': 'no-cache',
            'Pragma': 'no-cache',
            'X-Requested-With': 'XMLHttpRequest'
        }
        
        print("   安全请求头:")
        for header, value in secure_headers.items():
            print(f"     {header}: {value}")
        
        # 请求超时设置
        timeout_settings = {
            'connect_timeout': 10,  # 连接超时
            'read_timeout': 30,     # 读取超时
            'total_timeout': 60     # 总超时
        }
        
        print("\n   超时设置:")
        for setting, value in timeout_settings.items():
            print(f"     {setting}: {value}秒")
        
        # 代理设置示例
        proxy_config = {
            'http': 'http://proxy.example.com:8080',
            'https': 'https://proxy.example.com:8080'
        }
        
        print("\n   代理配置示例:")
        for protocol, proxy in proxy_config.items():
            print(f"     {protocol}: {proxy}")
    
    # 4. 错误处理和日志
    print("\n4. 错误处理和日志:")
    
    def security_logging_demo():
        """安全日志演示"""
        import logging
        from datetime import datetime
        
        # 配置安全日志
        security_logger = logging.getLogger('security')
        security_logger.setLevel(logging.INFO)
        
        # 创建格式化器
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        security_logger.addHandler(console_handler)
        
        # 安全事件记录
        def log_security_event(event_type, details, severity='INFO'):
            """记录安全事件"""
            timestamp = datetime.now().isoformat()
            log_message = f"[{event_type}] {details} - {timestamp}"
            
            if severity == 'CRITICAL':
                security_logger.critical(log_message)
            elif severity == 'ERROR':
                security_logger.error(log_message)
            elif severity == 'WARNING':
                security_logger.warning(log_message)
            else:
                security_logger.info(log_message)
        
        # 示例安全事件
        log_security_event('LOGIN_ATTEMPT', '用户尝试登录: user@example.com')
        log_security_event('INVALID_INPUT', '检测到可疑输入', 'WARNING')
        log_security_event('RATE_LIMIT', '请求频率超限', 'ERROR')
        log_security_event('SECURITY_BREACH', '检测到安全漏洞', 'CRITICAL')
        
        print("\n   安全最佳实践:")
        best_practices = [
            "始终使用HTTPS进行敏感数据传输",
            "验证和清理所有用户输入",
            "使用强密码和多因素认证",
            "定期更新依赖库和系统",
            "实施适当的访问控制",
            "记录和监控安全事件",
            "使用安全的随机数生成器",
            "避免在日志中记录敏感信息",
            "实施请求频率限制",
            "使用安全的会话管理"
        ]
        
        for i, practice in enumerate(best_practices, 1):
            print(f"     {i}. {practice}")
    
    # 运行所有示例
    encryption_hashing_demo()
    input_validation_demo()
    secure_http_demo()
    security_logging_demo()

# 运行网络安全演示
network_security_demo()

# 6. 实际应用案例

# 6.1 简单的Web API客户端

import urllib.request
import urllib.parse
import json
import time
from datetime import datetime

class APIClient:
    """简单的Web API客户端"""
    
    def __init__(self, base_url, api_key=None, timeout=30):
        self.base_url = base_url.rstrip('/')
        self.api_key = api_key
        self.timeout = timeout
        self.session_headers = {
            'User-Agent': 'Python-API-Client/1.0',
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }
        
        if api_key:
            self.session_headers['Authorization'] = f'Bearer {api_key}'
    
    def _make_request(self, method, endpoint, data=None, params=None):
        """发送HTTP请求"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        # 添加查询参数
        if params:
            query_string = urllib.parse.urlencode(params)
            url = f"{url}?{query_string}"
        
        # 准备请求数据
        request_data = None
        if data:
            request_data = json.dumps(data).encode('utf-8')
        
        # 创建请求
        request = urllib.request.Request(
            url,
            data=request_data,
            headers=self.session_headers,
            method=method
        )
        
        try:
            with urllib.request.urlopen(request, timeout=self.timeout) as response:
                content = response.read().decode('utf-8')
                
                return {
                    'status_code': response.getcode(),
                    'headers': dict(response.headers),
                    'data': json.loads(content) if content else None
                }
                
        except urllib.error.HTTPError as e:
            error_content = e.read().decode('utf-8')
            return {
                'status_code': e.code,
                'headers': dict(e.headers),
                'error': error_content
            }
        except Exception as e:
            return {
                'status_code': 0,
                'error': str(e)
            }
    
    def get(self, endpoint, params=None):
        """GET请求"""
        return self._make_request('GET', endpoint, params=params)
    
    def post(self, endpoint, data=None):
        """POST请求"""
        return self._make_request('POST', endpoint, data=data)
    
    def put(self, endpoint, data=None):
        """PUT请求"""
        return self._make_request('PUT', endpoint, data=data)
    
    def delete(self, endpoint):
        """DELETE请求"""
        return self._make_request('DELETE', endpoint)

def api_client_demo():
    """API客户端演示"""
    print("=== Web API客户端演示 ===")
    
    # 创建API客户端
    client = APIClient('https://httpbin.org')
    
    # GET请求
    print("\n1. GET请求:")
    response = client.get('/get', params={'key': 'value', 'test': 'python'})
    if response['status_code'] == 200:
        print(f"   状态码: {response['status_code']}")
        print(f"   参数: {response['data']['args']}")
    else:
        print(f"   请求失败: {response.get('error')}")
    
    # POST请求
    print("\n2. POST请求:")
    post_data = {
        'name': 'Python网络编程',
        'type': 'tutorial',
        'timestamp': datetime.now().isoformat()
    }
    
    response = client.post('/post', data=post_data)
    if response['status_code'] == 200:
        print(f"   状态码: {response['status_code']}")
        print(f"   发送的数据: {response['data']['json']}")
    else:
        print(f"   请求失败: {response.get('error')}")
    
    # 错误处理
    print("\n3. 错误处理:")
    response = client.get('/status/404')
    print(f"   状态码: {response['status_code']}")
    if 'error' in response:
        print(f"   错误信息: {response['error']}")

# 运行API客户端演示
api_client_demo()

# 6.2 文件下载器

import urllib.request
import urllib.parse
import os
import time
from pathlib import Path

class FileDownloader:
    """文件下载器"""
    
    def __init__(self, download_dir='downloads', chunk_size=8192):
        self.download_dir = Path(download_dir)
        self.download_dir.mkdir(exist_ok=True)
        self.chunk_size = chunk_size
    
    def download_file(self, url, filename=None, progress_callback=None):
        """下载文件"""
        try:
            # 获取文件名
            if not filename:
                parsed_url = urllib.parse.urlparse(url)
                filename = os.path.basename(parsed_url.path) or 'download'
            
            filepath = self.download_dir / filename
            
            # 创建请求
            request = urllib.request.Request(url)
            request.add_header('User-Agent', 'Python-Downloader/1.0')
            
            with urllib.request.urlopen(request) as response:
                # 获取文件大小
                content_length = response.headers.get('Content-Length')
                total_size = int(content_length) if content_length else None
                
                print(f"开始下载: {filename}")
                if total_size:
                    print(f"文件大小: {self._format_size(total_size)}")
                
                downloaded = 0
                start_time = time.time()
                
                with open(filepath, 'wb') as f:
                    while True:
                        chunk = response.read(self.chunk_size)
                        if not chunk:
                            break
                        
                        f.write(chunk)
                        downloaded += len(chunk)
                        
                        # 调用进度回调
                        if progress_callback:
                            progress_callback(downloaded, total_size)
                        
                        # 显示进度
                        if total_size:
                            progress = (downloaded / total_size) * 100
                            speed = downloaded / (time.time() - start_time)
                            print(f"\r进度: {progress:.1f}% ({self._format_size(downloaded)}/{self._format_size(total_size)}) - {self._format_size(speed)}/s", end='')
                
                print(f"\n下载完成: {filepath}")
                return str(filepath)
                
        except Exception as e:
            print(f"下载失败: {e}")
            return None
    
    def download_multiple(self, urls, max_concurrent=3):
        """下载多个文件"""
        import threading
        from queue import Queue
        
        def worker():
            while True:
                url = url_queue.get()
                if url is None:
                    break
                
                filename = os.path.basename(urllib.parse.urlparse(url).path)
                print(f"\n[线程 {threading.current_thread().name}] 开始下载: {filename}")
                
                self.download_file(url)
                url_queue.task_done()
        
        # 创建队列和线程
        url_queue = Queue()
        threads = []
        
        # 启动工作线程
        for i in range(min(max_concurrent, len(urls))):
            t = threading.Thread(target=worker, name=f'Worker-{i+1}')
            t.start()
            threads.append(t)
        
        # 添加URL到队列
        for url in urls:
            url_queue.put(url)
        
        # 等待所有下载完成
        url_queue.join()
        
        # 停止工作线程
        for _ in threads:
            url_queue.put(None)
        
        for t in threads:
            t.join()
        
        print("\n所有文件下载完成")
    
    def _format_size(self, size):
        """格式化文件大小"""
        for unit in ['B', 'KB', 'MB', 'GB']:
            if size < 1024:
                return f"{size:.1f} {unit}"
            size /= 1024
        return f"{size:.1f} TB"

def file_downloader_demo():
    """文件下载器演示"""
    print("=== 文件下载器演示 ===")
    
    downloader = FileDownloader()
    
    # 单文件下载
    print("\n1. 单文件下载:")
    test_url = "https://httpbin.org/json"
    downloader.download_file(test_url, "test.json")
    
    # 多文件下载
    print("\n2. 多文件下载:")
    test_urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/xml",
        "https://httpbin.org/html"
    ]
    
    downloader.download_multiple(test_urls)

# 运行文件下载器演示
file_downloader_demo()

# 7. 学习建议和总结

# 7.1 学习路径

def learning_path():
    """网络编程学习路径"""
    print("=== 网络编程学习路径 ===")
    
    learning_stages = {
        "基础阶段": [
            "理解网络协议基础(TCP/IP、HTTP)",
            "掌握socket编程基础",
            "学习urllib模块使用",
            "了解客户端-服务器架构"
        ],
        "进阶阶段": [
            "掌握异步网络编程(asyncio)",
            "学习WebSocket编程",
            "了解网络安全基础",
            "掌握SSL/TLS编程"
        ],
        "高级阶段": [
            "学习网络框架(如aiohttp、tornado)",
            "掌握微服务架构",
            "了解负载均衡和集群",
            "学习网络监控和调试"
        ],
        "实战阶段": [
            "开发Web API客户端",
            "构建网络爬虫",
            "实现聊天应用",
            "开发分布式系统"
        ]
    }
    
    for stage, topics in learning_stages.items():
        print(f"\n{stage}:")
        for i, topic in enumerate(topics, 1):
            print(f"  {i}. {topic}")

def best_practices():
    """网络编程最佳实践"""
    print("\n=== 网络编程最佳实践 ===")
    
    practices = {
        "性能优化": [
            "使用连接池复用连接",
            "实施适当的超时设置",
            "使用异步编程提高并发性",
            "合理设置缓冲区大小",
            "避免阻塞操作"
        ],
        "错误处理": [
            "捕获和处理网络异常",
            "实施重试机制",
            "记录详细的错误日志",
            "提供友好的错误信息",
            "实施熔断器模式"
        ],
        "安全考虑": [
            "始终使用HTTPS传输敏感数据",
            "验证和清理用户输入",
            "实施认证和授权",
            "使用安全的随机数",
            "定期更新依赖库"
        ],
        "代码质量": [
            "编写清晰的文档",
            "使用类型提示",
            "编写单元测试",
            "遵循编码规范",
            "进行代码审查"
        ]
    }
    
    for category, items in practices.items():
        print(f"\n{category}:")
        for i, item in enumerate(items, 1):
            print(f"  {i}. {item}")

def common_pitfalls():
    """常见陷阱和解决方案"""
    print("\n=== 常见陷阱和解决方案 ===")
    
    pitfalls = {
        "阻塞操作": {
            "问题": "同步网络操作阻塞程序执行",
            "解决方案": "使用异步编程或多线程"
        },
        "资源泄漏": {
            "问题": "未正确关闭网络连接",
            "解决方案": "使用上下文管理器或try-finally"
        },
        "超时设置": {
            "问题": "未设置适当的超时时间",
            "解决方案": "根据应用场景设置合理超时"
        },
        "错误处理": {
            "问题": "未处理网络异常",
            "解决方案": "捕获并适当处理各种网络异常"
        },
        "安全漏洞": {
            "问题": "未验证用户输入或使用不安全协议",
            "解决方案": "实施输入验证和使用安全协议"
        }
    }
    
    for pitfall, details in pitfalls.items():
        print(f"\n{pitfall}:")
        print(f"  问题: {details['问题']}")
        print(f"  解决方案: {details['解决方案']}")

def chapter_summary():
    """本章总结"""
    print("\n=== 本章总结 ===")
    
    summary_points = [
        "网络编程是现代应用开发的重要技能",
        "Python提供了丰富的网络编程库和工具",
        "Socket编程是网络编程的基础",
        "HTTP编程适用于Web应用开发",
        "异步编程可以提高网络应用的性能",
        "网络安全是网络编程中的重要考虑因素",
        "实际项目中需要综合考虑性能、安全和可维护性",
        "持续学习和实践是掌握网络编程的关键"
    ]
    
    for i, point in enumerate(summary_points, 1):
        print(f"{i}. {point}")
    
    print("\n通过本章的学习,你应该能够:")
    skills = [
        "理解网络编程的基本概念和原理",
        "使用socket进行TCP和UDP编程",
        "使用urllib和http.client进行HTTP编程",
        "开发简单的网络应用和爬虫",
        "了解异步网络编程的基础",
        "实施基本的网络安全措施",
        "调试和优化网络应用"
    ]
    
    for i, skill in enumerate(skills, 1):
        print(f"  {i}. {skill}")

# 运行学习建议和总结
learning_path()
best_practices()
common_pitfalls()
chapter_summary()