第16天-常用第三方模块

2023/6/15

# 第16天-常用第三方模块

# 学习目标

通过本章学习,你将掌握:

  • 第三方模块的安装和管理
  • requests模块进行HTTP请求
  • pandas进行数据分析和处理
  • numpy进行数值计算
  • matplotlib进行数据可视化
  • 其他常用第三方模块的使用
  • 虚拟环境的创建和管理
  • 包管理的最佳实践

# 一、第三方模块概述

# 1.1 什么是第三方模块

def third_party_modules_intro():
    """第三方模块介绍"""
    print("=== 第三方模块概述 ===")
    
    concepts = {
        "定义": "由Python社区开发的扩展库,不包含在Python标准库中",
        "特点": [
            "功能强大且专业化",
            "活跃的社区支持",
            "持续更新和维护",
            "丰富的文档和示例"
        ],
        "优势": [
            "避免重复造轮子",
            "提高开发效率",
            "获得专业级功能",
            "学习最佳实践"
        ],
        "分类": {
            "网络请求": ["requests", "urllib3", "httpx"],
            "数据分析": ["pandas", "numpy", "scipy"],
            "数据可视化": ["matplotlib", "seaborn", "plotly"],
            "Web框架": ["Django", "Flask", "FastAPI"],
            "机器学习": ["scikit-learn", "tensorflow", "pytorch"],
            "图像处理": ["Pillow", "opencv-python", "imageio"],
            "数据库": ["SQLAlchemy", "pymongo", "redis"],
            "测试工具": ["pytest", "unittest2", "mock"]
        }
    }
    
    print(f"\n定义: {concepts['定义']}")
    
    print("\n特点:")
    for feature in concepts['特点']:
        print(f"  • {feature}")
    
    print("\n优势:")
    for advantage in concepts['优势']:
        print(f"  • {advantage}")
    
    print("\n常用分类:")
    for category, modules in concepts['分类'].items():
        print(f"  {category}: {', '.join(modules)}")

# 运行第三方模块介绍
third_party_modules_intro()

# 1.2 包管理工具

def package_management_demo():
    """包管理工具演示"""
    print("=== 包管理工具 ===")
    
    # pip基本命令
    pip_commands = {
        "安装包": [
            "pip install package_name",
            "pip install package_name==1.0.0  # 指定版本",
            "pip install package_name>=1.0.0  # 最低版本",
            "pip install -r requirements.txt  # 从文件安装"
        ],
        "查看包": [
            "pip list  # 列出所有已安装的包",
            "pip show package_name  # 显示包详细信息",
            "pip search package_name  # 搜索包(已废弃)"
        ],
        "升级包": [
            "pip install --upgrade package_name",
            "pip install -U package_name  # 简写",
            "pip list --outdated  # 查看过期包"
        ],
        "卸载包": [
            "pip uninstall package_name",
            "pip uninstall -r requirements.txt"
        ],
        "导出依赖": [
            "pip freeze > requirements.txt",
            "pip freeze --local > requirements.txt  # 只导出本地包"
        ]
    }
    
    for category, commands in pip_commands.items():
        print(f"\n{category}:")
        for cmd in commands:
            print(f"  {cmd}")
    
    # requirements.txt示例
    print("\nrequirements.txt示例:")
    requirements_example = """
# Web开发
Django==4.2.0
Flask>=2.0.0
requests==2.31.0

# 数据分析
pandas>=1.5.0
numpy>=1.24.0
matplotlib>=3.6.0

# 机器学习
scikit-learn>=1.2.0
tensorflow>=2.12.0

# 开发工具
pytest>=7.0.0
black>=23.0.0
flake8>=6.0.0
    """
    print(requirements_example)
    
    # 虚拟环境管理
    print("\n虚拟环境管理:")
    venv_commands = [
        "# 创建虚拟环境",
        "python -m venv myenv",
        "python -m venv --system-site-packages myenv  # 继承系统包",
        "",
        "# 激活虚拟环境",
        "# Windows:",
        "myenv\\Scripts\\activate",
        "# Linux/Mac:",
        "source myenv/bin/activate",
        "",
        "# 停用虚拟环境",
        "deactivate",
        "",
        "# 删除虚拟环境",
        "rm -rf myenv  # Linux/Mac",
        "rmdir /s myenv  # Windows"
    ]
    
    for cmd in venv_commands:
        print(f"  {cmd}")

# 运行包管理演示
package_management_demo()

# 二、requests模块 - HTTP请求

# 2.1 基本HTTP请求

# 首先需要安装: pip install requests
import requests
import json
from urllib.parse import urljoin

def requests_basic_demo():
    """requests基本使用演示"""
    print("=== requests基本HTTP请求 ===")
    
    # 1. GET请求
    print("\n1. GET请求:")
    
    try:
        # 基本GET请求
        response = requests.get('https://httpbin.org/get')
        print(f"  状态码: {response.status_code}")
        print(f"  响应头: {dict(list(response.headers.items())[:3])}...")
        print(f"  响应内容类型: {response.headers.get('content-type')}")
        
        # 带参数的GET请求
        params = {
            'name': '张三',
            'age': 25,
            'city': '北京'
        }
        response = requests.get('https://httpbin.org/get', params=params)
        data = response.json()
        print(f"  请求URL: {data['url']}")
        print(f"  查询参数: {data['args']}")
        
    except requests.exceptions.RequestException as e:
        print(f"  请求失败: {e}")
    
    # 2. POST请求
    print("\n2. POST请求:")
    
    try:
        # 发送JSON数据
        json_data = {
            'username': 'testuser',
            'password': 'testpass',
            'email': 'test@example.com'
        }
        
        response = requests.post(
            'https://httpbin.org/post',
            json=json_data,
            headers={'Content-Type': 'application/json'}
        )
        
        result = response.json()
        print(f"  发送的JSON: {result['json']}")
        print(f"  请求头: {result['headers']['Content-Type']}")
        
        # 发送表单数据
        form_data = {
            'name': '李四',
            'message': '这是一条测试消息'
        }
        
        response = requests.post(
            'https://httpbin.org/post',
            data=form_data
        )
        
        result = response.json()
        print(f"  表单数据: {result['form']}")
        
    except requests.exceptions.RequestException as e:
        print(f"  POST请求失败: {e}")
    
    # 3. 其他HTTP方法
    print("\n3. 其他HTTP方法:")
    
    methods = {
        'PUT': lambda: requests.put('https://httpbin.org/put', json={'data': 'updated'}),
        'DELETE': lambda: requests.delete('https://httpbin.org/delete'),
        'PATCH': lambda: requests.patch('https://httpbin.org/patch', json={'field': 'patched'}),
        'HEAD': lambda: requests.head('https://httpbin.org/get'),
        'OPTIONS': lambda: requests.options('https://httpbin.org/get')
    }
    
    for method_name, method_func in methods.items():
        try:
            response = method_func()
            print(f"  {method_name}: 状态码 {response.status_code}")
        except requests.exceptions.RequestException as e:
            print(f"  {method_name}: 请求失败 {e}")

# 运行requests基本演示
requests_basic_demo()

# 2.2 高级功能

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import time

