第16天-常用第三方模块
哪吒 2023/6/15
# 第16天-常用第三方模块
# 学习目标
通过本章学习,你将掌握:
- 第三方模块的安装和管理
- requests模块进行HTTP请求
- pandas进行数据分析和处理
- numpy进行数值计算
- matplotlib进行数据可视化
- 其他常用第三方模块的使用
- 虚拟环境的创建和管理
- 包管理的最佳实践
# 一、第三方模块概述
# 1.1 什么是第三方模块
def third_party_modules_intro():
"""第三方模块介绍"""
print("=== 第三方模块概述 ===")
concepts = {
"定义": "由Python社区开发的扩展库,不包含在Python标准库中",
"特点": [
"功能强大且专业化",
"活跃的社区支持",
"持续更新和维护",
"丰富的文档和示例"
],
"优势": [
"避免重复造轮子",
"提高开发效率",
"获得专业级功能",
"学习最佳实践"
],
"分类": {
"网络请求": ["requests", "urllib3", "httpx"],
"数据分析": ["pandas", "numpy", "scipy"],
"数据可视化": ["matplotlib", "seaborn", "plotly"],
"Web框架": ["Django", "Flask", "FastAPI"],
"机器学习": ["scikit-learn", "tensorflow", "pytorch"],
"图像处理": ["Pillow", "opencv-python", "imageio"],
"数据库": ["SQLAlchemy", "pymongo", "redis"],
"测试工具": ["pytest", "unittest2", "mock"]
}
}
print(f"\n定义: {concepts['定义']}")
print("\n特点:")
for feature in concepts['特点']:
print(f" • {feature}")
print("\n优势:")
for advantage in concepts['优势']:
print(f" • {advantage}")
print("\n常用分类:")
for category, modules in concepts['分类'].items():
print(f" {category}: {', '.join(modules)}")
# 运行第三方模块介绍
third_party_modules_intro()
# 1.2 包管理工具
def package_management_demo():
"""包管理工具演示"""
print("=== 包管理工具 ===")
# pip基本命令
pip_commands = {
"安装包": [
"pip install package_name",
"pip install package_name==1.0.0 # 指定版本",
"pip install package_name>=1.0.0 # 最低版本",
"pip install -r requirements.txt # 从文件安装"
],
"查看包": [
"pip list # 列出所有已安装的包",
"pip show package_name # 显示包详细信息",
"pip search package_name # 搜索包(已废弃)"
],
"升级包": [
"pip install --upgrade package_name",
"pip install -U package_name # 简写",
"pip list --outdated # 查看过期包"
],
"卸载包": [
"pip uninstall package_name",
"pip uninstall -r requirements.txt"
],
"导出依赖": [
"pip freeze > requirements.txt",
"pip freeze --local > requirements.txt # 只导出本地包"
]
}
for category, commands in pip_commands.items():
print(f"\n{category}:")
for cmd in commands:
print(f" {cmd}")
# requirements.txt示例
print("\nrequirements.txt示例:")
requirements_example = """
# Web开发
Django==4.2.0
Flask>=2.0.0
requests==2.31.0
# 数据分析
pandas>=1.5.0
numpy>=1.24.0
matplotlib>=3.6.0
# 机器学习
scikit-learn>=1.2.0
tensorflow>=2.12.0
# 开发工具
pytest>=7.0.0
black>=23.0.0
flake8>=6.0.0
"""
print(requirements_example)
# 虚拟环境管理
print("\n虚拟环境管理:")
venv_commands = [
"# 创建虚拟环境",
"python -m venv myenv",
"python -m venv --system-site-packages myenv # 继承系统包",
"",
"# 激活虚拟环境",
"# Windows:",
"myenv\\Scripts\\activate",
"# Linux/Mac:",
"source myenv/bin/activate",
"",
"# 停用虚拟环境",
"deactivate",
"",
"# 删除虚拟环境",
"rm -rf myenv # Linux/Mac",
"rmdir /s myenv # Windows"
]
for cmd in venv_commands:
print(f" {cmd}")
# 运行包管理演示
package_management_demo()
# 二、requests模块 - HTTP请求
# 2.1 基本HTTP请求
# 首先需要安装: pip install requests
import requests
import json
from urllib.parse import urljoin
def requests_basic_demo():
"""requests基本使用演示"""
print("=== requests基本HTTP请求 ===")
# 1. GET请求
print("\n1. GET请求:")
try:
# 基本GET请求
response = requests.get('https://httpbin.org/get')
print(f" 状态码: {response.status_code}")
print(f" 响应头: {dict(list(response.headers.items())[:3])}...")
print(f" 响应内容类型: {response.headers.get('content-type')}")
# 带参数的GET请求
params = {
'name': '张三',
'age': 25,
'city': '北京'
}
response = requests.get('https://httpbin.org/get', params=params)
data = response.json()
print(f" 请求URL: {data['url']}")
print(f" 查询参数: {data['args']}")
except requests.exceptions.RequestException as e:
print(f" 请求失败: {e}")
# 2. POST请求
print("\n2. POST请求:")
try:
# 发送JSON数据
json_data = {
'username': 'testuser',
'password': 'testpass',
'email': 'test@example.com'
}
response = requests.post(
'https://httpbin.org/post',
json=json_data,
headers={'Content-Type': 'application/json'}
)
result = response.json()
print(f" 发送的JSON: {result['json']}")
print(f" 请求头: {result['headers']['Content-Type']}")
# 发送表单数据
form_data = {
'name': '李四',
'message': '这是一条测试消息'
}
response = requests.post(
'https://httpbin.org/post',
data=form_data
)
result = response.json()
print(f" 表单数据: {result['form']}")
except requests.exceptions.RequestException as e:
print(f" POST请求失败: {e}")
# 3. 其他HTTP方法
print("\n3. 其他HTTP方法:")
methods = {
'PUT': lambda: requests.put('https://httpbin.org/put', json={'data': 'updated'}),
'DELETE': lambda: requests.delete('https://httpbin.org/delete'),
'PATCH': lambda: requests.patch('https://httpbin.org/patch', json={'field': 'patched'}),
'HEAD': lambda: requests.head('https://httpbin.org/get'),
'OPTIONS': lambda: requests.options('https://httpbin.org/get')
}
for method_name, method_func in methods.items():
try:
response = method_func()
print(f" {method_name}: 状态码 {response.status_code}")
except requests.exceptions.RequestException as e:
print(f" {method_name}: 请求失败 {e}")
# 运行requests基本演示
requests_basic_demo()
# 2.2 高级功能
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import time
def requests_advanced_demo():
"""requests高级功能演示"""
print("=== requests高级功能 ===")
# 1. 会话管理
print("\n1. 会话管理:")
# 创建会话
session = requests.Session()
# 设置默认头部
session.headers.update({
'User-Agent': 'MyApp/1.0',
'Accept': 'application/json'
})
try:
# 使用会话发送请求
response = session.get('https://httpbin.org/headers')
headers_info = response.json()
print(f" 会话头部: {headers_info['headers']['User-Agent']}")
# 会话中的Cookie会自动保持
session.get('https://httpbin.org/cookies/set/session_id/12345')
response = session.get('https://httpbin.org/cookies')
cookies_info = response.json()
print(f" 会话Cookie: {cookies_info['cookies']}")
except requests.exceptions.RequestException as e:
print(f" 会话请求失败: {e}")
finally:
session.close()
# 2. 超时和重试
print("\n2. 超时和重试:")
# 设置超时
try:
# 连接超时5秒,读取超时10秒
response = requests.get(
'https://httpbin.org/delay/2',
timeout=(5, 10)
)
print(f" 超时请求成功: {response.status_code}")
except requests.exceptions.Timeout:
print(" 请求超时")
except requests.exceptions.RequestException as e:
print(f" 请求失败: {e}")
# 配置重试策略
def create_session_with_retry():
session = requests.Session()
# 重试策略
retry_strategy = Retry(
total=3, # 总重试次数
backoff_factor=1, # 重试间隔
status_forcelist=[429, 500, 502, 503, 504], # 需要重试的状态码
)
# 添加重试适配器
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
retry_session = create_session_with_retry()
try:
response = retry_session.get(
'https://httpbin.org/status/500',
timeout=10
)
print(f" 重试请求结果: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f" 重试后仍失败: {e}")
finally:
retry_session.close()
# 3. 文件上传和下载
print("\n3. 文件上传和下载:")
# 模拟文件上传
try:
# 创建测试文件内容
files = {
'file': ('test.txt', 'Hello, World!', 'text/plain')
}
response = requests.post(
'https://httpbin.org/post',
files=files
)
result = response.json()
print(f" 上传文件信息: {result['files']}")
except requests.exceptions.RequestException as e:
print(f" 文件上传失败: {e}")
# 流式下载
def download_file_stream(url, filename):
"""流式下载文件"""
try:
with requests.get(url, stream=True) as response:
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = (downloaded / total_size) * 100
print(f"\r 下载进度: {progress:.1f}%", end='')
print(f"\n 文件下载完成: {filename}")
return True
except requests.exceptions.RequestException as e:
print(f" 下载失败: {e}")
return False
# 示例:下载小文件
print("\n 开始下载示例文件...")
success = download_file_stream(
'https://httpbin.org/json',
'example.json'
)
if success:
try:
with open('example.json', 'r') as f:
content = f.read()
print(f" 下载内容预览: {content[:100]}...")
except Exception as e:
print(f" 读取文件失败: {e}")
# 4. 认证和代理
print("\n4. 认证和代理:")
# HTTP基本认证
try:
response = requests.get(
'https://httpbin.org/basic-auth/user/pass',
auth=('user', 'pass')
)
print(f" 基本认证: {response.status_code}")
auth_info = response.json()
print(f" 认证用户: {auth_info['user']}")
except requests.exceptions.RequestException as e:
print(f" 认证失败: {e}")
# 代理设置示例(不实际使用)
proxy_config = {
'http': 'http://proxy.example.com:8080',
'https': 'https://proxy.example.com:8080'
}
print(f" 代理配置示例: {proxy_config}")
# 5. 错误处理
print("\n5. 错误处理:")
def safe_request(url, **kwargs):
"""安全的请求函数"""
try:
response = requests.get(url, **kwargs)
response.raise_for_status() # 检查HTTP错误
return response
except requests.exceptions.ConnectionError:
print(f" 连接错误: 无法连接到 {url}")
except requests.exceptions.Timeout:
print(f" 超时错误: 请求 {url} 超时")
except requests.exceptions.HTTPError as e:
print(f" HTTP错误: {e}")
except requests.exceptions.RequestException as e:
print(f" 请求异常: {e}")
return None
# 测试错误处理
test_urls = [
'https://httpbin.org/status/404', # 404错误
'https://httpbin.org/delay/1', # 正常请求
'https://nonexistent.example.com' # 连接错误
]
for url in test_urls:
print(f"\n 测试URL: {url}")
response = safe_request(url, timeout=5)
if response:
print(f" 成功: 状态码 {response.status_code}")
else:
print(f" 失败")
# 运行requests高级演示
requests_advanced_demo()
# 2.3 实际应用示例
import requests
import json
import time
from datetime import datetime
class APIClient:
"""API客户端封装"""
def __init__(self, base_url, api_key=None, timeout=30):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
self.timeout = timeout
# 设置默认头部
self.session.headers.update({
'User-Agent': 'APIClient/1.0',
'Accept': 'application/json',
'Content-Type': 'application/json'
})
# 设置API密钥
if api_key:
self.session.headers['Authorization'] = f'Bearer {api_key}'
def _make_request(self, method, endpoint, **kwargs):
"""发送请求的内部方法"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
# 设置默认超时
kwargs.setdefault('timeout', self.timeout)
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
# 记录请求日志
print(f"[{datetime.now()}] {method} {url} -> {response.status_code}")
return response
except requests.exceptions.RequestException as e:
print(f"[{datetime.now()}] 请求失败: {method} {url} -> {e}")
raise
def get(self, endpoint, params=None):
"""GET请求"""
return self._make_request('GET', endpoint, params=params)
def post(self, endpoint, data=None, json_data=None):
"""POST请求"""
kwargs = {}
if json_data:
kwargs['json'] = json_data
elif data:
kwargs['data'] = data
return self._make_request('POST', endpoint, **kwargs)
def put(self, endpoint, data=None, json_data=None):
"""PUT请求"""
kwargs = {}
if json_data:
kwargs['json'] = json_data
elif data:
kwargs['data'] = data
return self._make_request('PUT', endpoint, **kwargs)
def delete(self, endpoint):
"""DELETE请求"""
return self._make_request('DELETE', endpoint)
def close(self):
"""关闭会话"""
self.session.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def api_client_demo():
"""API客户端演示"""
print("=== API客户端应用示例 ===")
# 使用上下文管理器
with APIClient('https://httpbin.org') as client:
# 1. 获取数据
print("\n1. 获取数据:")
try:
response = client.get('/get', params={'page': 1, 'limit': 10})
data = response.json()
print(f" 请求参数: {data['args']}")
except Exception as e:
print(f" 获取数据失败: {e}")
# 2. 提交数据
print("\n2. 提交数据:")
try:
user_data = {
'name': '张三',
'email': 'zhangsan@example.com',
'age': 25
}
response = client.post('/post', json_data=user_data)
result = response.json()
print(f" 提交的数据: {result['json']}")
except Exception as e:
print(f" 提交数据失败: {e}")
# 3. 更新数据
print("\n3. 更新数据:")
try:
update_data = {
'name': '张三',
'age': 26 # 更新年龄
}
response = client.put('/put', json_data=update_data)
result = response.json()
print(f" 更新的数据: {result['json']}")
except Exception as e:
print(f" 更新数据失败: {e}")
# 4. 删除数据
print("\n4. 删除数据:")
try:
response = client.delete('/delete')
print(f" 删除操作状态: {response.status_code}")
except Exception as e:
print(f" 删除数据失败: {e}")
# 运行API客户端演示
api_client_demo()
# 三、pandas模块 - 数据分析
# 3.1 基础数据结构
# 首先需要安装: pip install pandas numpy
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def pandas_basic_demo():
"""pandas基础数据结构演示"""
print("=== pandas基础数据结构 ===")
# 1. Series - 一维数据
print("\n1. Series - 一维数据:")
# 创建Series
numbers = pd.Series([1, 2, 3, 4, 5])
print(f" 数字Series:\n{numbers}")
# 带索引的Series
scores = pd.Series(
[85, 92, 78, 95, 88],
index=['数学', '英语', '物理', '化学', '生物']
)
print(f"\n 成绩Series:\n{scores}")
# 从字典创建Series
student_info = pd.Series({
'姓名': '张三',
'年龄': 20,
'专业': '计算机科学',
'年级': '大二'
})
print(f"\n 学生信息Series:\n{student_info}")
# Series基本操作
print(f"\n Series基本操作:")
print(f" 数据类型: {scores.dtype}")
print(f" 形状: {scores.shape}")
print(f" 大小: {scores.size}")
print(f" 索引: {list(scores.index)}")
print(f" 值: {list(scores.values)}")
print(f" 最大值: {scores.max()}")
print(f" 最小值: {scores.min()}")
print(f" 平均值: {scores.mean():.2f}")
print(f" 标准差: {scores.std():.2f}")
# 2. DataFrame - 二维数据
print("\n2. DataFrame - 二维数据:")
# 从字典创建DataFrame
students_data = {
'姓名': ['张三', '李四', '王五', '赵六', '钱七'],
'年龄': [20, 21, 19, 22, 20],
'专业': ['计算机', '数学', '物理', '化学', '生物'],
'成绩': [85, 92, 78, 95, 88]
}
df = pd.DataFrame(students_data)
print(f" 学生DataFrame:\n{df}")
# DataFrame基本信息
print(f"\n DataFrame基本信息:")
print(f" 形状: {df.shape}")
print(f" 列名: {list(df.columns)}")
print(f" 索引: {list(df.index)}")
print(f" 数据类型:\n{df.dtypes}")
# 查看数据概览
print(f"\n 数据概览:")
print(f" 前3行:\n{df.head(3)}")
print(f"\n 后2行:\n{df.tail(2)}")
print(f"\n 统计信息:\n{df.describe()}")
print(f"\n 基本信息:")
df.info()
# 3. 索引和选择
print("\n3. 索引和选择:")
# 选择列
print(f" 选择单列 - 姓名:\n{df['姓名']}")
print(f"\n 选择多列:\n{df[['姓名', '成绩']]}")
# 选择行
print(f"\n 选择行 - 第2行:\n{df.iloc[1]}")
print(f"\n 选择多行:\n{df.iloc[1:4]}")
# 条件选择
high_scores = df[df['成绩'] >= 90]
print(f"\n 高分学生 (成绩>=90):\n{high_scores}")
# 复合条件
young_high_scores = df[(df['年龄'] <= 20) & (df['成绩'] >= 85)]
print(f"\n 年轻高分学生:\n{young_high_scores}")
# 4. 数据操作
print("\n4. 数据操作:")
# 添加新列
df['等级'] = df['成绩'].apply(lambda x: 'A' if x >= 90 else 'B' if x >= 80 else 'C')
print(f" 添加等级列:\n{df}")
# 排序
df_sorted = df.sort_values('成绩', ascending=False)
print(f"\n 按成绩降序排列:\n{df_sorted}")
# 分组统计
grade_stats = df.groupby('等级')['成绩'].agg(['count', 'mean', 'min', 'max'])
print(f"\n 按等级分组统计:\n{grade_stats}")
# 运行pandas基础演示
pandas_basic_demo()
# 3.2 数据处理和清洗
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
def pandas_data_processing_demo():
"""pandas数据处理和清洗演示"""
print("=== pandas数据处理和清洗 ===")
# 创建包含缺失值和异常值的示例数据
np.random.seed(42)
data = {
'日期': pd.date_range('2023-01-01', periods=100, freq='D'),
'销售额': np.random.normal(1000, 200, 100),
'客户数': np.random.poisson(50, 100),
'地区': np.random.choice(['北京', '上海', '广州', '深圳'], 100),
'产品': np.random.choice(['A', 'B', 'C'], 100)
}
df = pd.DataFrame(data)
# 人为添加一些缺失值和异常值
df.loc[5:10, '销售额'] = np.nan
df.loc[15, '客户数'] = -5 # 异常值
df.loc[25, '销售额'] = 10000 # 异常值
print(f"原始数据形状: {df.shape}")
print(f"前5行数据:\n{df.head()}")
# 1. 缺失值处理
print("\n1. 缺失值处理:")
# 检查缺失值
missing_info = df.isnull().sum()
print(f" 各列缺失值数量:\n{missing_info}")
# 缺失值比例
missing_percent = (df.isnull().sum() / len(df)) * 100
print(f"\n 各列缺失值比例:\n{missing_percent}")
# 处理缺失值的不同方法
df_processed = df.copy()
# 删除包含缺失值的行
df_dropna = df.dropna()
print(f"\n 删除缺失值后形状: {df_dropna.shape}")
# 用均值填充数值列的缺失值
df_processed['销售额'].fillna(df_processed['销售额'].mean(), inplace=True)
# 用前一个值填充
df_processed['销售额'].fillna(method='ffill', inplace=True)
# 用后一个值填充
df_processed['销售额'].fillna(method='bfill', inplace=True)
print(f" 填充后缺失值数量: {df_processed.isnull().sum().sum()}")
# 2. 异常值检测和处理
print("\n2. 异常值检测和处理:")
# 使用IQR方法检测异常值
def detect_outliers_iqr(series):
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return (series < lower_bound) | (series > upper_bound)
# 检测销售额异常值
sales_outliers = detect_outliers_iqr(df_processed['销售额'])
print(f" 销售额异常值数量: {sales_outliers.sum()}")
print(f" 异常值: {df_processed.loc[sales_outliers, '销售额'].values}")
# 检测客户数异常值(负值)
customer_outliers = df_processed['客户数'] < 0
print(f" 客户数异常值数量: {customer_outliers.sum()}")
# 处理异常值
# 方法1: 删除异常值
df_no_outliers = df_processed[~(sales_outliers | customer_outliers)]
print(f" 删除异常值后形状: {df_no_outliers.shape}")
# 方法2: 用边界值替换异常值
df_capped = df_processed.copy()
# 销售额异常值用95%分位数替换
sales_95th = df_capped['销售额'].quantile(0.95)
df_capped.loc[df_capped['销售额'] > sales_95th, '销售额'] = sales_95th
# 客户数负值用0替换
df_capped.loc[df_capped['客户数'] < 0, '客户数'] = 0
print(f" 替换异常值后统计:\n{df_capped[['销售额', '客户数']].describe()}")
# 3. 数据类型转换
print("\n3. 数据类型转换:")
print(f" 原始数据类型:\n{df_processed.dtypes}")
# 转换数据类型
df_converted = df_processed.copy()
df_converted['客户数'] = df_converted['客户数'].astype('int32')
df_converted['地区'] = df_converted['地区'].astype('category')
df_converted['产品'] = df_converted['产品'].astype('category')
print(f"\n 转换后数据类型:\n{df_converted.dtypes}")
# 内存使用对比
print(f"\n 内存使用对比:")
print(f" 原始: {df_processed.memory_usage(deep=True).sum() / 1024:.2f} KB")
print(f" 转换后: {df_converted.memory_usage(deep=True).sum() / 1024:.2f} KB")
# 4. 数据重塑
print("\n4. 数据重塑:")
# 透视表
pivot_table = df_converted.pivot_table(
values='销售额',
index='地区',
columns='产品',
aggfunc=['mean', 'sum'],
fill_value=0
)
print(f" 透视表:\n{pivot_table}")
# 分组聚合
grouped_stats = df_converted.groupby(['地区', '产品']).agg({
'销售额': ['count', 'mean', 'sum', 'std'],
'客户数': ['mean', 'sum']
}).round(2)
print(f"\n 分组统计:\n{grouped_stats.head(10)}")
# 5. 时间序列处理
print("\n5. 时间序列处理:")
# 设置日期为索引
df_ts = df_converted.set_index('日期')
# 重采样 - 按周汇总
weekly_sales = df_ts['销售额'].resample('W').agg({
'总销售额': 'sum',
'平均销售额': 'mean',
'最大销售额': 'max'
})
print(f" 周度销售统计:\n{weekly_sales.head()}")
# 滚动窗口计算
df_ts['销售额_7日均值'] = df_ts['销售额'].rolling(window=7).mean()
df_ts['销售额_7日标准差'] = df_ts['销售额'].rolling(window=7).std()
print(f"\n 滚动统计示例:\n{df_ts[['销售额', '销售额_7日均值', '销售额_7日标准差']].head(10)}")
# 6. 数据合并
print("\n6. 数据合并:")
# 创建额外的数据表
region_info = pd.DataFrame({
'地区': ['北京', '上海', '广州', '深圳'],
'人口': [2154, 2424, 1530, 1756], # 万人
'GDP': [4.0, 4.3, 2.9, 3.2] # 万亿元
})
# 合并数据
df_merged = df_converted.merge(region_info, on='地区', how='left')
print(f" 合并后数据:\n{df_merged.head()}")
# 计算人均销售额
df_merged['人均销售额'] = df_merged['销售额'] / df_merged['人口']
# 按地区统计
region_summary = df_merged.groupby('地区').agg({
'销售额': 'sum',
'客户数': 'sum',
'人口': 'first',
'GDP': 'first',
'人均销售额': 'mean'
}).round(2)
print(f"\n 地区汇总统计:\n{region_summary}")
# 运行pandas数据处理演示
pandas_data_processing_demo()
# 3.3 数据分析实例
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class SalesAnalyzer:
"""销售数据分析器"""
def __init__(self, data_file=None):
self.df = None
if data_file:
self.load_data(data_file)
else:
self.generate_sample_data()
def generate_sample_data(self, n_records=1000):
"""生成示例销售数据"""
np.random.seed(42)
# 生成日期范围
start_date = datetime(2023, 1, 1)
dates = [start_date + timedelta(days=x) for x in range(365)]
data = []
for _ in range(n_records):
record = {
'订单ID': f'ORD{_+1:06d}',
'日期': np.random.choice(dates),
'客户ID': f'CUST{np.random.randint(1, 201):04d}',
'产品类别': np.random.choice(['电子产品', '服装', '家居', '图书', '食品'], p=[0.3, 0.25, 0.2, 0.15, 0.1]),
'产品名称': f'产品{np.random.randint(1, 101):03d}',
'数量': np.random.randint(1, 11),
'单价': np.random.uniform(10, 1000),
'地区': np.random.choice(['华北', '华东', '华南', '华中', '西南', '西北', '东北']),
'销售员': f'员工{np.random.randint(1, 21):02d}',
'渠道': np.random.choice(['线上', '线下'], p=[0.6, 0.4])
}
data.append(record)
self.df = pd.DataFrame(data)
self.df['销售额'] = self.df['数量'] * self.df['单价']
self.df['日期'] = pd.to_datetime(self.df['日期'])
print(f"生成了 {len(self.df)} 条销售记录")
def load_data(self, file_path):
"""加载数据文件"""
try:
if file_path.endswith('.csv'):
self.df = pd.read_csv(file_path)
elif file_path.endswith('.xlsx'):
self.df = pd.read_excel(file_path)
else:
raise ValueError("不支持的文件格式")
# 确保日期列是datetime类型
if '日期' in self.df.columns:
self.df['日期'] = pd.to_datetime(self.df['日期'])
print(f"成功加载 {len(self.df)} 条记录")
except Exception as e:
print(f"加载数据失败: {e}")
def basic_analysis(self):
"""基础分析"""
print("=== 基础销售分析 ===")
if self.df is None:
print("没有数据可分析")
return
# 基本统计
print(f"\n数据概览:")
print(f" 记录数: {len(self.df):,}")
print(f" 时间范围: {self.df['日期'].min()} 到 {self.df['日期'].max()}")
print(f" 总销售额: ¥{self.df['销售额'].sum():,.2f}")
print(f" 平均订单金额: ¥{self.df['销售额'].mean():.2f}")
print(f" 客户数量: {self.df['客户ID'].nunique():,}")
print(f" 产品数量: {self.df['产品名称'].nunique():,}")
# 销售额分布
print(f"\n销售额统计:")
sales_stats = self.df['销售额'].describe()
for stat, value in sales_stats.items():
print(f" {stat}: ¥{value:.2f}")
def time_analysis(self):
"""时间维度分析"""
print("\n=== 时间维度分析 ===")
# 按月统计
monthly_sales = self.df.groupby(self.df['日期'].dt.to_period('M')).agg({
'销售额': 'sum',
'订单ID': 'count',
'客户ID': 'nunique'
}).round(2)
monthly_sales.columns = ['月销售额', '订单数', '客户数']
print(f"\n月度销售统计:")
print(monthly_sales.head(10))
# 按星期几统计
self.df['星期几'] = self.df['日期'].dt.day_name()
weekday_sales = self.df.groupby('星期几')['销售额'].agg(['sum', 'mean', 'count']).round(2)
weekday_sales.columns = ['总销售额', '平均销售额', '订单数']
print(f"\n星期销售统计:")
print(weekday_sales)
# 销售趋势
daily_sales = self.df.groupby('日期')['销售额'].sum()
# 计算移动平均
daily_sales_ma7 = daily_sales.rolling(window=7).mean()
daily_sales_ma30 = daily_sales.rolling(window=30).mean()
print(f"\n销售趋势分析:")
print(f" 最高单日销售额: ¥{daily_sales.max():.2f} ({daily_sales.idxmax()})")
print(f" 最低单日销售额: ¥{daily_sales.min():.2f} ({daily_sales.idxmin()})")
print(f" 7日移动平均: ¥{daily_sales_ma7.iloc[-1]:.2f}")
print(f" 30日移动平均: ¥{daily_sales_ma30.iloc[-1]:.2f}")
def product_analysis(self):
"""产品维度分析"""
print("\n=== 产品维度分析 ===")
# 按产品类别统计
category_stats = self.df.groupby('产品类别').agg({
'销售额': ['sum', 'mean', 'count'],
'数量': 'sum',
'客户ID': 'nunique'
}).round(2)
category_stats.columns = ['总销售额', '平均销售额', '订单数', '总数量', '客户数']
category_stats = category_stats.sort_values('总销售额', ascending=False)
print(f"\n产品类别统计:")
print(category_stats)
# 计算类别占比
category_stats['销售额占比'] = (category_stats['总销售额'] / category_stats['总销售额'].sum() * 100).round(2)
print(f"\n产品类别销售额占比:")
for category, row in category_stats.iterrows():
print(f" {category}: {row['销售额占比']}%")
# 热销产品TOP10
top_products = self.df.groupby('产品名称').agg({
'销售额': 'sum',
'数量': 'sum',
'订单ID': 'count'
}).round(2)
top_products.columns = ['总销售额', '总数量', '订单数']
top_products = top_products.sort_values('总销售额', ascending=False).head(10)
print(f"\n热销产品TOP10:")
print(top_products)
def customer_analysis(self):
"""客户维度分析"""
print("\n=== 客户维度分析 ===")
# 客户价值分析
customer_stats = self.df.groupby('客户ID').agg({
'销售额': 'sum',
'订单ID': 'count',
'日期': ['min', 'max']
}).round(2)
customer_stats.columns = ['总消费额', '订单数', '首次购买', '最后购买']
customer_stats['平均订单金额'] = (customer_stats['总消费额'] / customer_stats['订单数']).round(2)
# 计算客户活跃天数
customer_stats['活跃天数'] = (customer_stats['最后购买'] - customer_stats['首次购买']).dt.days + 1
print(f"\n客户统计概览:")
print(f" 总客户数: {len(customer_stats):,}")
print(f" 平均客户价值: ¥{customer_stats['总消费额'].mean():.2f}")
print(f" 平均订单数: {customer_stats['订单数'].mean():.2f}")
print(f" 平均订单金额: ¥{customer_stats['平均订单金额'].mean():.2f}")
# 客户分层(RFM简化版)
# R: Recency (最近购买时间)
# F: Frequency (购买频率)
# M: Monetary (消费金额)
latest_date = self.df['日期'].max()
customer_stats['最近购买天数'] = (latest_date - customer_stats['最后购买']).dt.days
# 客户分层
def customer_segment(row):
if row['总消费额'] >= customer_stats['总消费额'].quantile(0.8):
if row['最近购买天数'] <= 30:
return '高价值活跃客户'
else:
return '高价值沉睡客户'
elif row['总消费额'] >= customer_stats['总消费额'].quantile(0.5):
if row['最近购买天数'] <= 60:
return '中价值活跃客户'
else:
return '中价值沉睡客户'
else:
if row['最近购买天数'] <= 90:
return '低价值活跃客户'
else:
return '低价值沉睡客户'
customer_stats['客户分层'] = customer_stats.apply(customer_segment, axis=1)
segment_stats = customer_stats.groupby('客户分层').agg({
'总消费额': ['count', 'sum', 'mean'],
'订单数': 'mean',
'平均订单金额': 'mean'
}).round(2)
print(f"\n客户分层统计:")
print(segment_stats)
# TOP客户
top_customers = customer_stats.sort_values('总消费额', ascending=False).head(10)
print(f"\nTOP10客户:")
print(top_customers[['总消费额', '订单数', '平均订单金额', '客户分层']])
def regional_analysis(self):
"""地区维度分析"""
print("\n=== 地区维度分析 ===")
# 地区销售统计
region_stats = self.df.groupby('地区').agg({
'销售额': ['sum', 'mean', 'count'],
'客户ID': 'nunique',
'销售员': 'nunique'
}).round(2)
region_stats.columns = ['总销售额', '平均销售额', '订单数', '客户数', '销售员数']
region_stats = region_stats.sort_values('总销售额', ascending=False)
# 计算地区占比
region_stats['销售额占比'] = (region_stats['总销售额'] / region_stats['总销售额'].sum() * 100).round(2)
region_stats['人均销售额'] = (region_stats['总销售额'] / region_stats['客户数']).round(2)
print(f"\n地区销售统计:")
print(region_stats)
# 渠道分析
channel_stats = self.df.groupby(['地区', '渠道'])['销售额'].sum().unstack(fill_value=0)
channel_stats['总计'] = channel_stats.sum(axis=1)
channel_stats = channel_stats.sort_values('总计', ascending=False)
print(f"\n地区渠道分析:")
print(channel_stats)
def sales_performance_analysis(self):
"""销售员绩效分析"""
print("\n=== 销售员绩效分析 ===")
# 销售员统计
salesperson_stats = self.df.groupby('销售员').agg({
'销售额': ['sum', 'mean', 'count'],
'客户ID': 'nunique',
'产品类别': 'nunique'
}).round(2)
salesperson_stats.columns = ['总销售额', '平均订单金额', '订单数', '客户数', '产品类别数']
salesperson_stats['客户平均价值'] = (salesperson_stats['总销售额'] / salesperson_stats['客户数']).round(2)
salesperson_stats = salesperson_stats.sort_values('总销售额', ascending=False)
print(f"\n销售员绩效统计:")
print(salesperson_stats.head(10))
# 绩效分级
performance_threshold = salesperson_stats['总销售额'].quantile([0.2, 0.8])
def performance_level(sales):
if sales >= performance_threshold[0.8]:
return '优秀'
elif sales >= performance_threshold[0.2]:
return '良好'
else:
return '待提升'
salesperson_stats['绩效等级'] = salesperson_stats['总销售额'].apply(performance_level)
performance_summary = salesperson_stats.groupby('绩效等级').agg({
'总销售额': ['count', 'sum', 'mean'],
'客户数': 'mean',
'订单数': 'mean'
}).round(2)
print(f"\n绩效等级分布:")
print(performance_summary)
def generate_report(self):
"""生成完整分析报告"""
print("\n" + "="*50)
print(" 销售数据分析报告")
print("="*50)
self.basic_analysis()
self.time_analysis()
self.product_analysis()
self.customer_analysis()
self.regional_analysis()
self.sales_performance_analysis()
print("\n" + "="*50)
print(" 报告生成完成")
print("="*50)
def pandas_analysis_demo():
"""pandas数据分析实例演示"""
print("=== pandas数据分析实例 ===")
# 创建销售分析器
analyzer = SalesAnalyzer()
# 生成完整分析报告
analyzer.generate_report()
# 运行pandas分析演示
pandas_analysis_demo()
# 四、numpy模块 - 数值计算
# 4.1 基础数组操作
# 首先需要安装: pip install numpy
import numpy as np
import time
def numpy_basic_demo():
"""numpy基础操作演示"""
print("=== numpy基础数组操作 ===")
# 1. 创建数组
print("\n1. 创建数组:")
# 从列表创建
arr1 = np.array([1, 2, 3, 4, 5])
print(f" 一维数组: {arr1}")
print(f" 数据类型: {arr1.dtype}")
print(f" 形状: {arr1.shape}")
print(f" 维度: {arr1.ndim}")
# 二维数组
arr2 = np.array([[1, 2, 3], [4, 5, 6]])
print(f"\n 二维数组:\n{arr2}")
print(f" 形状: {arr2.shape}")
print(f" 大小: {arr2.size}")
# 指定数据类型
arr3 = np.array([1, 2, 3], dtype=np.float64)
print(f"\n 指定类型数组: {arr3}")
print(f" 数据类型: {arr3.dtype}")
# 2. 特殊数组创建
print("\n2. 特殊数组创建:")
# 零数组
zeros = np.zeros((3, 4))
print(f" 零数组:\n{zeros}")
# 一数组
ones = np.ones((2, 3), dtype=int)
print(f"\n 一数组:\n{ones}")
# 单位矩阵
identity = np.eye(3)
print(f"\n 单位矩阵:\n{identity}")
# 等差数列
linspace = np.linspace(0, 10, 5)
print(f"\n 等差数列: {linspace}")
# 等比数列
arange = np.arange(0, 10, 2)
print(f" 等差序列: {arange}")
# 随机数组
np.random.seed(42)
random_arr = np.random.random((2, 3))
print(f"\n 随机数组:\n{random_arr}")
# 正态分布随机数
normal_arr = np.random.normal(0, 1, (2, 3))
print(f"\n 正态分布随机数:\n{normal_arr}")
# 3. 数组索引和切片
print("\n3. 数组索引和切片:")
arr = np.arange(12).reshape(3, 4)
print(f" 原数组:\n{arr}")
# 基本索引
print(f" 元素[1,2]: {arr[1, 2]}")
print(f" 第一行: {arr[0]}")
print(f" 第一列: {arr[:, 0]}")
# 切片
print(f" 前两行:\n{arr[:2]}")
print(f" 后两列:\n{arr[:, -2:]}")
# 布尔索引
mask = arr > 5
print(f"\n 大于5的元素: {arr[mask]}")
# 花式索引
indices = np.array([0, 2])
print(f" 选择第0和第2行:\n{arr[indices]}")
# 4. 数组形状操作
print("\n4. 数组形状操作:")
original = np.arange(12)
print(f" 原数组: {original}")
# 重塑
reshaped = original.reshape(3, 4)
print(f" 重塑为3x4:\n{reshaped}")
# 转置
transposed = reshaped.T
print(f" 转置:\n{transposed}")
# 展平
flattened = reshaped.flatten()
print(f" 展平: {flattened}")
# 添加维度
expanded = np.expand_dims(original, axis=0)
print(f" 添加维度: {expanded.shape}")
# 压缩维度
squeezed = np.squeeze(expanded)
print(f" 压缩维度: {squeezed.shape}")
# 运行numpy基础演示
numpy_basic_demo()
# 4.2 数学运算和统计
import numpy as np
def numpy_math_demo():
"""numpy数学运算演示"""
print("=== numpy数学运算和统计 ===")
# 创建测试数据
np.random.seed(42)
arr1 = np.random.randint(1, 10, (3, 4))
arr2 = np.random.randint(1, 10, (3, 4))
print(f"数组1:\n{arr1}")
print(f"数组2:\n{arr2}")
# 1. 基本数学运算
print("\n1. 基本数学运算:")
# 元素级运算
print(f" 加法:\n{arr1 + arr2}")
print(f"\n 减法:\n{arr1 - arr2}")
print(f"\n 乘法:\n{arr1 * arr2}")
print(f"\n 除法:\n{arr1 / arr2}")
print(f"\n 幂运算:\n{arr1 ** 2}")
# 标量运算
print(f"\n 标量加法:\n{arr1 + 10}")
print(f"\n 标量乘法:\n{arr1 * 2}")
# 2. 数学函数
print("\n2. 数学函数:")
# 三角函数
angles = np.array([0, np.pi/6, np.pi/4, np.pi/3, np.pi/2])
print(f" 角度: {angles}")
print(f" sin值: {np.sin(angles)}")
print(f" cos值: {np.cos(angles)}")
print(f" tan值: {np.tan(angles)}")
# 指数和对数
values = np.array([1, 2, 3, 4, 5])
print(f"\n 原值: {values}")
print(f" 指数: {np.exp(values)}")
print(f" 自然对数: {np.log(values)}")
print(f" 以10为底: {np.log10(values)}")
print(f" 平方根: {np.sqrt(values)}")
# 取整函数
decimals = np.array([1.2, 2.7, -1.5, -2.8])
print(f"\n 小数: {decimals}")
print(f" 向上取整: {np.ceil(decimals)}")
print(f" 向下取整: {np.floor(decimals)}")
print(f" 四舍五入: {np.round(decimals)}")
print(f" 截断: {np.trunc(decimals)}")
# 3. 统计函数
print("\n3. 统计函数:")
data = np.random.normal(50, 15, (5, 6))
print(f" 测试数据:\n{data.round(2)}")
# 基本统计
print(f"\n 最大值: {np.max(data):.2f}")
print(f" 最小值: {np.min(data):.2f}")
print(f" 均值: {np.mean(data):.2f}")
print(f" 中位数: {np.median(data):.2f}")
print(f" 标准差: {np.std(data):.2f}")
print(f" 方差: {np.var(data):.2f}")
print(f" 总和: {np.sum(data):.2f}")
# 按轴统计
print(f"\n 按行统计 (axis=1):")
print(f" 行均值: {np.mean(data, axis=1).round(2)}")
print(f" 行最大值: {np.max(data, axis=1).round(2)}")
print(f"\n 按列统计 (axis=0):")
print(f" 列均值: {np.mean(data, axis=0).round(2)}")
print(f" 列最小值: {np.min(data, axis=0).round(2)}")
# 分位数
percentiles = [25, 50, 75, 90, 95]
print(f"\n 分位数:")
for p in percentiles:
value = np.percentile(data, p)
print(f" {p}%分位数: {value:.2f}")
# 4. 线性代数
print("\n4. 线性代数:")
# 矩阵乘法
A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])
print(f" 矩阵A:\n{A}")
print(f" 矩阵B:\n{B}")
print(f" 矩阵乘法 A@B:\n{A @ B}")
print(f" 矩阵乘法 np.dot(A,B):\n{np.dot(A, B)}")
# 矩阵属性
print(f"\n 矩阵A的行列式: {np.linalg.det(A):.2f}")
print(f" 矩阵A的迹: {np.trace(A)}")
# 特征值和特征向量
eigenvalues, eigenvectors = np.linalg.eig(A)
print(f" 特征值: {eigenvalues}")
print(f" 特征向量:\n{eigenvectors}")
# 矩阵求逆
try:
A_inv = np.linalg.inv(A)
print(f" 矩阵A的逆:\n{A_inv}")
print(f" 验证 A * A_inv:\n{(A @ A_inv).round(10)}")
except np.linalg.LinAlgError:
print(" 矩阵不可逆")
# 5. 数组比较和逻辑运算
print("\n5. 数组比较和逻辑运算:")
x = np.array([1, 2, 3, 4, 5])
y = np.array([1, 3, 2, 4, 6])
print(f" 数组x: {x}")
print(f" 数组y: {y}")
print(f" x == y: {x == y}")
print(f" x > y: {x > y}")
print(f" x >= 3: {x >= 3}")
# 逻辑运算
condition1 = x > 2
condition2 = x < 5
print(f"\n x > 2: {condition1}")
print(f" x < 5: {condition2}")
print(f" (x > 2) & (x < 5): {condition1 & condition2}")
print(f" (x > 2) | (x < 5): {condition1 | condition2}")
# 条件选择
result = np.where(x > 3, x, 0)
print(f" 条件选择 (x>3则保留,否则为0): {result}")
# 运行numpy数学演示
numpy_math_demo()
# 4.3 性能优化和实际应用
import numpy as np
import time
def numpy_performance_demo():
"""numpy性能优化演示"""
print("=== numpy性能优化和实际应用 ===")
# 1. 性能对比
print("\n1. 性能对比:")
# 创建大数组
size = 1000000
python_list = list(range(size))
numpy_array = np.arange(size)
# Python列表求和
start_time = time.time()
python_sum = sum(python_list)
python_time = time.time() - start_time
# NumPy数组求和
start_time = time.time()
numpy_sum = np.sum(numpy_array)
numpy_time = time.time() - start_time
print(f" 数组大小: {size:,}")
print(f" Python列表求和: {python_time:.6f}秒")
print(f" NumPy数组求和: {numpy_time:.6f}秒")
print(f" 性能提升: {python_time/numpy_time:.1f}倍")
# 2. 向量化操作
print("\n2. 向量化操作:")
# 非向量化方式
def python_operation(arr):
result = []
for x in arr:
result.append(x**2 + 2*x + 1)
return result
# 向量化方式
def numpy_operation(arr):
return arr**2 + 2*arr + 1
test_data = list(range(100000))
numpy_data = np.array(test_data)
# 性能测试
start_time = time.time()
python_result = python_operation(test_data)
python_time = time.time() - start_time
start_time = time.time()
numpy_result = numpy_operation(numpy_data)
numpy_time = time.time() - start_time
print(f" Python循环: {python_time:.6f}秒")
print(f" NumPy向量化: {numpy_time:.6f}秒")
print(f" 性能提升: {python_time/numpy_time:.1f}倍")
# 3. 广播机制
print("\n3. 广播机制:")
# 不同形状数组的运算
a = np.array([[1, 2, 3],
[4, 5, 6]])
b = np.array([10, 20, 30])
c = np.array([[100],
[200]])
print(f" 数组a (2x3):\n{a}")
print(f" 数组b (3,): {b}")
print(f" 数组c (2x1):\n{c}")
print(f"\n a + b (广播):\n{a + b}")
print(f" a + c (广播):\n{a + c}")
print(f" a + b + c (广播):\n{a + b + c}")
# 4. 内存优化
print("\n4. 内存优化:")
# 数据类型优化
large_array = np.random.randint(0, 100, 1000000)
# 不同数据类型的内存使用
int64_size = large_array.astype(np.int64).nbytes
int32_size = large_array.astype(np.int32).nbytes
int16_size = large_array.astype(np.int16).nbytes
int8_size = large_array.astype(np.int8).nbytes
print(f" 数组大小: {len(large_array):,}")
print(f" int64内存: {int64_size/1024/1024:.2f} MB")
print(f" int32内存: {int32_size/1024/1024:.2f} MB")
print(f" int16内存: {int16_size/1024/1024:.2f} MB")
print(f" int8内存: {int8_size/1024/1024:.2f} MB")
# 就地操作
arr = np.random.random(1000000)
arr_copy = arr.copy()
# 创建新数组
start_time = time.time()
result1 = arr * 2 + 1
time1 = time.time() - start_time
# 就地操作
start_time = time.time()
arr_copy *= 2
arr_copy += 1
time2 = time.time() - start_time
print(f"\n 创建新数组: {time1:.6f}秒")
print(f" 就地操作: {time2:.6f}秒")
print(f" 性能提升: {time1/time2:.1f}倍")
# 5. 实际应用示例
print("\n5. 实际应用示例:")
# 图像处理模拟
def image_processing_demo():
# 模拟RGB图像 (高度, 宽度, 通道)
height, width = 100, 100
image = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
print(f" 原图像形状: {image.shape}")
print(f" 原图像数据类型: {image.dtype}")
# 转换为灰度图
# 灰度 = 0.299*R + 0.587*G + 0.114*B
weights = np.array([0.299, 0.587, 0.114])
gray_image = np.dot(image, weights).astype(np.uint8)
print(f" 灰度图形状: {gray_image.shape}")
print(f" 像素值范围: {gray_image.min()} - {gray_image.max()}")
# 图像增强 - 对比度调整
enhanced = np.clip(gray_image * 1.5, 0, 255).astype(np.uint8)
print(f" 增强后范围: {enhanced.min()} - {enhanced.max()}")
# 边缘检测模拟 (简单差分)
edges_x = np.abs(np.diff(gray_image, axis=1))
edges_y = np.abs(np.diff(gray_image, axis=0))
print(f" 水平边缘形状: {edges_x.shape}")
print(f" 垂直边缘形状: {edges_y.shape}")
return gray_image, enhanced, edges_x, edges_y
# 信号处理模拟
def signal_processing_demo():
# 生成信号
t = np.linspace(0, 1, 1000)
frequency1, frequency2 = 5, 20
signal = np.sin(2 * np.pi * frequency1 * t) + 0.5 * np.sin(2 * np.pi * frequency2 * t)
# 添加噪声
noise = np.random.normal(0, 0.1, len(signal))
noisy_signal = signal + noise
print(f" 信号长度: {len(signal)}")
print(f" 信号范围: {signal.min():.3f} - {signal.max():.3f}")
print(f" 噪声信号范围: {noisy_signal.min():.3f} - {noisy_signal.max():.3f}")
# 简单滤波 (移动平均)
window_size = 10
filtered_signal = np.convolve(noisy_signal, np.ones(window_size)/window_size, mode='same')
print(f" 滤波后范围: {filtered_signal.min():.3f} - {filtered_signal.max():.3f}")
# 统计分析
print(f" 原信号标准差: {np.std(signal):.3f}")
print(f" 噪声信号标准差: {np.std(noisy_signal):.3f}")
print(f" 滤波信号标准差: {np.std(filtered_signal):.3f}")
return signal, noisy_signal, filtered_signal
# 数据分析模拟
def data_analysis_demo():
# 生成销售数据
np.random.seed(42)
days = 365
base_sales = 1000
trend = np.linspace(0, 200, days) # 增长趋势
seasonal = 100 * np.sin(2 * np.pi * np.arange(days) / 365.25 * 4) # 季节性
noise = np.random.normal(0, 50, days) # 随机噪声
sales = base_sales + trend + seasonal + noise
sales = np.maximum(sales, 0) # 确保非负
print(f" 销售数据天数: {len(sales)}")
print(f" 平均日销售额: ¥{np.mean(sales):.2f}")
print(f" 销售额标准差: ¥{np.std(sales):.2f}")
print(f" 最高日销售额: ¥{np.max(sales):.2f}")
print(f" 最低日销售额: ¥{np.min(sales):.2f}")
# 移动平均分析
window_sizes = [7, 30, 90]
for window in window_sizes:
ma = np.convolve(sales, np.ones(window)/window, mode='valid')
print(f" {window}日移动平均: ¥{ma[-1]:.2f}")
# 同比分析 (简化)
if len(sales) >= 365:
yoy_growth = (sales[-1] - sales[-365]) / sales[-365] * 100
print(f" 年同比增长: {yoy_growth:.1f}%")
return sales
print("\n 图像处理演示:")
image_processing_demo()
print("\n 信号处理演示:")
signal_processing_demo()
print("\n 数据分析演示:")
data_analysis_demo()
# 运行numpy性能演示
numpy_performance_demo()
# 五、matplotlib模块 - 数据可视化
# 5.1 基础绘图
# 首先需要安装: pip install matplotlib
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta
def matplotlib_basic_demo():
"""matplotlib基础绘图演示"""
print("=== matplotlib基础绘图 ===")
# 设置中文字体支持
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# 1. 线图
print("\n1. 线图演示")
# 生成数据
x = np.linspace(0, 10, 100)
y1 = np.sin(x)
y2 = np.cos(x)
y3 = np.sin(x) * np.exp(-x/5)
# 创建图形
plt.figure(figsize=(12, 8))
# 第一个子图
plt.subplot(2, 2, 1)
plt.plot(x, y1, label='sin(x)', linewidth=2, color='blue')
plt.plot(x, y2, label='cos(x)', linewidth=2, color='red', linestyle='--')
plt.title('三角函数')
plt.xlabel('x')
plt.ylabel('y')
plt.legend()
plt.grid(True, alpha=0.3)
# 第二个子图 - 散点图
plt.subplot(2, 2, 2)
np.random.seed(42)
x_scatter = np.random.randn(50)
y_scatter = 2 * x_scatter + np.random.randn(50)
plt.scatter(x_scatter, y_scatter, alpha=0.6, c=y_scatter, cmap='viridis')
plt.title('散点图')
plt.xlabel('X值')
plt.ylabel('Y值')
plt.colorbar(label='Y值')
# 第三个子图 - 柱状图
plt.subplot(2, 2, 3)
categories = ['A', 'B', 'C', 'D', 'E']
values = [23, 45, 56, 78, 32]
colors = ['red', 'green', 'blue', 'orange', 'purple']
bars = plt.bar(categories, values, color=colors, alpha=0.7)
plt.title('柱状图')
plt.xlabel('类别')
plt.ylabel('数值')
# 在柱子上添加数值标签
for bar, value in zip(bars, values):
plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
str(value), ha='center', va='bottom')
# 第四个子图 - 饼图
plt.subplot(2, 2, 4)
sizes = [30, 25, 20, 15, 10]
labels = ['产品A', '产品B', '产品C', '产品D', '产品E']
explode = (0.1, 0, 0, 0, 0) # 突出第一个扇形
plt.pie(sizes, labels=labels, explode=explode, autopct='%1.1f%%',
shadow=True, startangle=90)
plt.title('饼图')
plt.tight_layout()
plt.savefig('basic_plots.png', dpi=300, bbox_inches='tight')
plt.show()
print(" 基础图形已保存为 'basic_plots.png'")
# 2. 高级线图
print("\n2. 高级线图演示")
plt.figure(figsize=(12, 6))
# 生成时间序列数据
dates = [datetime(2023, 1, 1) + timedelta(days=i) for i in range(365)]
np.random.seed(42)
# 模拟股价数据
price = 100
prices = [price]
for _ in range(364):
change = np.random.normal(0, 2)
price = max(price + change, 10) # 确保价格不为负
prices.append(price)
# 计算移动平均
ma_20 = []
ma_50 = []
for i in range(len(prices)):
if i >= 19:
ma_20.append(np.mean(prices[i-19:i+1]))
else:
ma_20.append(np.nan)
if i >= 49:
ma_50.append(np.mean(prices[i-49:i+1]))
else:
ma_50.append(np.nan)
# 绘制股价图
plt.plot(dates, prices, label='股价', linewidth=1, alpha=0.7)
plt.plot(dates, ma_20, label='20日均线', linewidth=2, color='orange')
plt.plot(dates, ma_50, label='50日均线', linewidth=2, color='red')
plt.title('股价走势图', fontsize=16)
plt.xlabel('日期', fontsize=12)
plt.ylabel('价格 (元)', fontsize=12)
plt.legend()
plt.grid(True, alpha=0.3)
# 格式化x轴日期
plt.xticks(rotation=45)
# 添加注释
max_price_idx = np.argmax(prices)
max_price = prices[max_price_idx]
max_date = dates[max_price_idx]
plt.annotate(f'最高点\n{max_price:.2f}元',
xy=(max_date, max_price),
xytext=(max_date, max_price + 10),
arrowprops=dict(arrowstyle='->', color='red'),
fontsize=10, ha='center')
plt.tight_layout()
plt.savefig('stock_chart.png', dpi=300, bbox_inches='tight')
plt.show()
print(" 股价图已保存为 'stock_chart.png'")
# 运行matplotlib基础演示
matplotlib_basic_demo()
# 5.2 高级可视化
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.patches import Rectangle
from matplotlib.collections import PatchCollection
def matplotlib_advanced_demo():
"""matplotlib高级可视化演示"""
print("=== matplotlib高级可视化 ===")
# 设置样式
plt.style.use('seaborn-v0_8')
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# 1. 热力图
print("\n1. 热力图演示")
# 生成相关性矩阵数据
np.random.seed(42)
variables = ['销售额', '广告费', '客户数', '产品数', '员工数']
n_vars = len(variables)
# 生成随机相关性矩阵
correlation_matrix = np.random.rand(n_vars, n_vars)
correlation_matrix = (correlation_matrix + correlation_matrix.T) / 2 # 对称化
np.fill_diagonal(correlation_matrix, 1) # 对角线为1
plt.figure(figsize=(10, 8))
# 创建热力图
im = plt.imshow(correlation_matrix, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1)
# 设置刻度和标签
plt.xticks(range(n_vars), variables, rotation=45)
plt.yticks(range(n_vars), variables)
# 添加数值标签
for i in range(n_vars):
for j in range(n_vars):
plt.text(j, i, f'{correlation_matrix[i, j]:.2f}',
ha='center', va='center',
color='white' if abs(correlation_matrix[i, j]) > 0.5 else 'black')
plt.colorbar(im, label='相关系数')
plt.title('变量相关性热力图', fontsize=16, pad=20)
plt.tight_layout()
plt.savefig('heatmap.png', dpi=300, bbox_inches='tight')
plt.show()
# 2. 箱线图
print("\n2. 箱线图演示")
# 生成不同组的数据
np.random.seed(42)
group_data = {
'组A': np.random.normal(100, 15, 100),
'组B': np.random.normal(110, 20, 100),
'组C': np.random.normal(95, 10, 100),
'组D': np.random.normal(105, 25, 100)
}
plt.figure(figsize=(12, 6))
# 左侧:传统箱线图
plt.subplot(1, 2, 1)
data_values = list(group_data.values())
labels = list(group_data.keys())
box_plot = plt.boxplot(data_values, labels=labels, patch_artist=True)
# 自定义箱线图颜色
colors = ['lightblue', 'lightgreen', 'lightcoral', 'lightyellow']
for patch, color in zip(box_plot['boxes'], colors):
patch.set_facecolor(color)
patch.set_alpha(0.7)
plt.title('箱线图')
plt.ylabel('数值')
plt.grid(True, alpha=0.3)
# 右侧:小提琴图
plt.subplot(1, 2, 2)
violin_plot = plt.violinplot(data_values, positions=range(1, len(labels)+1))
# 自定义小提琴图颜色
for pc, color in zip(violin_plot['bodies'], colors):
pc.set_facecolor(color)
pc.set_alpha(0.7)
plt.xticks(range(1, len(labels)+1), labels)
plt.title('小提琴图')
plt.ylabel('数值')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('box_violin_plots.png', dpi=300, bbox_inches='tight')
plt.show()
# 3. 多轴图
print("\n3. 多轴图演示")
# 生成数据
months = ['1月', '2月', '3月', '4月', '5月', '6月',
'7月', '8月', '9月', '10月', '11月', '12月']
sales = [120, 135, 158, 142, 168, 195, 210, 198, 175, 162, 148, 155]
profit_rate = [8.5, 9.2, 10.1, 8.8, 11.2, 12.5, 13.1, 12.8, 11.5, 10.8, 9.9, 10.3]
fig, ax1 = plt.subplots(figsize=(12, 6))
# 第一个y轴 - 销售额
color1 = 'tab:blue'
ax1.set_xlabel('月份')
ax1.set_ylabel('销售额 (万元)', color=color1)
line1 = ax1.plot(months, sales, color=color1, marker='o', linewidth=2, label='销售额')
ax1.tick_params(axis='y', labelcolor=color1)
ax1.grid(True, alpha=0.3)
# 第二个y轴 - 利润率
ax2 = ax1.twinx()
color2 = 'tab:red'
ax2.set_ylabel('利润率 (%)', color=color2)
line2 = ax2.plot(months, profit_rate, color=color2, marker='s', linewidth=2, label='利润率')
ax2.tick_params(axis='y', labelcolor=color2)
# 添加图例
lines = line1 + line2
labels = [l.get_label() for l in lines]
ax1.legend(lines, labels, loc='upper left')
plt.title('销售额与利润率趋势', fontsize=16)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('dual_axis_plot.png', dpi=300, bbox_inches='tight')
plt.show()
# 4. 3D图形
print("\n4. 3D图形演示")
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure(figsize=(15, 5))
# 3D散点图
ax1 = fig.add_subplot(131, projection='3d')
np.random.seed(42)
n_points = 100
x = np.random.randn(n_points)
y = np.random.randn(n_points)
z = x**2 + y**2 + np.random.randn(n_points) * 0.1
colors = z
scatter = ax1.scatter(x, y, z, c=colors, cmap='viridis', alpha=0.6)
ax1.set_xlabel('X轴')
ax1.set_ylabel('Y轴')
ax1.set_zlabel('Z轴')
ax1.set_title('3D散点图')
# 3D表面图
ax2 = fig.add_subplot(132, projection='3d')
x_surf = np.linspace(-2, 2, 30)
y_surf = np.linspace(-2, 2, 30)
X, Y = np.meshgrid(x_surf, y_surf)
Z = np.sin(np.sqrt(X**2 + Y**2))
surface = ax2.plot_surface(X, Y, Z, cmap='coolwarm', alpha=0.8)
ax2.set_xlabel('X轴')
ax2.set_ylabel('Y轴')
ax2.set_zlabel('Z轴')
ax2.set_title('3D表面图')
# 3D线框图
ax3 = fig.add_subplot(133, projection='3d')
wireframe = ax3.plot_wireframe(X, Y, Z, alpha=0.6)
ax3.set_xlabel('X轴')
ax3.set_ylabel('Y轴')
ax3.set_zlabel('Z轴')
ax3.set_title('3D线框图')
plt.tight_layout()
plt.savefig('3d_plots.png', dpi=300, bbox_inches='tight')
plt.show()
# 5. 动画图(静态展示)
print("\n5. 动画效果演示(静态帧)")
# 创建动画的几个关键帧
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()
t_values = np.linspace(0, 2*np.pi, 6)
x_base = np.linspace(0, 4*np.pi, 100)
for i, t in enumerate(t_values):
ax = axes[i]
y = np.sin(x_base + t)
ax.plot(x_base, y, 'b-', linewidth=2)
ax.set_ylim(-1.5, 1.5)
ax.set_title(f'帧 {i+1}: t={t:.2f}')
ax.grid(True, alpha=0.3)
plt.suptitle('正弦波动画效果(静态帧展示)', fontsize=16)
plt.tight_layout()
plt.savefig('animation_frames.png', dpi=300, bbox_inches='tight')
plt.show()
print(" 所有高级图形已保存")
# 运行matplotlib高级演示
matplotlib_advanced_demo()
# 六、scikit-learn模块 - 机器学习
# 6.1 基础机器学习
# 首先需要安装: pip install scikit-learn
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, mean_squared_error, classification_report
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
def sklearn_basic_demo():
"""scikit-learn基础机器学习演示"""
print("=== scikit-learn基础机器学习 ===")
# 1. 线性回归
print("\n1. 线性回归演示:")
# 生成回归数据
X, y = datasets.make_regression(n_samples=100, n_features=1, noise=10, random_state=42)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 创建和训练模型
lr_model = LinearRegression()
lr_model.fit(X_train, y_train)
# 预测
y_pred = lr_model.predict(X_test)
# 评估
mse = mean_squared_error(y_test, y_pred)
print(f" 训练样本数: {len(X_train)}")
print(f" 测试样本数: {len(X_test)}")
print(f" 均方误差: {mse:.2f}")
print(f" 模型系数: {lr_model.coef_[0]:.2f}")
print(f" 模型截距: {lr_model.intercept_:.2f}")
# 2. 分类任务
print("\n2. 分类任务演示:")
# 使用鸢尾花数据集
iris = datasets.load_iris()
X_iris, y_iris = iris.data, iris.target
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(
X_iris, y_iris, test_size=0.3, random_state=42
)
# 数据标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 逻辑回归
log_reg = LogisticRegression(random_state=42)
log_reg.fit(X_train_scaled, y_train)
# 随机森林
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)
# 预测和评估
log_pred = log_reg.predict(X_test_scaled)
rf_pred = rf_model.predict(X_test)
log_accuracy = accuracy_score(y_test, log_pred)
rf_accuracy = accuracy_score(y_test, rf_pred)
print(f" 数据集: 鸢尾花数据集")
print(f" 特征数: {X_iris.shape[1]}")
print(f" 类别数: {len(np.unique(y_iris))}")
print(f" 逻辑回归准确率: {log_accuracy:.3f}")
print(f" 随机森林准确率: {rf_accuracy:.3f}")
# 特征重要性
feature_importance = rf_model.feature_importances_
feature_names = iris.feature_names
print(f"\n 特征重要性:")
for name, importance in zip(feature_names, feature_importance):
print(f" {name}: {importance:.3f}")
# 3. 聚类分析
print("\n3. 聚类分析演示:")
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
# 生成聚类数据
X_cluster, _ = datasets.make_blobs(n_samples=300, centers=4, n_features=2,
random_state=42, cluster_std=1.5)
# K-means聚类
kmeans = KMeans(n_clusters=4, random_state=42, n_init=10)
cluster_labels = kmeans.fit_predict(X_cluster)
# 评估聚类效果
silhouette_avg = silhouette_score(X_cluster, cluster_labels)
print(f" 样本数: {len(X_cluster)}")
print(f" 聚类数: 4")
print(f" 轮廓系数: {silhouette_avg:.3f}")
print(f" 聚类中心:")
for i, center in enumerate(kmeans.cluster_centers_):
print(f" 簇{i+1}: ({center[0]:.2f}, {center[1]:.2f})")
# 4. 模型评估和交叉验证
print("\n4. 模型评估和交叉验证:")
from sklearn.model_selection import cross_val_score, GridSearchCV
from sklearn.svm import SVC
# 使用SVM进行交叉验证
svm_model = SVC(random_state=42)
cv_scores = cross_val_score(svm_model, X_train_scaled, y_train, cv=5)
print(f" 5折交叉验证结果:")
print(f" 各折得分: {cv_scores}")
print(f" 平均得分: {cv_scores.mean():.3f} (+/- {cv_scores.std() * 2:.3f})")
# 网格搜索调参
param_grid = {
'C': [0.1, 1, 10],
'gamma': ['scale', 'auto', 0.1, 1]
}
grid_search = GridSearchCV(SVC(random_state=42), param_grid, cv=3, scoring='accuracy')
grid_search.fit(X_train_scaled, y_train)
print(f"\n 网格搜索最佳参数: {grid_search.best_params_}")
print(f" 最佳交叉验证得分: {grid_search.best_score_:.3f}")
# 最佳模型在测试集上的表现
best_model = grid_search.best_estimator_
test_score = best_model.score(X_test_scaled, y_test)
print(f" 测试集得分: {test_score:.3f}")
# 运行scikit-learn基础演示
sklearn_basic_demo()
# 七、BeautifulSoup模块 - 网页解析
# 7.1 HTML解析基础
# 首先需要安装: pip install beautifulsoup4 lxml
from bs4 import BeautifulSoup
import requests
from urllib.parse import urljoin, urlparse
import re
import time
def beautifulsoup_demo():
"""BeautifulSoup网页解析演示"""
print("=== BeautifulSoup网页解析 ===")
# 1. 基础HTML解析
print("\n1. 基础HTML解析:")
# 示例HTML内容
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>示例网页</title>
<meta charset="UTF-8">
</head>
<body>
<div class="header">
<h1 id="main-title">欢迎来到我的网站</h1>
<nav>
<ul>
<li><a href="/home">首页</a></li>
<li><a href="/about">关于</a></li>
<li><a href="/contact">联系</a></li>
</ul>
</nav>
</div>
<div class="content">
<article class="post" data-id="1">
<h2>第一篇文章</h2>
<p class="meta">发布时间: 2023-01-01</p>
<p>这是第一篇文章的内容...</p>
<div class="tags">
<span class="tag">Python</span>
<span class="tag">编程</span>
</div>
</article>
<article class="post" data-id="2">
<h2>第二篇文章</h2>
<p class="meta">发布时间: 2023-01-02</p>
<p>这是第二篇文章的内容...</p>
<div class="tags">
<span class="tag">Web开发</span>
<span class="tag">HTML</span>
</div>
</article>
</div>
<footer>
<p>© 2023 我的网站. 保留所有权利.</p>
</footer>
</body>
</html>
"""
# 创建BeautifulSoup对象
soup = BeautifulSoup(html_content, 'html.parser')
# 基本信息提取
print(f" 网页标题: {soup.title.string}")
print(f" 主标题: {soup.find('h1').string}")
# 2. 元素查找
print("\n2. 元素查找方法:")
# 按标签查找
all_links = soup.find_all('a')
print(f" 所有链接数量: {len(all_links)}")
for link in all_links:
print(f" 链接文本: '{link.string}', 地址: '{link.get('href')}'")
# 按类名查找
posts = soup.find_all('article', class_='post')
print(f"\n 文章数量: {len(posts)}")
for i, post in enumerate(posts, 1):
title = post.find('h2').string
meta = post.find('p', class_='meta').string
data_id = post.get('data-id')
print(f" 文章{i}: {title} (ID: {data_id})")
print(f" {meta}")
# 按ID查找
main_title = soup.find('h1', id='main-title')
print(f"\n 主标题元素: {main_title.string}")
# CSS选择器
print("\n3. CSS选择器:")
# 选择所有标签
tags = soup.select('.tag')
print(f" 所有标签: {[tag.string for tag in tags]}")
# 选择特定文章的标签
first_post_tags = soup.select('article[data-id="1"] .tag')
print(f" 第一篇文章标签: {[tag.string for tag in first_post_tags]}")
# 选择导航链接
nav_links = soup.select('nav ul li a')
print(f" 导航链接: {[link.string for link in nav_links]}")
# 4. 文本提取和处理
print("\n4. 文本提取和处理:")
# 提取纯文本
content_div = soup.find('div', class_='content')
content_text = content_div.get_text(strip=True)
print(f" 内容区域文本长度: {len(content_text)}字符")
# 提取特定格式的文本
for post in posts:
title = post.find('h2').get_text(strip=True)
content = post.find_all('p')[-1].get_text(strip=True) # 最后一个p标签
print(f" {title}: {content[:20]}...")
# 5. 属性操作
print("\n5. 属性操作:")
# 获取和修改属性
first_link = soup.find('a')
print(f" 第一个链接原始href: {first_link.get('href')}")
# 修改属性
first_link['href'] = 'https://example.com/home'
first_link['target'] = '_blank'
print(f" 修改后的链接: {first_link}")
# 添加新属性
for post in soup.find_all('article'):
post['class'] = post.get('class', []) + ['processed']
print(f" 第一篇文章的class: {soup.find('article').get('class')}")
# 运行BeautifulSoup演示
beautifulsoup_demo()
# 7.2 实际网页爬取
import requests
from bs4 import BeautifulSoup
import time
import csv
from urllib.parse import urljoin, urlparse
import os
def web_scraping_demo():
"""实际网页爬取演示"""
print("=== 实际网页爬取演示 ===")
# 1. 基础网页请求和解析
print("\n1. 基础网页请求和解析:")
def safe_request(url, headers=None, timeout=10):
"""安全的网页请求函数"""
default_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
if headers:
default_headers.update(headers)
try:
response = requests.get(url, headers=default_headers, timeout=timeout)
response.raise_for_status()
return response
except requests.RequestException as e:
print(f" 请求失败: {e}")
return None
# 示例:解析一个简单的HTML页面(模拟)
def parse_example_page():
"""解析示例页面"""
# 模拟HTML内容(实际应用中这里会是真实的网页请求)
sample_html = """
<html>
<head><title>新闻网站</title></head>
<body>
<div class="news-list">
<article class="news-item">
<h3><a href="/news/1">Python 3.12 发布新特性</a></h3>
<p class="summary">Python 3.12 带来了许多新特性和改进...</p>
<span class="date">2023-10-01</span>
<span class="author">张三</span>
</article>
<article class="news-item">
<h3><a href="/news/2">机器学习最新进展</a></h3>
<p class="summary">最新的机器学习算法在各个领域都有突破...</p>
<span class="date">2023-10-02</span>
<span class="author">李四</span>
</article>
<article class="news-item">
<h3><a href="/news/3">Web开发趋势分析</a></h3>
<p class="summary">2023年Web开发的主要趋势和技术栈...</p>
<span class="date">2023-10-03</span>
<span class="author">王五</span>
</article>
</div>
</body>
</html>
"""
soup = BeautifulSoup(sample_html, 'html.parser')
# 提取新闻信息
news_items = soup.find_all('article', class_='news-item')
news_data = []
for item in news_items:
title_link = item.find('h3').find('a')
title = title_link.get_text(strip=True)
link = title_link.get('href')
summary = item.find('p', class_='summary').get_text(strip=True)
date = item.find('span', class_='date').get_text(strip=True)
author = item.find('span', class_='author').get_text(strip=True)
news_data.append({
'title': title,
'link': link,
'summary': summary,
'date': date,
'author': author
})
return news_data
# 解析示例页面
news_list = parse_example_page()
print(f" 提取到 {len(news_list)} 条新闻:")
for i, news in enumerate(news_list, 1):
print(f" {i}. {news['title']}")
print(f" 作者: {news['author']}, 日期: {news['date']}")
print(f" 摘要: {news['summary'][:30]}...")
print()
# 2. 数据清洗和处理
print("\n2. 数据清洗和处理:")
def clean_text(text):
"""清洗文本数据"""
if not text:
return ""
# 去除多余空白
text = re.sub(r'\s+', ' ', text.strip())
# 去除特殊字符
text = re.sub(r'[\r\n\t]', '', text)
# 去除HTML实体
text = text.replace(' ', ' ').replace('&', '&')
return text
def extract_numbers(text):
"""从文本中提取数字"""
numbers = re.findall(r'\d+(?:\.\d+)?', text)
return [float(num) for num in numbers]
def extract_dates(text):
"""从文本中提取日期"""
date_patterns = [
r'\d{4}-\d{2}-\d{2}', # YYYY-MM-DD
r'\d{2}/\d{2}/\d{4}', # MM/DD/YYYY
r'\d{1,2}月\d{1,2}日' # 中文日期
]
dates = []
for pattern in date_patterns:
dates.extend(re.findall(pattern, text))
return dates
# 示例文本清洗
sample_text = " 这是一个\n\t包含多余空白的文本 2023-10-01 价格:99.99元 "
cleaned = clean_text(sample_text)
numbers = extract_numbers(sample_text)
dates = extract_dates(sample_text)
print(f" 原始文本: '{sample_text}'")
print(f" 清洗后: '{cleaned}'")
print(f" 提取的数字: {numbers}")
print(f" 提取的日期: {dates}")
# 3. 数据存储
print("\n3. 数据存储:")
def save_to_csv(data, filename):
"""保存数据到CSV文件"""
if not data:
return
fieldnames = data[0].keys()
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f" 数据已保存到 {filename}")
def save_to_json(data, filename):
"""保存数据到JSON文件"""
import json
with open(filename, 'w', encoding='utf-8') as jsonfile:
json.dump(data, jsonfile, ensure_ascii=False, indent=2)
print(f" 数据已保存到 {filename}")
# 保存新闻数据
save_to_csv(news_list, 'news_data.csv')
save_to_json(news_list, 'news_data.json')
# 4. 爬虫最佳实践
print("\n4. 爬虫最佳实践:")
class WebScraper:
"""网页爬虫类"""
def __init__(self, delay=1, max_retries=3):
self.delay = delay
self.max_retries = max_retries
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def get_page(self, url):
"""获取网页内容"""
for attempt in range(self.max_retries):
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
# 添加延迟,避免过于频繁的请求
time.sleep(self.delay)
return response
except requests.RequestException as e:
print(f" 尝试 {attempt + 1} 失败: {e}")
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
return None
def parse_page(self, html_content, selectors):
"""解析网页内容"""
soup = BeautifulSoup(html_content, 'html.parser')
results = {}
for key, selector in selectors.items():
elements = soup.select(selector)
if elements:
if len(elements) == 1:
results[key] = elements[0].get_text(strip=True)
else:
results[key] = [elem.get_text(strip=True) for elem in elements]
else:
results[key] = None
return results
def scrape_multiple_pages(self, urls, selectors):
"""爬取多个页面"""
results = []
for i, url in enumerate(urls, 1):
print(f" 正在爬取第 {i}/{len(urls)} 个页面...")
response = self.get_page(url)
if response:
data = self.parse_page(response.text, selectors)
data['url'] = url
results.append(data)
else:
print(f" 跳过页面: {url}")
return results
# 示例使用
scraper = WebScraper(delay=0.5)
# 定义选择器
selectors = {
'title': 'h1, h2, h3',
'content': 'p',
'links': 'a'
}
print(f" 爬虫配置:")
print(f" 延迟: {scraper.delay}秒")
print(f" 最大重试: {scraper.max_retries}次")
print(f" 选择器: {list(selectors.keys())}")
# 5. 错误处理和日志
print("\n5. 错误处理和日志:")
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('scraper.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def robust_scrape(url, max_attempts=3):
"""健壮的爬取函数"""
for attempt in range(max_attempts):
try:
logger.info(f"尝试爬取: {url} (第{attempt+1}次)")
# 模拟可能的错误
if attempt == 0:
raise requests.ConnectionError("模拟连接错误")
elif attempt == 1:
raise requests.Timeout("模拟超时错误")
else:
logger.info("爬取成功")
return "成功获取页面内容"
except requests.RequestException as e:
logger.warning(f"爬取失败: {e}")
if attempt < max_attempts - 1:
wait_time = 2 ** attempt
logger.info(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
else:
logger.error(f"所有尝试都失败了: {url}")
return None
# 测试健壮爬取
result = robust_scrape("https://example.com")
print(f" 爬取结果: {result}")
print(f"\n 爬虫演示完成,相关文件已生成")
# 运行网页爬取演示
web_scraping_demo()
# 八、模块选择和最佳实践
# 8.1 模块选择指南
def module_selection_guide():
"""第三方模块选择指南"""
print("=== 第三方模块选择指南 ===")
# 1. 按应用场景分类
print("\n1. 按应用场景选择模块:")
scenarios = {
"Web开发": {
"框架": ["Django", "Flask", "FastAPI"],
"HTTP客户端": ["requests", "httpx", "aiohttp"],
"模板引擎": ["Jinja2", "Django Templates"],
"数据库ORM": ["SQLAlchemy", "Django ORM", "Peewee"]
},
"数据科学": {
"数据处理": ["pandas", "numpy", "polars"],
"可视化": ["matplotlib", "seaborn", "plotly", "bokeh"],
"机器学习": ["scikit-learn", "tensorflow", "pytorch"],
"统计分析": ["scipy", "statsmodels"]
},
"网络爬虫": {
"HTML解析": ["BeautifulSoup", "lxml", "html.parser"],
"浏览器自动化": ["selenium", "playwright", "pyppeteer"],
"异步爬虫": ["scrapy", "aiohttp", "asyncio"]
},
"图像处理": {
"基础处理": ["Pillow", "opencv-python"],
"深度学习": ["tensorflow", "pytorch", "keras"],
"计算机视觉": ["opencv-python", "scikit-image"]
},
"自动化运维": {
"系统管理": ["psutil", "paramiko", "fabric"],
"配置管理": ["ansible", "saltstack"],
"监控": ["prometheus_client", "psutil"]
}
}
for scenario, categories in scenarios.items():
print(f"\n {scenario}:")
for category, modules in categories.items():
print(f" {category}: {', '.join(modules)}")
# 2. 性能对比
print("\n2. 常见模块性能对比:")
performance_comparison = {
"HTTP请求库": {
"requests": {"易用性": "★★★★★", "性能": "★★★☆☆", "功能": "★★★★☆"},
"httpx": {"易用性": "★★★★☆", "性能": "★★★★☆", "功能": "★★★★★"},
"aiohttp": {"易用性": "★★★☆☆", "性能": "★★★★★", "功能": "★★★★☆"}
},
"数据处理库": {
"pandas": {"易用性": "★★★★★", "性能": "★★★☆☆", "内存效率": "★★☆☆☆"},
"polars": {"易用性": "★★★☆☆", "性能": "★★★★★", "内存效率": "★★★★★"},
"numpy": {"易用性": "★★★☆☆", "性能": "★★★★★", "内存效率": "★★★★☆"}
},
"Web框架": {
"Django": {"学习曲线": "★★☆☆☆", "功能完整性": "★★★★★", "性能": "★★★☆☆"},
"Flask": {"学习曲线": "★★★★☆", "功能完整性": "★★★☆☆", "性能": "★★★★☆"},
"FastAPI": {"学习曲线": "★★★★☆", "功能完整性": "★★★★☆", "性能": "★★★★★"}
}
}
for category, modules in performance_comparison.items():
print(f"\n {category}:")
for module, metrics in modules.items():
print(f" {module}:")
for metric, rating in metrics.items():
print(f" {metric}: {rating}")
# 3. 选择建议
print("\n3. 选择建议:")
recommendations = {
"初学者": {
"推荐模块": ["requests", "pandas", "matplotlib", "BeautifulSoup"],
"原因": "文档完善,社区活跃,学习资源丰富",
"避免": ["复杂的异步库", "底层系统库"]
},
"数据分析师": {
"推荐模块": ["pandas", "numpy", "matplotlib", "seaborn", "scikit-learn"],
"原因": "专为数据分析设计,功能强大",
"避免": ["Web开发框架", "底层网络库"]
},
"Web开发者": {
"推荐模块": ["Django/Flask", "requests", "SQLAlchemy", "Celery"],
"原因": "Web开发生态完整,部署方便",
"避免": ["科学计算库", "图像处理库"]
},
"性能优化者": {
"推荐模块": ["numpy", "numba", "cython", "asyncio"],
"原因": "高性能,支持并行和异步",
"避免": ["纯Python实现的库", "功能过重的框架"]
}
}
for user_type, info in recommendations.items():
print(f"\n {user_type}:")
print(f" 推荐: {', '.join(info['推荐模块'])}")
print(f" 原因: {info['原因']}")
print(f" 避免: {', '.join(info['避免'])}")
# 运行模块选择指南
module_selection_guide()
# 8.2 最佳实践和总结
def best_practices_summary():
"""第三方模块最佳实践和总结"""
print("=== 第三方模块最佳实践和总结 ===")
# 1. 安装和管理最佳实践
print("\n1. 安装和管理最佳实践:")
practices = {
"虚拟环境": {
"重要性": "★★★★★",
"工具": ["venv", "conda", "pipenv", "poetry"],
"好处": ["隔离依赖", "避免冲突", "便于部署", "版本管理"]
},
"依赖管理": {
"重要性": "★★★★☆",
"文件": ["requirements.txt", "Pipfile", "pyproject.toml"],
"好处": ["可重现环境", "团队协作", "自动化部署"]
},
"版本固定": {
"重要性": "★★★★☆",
"策略": ["精确版本", "兼容版本", "最小版本"],
"示例": ["requests==2.28.1", "pandas>=1.5.0,<2.0.0"]
}
}
for practice, details in practices.items():
print(f"\n {practice} (重要性: {details['重要性']}):")
for key, values in details.items():
if key != "重要性":
if isinstance(values, list):
print(f" {key}: {', '.join(values)}")
else:
print(f" {key}: {values}")
# 2. 代码质量最佳实践
print("\n2. 代码质量最佳实践:")
code_quality_tips = [
"导入规范: 标准库 -> 第三方库 -> 本地模块",
"异常处理: 捕获具体异常,提供有意义的错误信息",
"文档字符串: 为复杂函数添加详细说明",
"类型提示: 使用typing模块提高代码可读性",
"单元测试: 为关键功能编写测试用例",
"代码格式: 使用black、flake8等工具保持一致性"
]
for i, tip in enumerate(code_quality_tips, 1):
print(f" {i}. {tip}")
# 3. 性能优化建议
print("\n3. 性能优化建议:")
performance_tips = {
"数据处理": [
"优先使用numpy和pandas的向量化操作",
"避免在循环中重复创建对象",
"使用适当的数据类型减少内存占用",
"考虑使用numba加速数值计算"
],
"网络请求": [
"使用连接池复用连接",
"设置合理的超时时间",
"使用异步请求处理并发",
"实现请求重试和错误处理"
],
"文件操作": [
"使用上下文管理器确保资源释放",
"批量处理减少I/O操作",
"选择合适的文件格式(CSV vs JSON vs Parquet)",
"考虑使用内存映射处理大文件"
]
}
for category, tips in performance_tips.items():
print(f"\n {category}:")
for tip in tips:
print(f" • {tip}")
# 4. 常见陷阱和解决方案
print("\n4. 常见陷阱和解决方案:")
common_pitfalls = {
"版本冲突": {
"问题": "不同模块要求不兼容的依赖版本",
"解决": "使用虚拟环境,检查依赖树,选择兼容版本"
},
"内存泄漏": {
"问题": "大数据处理时内存不断增长",
"解决": "及时释放变量,使用生成器,分批处理数据"
},
"编码问题": {
"问题": "处理中文或特殊字符时出现乱码",
"解决": "明确指定编码格式,使用UTF-8"
},
"网络超时": {
"问题": "网络请求经常超时失败",
"解决": "设置重试机制,使用指数退避,检查网络状况"
},
"路径问题": {
"问题": "跨平台路径分隔符不一致",
"解决": "使用pathlib或os.path处理路径"
}
}
for pitfall, details in common_pitfalls.items():
print(f"\n {pitfall}:")
print(f" 问题: {details['问题']}")
print(f" 解决: {details['解决']}")
# 5. 学习建议
print("\n5. 学习建议:")
learning_path = {
"基础阶段": {
"重点模块": ["requests", "json", "csv"],
"学习目标": "掌握基本的数据获取和处理",
"项目建议": "简单的API调用和数据保存"
},
"进阶阶段": {
"重点模块": ["pandas", "matplotlib", "BeautifulSoup"],
"学习目标": "数据分析和可视化能力",
"项目建议": "网页数据爬取和分析报告"
},
"高级阶段": {
"重点模块": ["numpy", "scikit-learn", "asyncio"],
"学习目标": "高性能计算和机器学习",
"项目建议": "完整的数据科学项目"
},
"专业阶段": {
"重点模块": ["tensorflow", "django", "celery"],
"学习目标": "专业领域深度应用",
"项目建议": "生产级应用开发"
}
}
for stage, details in learning_path.items():
print(f"\n {stage}:")
print(f" 重点模块: {', '.join(details['重点模块'])}")
print(f" 学习目标: {details['学习目标']}")
print(f" 项目建议: {details['项目建议']}")
# 6. 总结
print("\n6. 总结:")
summary_points = [
"第三方模块是Python生态系统的重要组成部分",
"选择合适的模块比重复造轮子更高效",
"虚拟环境和依赖管理是专业开发的基础",
"性能优化要基于实际测量,避免过早优化",
"持续学习新模块,跟上技术发展趋势",
"实践项目是掌握模块使用的最佳方式"
]
for i, point in enumerate(summary_points, 1):
print(f" {i}. {point}")
print("\n恭喜你完成了第16天的学习!")
print("你已经掌握了Python常用第三方模块的使用方法。")
print("建议继续通过实际项目来深化理解和应用这些知识。")
# 运行最佳实践总结
best_practices_summary()
# 学习总结
通过第16天的学习,我们深入了解了Python常用第三方模块的使用方法:
# 主要收获
- 包管理基础 - 掌握了pip和虚拟环境的使用
- 网络请求 - 学会使用requests进行HTTP通信
- 数据处理 - 熟练运用pandas进行数据分析
- 数值计算 - 理解numpy的强大数值计算能力
- 数据可视化 - 掌握matplotlib创建各种图表
- 机器学习 - 了解scikit-learn的基础应用
- 网页解析 - 学会使用BeautifulSoup处理HTML
- 最佳实践 - 掌握模块选择和代码质量标准
# 实践建议
- 动手实践 - 通过实际项目巩固所学知识
- 持续学习 - 关注新模块和技术发展
- 社区参与 - 积极参与开源项目和技术讨论
- 文档阅读 - 养成阅读官方文档的习惯
# 下一步学习
- 深入学习特定领域的专业模块
- 了解异步编程和高性能计算
- 学习Web开发框架如Django或Flask
- 探索深度学习框架如TensorFlow或PyTorch
继续保持学习的热情,Python的世界还有更多精彩等待你去探索!