def requests_advanced_demo():
    """requests高级功能演示"""
    print("=== requests高级功能 ===")
    
    # 1. 会话管理
    print("\n1. 会话管理:")
    
    # 创建会话
    session = requests.Session()
    
    # 设置默认头部
    session.headers.update({
        'User-Agent': 'MyApp/1.0',
        'Accept': 'application/json'
    })
    
    try:
        # 使用会话发送请求
        response = session.get('https://httpbin.org/headers')
        headers_info = response.json()
        print(f"  会话头部: {headers_info['headers']['User-Agent']}")
        
        # 会话中的Cookie会自动保持
        session.get('https://httpbin.org/cookies/set/session_id/12345')
        response = session.get('https://httpbin.org/cookies')
        cookies_info = response.json()
        print(f"  会话Cookie: {cookies_info['cookies']}")
        
    except requests.exceptions.RequestException as e:
        print(f"  会话请求失败: {e}")
    finally:
        session.close()
    
    # 2. 超时和重试
    print("\n2. 超时和重试:")
    
    # 设置超时
    try:
        # 连接超时5秒,读取超时10秒
        response = requests.get(
            'https://httpbin.org/delay/2',
            timeout=(5, 10)
        )
        print(f"  超时请求成功: {response.status_code}")
    except requests.exceptions.Timeout:
        print("  请求超时")
    except requests.exceptions.RequestException as e:
        print(f"  请求失败: {e}")
    
    # 配置重试策略
    def create_session_with_retry():
        session = requests.Session()
        
        # 重试策略
        retry_strategy = Retry(
            total=3,  # 总重试次数
            backoff_factor=1,  # 重试间隔
            status_forcelist=[429, 500, 502, 503, 504],  # 需要重试的状态码
        )
        
        # 添加重试适配器
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount('http://', adapter)
        session.mount('https://', adapter)
        
        return session
    
    retry_session = create_session_with_retry()
    
    try:
        response = retry_session.get(
            'https://httpbin.org/status/500',
            timeout=10
        )
        print(f"  重试请求结果: {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"  重试后仍失败: {e}")
    finally:
        retry_session.close()
    
    # 3. 文件上传和下载
    print("\n3. 文件上传和下载:")
    
    # 模拟文件上传
    try:
        # 创建测试文件内容
        files = {
            'file': ('test.txt', 'Hello, World!', 'text/plain')
        }
        
        response = requests.post(
            'https://httpbin.org/post',
            files=files
        )
        
        result = response.json()
        print(f"  上传文件信息: {result['files']}")
        
    except requests.exceptions.RequestException as e:
        print(f"  文件上传失败: {e}")
    
    # 流式下载
    def download_file_stream(url, filename):
        """流式下载文件"""
        try:
            with requests.get(url, stream=True) as response:
                response.raise_for_status()
                
                total_size = int(response.headers.get('content-length', 0))
                downloaded = 0
                
                with open(filename, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)
                            downloaded += len(chunk)
                            
                            if total_size > 0:
                                progress = (downloaded / total_size) * 100
                                print(f"\r  下载进度: {progress:.1f}%", end='')
                
                print(f"\n  文件下载完成: {filename}")
                return True
                
        except requests.exceptions.RequestException as e:
            print(f"  下载失败: {e}")
            return False
    
    # 示例:下载小文件
    print("\n  开始下载示例文件...")
    success = download_file_stream(
        'https://httpbin.org/json',
        'example.json'
    )
    
    if success:
        try:
            with open('example.json', 'r') as f:
                content = f.read()
                print(f"  下载内容预览: {content[:100]}...")
        except Exception as e:
            print(f"  读取文件失败: {e}")
    
    # 4. 认证和代理
    print("\n4. 认证和代理:")
    
    # HTTP基本认证
    try:
        response = requests.get(
            'https://httpbin.org/basic-auth/user/pass',
            auth=('user', 'pass')
        )
        print(f"  基本认证: {response.status_code}")
        
        auth_info = response.json()
        print(f"  认证用户: {auth_info['user']}")
        
    except requests.exceptions.RequestException as e:
        print(f"  认证失败: {e}")
    
    # 代理设置示例(不实际使用)
    proxy_config = {
        'http': 'http://proxy.example.com:8080',
        'https': 'https://proxy.example.com:8080'
    }
    
    print(f"  代理配置示例: {proxy_config}")
    
    # 5. 错误处理
    print("\n5. 错误处理:")
    
    def safe_request(url, **kwargs):
        """安全的请求函数"""
        try:
            response = requests.get(url, **kwargs)
            response.raise_for_status()  # 检查HTTP错误
            return response
            
        except requests.exceptions.ConnectionError:
            print(f"  连接错误: 无法连接到 {url}")
        except requests.exceptions.Timeout:
            print(f"  超时错误: 请求 {url} 超时")
        except requests.exceptions.HTTPError as e:
            print(f"  HTTP错误: {e}")
        except requests.exceptions.RequestException as e:
            print(f"  请求异常: {e}")
        
        return None
    
    # 测试错误处理
    test_urls = [
        'https://httpbin.org/status/404',  # 404错误
        'https://httpbin.org/delay/1',     # 正常请求
        'https://nonexistent.example.com'  # 连接错误
    ]
    
    for url in test_urls:
        print(f"\n  测试URL: {url}")
        response = safe_request(url, timeout=5)
        if response:
            print(f"    成功: 状态码 {response.status_code}")
        else:
            print(f"    失败")

# 运行requests高级演示
requests_advanced_demo()

# 2.3 实际应用示例

import requests
import json
import time
from datetime import datetime

class APIClient:
    """API客户端封装"""
    
    def __init__(self, base_url, api_key=None, timeout=30):
        self.base_url = base_url.rstrip('/')
        self.session = requests.Session()
        self.timeout = timeout
        
        # 设置默认头部
        self.session.headers.update({
            'User-Agent': 'APIClient/1.0',
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        })
        
        # 设置API密钥
        if api_key:
            self.session.headers['Authorization'] = f'Bearer {api_key}'
    
    def _make_request(self, method, endpoint, **kwargs):
        """发送请求的内部方法"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        # 设置默认超时
        kwargs.setdefault('timeout', self.timeout)
        
        try:
            response = self.session.request(method, url, **kwargs)
            response.raise_for_status()
            
            # 记录请求日志
            print(f"[{datetime.now()}] {method} {url} -> {response.status_code}")
            
            return response
            
        except requests.exceptions.RequestException as e:
            print(f"[{datetime.now()}] 请求失败: {method} {url} -> {e}")
            raise
    
    def get(self, endpoint, params=None):
        """GET请求"""
        return self._make_request('GET', endpoint, params=params)
    
    def post(self, endpoint, data=None, json_data=None):
        """POST请求"""
        kwargs = {}
        if json_data:
            kwargs['json'] = json_data
        elif data:
            kwargs['data'] = data
        
        return self._make_request('POST', endpoint, **kwargs)
    
    def put(self, endpoint, data=None, json_data=None):
        """PUT请求"""
        kwargs = {}
        if json_data:
            kwargs['json'] = json_data
        elif data:
            kwargs['data'] = data
        
        return self._make_request('PUT', endpoint, **kwargs)
    
    def delete(self, endpoint):
        """DELETE请求"""
        return self._make_request('DELETE', endpoint)
    
    def close(self):
        """关闭会话"""
        self.session.close()
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

def api_client_demo():
    """API客户端演示"""
    print("=== API客户端应用示例 ===")
    
    # 使用上下文管理器
    with APIClient('https://httpbin.org') as client:
        
        # 1. 获取数据
        print("\n1. 获取数据:")
        try:
            response = client.get('/get', params={'page': 1, 'limit': 10})
            data = response.json()
            print(f"  请求参数: {data['args']}")
        except Exception as e:
            print(f"  获取数据失败: {e}")
        
        # 2. 提交数据
        print("\n2. 提交数据:")
        try:
            user_data = {
                'name': '张三',
                'email': 'zhangsan@example.com',
                'age': 25
            }
            
            response = client.post('/post', json_data=user_data)
            result = response.json()
            print(f"  提交的数据: {result['json']}")
        except Exception as e:
            print(f"  提交数据失败: {e}")
        
        # 3. 更新数据
        print("\n3. 更新数据:")
        try:
            update_data = {
                'name': '张三',
                'age': 26  # 更新年龄
            }
            
            response = client.put('/put', json_data=update_data)
            result = response.json()
            print(f"  更新的数据: {result['json']}")
        except Exception as e:
            print(f"  更新数据失败: {e}")
        
        # 4. 删除数据
        print("\n4. 删除数据:")
        try:
            response = client.delete('/delete')
            print(f"  删除操作状态: {response.status_code}")
        except Exception as e:
            print(f"  删除数据失败: {e}")

# 运行API客户端演示
api_client_demo()

# 三、pandas模块 - 数据分析

# 3.1 基础数据结构

# 首先需要安装: pip install pandas numpy
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

def pandas_basic_demo():
    """pandas基础数据结构演示"""
    print("=== pandas基础数据结构 ===")
    
    # 1. Series - 一维数据
    print("\n1. Series - 一维数据:")
    
    # 创建Series
    numbers = pd.Series([1, 2, 3, 4, 5])
    print(f"  数字Series:\n{numbers}")
    
    # 带索引的Series
    scores = pd.Series(
        [85, 92, 78, 95, 88],
        index=['数学', '英语', '物理', '化学', '生物']
    )
    print(f"\n  成绩Series:\n{scores}")
    
    # 从字典创建Series
    student_info = pd.Series({
        '姓名': '张三',
        '年龄': 20,
        '专业': '计算机科学',
        '年级': '大二'
    })
    print(f"\n  学生信息Series:\n{student_info}")
    
    # Series基本操作
    print(f"\n  Series基本操作:")
    print(f"    数据类型: {scores.dtype}")
    print(f"    形状: {scores.shape}")
    print(f"    大小: {scores.size}")
    print(f"    索引: {list(scores.index)}")
    print(f"    值: {list(scores.values)}")
    print(f"    最大值: {scores.max()}")
    print(f"    最小值: {scores.min()}")
    print(f"    平均值: {scores.mean():.2f}")
    print(f"    标准差: {scores.std():.2f}")
    
    # 2. DataFrame - 二维数据
    print("\n2. DataFrame - 二维数据:")
    
    # 从字典创建DataFrame
    students_data = {
        '姓名': ['张三', '李四', '王五', '赵六', '钱七'],
        '年龄': [20, 21, 19, 22, 20],
        '专业': ['计算机', '数学', '物理', '化学', '生物'],
        '成绩': [85, 92, 78, 95, 88]
    }
    
    df = pd.DataFrame(students_data)
    print(f"  学生DataFrame:\n{df}")
    
    # DataFrame基本信息
    print(f"\n  DataFrame基本信息:")
    print(f"    形状: {df.shape}")
    print(f"    列名: {list(df.columns)}")
    print(f"    索引: {list(df.index)}")
    print(f"    数据类型:\n{df.dtypes}")
    
    # 查看数据概览
    print(f"\n  数据概览:")
    print(f"    前3行:\n{df.head(3)}")
    print(f"\n    后2行:\n{df.tail(2)}")
    print(f"\n    统计信息:\n{df.describe()}")
    print(f"\n    基本信息:")
    df.info()
    
    # 3. 索引和选择
    print("\n3. 索引和选择:")
    
    # 选择列
    print(f"  选择单列 - 姓名:\n{df['姓名']}")
    print(f"\n  选择多列:\n{df[['姓名', '成绩']]}")
    
    # 选择行
    print(f"\n  选择行 - 第2行:\n{df.iloc[1]}")
    print(f"\n  选择多行:\n{df.iloc[1:4]}")
    
    # 条件选择
    high_scores = df[df['成绩'] >= 90]
    print(f"\n  高分学生 (成绩>=90):\n{high_scores}")
    
    # 复合条件
    young_high_scores = df[(df['年龄'] <= 20) & (df['成绩'] >= 85)]
    print(f"\n  年轻高分学生:\n{young_high_scores}")
    
    # 4. 数据操作
    print("\n4. 数据操作:")
    
    # 添加新列
    df['等级'] = df['成绩'].apply(lambda x: 'A' if x >= 90 else 'B' if x >= 80 else 'C')
    print(f"  添加等级列:\n{df}")
    
    # 排序
    df_sorted = df.sort_values('成绩', ascending=False)
    print(f"\n  按成绩降序排列:\n{df_sorted}")
    
    # 分组统计
    grade_stats = df.groupby('等级')['成绩'].agg(['count', 'mean', 'min', 'max'])
    print(f"\n  按等级分组统计:\n{grade_stats}")

# 运行pandas基础演示
pandas_basic_demo()

# 3.2 数据处理和清洗

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

def pandas_data_processing_demo():
    """pandas数据处理和清洗演示"""
    print("=== pandas数据处理和清洗 ===")
    
    # 创建包含缺失值和异常值的示例数据
    np.random.seed(42)
    
    data = {
        '日期': pd.date_range('2023-01-01', periods=100, freq='D'),
        '销售额': np.random.normal(1000, 200, 100),
        '客户数': np.random.poisson(50, 100),
        '地区': np.random.choice(['北京', '上海', '广州', '深圳'], 100),
        '产品': np.random.choice(['A', 'B', 'C'], 100)
    }
    
    df = pd.DataFrame(data)
    
    # 人为添加一些缺失值和异常值
    df.loc[5:10, '销售额'] = np.nan
    df.loc[15, '客户数'] = -5  # 异常值
    df.loc[25, '销售额'] = 10000  # 异常值
    
    print(f"原始数据形状: {df.shape}")
    print(f"前5行数据:\n{df.head()}")
    
    # 1. 缺失值处理
    print("\n1. 缺失值处理:")
    
    # 检查缺失值
    missing_info = df.isnull().sum()
    print(f"  各列缺失值数量:\n{missing_info}")
    
    # 缺失值比例
    missing_percent = (df.isnull().sum() / len(df)) * 100
    print(f"\n  各列缺失值比例:\n{missing_percent}")
    
    # 处理缺失值的不同方法
    df_processed = df.copy()
    
    # 删除包含缺失值的行
    df_dropna = df.dropna()
    print(f"\n  删除缺失值后形状: {df_dropna.shape}")
    
    # 用均值填充数值列的缺失值
    df_processed['销售额'].fillna(df_processed['销售额'].mean(), inplace=True)
    
    # 用前一个值填充
    df_processed['销售额'].fillna(method='ffill', inplace=True)
    
    # 用后一个值填充
    df_processed['销售额'].fillna(method='bfill', inplace=True)
    
    print(f"  填充后缺失值数量: {df_processed.isnull().sum().sum()}")
    
    # 2. 异常值检测和处理
    print("\n2. 异常值检测和处理:")
    
    # 使用IQR方法检测异常值
    def detect_outliers_iqr(series):
        Q1 = series.quantile(0.25)
        Q3 = series.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        return (series < lower_bound) | (series > upper_bound)
    
    # 检测销售额异常值
    sales_outliers = detect_outliers_iqr(df_processed['销售额'])
    print(f"  销售额异常值数量: {sales_outliers.sum()}")
    print(f"  异常值: {df_processed.loc[sales_outliers, '销售额'].values}")
    
    # 检测客户数异常值(负值)
    customer_outliers = df_processed['客户数'] < 0
    print(f"  客户数异常值数量: {customer_outliers.sum()}")
    
    # 处理异常值
    # 方法1: 删除异常值
    df_no_outliers = df_processed[~(sales_outliers | customer_outliers)]
    print(f"  删除异常值后形状: {df_no_outliers.shape}")
    
    # 方法2: 用边界值替换异常值
    df_capped = df_processed.copy()
    
    # 销售额异常值用95%分位数替换
    sales_95th = df_capped['销售额'].quantile(0.95)
    df_capped.loc[df_capped['销售额'] > sales_95th, '销售额'] = sales_95th
    
    # 客户数负值用0替换
    df_capped.loc[df_capped['客户数'] < 0, '客户数'] = 0
    
    print(f"  替换异常值后统计:\n{df_capped[['销售额', '客户数']].describe()}")
    
    # 3. 数据类型转换
    print("\n3. 数据类型转换:")
    
    print(f"  原始数据类型:\n{df_processed.dtypes}")
    
    # 转换数据类型
    df_converted = df_processed.copy()
    df_converted['客户数'] = df_converted['客户数'].astype('int32')
    df_converted['地区'] = df_converted['地区'].astype('category')
    df_converted['产品'] = df_converted['产品'].astype('category')
    
    print(f"\n  转换后数据类型:\n{df_converted.dtypes}")
    
    # 内存使用对比
    print(f"\n  内存使用对比:")
    print(f"    原始: {df_processed.memory_usage(deep=True).sum() / 1024:.2f} KB")
    print(f"    转换后: {df_converted.memory_usage(deep=True).sum() / 1024:.2f} KB")
    
    # 4. 数据重塑
    print("\n4. 数据重塑:")
    
    # 透视表
    pivot_table = df_converted.pivot_table(
        values='销售额',
        index='地区',
        columns='产品',
        aggfunc=['mean', 'sum'],
        fill_value=0
    )
    print(f"  透视表:\n{pivot_table}")
    
    # 分组聚合
    grouped_stats = df_converted.groupby(['地区', '产品']).agg({
        '销售额': ['count', 'mean', 'sum', 'std'],
        '客户数': ['mean', 'sum']
    }).round(2)
    print(f"\n  分组统计:\n{grouped_stats.head(10)}")
    
    # 5. 时间序列处理
    print("\n5. 时间序列处理:")
    
    # 设置日期为索引
    df_ts = df_converted.set_index('日期')
    
    # 重采样 - 按周汇总
    weekly_sales = df_ts['销售额'].resample('W').agg({
        '总销售额': 'sum',
        '平均销售额': 'mean',
        '最大销售额': 'max'
    })
    print(f"  周度销售统计:\n{weekly_sales.head()}")
    
    # 滚动窗口计算
    df_ts['销售额_7日均值'] = df_ts['销售额'].rolling(window=7).mean()
    df_ts['销售额_7日标准差'] = df_ts['销售额'].rolling(window=7).std()
    
    print(f"\n  滚动统计示例:\n{df_ts[['销售额', '销售额_7日均值', '销售额_7日标准差']].head(10)}")
    
    # 6. 数据合并
    print("\n6. 数据合并:")
    
    # 创建额外的数据表
    region_info = pd.DataFrame({
        '地区': ['北京', '上海', '广州', '深圳'],
        '人口': [2154, 2424, 1530, 1756],  # 万人
        'GDP': [4.0, 4.3, 2.9, 3.2]  # 万亿元
    })
    
    # 合并数据
    df_merged = df_converted.merge(region_info, on='地区', how='left')
    print(f"  合并后数据:\n{df_merged.head()}")
    
    # 计算人均销售额
    df_merged['人均销售额'] = df_merged['销售额'] / df_merged['人口']
    
    # 按地区统计
    region_summary = df_merged.groupby('地区').agg({
        '销售额': 'sum',
        '客户数': 'sum',
        '人口': 'first',
        'GDP': 'first',
        '人均销售额': 'mean'
    }).round(2)
    
    print(f"\n  地区汇总统计:\n{region_summary}")

# 运行pandas数据处理演示
pandas_data_processing_demo()

# 3.3 数据分析实例

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

class SalesAnalyzer:
    """销售数据分析器"""
    
    def __init__(self, data_file=None):
        self.df = None
        if data_file:
            self.load_data(data_file)
        else:
            self.generate_sample_data()
    
    def generate_sample_data(self, n_records=1000):
        """生成示例销售数据"""
        np.random.seed(42)
        
        # 生成日期范围
        start_date = datetime(2023, 1, 1)
        dates = [start_date + timedelta(days=x) for x in range(365)]
        
        data = []
        for _ in range(n_records):
            record = {
                '订单ID': f'ORD{_+1:06d}',
                '日期': np.random.choice(dates),
                '客户ID': f'CUST{np.random.randint(1, 201):04d}',
                '产品类别': np.random.choice(['电子产品', '服装', '家居', '图书', '食品'], p=[0.3, 0.25, 0.2, 0.15, 0.1]),
                '产品名称': f'产品{np.random.randint(1, 101):03d}',
                '数量': np.random.randint(1, 11),
                '单价': np.random.uniform(10, 1000),
                '地区': np.random.choice(['华北', '华东', '华南', '华中', '西南', '西北', '东北']),
                '销售员': f'员工{np.random.randint(1, 21):02d}',
                '渠道': np.random.choice(['线上', '线下'], p=[0.6, 0.4])
            }
            data.append(record)
        
        self.df = pd.DataFrame(data)
        self.df['销售额'] = self.df['数量'] * self.df['单价']
        self.df['日期'] = pd.to_datetime(self.df['日期'])
        
        print(f"生成了 {len(self.df)} 条销售记录")
    
    def load_data(self, file_path):
        """加载数据文件"""
        try:
            if file_path.endswith('.csv'):
                self.df = pd.read_csv(file_path)
            elif file_path.endswith('.xlsx'):
                self.df = pd.read_excel(file_path)
            else:
                raise ValueError("不支持的文件格式")
            
            # 确保日期列是datetime类型
            if '日期' in self.df.columns:
                self.df['日期'] = pd.to_datetime(self.df['日期'])
            
            print(f"成功加载 {len(self.df)} 条记录")
            
        except Exception as e:
            print(f"加载数据失败: {e}")
    
    def basic_analysis(self):
        """基础分析"""
        print("=== 基础销售分析 ===")
        
        if self.df is None:
            print("没有数据可分析")
            return
        
        # 基本统计
        print(f"\n数据概览:")
        print(f"  记录数: {len(self.df):,}")
        print(f"  时间范围: {self.df['日期'].min()}{self.df['日期'].max()}")
        print(f"  总销售额: ¥{self.df['销售额'].sum():,.2f}")
        print(f"  平均订单金额: ¥{self.df['销售额'].mean():.2f}")
        print(f"  客户数量: {self.df['客户ID'].nunique():,}")
        print(f"  产品数量: {self.df['产品名称'].nunique():,}")
        
        # 销售额分布
        print(f"\n销售额统计:")
        sales_stats = self.df['销售额'].describe()
        for stat, value in sales_stats.items():
            print(f"  {stat}: ¥{value:.2f}")
    
    def time_analysis(self):
        """时间维度分析"""
        print("\n=== 时间维度分析 ===")
        
        # 按月统计
        monthly_sales = self.df.groupby(self.df['日期'].dt.to_period('M')).agg({
            '销售额': 'sum',
            '订单ID': 'count',
            '客户ID': 'nunique'
        }).round(2)
        monthly_sales.columns = ['月销售额', '订单数', '客户数']
        
        print(f"\n月度销售统计:")
        print(monthly_sales.head(10))
        
        # 按星期几统计
        self.df['星期几'] = self.df['日期'].dt.day_name()
        weekday_sales = self.df.groupby('星期几')['销售额'].agg(['sum', 'mean', 'count']).round(2)
        weekday_sales.columns = ['总销售额', '平均销售额', '订单数']
        
        print(f"\n星期销售统计:")
        print(weekday_sales)
        
        # 销售趋势
        daily_sales = self.df.groupby('日期')['销售额'].sum()
        
        # 计算移动平均
        daily_sales_ma7 = daily_sales.rolling(window=7).mean()
        daily_sales_ma30 = daily_sales.rolling(window=30).mean()
        
        print(f"\n销售趋势分析:")
        print(f"  最高单日销售额: ¥{daily_sales.max():.2f} ({daily_sales.idxmax()})")
        print(f"  最低单日销售额: ¥{daily_sales.min():.2f} ({daily_sales.idxmin()})")
        print(f"  7日移动平均: ¥{daily_sales_ma7.iloc[-1]:.2f}")
        print(f"  30日移动平均: ¥{daily_sales_ma30.iloc[-1]:.2f}")
    
    def product_analysis(self):
        """产品维度分析"""
        print("\n=== 产品维度分析 ===")
        
        # 按产品类别统计
        category_stats = self.df.groupby('产品类别').agg({
            '销售额': ['sum', 'mean', 'count'],
            '数量': 'sum',
            '客户ID': 'nunique'
        }).round(2)
        
        category_stats.columns = ['总销售额', '平均销售额', '订单数', '总数量', '客户数']
        category_stats = category_stats.sort_values('总销售额', ascending=False)
        
        print(f"\n产品类别统计:")
        print(category_stats)
        
        # 计算类别占比
        category_stats['销售额占比'] = (category_stats['总销售额'] / category_stats['总销售额'].sum() * 100).round(2)
        print(f"\n产品类别销售额占比:")
        for category, row in category_stats.iterrows():
            print(f"  {category}: {row['销售额占比']}%")
        
        # 热销产品TOP10
        top_products = self.df.groupby('产品名称').agg({
            '销售额': 'sum',
            '数量': 'sum',
            '订单ID': 'count'
        }).round(2)
        top_products.columns = ['总销售额', '总数量', '订单数']
        top_products = top_products.sort_values('总销售额', ascending=False).head(10)
        
        print(f"\n热销产品TOP10:")
        print(top_products)
    
    def customer_analysis(self):
        """客户维度分析"""
        print("\n=== 客户维度分析 ===")
        
        # 客户价值分析
        customer_stats = self.df.groupby('客户ID').agg({
            '销售额': 'sum',
            '订单ID': 'count',
            '日期': ['min', 'max']
        }).round(2)
        
        customer_stats.columns = ['总消费额', '订单数', '首次购买', '最后购买']
        customer_stats['平均订单金额'] = (customer_stats['总消费额'] / customer_stats['订单数']).round(2)
        
        # 计算客户活跃天数
        customer_stats['活跃天数'] = (customer_stats['最后购买'] - customer_stats['首次购买']).dt.days + 1
        
        print(f"\n客户统计概览:")
        print(f"  总客户数: {len(customer_stats):,}")
        print(f"  平均客户价值: ¥{customer_stats['总消费额'].mean():.2f}")
        print(f"  平均订单数: {customer_stats['订单数'].mean():.2f}")
        print(f"  平均订单金额: ¥{customer_stats['平均订单金额'].mean():.2f}")
        
        # 客户分层(RFM简化版)
        # R: Recency (最近购买时间)
        # F: Frequency (购买频率)
        # M: Monetary (消费金额)
        
        latest_date = self.df['日期'].max()
        customer_stats['最近购买天数'] = (latest_date - customer_stats['最后购买']).dt.days
        
        # 客户分层
        def customer_segment(row):
            if row['总消费额'] >= customer_stats['总消费额'].quantile(0.8):
                if row['最近购买天数'] <= 30:
                    return '高价值活跃客户'
                else:
                    return '高价值沉睡客户'
            elif row['总消费额'] >= customer_stats['总消费额'].quantile(0.5):
                if row['最近购买天数'] <= 60:
                    return '中价值活跃客户'
                else:
                    return '中价值沉睡客户'
            else:
                if row['最近购买天数'] <= 90:
                    return '低价值活跃客户'
                else:
                    return '低价值沉睡客户'
        
        customer_stats['客户分层'] = customer_stats.apply(customer_segment, axis=1)
        
        segment_stats = customer_stats.groupby('客户分层').agg({
            '总消费额': ['count', 'sum', 'mean'],
            '订单数': 'mean',
            '平均订单金额': 'mean'
        }).round(2)
        
        print(f"\n客户分层统计:")
        print(segment_stats)
        
        # TOP客户
        top_customers = customer_stats.sort_values('总消费额', ascending=False).head(10)
        print(f"\nTOP10客户:")
        print(top_customers[['总消费额', '订单数', '平均订单金额', '客户分层']])
    
    def regional_analysis(self):
        """地区维度分析"""
        print("\n=== 地区维度分析 ===")
        
        # 地区销售统计
        region_stats = self.df.groupby('地区').agg({
            '销售额': ['sum', 'mean', 'count'],
            '客户ID': 'nunique',
            '销售员': 'nunique'
        }).round(2)
        
        region_stats.columns = ['总销售额', '平均销售额', '订单数', '客户数', '销售员数']
        region_stats = region_stats.sort_values('总销售额', ascending=False)
        
        # 计算地区占比
        region_stats['销售额占比'] = (region_stats['总销售额'] / region_stats['总销售额'].sum() * 100).round(2)
        region_stats['人均销售额'] = (region_stats['总销售额'] / region_stats['客户数']).round(2)
        
        print(f"\n地区销售统计:")
        print(region_stats)
        
        # 渠道分析
        channel_stats = self.df.groupby(['地区', '渠道'])['销售额'].sum().unstack(fill_value=0)
        channel_stats['总计'] = channel_stats.sum(axis=1)
        channel_stats = channel_stats.sort_values('总计', ascending=False)
        
        print(f"\n地区渠道分析:")
        print(channel_stats)
    
    def sales_performance_analysis(self):
        """销售员绩效分析"""
        print("\n=== 销售员绩效分析 ===")
        
        # 销售员统计
        salesperson_stats = self.df.groupby('销售员').agg({
            '销售额': ['sum', 'mean', 'count'],
            '客户ID': 'nunique',
            '产品类别': 'nunique'
        }).round(2)
        
        salesperson_stats.columns = ['总销售额', '平均订单金额', '订单数', '客户数', '产品类别数']
        salesperson_stats['客户平均价值'] = (salesperson_stats['总销售额'] / salesperson_stats['客户数']).round(2)
        salesperson_stats = salesperson_stats.sort_values('总销售额', ascending=False)
        
        print(f"\n销售员绩效统计:")
        print(salesperson_stats.head(10))
        
        # 绩效分级
        performance_threshold = salesperson_stats['总销售额'].quantile([0.2, 0.8])
        
        def performance_level(sales):
            if sales >= performance_threshold[0.8]:
                return '优秀'
            elif sales >= performance_threshold[0.2]:
                return '良好'
            else:
                return '待提升'
        
        salesperson_stats['绩效等级'] = salesperson_stats['总销售额'].apply(performance_level)
        
        performance_summary = salesperson_stats.groupby('绩效等级').agg({
            '总销售额': ['count', 'sum', 'mean'],
            '客户数': 'mean',
            '订单数': 'mean'
        }).round(2)
        
        print(f"\n绩效等级分布:")
        print(performance_summary)
    
    def generate_report(self):
        """生成完整分析报告"""
        print("\n" + "="*50)
        print("           销售数据分析报告")
        print("="*50)
        
        self.basic_analysis()
        self.time_analysis()
        self.product_analysis()
        self.customer_analysis()
        self.regional_analysis()
        self.sales_performance_analysis()
        
        print("\n" + "="*50)
        print("           报告生成完成")
        print("="*50)

def pandas_analysis_demo():
    """pandas数据分析实例演示"""
    print("=== pandas数据分析实例 ===")
    
    # 创建销售分析器
    analyzer = SalesAnalyzer()
    
    # 生成完整分析报告
    analyzer.generate_report()

# 运行pandas分析演示
pandas_analysis_demo()

# 四、numpy模块 - 数值计算

# 4.1 基础数组操作

# 首先需要安装: pip install numpy
import numpy as np
import time

def numpy_basic_demo():
    """numpy基础操作演示"""
    print("=== numpy基础数组操作 ===")
    
    # 1. 创建数组
    print("\n1. 创建数组:")
    
    # 从列表创建
    arr1 = np.array([1, 2, 3, 4, 5])
    print(f"  一维数组: {arr1}")
    print(f"  数据类型: {arr1.dtype}")
    print(f"  形状: {arr1.shape}")
    print(f"  维度: {arr1.ndim}")
    
    # 二维数组
    arr2 = np.array([[1, 2, 3], [4, 5, 6]])
    print(f"\n  二维数组:\n{arr2}")
    print(f"  形状: {arr2.shape}")
    print(f"  大小: {arr2.size}")
    
    # 指定数据类型
    arr3 = np.array([1, 2, 3], dtype=np.float64)
    print(f"\n  指定类型数组: {arr3}")
    print(f"  数据类型: {arr3.dtype}")
    
    # 2. 特殊数组创建
    print("\n2. 特殊数组创建:")
    
    # 零数组
    zeros = np.zeros((3, 4))
    print(f"  零数组:\n{zeros}")
    
    # 一数组
    ones = np.ones((2, 3), dtype=int)
    print(f"\n  一数组:\n{ones}")
    
    # 单位矩阵
    identity = np.eye(3)
    print(f"\n  单位矩阵:\n{identity}")
    
    # 等差数列
    linspace = np.linspace(0, 10, 5)
    print(f"\n  等差数列: {linspace}")
    
    # 等比数列
    arange = np.arange(0, 10, 2)
    print(f"  等差序列: {arange}")
    
    # 随机数组
    np.random.seed(42)
    random_arr = np.random.random((2, 3))
    print(f"\n  随机数组:\n{random_arr}")
    
    # 正态分布随机数
    normal_arr = np.random.normal(0, 1, (2, 3))
    print(f"\n  正态分布随机数:\n{normal_arr}")
    
    # 3. 数组索引和切片
    print("\n3. 数组索引和切片:")
    
    arr = np.arange(12).reshape(3, 4)
    print(f"  原数组:\n{arr}")
    
    # 基本索引
    print(f"  元素[1,2]: {arr[1, 2]}")
    print(f"  第一行: {arr[0]}")
    print(f"  第一列: {arr[:, 0]}")
    
    # 切片
    print(f"  前两行:\n{arr[:2]}")
    print(f"  后两列:\n{arr[:, -2:]}")
    
    # 布尔索引
    mask = arr > 5
    print(f"\n  大于5的元素: {arr[mask]}")
    
    # 花式索引
    indices = np.array([0, 2])
    print(f"  选择第0和第2行:\n{arr[indices]}")
    
    # 4. 数组形状操作
    print("\n4. 数组形状操作:")
    
    original = np.arange(12)
    print(f"  原数组: {original}")
    
    # 重塑
    reshaped = original.reshape(3, 4)
    print(f"  重塑为3x4:\n{reshaped}")
    
    # 转置
    transposed = reshaped.T
    print(f"  转置:\n{transposed}")
    
    # 展平
    flattened = reshaped.flatten()
    print(f"  展平: {flattened}")
    
    # 添加维度
    expanded = np.expand_dims(original, axis=0)
    print(f"  添加维度: {expanded.shape}")
    
    # 压缩维度
    squeezed = np.squeeze(expanded)
    print(f"  压缩维度: {squeezed.shape}")

# 运行numpy基础演示
numpy_basic_demo()

# 4.2 数学运算和统计

import numpy as np

def numpy_math_demo():
    """numpy数学运算演示"""
    print("=== numpy数学运算和统计 ===")
    
    # 创建测试数据
    np.random.seed(42)
    arr1 = np.random.randint(1, 10, (3, 4))
    arr2 = np.random.randint(1, 10, (3, 4))
    
    print(f"数组1:\n{arr1}")
    print(f"数组2:\n{arr2}")
    
    # 1. 基本数学运算
    print("\n1. 基本数学运算:")
    
    # 元素级运算
    print(f"  加法:\n{arr1 + arr2}")
    print(f"\n  减法:\n{arr1 - arr2}")
    print(f"\n  乘法:\n{arr1 * arr2}")
    print(f"\n  除法:\n{arr1 / arr2}")
    print(f"\n  幂运算:\n{arr1 ** 2}")
    
    # 标量运算
    print(f"\n  标量加法:\n{arr1 + 10}")
    print(f"\n  标量乘法:\n{arr1 * 2}")
    
    # 2. 数学函数
    print("\n2. 数学函数:")
    
    # 三角函数
    angles = np.array([0, np.pi/6, np.pi/4, np.pi/3, np.pi/2])
    print(f"  角度: {angles}")
    print(f"  sin值: {np.sin(angles)}")
    print(f"  cos值: {np.cos(angles)}")
    print(f"  tan值: {np.tan(angles)}")
    
    # 指数和对数
    values = np.array([1, 2, 3, 4, 5])
    print(f"\n  原值: {values}")
    print(f"  指数: {np.exp(values)}")
    print(f"  自然对数: {np.log(values)}")
    print(f"  以10为底: {np.log10(values)}")
    print(f"  平方根: {np.sqrt(values)}")
    
    # 取整函数
    decimals = np.array([1.2, 2.7, -1.5, -2.8])
    print(f"\n  小数: {decimals}")
    print(f"  向上取整: {np.ceil(decimals)}")
    print(f"  向下取整: {np.floor(decimals)}")
    print(f"  四舍五入: {np.round(decimals)}")
    print(f"  截断: {np.trunc(decimals)}")
    
    # 3. 统计函数
    print("\n3. 统计函数:")
    
    data = np.random.normal(50, 15, (5, 6))
    print(f"  测试数据:\n{data.round(2)}")
    
    # 基本统计
    print(f"\n  最大值: {np.max(data):.2f}")
    print(f"  最小值: {np.min(data):.2f}")
    print(f"  均值: {np.mean(data):.2f}")
    print(f"  中位数: {np.median(data):.2f}")
    print(f"  标准差: {np.std(data):.2f}")
    print(f"  方差: {np.var(data):.2f}")
    print(f"  总和: {np.sum(data):.2f}")
    
    # 按轴统计
    print(f"\n  按行统计 (axis=1):")
    print(f"    行均值: {np.mean(data, axis=1).round(2)}")
    print(f"    行最大值: {np.max(data, axis=1).round(2)}")
    
    print(f"\n  按列统计 (axis=0):")
    print(f"    列均值: {np.mean(data, axis=0).round(2)}")
    print(f"    列最小值: {np.min(data, axis=0).round(2)}")
    
    # 分位数
    percentiles = [25, 50, 75, 90, 95]
    print(f"\n  分位数:")
    for p in percentiles:
        value = np.percentile(data, p)
        print(f"    {p}%分位数: {value:.2f}")
    
    # 4. 线性代数
    print("\n4. 线性代数:")
    
    # 矩阵乘法
    A = np.array([[1, 2], [3, 4]])
    B = np.array([[5, 6], [7, 8]])
    
    print(f"  矩阵A:\n{A}")
    print(f"  矩阵B:\n{B}")
    print(f"  矩阵乘法 A@B:\n{A @ B}")
    print(f"  矩阵乘法 np.dot(A,B):\n{np.dot(A, B)}")
    
    # 矩阵属性
    print(f"\n  矩阵A的行列式: {np.linalg.det(A):.2f}")
    print(f"  矩阵A的迹: {np.trace(A)}")
    
    # 特征值和特征向量
    eigenvalues, eigenvectors = np.linalg.eig(A)
    print(f"  特征值: {eigenvalues}")
    print(f"  特征向量:\n{eigenvectors}")
    
    # 矩阵求逆
    try:
        A_inv = np.linalg.inv(A)
        print(f"  矩阵A的逆:\n{A_inv}")
        print(f"  验证 A * A_inv:\n{(A @ A_inv).round(10)}")
    except np.linalg.LinAlgError:
        print("  矩阵不可逆")
    
    # 5. 数组比较和逻辑运算
    print("\n5. 数组比较和逻辑运算:")
    
    x = np.array([1, 2, 3, 4, 5])
    y = np.array([1, 3, 2, 4, 6])
    
    print(f"  数组x: {x}")
    print(f"  数组y: {y}")
    print(f"  x == y: {x == y}")
    print(f"  x > y: {x > y}")
    print(f"  x >= 3: {x >= 3}")
    
    # 逻辑运算
    condition1 = x > 2
    condition2 = x < 5
    print(f"\n  x > 2: {condition1}")
    print(f"  x < 5: {condition2}")
    print(f"  (x > 2) & (x < 5): {condition1 & condition2}")
    print(f"  (x > 2) | (x < 5): {condition1 | condition2}")
    
    # 条件选择
    result = np.where(x > 3, x, 0)
    print(f"  条件选择 (x>3则保留,否则为0): {result}")

# 运行numpy数学演示
numpy_math_demo()

# 4.3 性能优化和实际应用

import numpy as np
import time

def numpy_performance_demo():
    """numpy性能优化演示"""
    print("=== numpy性能优化和实际应用 ===")
    
    # 1. 性能对比
    print("\n1. 性能对比:")
    
    # 创建大数组
    size = 1000000
    python_list = list(range(size))
    numpy_array = np.arange(size)
    
    # Python列表求和
    start_time = time.time()
    python_sum = sum(python_list)
    python_time = time.time() - start_time
    
    # NumPy数组求和
    start_time = time.time()
    numpy_sum = np.sum(numpy_array)
    numpy_time = time.time() - start_time
    
    print(f"  数组大小: {size:,}")
    print(f"  Python列表求和: {python_time:.6f}秒")
    print(f"  NumPy数组求和: {numpy_time:.6f}秒")
    print(f"  性能提升: {python_time/numpy_time:.1f}倍")
    
    # 2. 向量化操作
    print("\n2. 向量化操作:")
    
    # 非向量化方式
    def python_operation(arr):
        result = []
        for x in arr:
            result.append(x**2 + 2*x + 1)
        return result
    
    # 向量化方式
    def numpy_operation(arr):
        return arr**2 + 2*arr + 1
    
    test_data = list(range(100000))
    numpy_data = np.array(test_data)
    
    # 性能测试
    start_time = time.time()
    python_result = python_operation(test_data)
    python_time = time.time() - start_time
    
    start_time = time.time()
    numpy_result = numpy_operation(numpy_data)
    numpy_time = time.time() - start_time
    
    print(f"  Python循环: {python_time:.6f}秒")
    print(f"  NumPy向量化: {numpy_time:.6f}秒")
    print(f"  性能提升: {python_time/numpy_time:.1f}倍")
    
    # 3. 广播机制
    print("\n3. 广播机制:")
    
    # 不同形状数组的运算
    a = np.array([[1, 2, 3],
                  [4, 5, 6]])
    b = np.array([10, 20, 30])
    c = np.array([[100],
                  [200]])
    
    print(f"  数组a (2x3):\n{a}")
    print(f"  数组b (3,): {b}")
    print(f"  数组c (2x1):\n{c}")
    
    print(f"\n  a + b (广播):\n{a + b}")
    print(f"  a + c (广播):\n{a + c}")
    print(f"  a + b + c (广播):\n{a + b + c}")
    
    # 4. 内存优化
    print("\n4. 内存优化:")
    
    # 数据类型优化
    large_array = np.random.randint(0, 100, 1000000)
    
    # 不同数据类型的内存使用
    int64_size = large_array.astype(np.int64).nbytes
    int32_size = large_array.astype(np.int32).nbytes
    int16_size = large_array.astype(np.int16).nbytes
    int8_size = large_array.astype(np.int8).nbytes
    
    print(f"  数组大小: {len(large_array):,}")
    print(f"  int64内存: {int64_size/1024/1024:.2f} MB")
    print(f"  int32内存: {int32_size/1024/1024:.2f} MB")
    print(f"  int16内存: {int16_size/1024/1024:.2f} MB")
    print(f"  int8内存: {int8_size/1024/1024:.2f} MB")
    
    # 就地操作
    arr = np.random.random(1000000)
    arr_copy = arr.copy()
    
    # 创建新数组
    start_time = time.time()
    result1 = arr * 2 + 1
    time1 = time.time() - start_time
    
    # 就地操作
    start_time = time.time()
    arr_copy *= 2
    arr_copy += 1
    time2 = time.time() - start_time
    
    print(f"\n  创建新数组: {time1:.6f}秒")
    print(f"  就地操作: {time2:.6f}秒")
    print(f"  性能提升: {time1/time2:.1f}倍")
    
    # 5. 实际应用示例
    print("\n5. 实际应用示例:")
    
    # 图像处理模拟
    def image_processing_demo():
        # 模拟RGB图像 (高度, 宽度, 通道)
        height, width = 100, 100
        image = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
        
        print(f"  原图像形状: {image.shape}")
        print(f"  原图像数据类型: {image.dtype}")
        
        # 转换为灰度图
        # 灰度 = 0.299*R + 0.587*G + 0.114*B
        weights = np.array([0.299, 0.587, 0.114])
        gray_image = np.dot(image, weights).astype(np.uint8)
        
        print(f"  灰度图形状: {gray_image.shape}")
        print(f"  像素值范围: {gray_image.min()} - {gray_image.max()}")
        
        # 图像增强 - 对比度调整
        enhanced = np.clip(gray_image * 1.5, 0, 255).astype(np.uint8)
        print(f"  增强后范围: {enhanced.min()} - {enhanced.max()}")
        
        # 边缘检测模拟 (简单差分)
        edges_x = np.abs(np.diff(gray_image, axis=1))
        edges_y = np.abs(np.diff(gray_image, axis=0))
        
        print(f"  水平边缘形状: {edges_x.shape}")
        print(f"  垂直边缘形状: {edges_y.shape}")
        
        return gray_image, enhanced, edges_x, edges_y
    
    # 信号处理模拟
    def signal_processing_demo():
        # 生成信号
        t = np.linspace(0, 1, 1000)
        frequency1, frequency2 = 5, 20
        signal = np.sin(2 * np.pi * frequency1 * t) + 0.5 * np.sin(2 * np.pi * frequency2 * t)
        
        # 添加噪声
        noise = np.random.normal(0, 0.1, len(signal))
        noisy_signal = signal + noise
        
        print(f"  信号长度: {len(signal)}")
        print(f"  信号范围: {signal.min():.3f} - {signal.max():.3f}")
        print(f"  噪声信号范围: {noisy_signal.min():.3f} - {noisy_signal.max():.3f}")
        
        # 简单滤波 (移动平均)
        window_size = 10
        filtered_signal = np.convolve(noisy_signal, np.ones(window_size)/window_size, mode='same')
        
        print(f"  滤波后范围: {filtered_signal.min():.3f} - {filtered_signal.max():.3f}")
        
        # 统计分析
        print(f"  原信号标准差: {np.std(signal):.3f}")
        print(f"  噪声信号标准差: {np.std(noisy_signal):.3f}")
        print(f"  滤波信号标准差: {np.std(filtered_signal):.3f}")
        
        return signal, noisy_signal, filtered_signal
    
    # 数据分析模拟
    def data_analysis_demo():
        # 生成销售数据
        np.random.seed(42)
        days = 365
        base_sales = 1000
        trend = np.linspace(0, 200, days)  # 增长趋势
        seasonal = 100 * np.sin(2 * np.pi * np.arange(days) / 365.25 * 4)  # 季节性
        noise = np.random.normal(0, 50, days)  # 随机噪声
        
        sales = base_sales + trend + seasonal + noise
        sales = np.maximum(sales, 0)  # 确保非负
        
        print(f"  销售数据天数: {len(sales)}")
        print(f"  平均日销售额: ¥{np.mean(sales):.2f}")
        print(f"  销售额标准差: ¥{np.std(sales):.2f}")
        print(f"  最高日销售额: ¥{np.max(sales):.2f}")
        print(f"  最低日销售额: ¥{np.min(sales):.2f}")
        
        # 移动平均分析
        window_sizes = [7, 30, 90]
        for window in window_sizes:
            ma = np.convolve(sales, np.ones(window)/window, mode='valid')
            print(f"  {window}日移动平均: ¥{ma[-1]:.2f}")
        
        # 同比分析 (简化)
        if len(sales) >= 365:
            yoy_growth = (sales[-1] - sales[-365]) / sales[-365] * 100
            print(f"  年同比增长: {yoy_growth:.1f}%")
        
        return sales
    
    print("\n  图像处理演示:")
    image_processing_demo()
    
    print("\n  信号处理演示:")
    signal_processing_demo()
    
    print("\n  数据分析演示:")
    data_analysis_demo()

# 运行numpy性能演示
numpy_performance_demo()

# 五、matplotlib模块 - 数据可视化

# 5.1 基础绘图

# 首先需要安装: pip install matplotlib
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta

def matplotlib_basic_demo():
    """matplotlib基础绘图演示"""
    print("=== matplotlib基础绘图 ===")
    
    # 设置中文字体支持
    plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
    plt.rcParams['axes.unicode_minus'] = False
    
    # 1. 线图
    print("\n1. 线图演示")
    
    # 生成数据
    x = np.linspace(0, 10, 100)
    y1 = np.sin(x)
    y2 = np.cos(x)
    y3 = np.sin(x) * np.exp(-x/5)
    
    # 创建图形
    plt.figure(figsize=(12, 8))
    
    # 第一个子图
    plt.subplot(2, 2, 1)
    plt.plot(x, y1, label='sin(x)', linewidth=2, color='blue')
    plt.plot(x, y2, label='cos(x)', linewidth=2, color='red', linestyle='--')
    plt.title('三角函数')
    plt.xlabel('x')
    plt.ylabel('y')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    # 第二个子图 - 散点图
    plt.subplot(2, 2, 2)
    np.random.seed(42)
    x_scatter = np.random.randn(50)
    y_scatter = 2 * x_scatter + np.random.randn(50)
    plt.scatter(x_scatter, y_scatter, alpha=0.6, c=y_scatter, cmap='viridis')
    plt.title('散点图')
    plt.xlabel('X值')
    plt.ylabel('Y值')
    plt.colorbar(label='Y值')
    
    # 第三个子图 - 柱状图
    plt.subplot(2, 2, 3)
    categories = ['A', 'B', 'C', 'D', 'E']
    values = [23, 45, 56, 78, 32]
    colors = ['red', 'green', 'blue', 'orange', 'purple']
    bars = plt.bar(categories, values, color=colors, alpha=0.7)
    plt.title('柱状图')
    plt.xlabel('类别')
    plt.ylabel('数值')
    
    # 在柱子上添加数值标签
    for bar, value in zip(bars, values):
        plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
                str(value), ha='center', va='bottom')
    
    # 第四个子图 - 饼图
    plt.subplot(2, 2, 4)
    sizes = [30, 25, 20, 15, 10]
    labels = ['产品A', '产品B', '产品C', '产品D', '产品E']
    explode = (0.1, 0, 0, 0, 0)  # 突出第一个扇形
    
    plt.pie(sizes, labels=labels, explode=explode, autopct='%1.1f%%',
            shadow=True, startangle=90)
    plt.title('饼图')
    
    plt.tight_layout()
    plt.savefig('basic_plots.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    print("  基础图形已保存为 'basic_plots.png'")
    
    # 2. 高级线图
    print("\n2. 高级线图演示")
    
    plt.figure(figsize=(12, 6))
    
    # 生成时间序列数据
    dates = [datetime(2023, 1, 1) + timedelta(days=i) for i in range(365)]
    np.random.seed(42)
    
    # 模拟股价数据
    price = 100
    prices = [price]
    for _ in range(364):
        change = np.random.normal(0, 2)
        price = max(price + change, 10)  # 确保价格不为负
        prices.append(price)
    
    # 计算移动平均
    ma_20 = []
    ma_50 = []
    
    for i in range(len(prices)):
        if i >= 19:
            ma_20.append(np.mean(prices[i-19:i+1]))
        else:
            ma_20.append(np.nan)
        
        if i >= 49:
            ma_50.append(np.mean(prices[i-49:i+1]))
        else:
            ma_50.append(np.nan)
    
    # 绘制股价图
    plt.plot(dates, prices, label='股价', linewidth=1, alpha=0.7)
    plt.plot(dates, ma_20, label='20日均线', linewidth=2, color='orange')
    plt.plot(dates, ma_50, label='50日均线', linewidth=2, color='red')
    
    plt.title('股价走势图', fontsize=16)
    plt.xlabel('日期', fontsize=12)
    plt.ylabel('价格 (元)', fontsize=12)
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    # 格式化x轴日期
    plt.xticks(rotation=45)
    
    # 添加注释
    max_price_idx = np.argmax(prices)
    max_price = prices[max_price_idx]
    max_date = dates[max_price_idx]
    
    plt.annotate(f'最高点\n{max_price:.2f}元',
                xy=(max_date, max_price),
                xytext=(max_date, max_price + 10),
                arrowprops=dict(arrowstyle='->', color='red'),
                fontsize=10, ha='center')
    
    plt.tight_layout()
    plt.savefig('stock_chart.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    print("  股价图已保存为 'stock_chart.png'")

# 运行matplotlib基础演示
matplotlib_basic_demo()

# 5.2 高级可视化

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.patches import Rectangle
from matplotlib.collections import PatchCollection

def matplotlib_advanced_demo():
    """matplotlib高级可视化演示"""
    print("=== matplotlib高级可视化 ===")
    
    # 设置样式
    plt.style.use('seaborn-v0_8')
    plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
    plt.rcParams['axes.unicode_minus'] = False
    
    # 1. 热力图
    print("\n1. 热力图演示")
    
    # 生成相关性矩阵数据
    np.random.seed(42)
    variables = ['销售额', '广告费', '客户数', '产品数', '员工数']
    n_vars = len(variables)
    
    # 生成随机相关性矩阵
    correlation_matrix = np.random.rand(n_vars, n_vars)
    correlation_matrix = (correlation_matrix + correlation_matrix.T) / 2  # 对称化
    np.fill_diagonal(correlation_matrix, 1)  # 对角线为1
    
    plt.figure(figsize=(10, 8))
    
    # 创建热力图
    im = plt.imshow(correlation_matrix, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1)
    
    # 设置刻度和标签
    plt.xticks(range(n_vars), variables, rotation=45)
    plt.yticks(range(n_vars), variables)
    
    # 添加数值标签
    for i in range(n_vars):
        for j in range(n_vars):
            plt.text(j, i, f'{correlation_matrix[i, j]:.2f}',
                    ha='center', va='center',
                    color='white' if abs(correlation_matrix[i, j]) > 0.5 else 'black')
    
    plt.colorbar(im, label='相关系数')
    plt.title('变量相关性热力图', fontsize=16, pad=20)
    plt.tight_layout()
    plt.savefig('heatmap.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    # 2. 箱线图
    print("\n2. 箱线图演示")
    
    # 生成不同组的数据
    np.random.seed(42)
    group_data = {
        '组A': np.random.normal(100, 15, 100),
        '组B': np.random.normal(110, 20, 100),
        '组C': np.random.normal(95, 10, 100),
        '组D': np.random.normal(105, 25, 100)
    }
    
    plt.figure(figsize=(12, 6))
    
    # 左侧:传统箱线图
    plt.subplot(1, 2, 1)
    data_values = list(group_data.values())
    labels = list(group_data.keys())
    
    box_plot = plt.boxplot(data_values, labels=labels, patch_artist=True)
    
    # 自定义箱线图颜色
    colors = ['lightblue', 'lightgreen', 'lightcoral', 'lightyellow']
    for patch, color in zip(box_plot['boxes'], colors):
        patch.set_facecolor(color)
        patch.set_alpha(0.7)
    
    plt.title('箱线图')
    plt.ylabel('数值')
    plt.grid(True, alpha=0.3)
    
    # 右侧:小提琴图
    plt.subplot(1, 2, 2)
    violin_plot = plt.violinplot(data_values, positions=range(1, len(labels)+1))
    
    # 自定义小提琴图颜色
    for pc, color in zip(violin_plot['bodies'], colors):
        pc.set_facecolor(color)
        pc.set_alpha(0.7)
    
    plt.xticks(range(1, len(labels)+1), labels)
    plt.title('小提琴图')
    plt.ylabel('数值')
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('box_violin_plots.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    # 3. 多轴图
    print("\n3. 多轴图演示")
    
    # 生成数据
    months = ['1月', '2月', '3月', '4月', '5月', '6月',
              '7月', '8月', '9月', '10月', '11月', '12月']
    sales = [120, 135, 158, 142, 168, 195, 210, 198, 175, 162, 148, 155]
    profit_rate = [8.5, 9.2, 10.1, 8.8, 11.2, 12.5, 13.1, 12.8, 11.5, 10.8, 9.9, 10.3]
    
    fig, ax1 = plt.subplots(figsize=(12, 6))
    
    # 第一个y轴 - 销售额
    color1 = 'tab:blue'
    ax1.set_xlabel('月份')
    ax1.set_ylabel('销售额 (万元)', color=color1)
    line1 = ax1.plot(months, sales, color=color1, marker='o', linewidth=2, label='销售额')
    ax1.tick_params(axis='y', labelcolor=color1)
    ax1.grid(True, alpha=0.3)
    
    # 第二个y轴 - 利润率
    ax2 = ax1.twinx()
    color2 = 'tab:red'
    ax2.set_ylabel('利润率 (%)', color=color2)
    line2 = ax2.plot(months, profit_rate, color=color2, marker='s', linewidth=2, label='利润率')
    ax2.tick_params(axis='y', labelcolor=color2)
    
    # 添加图例
    lines = line1 + line2
    labels = [l.get_label() for l in lines]
    ax1.legend(lines, labels, loc='upper left')
    
    plt.title('销售额与利润率趋势', fontsize=16)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig('dual_axis_plot.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    # 4. 3D图形
    print("\n4. 3D图形演示")
    
    from mpl_toolkits.mplot3d import Axes3D
    
    fig = plt.figure(figsize=(15, 5))
    
    # 3D散点图
    ax1 = fig.add_subplot(131, projection='3d')
    
    np.random.seed(42)
    n_points = 100
    x = np.random.randn(n_points)
    y = np.random.randn(n_points)
    z = x**2 + y**2 + np.random.randn(n_points) * 0.1
    colors = z
    
    scatter = ax1.scatter(x, y, z, c=colors, cmap='viridis', alpha=0.6)
    ax1.set_xlabel('X轴')
    ax1.set_ylabel('Y轴')
    ax1.set_zlabel('Z轴')
    ax1.set_title('3D散点图')
    
    # 3D表面图
    ax2 = fig.add_subplot(132, projection='3d')
    
    x_surf = np.linspace(-2, 2, 30)
    y_surf = np.linspace(-2, 2, 30)
    X, Y = np.meshgrid(x_surf, y_surf)
    Z = np.sin(np.sqrt(X**2 + Y**2))
    
    surface = ax2.plot_surface(X, Y, Z, cmap='coolwarm', alpha=0.8)
    ax2.set_xlabel('X轴')
    ax2.set_ylabel('Y轴')
    ax2.set_zlabel('Z轴')
    ax2.set_title('3D表面图')
    
    # 3D线框图
    ax3 = fig.add_subplot(133, projection='3d')
    
    wireframe = ax3.plot_wireframe(X, Y, Z, alpha=0.6)
    ax3.set_xlabel('X轴')
    ax3.set_ylabel('Y轴')
    ax3.set_zlabel('Z轴')
    ax3.set_title('3D线框图')
    
    plt.tight_layout()
    plt.savefig('3d_plots.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    # 5. 动画图(静态展示)
    print("\n5. 动画效果演示(静态帧)")
    
    # 创建动画的几个关键帧
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.flatten()
    
    t_values = np.linspace(0, 2*np.pi, 6)
    x_base = np.linspace(0, 4*np.pi, 100)
    
    for i, t in enumerate(t_values):
        ax = axes[i]
        y = np.sin(x_base + t)
        ax.plot(x_base, y, 'b-', linewidth=2)
        ax.set_ylim(-1.5, 1.5)
        ax.set_title(f'帧 {i+1}: t={t:.2f}')
        ax.grid(True, alpha=0.3)
    
    plt.suptitle('正弦波动画效果(静态帧展示)', fontsize=16)
    plt.tight_layout()
    plt.savefig('animation_frames.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    print("  所有高级图形已保存")

# 运行matplotlib高级演示
matplotlib_advanced_demo()

# 六、scikit-learn模块 - 机器学习

# 6.1 基础机器学习

# 首先需要安装: pip install scikit-learn
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, mean_squared_error, classification_report
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd

def sklearn_basic_demo():
    """scikit-learn基础机器学习演示"""
    print("=== scikit-learn基础机器学习 ===")
    
    # 1. 线性回归
    print("\n1. 线性回归演示:")
    
    # 生成回归数据
    X, y = datasets.make_regression(n_samples=100, n_features=1, noise=10, random_state=42)
    
    # 分割数据
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # 创建和训练模型
    lr_model = LinearRegression()
    lr_model.fit(X_train, y_train)
    
    # 预测
    y_pred = lr_model.predict(X_test)
    
    # 评估
    mse = mean_squared_error(y_test, y_pred)
    print(f"  训练样本数: {len(X_train)}")
    print(f"  测试样本数: {len(X_test)}")
    print(f"  均方误差: {mse:.2f}")
    print(f"  模型系数: {lr_model.coef_[0]:.2f}")
    print(f"  模型截距: {lr_model.intercept_:.2f}")
    
    # 2. 分类任务
    print("\n2. 分类任务演示:")
    
    # 使用鸢尾花数据集
    iris = datasets.load_iris()
    X_iris, y_iris = iris.data, iris.target
    
    # 分割数据
    X_train, X_test, y_train, y_test = train_test_split(
        X_iris, y_iris, test_size=0.3, random_state=42
    )
    
    # 数据标准化
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # 逻辑回归
    log_reg = LogisticRegression(random_state=42)
    log_reg.fit(X_train_scaled, y_train)
    
    # 随机森林
    rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
    rf_model.fit(X_train, y_train)
    
    # 预测和评估
    log_pred = log_reg.predict(X_test_scaled)
    rf_pred = rf_model.predict(X_test)
    
    log_accuracy = accuracy_score(y_test, log_pred)
    rf_accuracy = accuracy_score(y_test, rf_pred)
    
    print(f"  数据集: 鸢尾花数据集")
    print(f"  特征数: {X_iris.shape[1]}")
    print(f"  类别数: {len(np.unique(y_iris))}")
    print(f"  逻辑回归准确率: {log_accuracy:.3f}")
    print(f"  随机森林准确率: {rf_accuracy:.3f}")
    
    # 特征重要性
    feature_importance = rf_model.feature_importances_
    feature_names = iris.feature_names
    
    print(f"\n  特征重要性:")
    for name, importance in zip(feature_names, feature_importance):
        print(f"    {name}: {importance:.3f}")
    
    # 3. 聚类分析
    print("\n3. 聚类分析演示:")
    
    from sklearn.cluster import KMeans
    from sklearn.metrics import silhouette_score
    
    # 生成聚类数据
    X_cluster, _ = datasets.make_blobs(n_samples=300, centers=4, n_features=2, 
                                      random_state=42, cluster_std=1.5)
    
    # K-means聚类
    kmeans = KMeans(n_clusters=4, random_state=42, n_init=10)
    cluster_labels = kmeans.fit_predict(X_cluster)
    
    # 评估聚类效果
    silhouette_avg = silhouette_score(X_cluster, cluster_labels)
    
    print(f"  样本数: {len(X_cluster)}")
    print(f"  聚类数: 4")
    print(f"  轮廓系数: {silhouette_avg:.3f}")
    print(f"  聚类中心:")
    for i, center in enumerate(kmeans.cluster_centers_):
        print(f"    簇{i+1}: ({center[0]:.2f}, {center[1]:.2f})")
    
    # 4. 模型评估和交叉验证
    print("\n4. 模型评估和交叉验证:")
    
    from sklearn.model_selection import cross_val_score, GridSearchCV
    from sklearn.svm import SVC
    
    # 使用SVM进行交叉验证
    svm_model = SVC(random_state=42)
    cv_scores = cross_val_score(svm_model, X_train_scaled, y_train, cv=5)
    
    print(f"  5折交叉验证结果:")
    print(f"    各折得分: {cv_scores}")
    print(f"    平均得分: {cv_scores.mean():.3f} (+/- {cv_scores.std() * 2:.3f})")
    
    # 网格搜索调参
    param_grid = {
        'C': [0.1, 1, 10],
        'gamma': ['scale', 'auto', 0.1, 1]
    }
    
    grid_search = GridSearchCV(SVC(random_state=42), param_grid, cv=3, scoring='accuracy')
    grid_search.fit(X_train_scaled, y_train)
    
    print(f"\n  网格搜索最佳参数: {grid_search.best_params_}")
    print(f"  最佳交叉验证得分: {grid_search.best_score_:.3f}")
    
    # 最佳模型在测试集上的表现
    best_model = grid_search.best_estimator_
    test_score = best_model.score(X_test_scaled, y_test)
    print(f"  测试集得分: {test_score:.3f}")

# 运行scikit-learn基础演示
sklearn_basic_demo()

# 七、BeautifulSoup模块 - 网页解析

# 7.1 HTML解析基础

# 首先需要安装: pip install beautifulsoup4 lxml
from bs4 import BeautifulSoup
import requests
from urllib.parse import urljoin, urlparse
import re
import time

def beautifulsoup_demo():
    """BeautifulSoup网页解析演示"""
    print("=== BeautifulSoup网页解析 ===")
    
    # 1. 基础HTML解析
    print("\n1. 基础HTML解析:")
    
    # 示例HTML内容
    html_content = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>示例网页</title>
        <meta charset="UTF-8">
    </head>
    <body>
        <div class="header">
            <h1 id="main-title">欢迎来到我的网站</h1>
            <nav>
                <ul>
                    <li><a href="/home">首页</a></li>
                    <li><a href="/about">关于</a></li>
                    <li><a href="/contact">联系</a></li>
                </ul>
            </nav>
        </div>
        <div class="content">
            <article class="post" data-id="1">
                <h2>第一篇文章</h2>
                <p class="meta">发布时间: 2023-01-01</p>
                <p>这是第一篇文章的内容...</p>
                <div class="tags">
                    <span class="tag">Python</span>
                    <span class="tag">编程</span>
                </div>
            </article>
            <article class="post" data-id="2">
                <h2>第二篇文章</h2>
                <p class="meta">发布时间: 2023-01-02</p>
                <p>这是第二篇文章的内容...</p>
                <div class="tags">
                    <span class="tag">Web开发</span>
                    <span class="tag">HTML</span>
                </div>
            </article>
        </div>
        <footer>
            <p>&copy; 2023 我的网站. 保留所有权利.</p>
        </footer>
    </body>
    </html>
    """
    
    # 创建BeautifulSoup对象
    soup = BeautifulSoup(html_content, 'html.parser')
    
    # 基本信息提取
    print(f"  网页标题: {soup.title.string}")
    print(f"  主标题: {soup.find('h1').string}")
    
    # 2. 元素查找
    print("\n2. 元素查找方法:")
    
    # 按标签查找
    all_links = soup.find_all('a')
    print(f"  所有链接数量: {len(all_links)}")
    for link in all_links:
        print(f"    链接文本: '{link.string}', 地址: '{link.get('href')}'")
    
    # 按类名查找
    posts = soup.find_all('article', class_='post')
    print(f"\n  文章数量: {len(posts)}")
    for i, post in enumerate(posts, 1):
        title = post.find('h2').string
        meta = post.find('p', class_='meta').string
        data_id = post.get('data-id')
        print(f"    文章{i}: {title} (ID: {data_id})")
        print(f"      {meta}")
    
    # 按ID查找
    main_title = soup.find('h1', id='main-title')
    print(f"\n  主标题元素: {main_title.string}")
    
    # CSS选择器
    print("\n3. CSS选择器:")
    
    # 选择所有标签
    tags = soup.select('.tag')
    print(f"  所有标签: {[tag.string for tag in tags]}")
    
    # 选择特定文章的标签
    first_post_tags = soup.select('article[data-id="1"] .tag')
    print(f"  第一篇文章标签: {[tag.string for tag in first_post_tags]}")
    
    # 选择导航链接
    nav_links = soup.select('nav ul li a')
    print(f"  导航链接: {[link.string for link in nav_links]}")
    
    # 4. 文本提取和处理
    print("\n4. 文本提取和处理:")
    
    # 提取纯文本
    content_div = soup.find('div', class_='content')
    content_text = content_div.get_text(strip=True)
    print(f"  内容区域文本长度: {len(content_text)}字符")
    
    # 提取特定格式的文本
    for post in posts:
        title = post.find('h2').get_text(strip=True)
        content = post.find_all('p')[-1].get_text(strip=True)  # 最后一个p标签
        print(f"    {title}: {content[:20]}...")
    
    # 5. 属性操作
    print("\n5. 属性操作:")
    
    # 获取和修改属性
    first_link = soup.find('a')
    print(f"  第一个链接原始href: {first_link.get('href')}")
    
    # 修改属性
    first_link['href'] = 'https://example.com/home'
    first_link['target'] = '_blank'
    print(f"  修改后的链接: {first_link}")
    
    # 添加新属性
    for post in soup.find_all('article'):
        post['class'] = post.get('class', []) + ['processed']
    
    print(f"  第一篇文章的class: {soup.find('article').get('class')}")

# 运行BeautifulSoup演示
beautifulsoup_demo()

# 7.2 实际网页爬取

import requests
from bs4 import BeautifulSoup
import time
import csv
from urllib.parse import urljoin, urlparse
import os

def web_scraping_demo():
    """实际网页爬取演示"""
    print("=== 实际网页爬取演示 ===")
    
    # 1. 基础网页请求和解析
    print("\n1. 基础网页请求和解析:")
    
    def safe_request(url, headers=None, timeout=10):
        """安全的网页请求函数"""
        default_headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        }
        if headers:
            default_headers.update(headers)
        
        try:
            response = requests.get(url, headers=default_headers, timeout=timeout)
            response.raise_for_status()
            return response
        except requests.RequestException as e:
            print(f"    请求失败: {e}")
            return None
    
    # 示例:解析一个简单的HTML页面(模拟)
    def parse_example_page():
        """解析示例页面"""
        # 模拟HTML内容(实际应用中这里会是真实的网页请求)
        sample_html = """
        <html>
        <head><title>新闻网站</title></head>
        <body>
            <div class="news-list">
                <article class="news-item">
                    <h3><a href="/news/1">Python 3.12 发布新特性</a></h3>
                    <p class="summary">Python 3.12 带来了许多新特性和改进...</p>
                    <span class="date">2023-10-01</span>
                    <span class="author">张三</span>
                </article>
                <article class="news-item">
                    <h3><a href="/news/2">机器学习最新进展</a></h3>
                    <p class="summary">最新的机器学习算法在各个领域都有突破...</p>
                    <span class="date">2023-10-02</span>
                    <span class="author">李四</span>
                </article>
                <article class="news-item">
                    <h3><a href="/news/3">Web开发趋势分析</a></h3>
                    <p class="summary">2023年Web开发的主要趋势和技术栈...</p>
                    <span class="date">2023-10-03</span>
                    <span class="author">王五</span>
                </article>
            </div>
        </body>
        </html>
        """
        
        soup = BeautifulSoup(sample_html, 'html.parser')
        
        # 提取新闻信息
        news_items = soup.find_all('article', class_='news-item')
        news_data = []
        
        for item in news_items:
            title_link = item.find('h3').find('a')
            title = title_link.get_text(strip=True)
            link = title_link.get('href')
            summary = item.find('p', class_='summary').get_text(strip=True)
            date = item.find('span', class_='date').get_text(strip=True)
            author = item.find('span', class_='author').get_text(strip=True)
            
            news_data.append({
                'title': title,
                'link': link,
                'summary': summary,
                'date': date,
                'author': author
            })
        
        return news_data
    
    # 解析示例页面
    news_list = parse_example_page()
    print(f"  提取到 {len(news_list)} 条新闻:")
    for i, news in enumerate(news_list, 1):
        print(f"    {i}. {news['title']}")
        print(f"       作者: {news['author']}, 日期: {news['date']}")
        print(f"       摘要: {news['summary'][:30]}...")
        print()
    
    # 2. 数据清洗和处理
    print("\n2. 数据清洗和处理:")
    
    def clean_text(text):
        """清洗文本数据"""
        if not text:
            return ""
        
        # 去除多余空白
        text = re.sub(r'\s+', ' ', text.strip())
        
        # 去除特殊字符
        text = re.sub(r'[\r\n\t]', '', text)
        
        # 去除HTML实体
        text = text.replace('&nbsp;', ' ').replace('&amp;', '&')
        
        return text
    
    def extract_numbers(text):
        """从文本中提取数字"""
        numbers = re.findall(r'\d+(?:\.\d+)?', text)
        return [float(num) for num in numbers]
    
    def extract_dates(text):
        """从文本中提取日期"""
        date_patterns = [
            r'\d{4}-\d{2}-\d{2}',  # YYYY-MM-DD
            r'\d{2}/\d{2}/\d{4}',  # MM/DD/YYYY
            r'\d{1,2}月\d{1,2}日'   # 中文日期
        ]
        
        dates = []
        for pattern in date_patterns:
            dates.extend(re.findall(pattern, text))
        
        return dates
    
    # 示例文本清洗
    sample_text = "   这是一个\n\t包含多余空白的文本   2023-10-01  价格:99.99元   "
    cleaned = clean_text(sample_text)
    numbers = extract_numbers(sample_text)
    dates = extract_dates(sample_text)
    
    print(f"  原始文本: '{sample_text}'")
    print(f"  清洗后: '{cleaned}'")
    print(f"  提取的数字: {numbers}")
    print(f"  提取的日期: {dates}")
    
    # 3. 数据存储
    print("\n3. 数据存储:")
    
    def save_to_csv(data, filename):
        """保存数据到CSV文件"""
        if not data:
            return
        
        fieldnames = data[0].keys()
        
        with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(data)
        
        print(f"    数据已保存到 {filename}")
    
    def save_to_json(data, filename):
        """保存数据到JSON文件"""
        import json
        
        with open(filename, 'w', encoding='utf-8') as jsonfile:
            json.dump(data, jsonfile, ensure_ascii=False, indent=2)
        
        print(f"    数据已保存到 {filename}")
    
    # 保存新闻数据
    save_to_csv(news_list, 'news_data.csv')
    save_to_json(news_list, 'news_data.json')
    
    # 4. 爬虫最佳实践
    print("\n4. 爬虫最佳实践:")
    
    class WebScraper:
        """网页爬虫类"""
        
        def __init__(self, delay=1, max_retries=3):
            self.delay = delay
            self.max_retries = max_retries
            self.session = requests.Session()
            self.session.headers.update({
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            })
        
        def get_page(self, url):
            """获取网页内容"""
            for attempt in range(self.max_retries):
                try:
                    response = self.session.get(url, timeout=10)
                    response.raise_for_status()
                    
                    # 添加延迟,避免过于频繁的请求
                    time.sleep(self.delay)
                    
                    return response
                
                except requests.RequestException as e:
                    print(f"    尝试 {attempt + 1} 失败: {e}")
                    if attempt < self.max_retries - 1:
                        time.sleep(2 ** attempt)  # 指数退避
            
            return None
        
        def parse_page(self, html_content, selectors):
            """解析网页内容"""
            soup = BeautifulSoup(html_content, 'html.parser')
            results = {}
            
            for key, selector in selectors.items():
                elements = soup.select(selector)
                if elements:
                    if len(elements) == 1:
                        results[key] = elements[0].get_text(strip=True)
                    else:
                        results[key] = [elem.get_text(strip=True) for elem in elements]
                else:
                    results[key] = None
            
            return results
        
        def scrape_multiple_pages(self, urls, selectors):
            """爬取多个页面"""
            results = []
            
            for i, url in enumerate(urls, 1):
                print(f"    正在爬取第 {i}/{len(urls)} 个页面...")
                
                response = self.get_page(url)
                if response:
                    data = self.parse_page(response.text, selectors)
                    data['url'] = url
                    results.append(data)
                else:
                    print(f"    跳过页面: {url}")
            
            return results
    
    # 示例使用
    scraper = WebScraper(delay=0.5)
    
    # 定义选择器
    selectors = {
        'title': 'h1, h2, h3',
        'content': 'p',
        'links': 'a'
    }
    
    print(f"    爬虫配置:")
    print(f"      延迟: {scraper.delay}秒")
    print(f"      最大重试: {scraper.max_retries}次")
    print(f"      选择器: {list(selectors.keys())}")
    
    # 5. 错误处理和日志
    print("\n5. 错误处理和日志:")
    
    import logging
    
    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('scraper.log', encoding='utf-8'),
            logging.StreamHandler()
        ]
    )
    
    logger = logging.getLogger(__name__)
    
    def robust_scrape(url, max_attempts=3):
        """健壮的爬取函数"""
        for attempt in range(max_attempts):
            try:
                logger.info(f"尝试爬取: {url} (第{attempt+1}次)")
                
                # 模拟可能的错误
                if attempt == 0:
                    raise requests.ConnectionError("模拟连接错误")
                elif attempt == 1:
                    raise requests.Timeout("模拟超时错误")
                else:
                    logger.info("爬取成功")
                    return "成功获取页面内容"
                
            except requests.RequestException as e:
                logger.warning(f"爬取失败: {e}")
                if attempt < max_attempts - 1:
                    wait_time = 2 ** attempt
                    logger.info(f"等待 {wait_time} 秒后重试...")
                    time.sleep(wait_time)
                else:
                    logger.error(f"所有尝试都失败了: {url}")
                    return None
    
    # 测试健壮爬取
    result = robust_scrape("https://example.com")
    print(f"    爬取结果: {result}")
    
    print(f"\n  爬虫演示完成,相关文件已生成")

# 运行网页爬取演示
web_scraping_demo()

# 八、模块选择和最佳实践

# 8.1 模块选择指南

def module_selection_guide():
    """第三方模块选择指南"""
    print("=== 第三方模块选择指南 ===")
    
    # 1. 按应用场景分类
    print("\n1. 按应用场景选择模块:")
    
    scenarios = {
        "Web开发": {
            "框架": ["Django", "Flask", "FastAPI"],
            "HTTP客户端": ["requests", "httpx", "aiohttp"],
            "模板引擎": ["Jinja2", "Django Templates"],
            "数据库ORM": ["SQLAlchemy", "Django ORM", "Peewee"]
        },
        "数据科学": {
            "数据处理": ["pandas", "numpy", "polars"],
            "可视化": ["matplotlib", "seaborn", "plotly", "bokeh"],
            "机器学习": ["scikit-learn", "tensorflow", "pytorch"],
            "统计分析": ["scipy", "statsmodels"]
        },
        "网络爬虫": {
            "HTML解析": ["BeautifulSoup", "lxml", "html.parser"],
            "浏览器自动化": ["selenium", "playwright", "pyppeteer"],
            "异步爬虫": ["scrapy", "aiohttp", "asyncio"]
        },
        "图像处理": {
            "基础处理": ["Pillow", "opencv-python"],
            "深度学习": ["tensorflow", "pytorch", "keras"],
            "计算机视觉": ["opencv-python", "scikit-image"]
        },
        "自动化运维": {
            "系统管理": ["psutil", "paramiko", "fabric"],
            "配置管理": ["ansible", "saltstack"],
            "监控": ["prometheus_client", "psutil"]
        }
    }
    
    for scenario, categories in scenarios.items():
        print(f"\n  {scenario}:")
        for category, modules in categories.items():
            print(f"    {category}: {', '.join(modules)}")
    
    # 2. 性能对比
    print("\n2. 常见模块性能对比:")
    
    performance_comparison = {
        "HTTP请求库": {
            "requests": {"易用性": "★★★★★", "性能": "★★★☆☆", "功能": "★★★★☆"},
            "httpx": {"易用性": "★★★★☆", "性能": "★★★★☆", "功能": "★★★★★"},
            "aiohttp": {"易用性": "★★★☆☆", "性能": "★★★★★", "功能": "★★★★☆"}
        },
        "数据处理库": {
            "pandas": {"易用性": "★★★★★", "性能": "★★★☆☆", "内存效率": "★★☆☆☆"},
            "polars": {"易用性": "★★★☆☆", "性能": "★★★★★", "内存效率": "★★★★★"},
            "numpy": {"易用性": "★★★☆☆", "性能": "★★★★★", "内存效率": "★★★★☆"}
        },
        "Web框架": {
            "Django": {"学习曲线": "★★☆☆☆", "功能完整性": "★★★★★", "性能": "★★★☆☆"},
            "Flask": {"学习曲线": "★★★★☆", "功能完整性": "★★★☆☆", "性能": "★★★★☆"},
            "FastAPI": {"学习曲线": "★★★★☆", "功能完整性": "★★★★☆", "性能": "★★★★★"}
        }
    }
    
    for category, modules in performance_comparison.items():
        print(f"\n  {category}:")
        for module, metrics in modules.items():
            print(f"    {module}:")
            for metric, rating in metrics.items():
                print(f"      {metric}: {rating}")
    
    # 3. 选择建议
    print("\n3. 选择建议:")
    
    recommendations = {
        "初学者": {
            "推荐模块": ["requests", "pandas", "matplotlib", "BeautifulSoup"],
            "原因": "文档完善,社区活跃,学习资源丰富",
            "避免": ["复杂的异步库", "底层系统库"]
        },
        "数据分析师": {
            "推荐模块": ["pandas", "numpy", "matplotlib", "seaborn", "scikit-learn"],
            "原因": "专为数据分析设计,功能强大",
            "避免": ["Web开发框架", "底层网络库"]
        },
        "Web开发者": {
            "推荐模块": ["Django/Flask", "requests", "SQLAlchemy", "Celery"],
            "原因": "Web开发生态完整,部署方便",
            "避免": ["科学计算库", "图像处理库"]
        },
        "性能优化者": {
            "推荐模块": ["numpy", "numba", "cython", "asyncio"],
            "原因": "高性能,支持并行和异步",
            "避免": ["纯Python实现的库", "功能过重的框架"]
        }
    }
    
    for user_type, info in recommendations.items():
        print(f"\n  {user_type}:")
        print(f"    推荐: {', '.join(info['推荐模块'])}")
        print(f"    原因: {info['原因']}")
        print(f"    避免: {', '.join(info['避免'])}")

# 运行模块选择指南
module_selection_guide()

# 8.2 最佳实践和总结

def best_practices_summary():
    """第三方模块最佳实践和总结"""
    print("=== 第三方模块最佳实践和总结 ===")
    
    # 1. 安装和管理最佳实践
    print("\n1. 安装和管理最佳实践:")
    
    practices = {
        "虚拟环境": {
            "重要性": "★★★★★",
            "工具": ["venv", "conda", "pipenv", "poetry"],
            "好处": ["隔离依赖", "避免冲突", "便于部署", "版本管理"]
        },
        "依赖管理": {
            "重要性": "★★★★☆",
            "文件": ["requirements.txt", "Pipfile", "pyproject.toml"],
            "好处": ["可重现环境", "团队协作", "自动化部署"]
        },
        "版本固定": {
            "重要性": "★★★★☆",
            "策略": ["精确版本", "兼容版本", "最小版本"],
            "示例": ["requests==2.28.1", "pandas>=1.5.0,<2.0.0"]
        }
    }
    
    for practice, details in practices.items():
        print(f"\n  {practice} (重要性: {details['重要性']}):")
        for key, values in details.items():
            if key != "重要性":
                if isinstance(values, list):
                    print(f"    {key}: {', '.join(values)}")
                else:
                    print(f"    {key}: {values}")
    
    # 2. 代码质量最佳实践
    print("\n2. 代码质量最佳实践:")
    
    code_quality_tips = [
        "导入规范: 标准库 -> 第三方库 -> 本地模块",
        "异常处理: 捕获具体异常,提供有意义的错误信息",
        "文档字符串: 为复杂函数添加详细说明",
        "类型提示: 使用typing模块提高代码可读性",
        "单元测试: 为关键功能编写测试用例",
        "代码格式: 使用black、flake8等工具保持一致性"
    ]
    
    for i, tip in enumerate(code_quality_tips, 1):
        print(f"    {i}. {tip}")
    
    # 3. 性能优化建议
    print("\n3. 性能优化建议:")
    
    performance_tips = {
        "数据处理": [
            "优先使用numpy和pandas的向量化操作",
            "避免在循环中重复创建对象",
            "使用适当的数据类型减少内存占用",
            "考虑使用numba加速数值计算"
        ],
        "网络请求": [
            "使用连接池复用连接",
            "设置合理的超时时间",
            "使用异步请求处理并发",
            "实现请求重试和错误处理"
        ],
        "文件操作": [
            "使用上下文管理器确保资源释放",
            "批量处理减少I/O操作",
            "选择合适的文件格式(CSV vs JSON vs Parquet)",
            "考虑使用内存映射处理大文件"
        ]
    }
    
    for category, tips in performance_tips.items():
        print(f"\n  {category}:")
        for tip in tips:
            print(f"    • {tip}")
    
    # 4. 常见陷阱和解决方案
    print("\n4. 常见陷阱和解决方案:")
    
    common_pitfalls = {
        "版本冲突": {
            "问题": "不同模块要求不兼容的依赖版本",
            "解决": "使用虚拟环境,检查依赖树,选择兼容版本"
        },
        "内存泄漏": {
            "问题": "大数据处理时内存不断增长",
            "解决": "及时释放变量,使用生成器,分批处理数据"
        },
        "编码问题": {
            "问题": "处理中文或特殊字符时出现乱码",
            "解决": "明确指定编码格式,使用UTF-8"
        },
        "网络超时": {
            "问题": "网络请求经常超时失败",
            "解决": "设置重试机制,使用指数退避,检查网络状况"
        },
        "路径问题": {
            "问题": "跨平台路径分隔符不一致",
            "解决": "使用pathlib或os.path处理路径"
        }
    }
    
    for pitfall, details in common_pitfalls.items():
        print(f"\n  {pitfall}:")
        print(f"    问题: {details['问题']}")
        print(f"    解决: {details['解决']}")
    
    # 5. 学习建议
    print("\n5. 学习建议:")
    
    learning_path = {
        "基础阶段": {
            "重点模块": ["requests", "json", "csv"],
            "学习目标": "掌握基本的数据获取和处理",
            "项目建议": "简单的API调用和数据保存"
        },
        "进阶阶段": {
            "重点模块": ["pandas", "matplotlib", "BeautifulSoup"],
            "学习目标": "数据分析和可视化能力",
            "项目建议": "网页数据爬取和分析报告"
        },
        "高级阶段": {
            "重点模块": ["numpy", "scikit-learn", "asyncio"],
            "学习目标": "高性能计算和机器学习",
            "项目建议": "完整的数据科学项目"
        },
        "专业阶段": {
            "重点模块": ["tensorflow", "django", "celery"],
            "学习目标": "专业领域深度应用",
            "项目建议": "生产级应用开发"
        }
    }
    
    for stage, details in learning_path.items():
        print(f"\n  {stage}:")
        print(f"    重点模块: {', '.join(details['重点模块'])}")
        print(f"    学习目标: {details['学习目标']}")
        print(f"    项目建议: {details['项目建议']}")
    
    # 6. 总结
    print("\n6. 总结:")
    
    summary_points = [
        "第三方模块是Python生态系统的重要组成部分",
        "选择合适的模块比重复造轮子更高效",
        "虚拟环境和依赖管理是专业开发的基础",
        "性能优化要基于实际测量,避免过早优化",
        "持续学习新模块,跟上技术发展趋势",
        "实践项目是掌握模块使用的最佳方式"
    ]
    
    for i, point in enumerate(summary_points, 1):
        print(f"    {i}. {point}")
    
    print("\n恭喜你完成了第16天的学习!")
    print("你已经掌握了Python常用第三方模块的使用方法。")
    print("建议继续通过实际项目来深化理解和应用这些知识。")

# 运行最佳实践总结
best_practices_summary()

# 学习总结

通过第16天的学习,我们深入了解了Python常用第三方模块的使用方法:

# 主要收获

  1. 包管理基础 - 掌握了pip和虚拟环境的使用
  2. 网络请求 - 学会使用requests进行HTTP通信
  3. 数据处理 - 熟练运用pandas进行数据分析
  4. 数值计算 - 理解numpy的强大数值计算能力
  5. 数据可视化 - 掌握matplotlib创建各种图表
  6. 机器学习 - 了解scikit-learn的基础应用
  7. 网页解析 - 学会使用BeautifulSoup处理HTML
  8. 最佳实践 - 掌握模块选择和代码质量标准

# 实践建议

  1. 动手实践 - 通过实际项目巩固所学知识
  2. 持续学习 - 关注新模块和技术发展
  3. 社区参与 - 积极参与开源项目和技术讨论
  4. 文档阅读 - 养成阅读官方文档的习惯

# 下一步学习

  • 深入学习特定领域的专业模块
  • 了解异步编程和高性能计算
  • 学习Web开发框架如Django或Flask
  • 探索深度学习框架如TensorFlow或PyTorch

继续保持学习的热情,Python的世界还有更多精彩等待你去探索!