第20天-访问数据库
哪吒 2023/6/15
# 第20天-访问数据库
# 1. 数据库基础
# 1.1 数据库概述
数据库是存储和管理数据的系统,Python提供了多种方式来访问不同类型的数据库。
def database_overview_demo():
"""数据库概述演示"""
print("=== 数据库概述演示 ===")
# 1. 数据库类型
print("\n1. 常见数据库类型:")
database_types = {
"关系型数据库": {
"SQLite": "轻量级文件数据库,无需服务器",
"MySQL": "开源关系型数据库管理系统",
"PostgreSQL": "功能强大的开源对象关系数据库",
"Oracle": "企业级商业数据库",
"SQL Server": "微软的关系型数据库"
},
"NoSQL数据库": {
"MongoDB": "文档型数据库",
"Redis": "内存键值数据库",
"Cassandra": "分布式列族数据库",
"Neo4j": "图形数据库"
}
}
for category, databases in database_types.items():
print(f"\n {category}:")
for name, description in databases.items():
print(f" • {name}: {description}")
# 2. Python数据库API规范
print("\n2. Python数据库API规范 (DB-API 2.0):")
api_components = {
"连接对象 (Connection)": "表示数据库连接",
"游标对象 (Cursor)": "执行SQL语句和获取结果",
"异常类型": "处理数据库相关错误",
"类型构造器": "处理特殊数据类型"
}
for component, description in api_components.items():
print(f" • {component}: {description}")
# 3. 常用Python数据库模块
print("\n3. 常用Python数据库模块:")
python_modules = {
"sqlite3": "Python内置SQLite模块",
"pymysql": "纯Python MySQL客户端",
"psycopg2": "PostgreSQL适配器",
"cx_Oracle": "Oracle数据库接口",
"pymongo": "MongoDB Python驱动",
"redis-py": "Redis Python客户端"
}
for module, description in python_modules.items():
print(f" • {module}: {description}")
# 4. 数据库操作基本流程
print("\n4. 数据库操作基本流程:")
basic_flow = [
"1. 导入数据库模块",
"2. 建立数据库连接",
"3. 创建游标对象",
"4. 执行SQL语句",
"5. 处理查询结果",
"6. 提交事务(如需要)",
"7. 关闭游标和连接"
]
for step in basic_flow:
print(f" {step}")
# 5. 基本SQL语句回顾
print("\n5. 基本SQL语句回顾:")
sql_examples = {
"创建表": "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)",
"插入数据": "INSERT INTO users (name, email) VALUES ('张三', 'zhang@example.com')",
"查询数据": "SELECT * FROM users WHERE name = '张三'",
"更新数据": "UPDATE users SET email = 'new@example.com' WHERE id = 1",
"删除数据": "DELETE FROM users WHERE id = 1"
}
for operation, sql in sql_examples.items():
print(f" {operation}:")
print(f" {sql}")
# 运行数据库概述演示
database_overview_demo()
# 2. SQLite数据库
# 2.1 SQLite基础操作
import sqlite3
import os
from datetime import datetime
def sqlite_basic_demo():
"""SQLite基础操作演示"""
print("=== SQLite基础操作演示 ===")
# 1. 连接数据库
print("\n1. 连接SQLite数据库:")
# 连接到数据库文件(如果不存在会自动创建)
db_path = 'example.db'
conn = sqlite3.connect(db_path)
print(f" ✓ 连接到数据库: {db_path}")
# 创建游标
cursor = conn.cursor()
print(" ✓ 创建游标对象")
# 2. 创建表
print("\n2. 创建数据表:")
# 用户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL,
password TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
''')
print(" ✓ 创建用户表")
# 文章表
cursor.execute('''
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT,
author_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (author_id) REFERENCES users (id)
)
''')
print(" ✓ 创建文章表")
# 3. 插入数据
print("\n3. 插入数据:")
# 插入单条记录
try:
cursor.execute(
"INSERT INTO users (username, email, password) VALUES (?, ?, ?)",
('张三', 'zhangsan@example.com', 'password123')
)
print(" ✓ 插入用户: 张三")
except sqlite3.IntegrityError as e:
print(f" 用户已存在: {e}")
# 插入多条记录
users_data = [
('李四', 'lisi@example.com', 'password456'),
('王五', 'wangwu@example.com', 'password789'),
('赵六', 'zhaoliu@example.com', 'passwordabc')
]
try:
cursor.executemany(
"INSERT INTO users (username, email, password) VALUES (?, ?, ?)",
users_data
)
print(f" ✓ 批量插入 {len(users_data)} 个用户")
except sqlite3.IntegrityError as e:
print(f" 部分用户已存在: {e}")
# 提交事务
conn.commit()
print(" ✓ 提交事务")
# 4. 查询数据
print("\n4. 查询数据:")
# 查询所有用户
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
print(f" 查询到 {len(users)} 个用户:")
for user in users:
print(f" ID: {user[0]}, 用户名: {user[1]}, 邮箱: {user[2]}")
# 条件查询
cursor.execute("SELECT id, username FROM users WHERE username LIKE ?", ('%张%',))
filtered_users = cursor.fetchall()
print(f"\n 姓张的用户 ({len(filtered_users)} 个):")
for user in filtered_users:
print(f" ID: {user[0]}, 用户名: {user[1]}")
# 使用fetchone获取单条记录
cursor.execute("SELECT * FROM users WHERE username = ?", ('李四',))
user = cursor.fetchone()
if user:
print(f"\n 找到用户: {user[1]} ({user[2]})")
# 5. 更新数据
print("\n5. 更新数据:")
cursor.execute(
"UPDATE users SET email = ? WHERE username = ?",
('zhangsan_new@example.com', '张三')
)
affected_rows = cursor.rowcount
print(f" ✓ 更新了 {affected_rows} 条记录")
conn.commit()
# 6. 删除数据
print("\n6. 删除数据:")
cursor.execute("DELETE FROM users WHERE username = ?", ('赵六',))
deleted_rows = cursor.rowcount
print(f" ✓ 删除了 {deleted_rows} 条记录")
conn.commit()
# 7. 查看表结构
print("\n7. 查看表结构:")
cursor.execute("PRAGMA table_info(users)")
columns = cursor.fetchall()
print(" users表结构:")
for col in columns:
print(f" {col[1]} {col[2]} {'NOT NULL' if col[3] else 'NULL'}")
# 8. 关闭连接
cursor.close()
conn.close()
print("\n ✓ 关闭数据库连接")
# 清理测试文件
if os.path.exists(db_path):
os.remove(db_path)
print(f" ✓ 清理测试文件: {db_path}")
# 运行SQLite基础演示
sqlite_basic_demo()
# 2.2 SQLite高级功能
import sqlite3
import json
from contextlib import contextmanager
def sqlite_advanced_demo():
"""SQLite高级功能演示"""
print("=== SQLite高级功能演示 ===")
# 1. 连接管理和上下文管理器
print("\n1. 连接管理:")
@contextmanager
def get_db_connection(db_path):
"""数据库连接上下文管理器"""
conn = None
try:
conn = sqlite3.connect(db_path)
# 设置行工厂,使查询结果可以像字典一样访问
conn.row_factory = sqlite3.Row
yield conn
except Exception as e:
if conn:
conn.rollback()
raise e
finally:
if conn:
conn.close()
db_path = 'advanced_example.db'
# 使用上下文管理器
with get_db_connection(db_path) as conn:
cursor = conn.cursor()
# 创建测试表
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
price REAL NOT NULL,
category TEXT,
metadata TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
print(" ✓ 使用上下文管理器创建连接")
# 2. 事务处理
print("\n2. 事务处理:")
def transfer_money_demo():
"""转账事务演示"""
with get_db_connection(db_path) as conn:
cursor = conn.cursor()
# 创建账户表
cursor.execute('''
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
balance REAL NOT NULL DEFAULT 0
)
''')
# 插入测试账户
cursor.execute("INSERT OR REPLACE INTO accounts (id, name, balance) VALUES (1, '账户A', 1000)")
cursor.execute("INSERT OR REPLACE INTO accounts (id, name, balance) VALUES (2, '账户B', 500)")
conn.commit()
try:
# 开始事务
cursor.execute("BEGIN TRANSACTION")
# 从账户A扣款
cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", (200, 1))
# 检查余额是否足够
cursor.execute("SELECT balance FROM accounts WHERE id = ?", (1,))
balance = cursor.fetchone()[0]
if balance < 0:
raise ValueError("余额不足")
# 向账户B转账
cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", (200, 2))
# 提交事务
conn.commit()
print(" ✓ 转账成功")
# 查看结果
cursor.execute("SELECT name, balance FROM accounts")
accounts = cursor.fetchall()
for account in accounts:
print(f" {account['name']}: {account['balance']} 元")
except Exception as e:
# 回滚事务
conn.rollback()
print(f" ✗ 转账失败,已回滚: {e}")
transfer_money_demo()
# 3. 索引和性能优化
print("\n3. 索引和性能优化:")
with get_db_connection(db_path) as conn:
cursor = conn.cursor()
# 创建索引
cursor.execute("CREATE INDEX IF NOT EXISTS idx_products_category ON products(category)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_products_price ON products(price)")
print(" ✓ 创建索引")
# 插入测试数据
products_data = [
('笔记本电脑', 5999.99, '电子产品', json.dumps({'brand': 'Dell', 'model': 'XPS13'})),
('无线鼠标', 199.99, '电子产品', json.dumps({'brand': 'Logitech', 'wireless': True})),
('办公椅', 899.99, '家具', json.dumps({'material': '皮革', 'adjustable': True})),
('台灯', 299.99, '家具', json.dumps({'type': 'LED', 'dimmable': True}))
]
cursor.executemany(
"INSERT OR REPLACE INTO products (name, price, category, metadata) VALUES (?, ?, ?, ?)",
products_data
)
conn.commit()
print(f" ✓ 插入 {len(products_data)} 个产品")
# 使用EXPLAIN QUERY PLAN查看查询计划
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM products WHERE category = '电子产品'")
plan = cursor.fetchall()
print(" 查询计划:")
for step in plan:
print(f" {step[3]}")
# 4. JSON数据处理
print("\n4. JSON数据处理:")
with get_db_connection(db_path) as conn:
cursor = conn.cursor()
# 查询并解析JSON数据
cursor.execute("SELECT name, metadata FROM products WHERE category = '电子产品'")
products = cursor.fetchall()
print(" 电子产品详情:")
for product in products:
metadata = json.loads(product['metadata'])
print(f" {product['name']}:")
for key, value in metadata.items():
print(f" {key}: {value}")
# 5. 自定义函数
print("\n5. 自定义函数:")
def calculate_discount(price, discount_rate):
"""计算折扣价格"""
return price * (1 - discount_rate / 100)
with get_db_connection(db_path) as conn:
# 注册自定义函数
conn.create_function("calculate_discount", 2, calculate_discount)
cursor = conn.cursor()
# 使用自定义函数
cursor.execute(
"SELECT name, price, calculate_discount(price, 20) as discounted_price FROM products"
)
products = cursor.fetchall()
print(" 产品价格(8折优惠):")
for product in products:
print(f" {product['name']}: {product['price']:.2f} → {product['discounted_price']:.2f}")
# 6. 备份和恢复
print("\n6. 数据库备份:")
def backup_database(source_db, backup_db):
"""备份数据库"""
with sqlite3.connect(source_db) as source:
with sqlite3.connect(backup_db) as backup:
source.backup(backup)
print(f" ✓ 备份完成: {source_db} → {backup_db}")
backup_path = 'backup_example.db'
backup_database(db_path, backup_path)
# 验证备份
with get_db_connection(backup_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM products")
count = cursor.fetchone()[0]
print(f" ✓ 备份验证: 产品数量 {count}")
# 清理文件
import os
for file_path in [db_path, backup_path]:
if os.path.exists(file_path):
os.remove(file_path)
print(" ✓ 清理测试文件")
# 运行SQLite高级演示
sqlite_advanced_demo()
# 3. MySQL数据库
# 3.1 MySQL连接和基本操作
# 注意:需要安装 pymysql: pip install pymysql
try:
import pymysql
PYMYSQL_AVAILABLE = True
except ImportError:
PYMYSQL_AVAILABLE = False
print("PyMySQL未安装,请运行: pip install pymysql")
def mysql_basic_demo():
"""MySQL基础操作演示"""
print("=== MySQL基础操作演示 ===")
if not PYMYSQL_AVAILABLE:
print(" ⚠️ PyMySQL模块未安装,跳过MySQL演示")
return
# 1. 连接配置
print("\n1. MySQL连接配置:")
# 数据库连接配置
config = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': 'password', # 请替换为实际密码
'database': 'test_db',
'charset': 'utf8mb4',
'autocommit': False
}
print(" 连接配置:")
for key, value in config.items():
if key != 'password':
print(f" {key}: {value}")
else:
print(f" {key}: {'*' * len(str(value))}")
# 2. 连接管理类
print("\n2. MySQL连接管理:")
class MySQLManager:
"""MySQL连接管理器"""
def __init__(self, config):
self.config = config
self.connection = None
def connect(self):
"""建立连接"""
try:
self.connection = pymysql.connect(**self.config)
print(" ✓ 连接MySQL成功")
return True
except pymysql.Error as e:
print(f" ✗ 连接失败: {e}")
return False
def disconnect(self):
"""关闭连接"""
if self.connection:
self.connection.close()
print(" ✓ 关闭MySQL连接")
def execute_query(self, sql, params=None):
"""执行查询"""
try:
with self.connection.cursor() as cursor:
cursor.execute(sql, params or ())
return cursor.fetchall()
except pymysql.Error as e:
print(f" ✗ 查询失败: {e}")
return None
def execute_update(self, sql, params=None):
"""执行更新"""
try:
with self.connection.cursor() as cursor:
affected_rows = cursor.execute(sql, params or ())
self.connection.commit()
return affected_rows
except pymysql.Error as e:
print(f" ✗ 更新失败: {e}")
self.connection.rollback()
return 0
def execute_many(self, sql, params_list):
"""批量执行"""
try:
with self.connection.cursor() as cursor:
affected_rows = cursor.executemany(sql, params_list)
self.connection.commit()
return affected_rows
except pymysql.Error as e:
print(f" ✗ 批量执行失败: {e}")
self.connection.rollback()
return 0
# 3. 模拟MySQL操作(如果无法连接真实数据库)
print("\n3. MySQL操作演示:")
def simulate_mysql_operations():
"""模拟MySQL操作"""
print(" 模拟MySQL数据库操作:")
# 模拟创建表
create_table_sql = '''
CREATE TABLE IF NOT EXISTS employees (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
department VARCHAR(50),
salary DECIMAL(10, 2),
hire_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
'''
print(" ✓ 创建员工表")
# 模拟插入数据
employees_data = [
('张三', 'zhangsan@company.com', '技术部', 8000.00, '2023-01-15'),
('李四', 'lisi@company.com', '销售部', 6000.00, '2023-02-20'),
('王五', 'wangwu@company.com', '人事部', 7000.00, '2023-03-10'),
('赵六', 'zhaoliu@company.com', '技术部', 9000.00, '2023-04-05')
]
insert_sql = '''
INSERT INTO employees (name, email, department, salary, hire_date)
VALUES (%s, %s, %s, %s, %s)
'''
print(f" ✓ 插入 {len(employees_data)} 个员工")
# 模拟查询操作
queries = {
"查询所有员工": "SELECT * FROM employees",
"按部门查询": "SELECT * FROM employees WHERE department = '技术部'",
"薪资统计": "SELECT department, AVG(salary) as avg_salary FROM employees GROUP BY department",
"高薪员工": "SELECT name, salary FROM employees WHERE salary > 7000 ORDER BY salary DESC"
}
for desc, sql in queries.items():
print(f" • {desc}: {sql[:50]}...")
# 模拟更新操作
update_sql = "UPDATE employees SET salary = salary * 1.1 WHERE department = '技术部'"
print(" ✓ 技术部员工加薪10%")
# 模拟删除操作
delete_sql = "DELETE FROM employees WHERE hire_date < '2023-02-01'"
print(" ✓ 删除早期员工记录")
# 尝试连接真实数据库,如果失败则模拟操作
mysql_manager = MySQLManager(config)
if mysql_manager.connect():
# 真实数据库操作
try:
# 创建数据库(如果不存在)
mysql_manager.execute_update("CREATE DATABASE IF NOT EXISTS test_db")
mysql_manager.execute_update("USE test_db")
# 执行实际操作...
print(" 执行真实MySQL操作")
except Exception as e:
print(f" 操作过程中出错: {e}")
finally:
mysql_manager.disconnect()
else:
# 模拟操作
simulate_mysql_operations()
# 运行MySQL基础演示
mysql_basic_demo()
# 3.2 MySQL连接池和高级功能
# 连接池需要额外安装: pip install DBUtils
try:
from DBUtils.PooledDB import PooledDB
DBUTILS_AVAILABLE = True
except ImportError:
DBUTILS_AVAILABLE = False
def mysql_advanced_demo():
"""MySQL高级功能演示"""
print("=== MySQL高级功能演示 ===")
# 1. 连接池管理
print("\n1. 连接池管理:")
class MySQLConnectionPool:
"""MySQL连接池管理器"""
def __init__(self, config, pool_size=5):
self.config = config
self.pool_size = pool_size
self.pool = None
self._init_pool()
def _init_pool(self):
"""初始化连接池"""
if DBUTILS_AVAILABLE and PYMYSQL_AVAILABLE:
try:
self.pool = PooledDB(
creator=pymysql,
maxconnections=self.pool_size,
mincached=2,
maxcached=5,
maxshared=3,
blocking=True,
maxusage=None,
setsession=[],
ping=0,
**self.config
)
print(f" ✓ 初始化连接池,大小: {self.pool_size}")
except Exception as e:
print(f" ✗ 连接池初始化失败: {e}")
else:
print(" ⚠️ 缺少依赖,使用模拟连接池")
self.pool = None
def get_connection(self):
"""从连接池获取连接"""
if self.pool:
return self.pool.connection()
else:
# 模拟连接
print(" 获取模拟连接")
return None
def execute_transaction(self, operations):
"""执行事务"""
conn = self.get_connection()
if not conn:
print(" ⚠️ 无法获取连接,模拟事务执行")
return
try:
cursor = conn.cursor()
# 开始事务
conn.begin()
for operation in operations:
sql = operation['sql']
params = operation.get('params', ())
cursor.execute(sql, params)
# 提交事务
conn.commit()
print(f" ✓ 事务执行成功,包含 {len(operations)} 个操作")
except Exception as e:
conn.rollback()
print(f" ✗ 事务执行失败,已回滚: {e}")
finally:
cursor.close()
conn.close()
# 2. 数据访问对象 (DAO) 模式
print("\n2. 数据访问对象模式:")
class UserDAO:
"""用户数据访问对象"""
def __init__(self, connection_pool):
self.pool = connection_pool
def create_user(self, username, email, password):
"""创建用户"""
sql = "INSERT INTO users (username, email, password) VALUES (%s, %s, %s)"
params = (username, email, password)
# 模拟执行
print(f" ✓ 创建用户: {username} ({email})")
return True
def get_user_by_id(self, user_id):
"""根据ID获取用户"""
sql = "SELECT * FROM users WHERE id = %s"
params = (user_id,)
# 模拟返回数据
user_data = {
'id': user_id,
'username': f'user_{user_id}',
'email': f'user_{user_id}@example.com',
'created_at': '2023-01-01 00:00:00'
}
print(f" ✓ 获取用户: {user_data['username']}")
return user_data
def update_user(self, user_id, **kwargs):
"""更新用户信息"""
if not kwargs:
return False
set_clause = ', '.join([f"{key} = %s" for key in kwargs.keys()])
sql = f"UPDATE users SET {set_clause} WHERE id = %s"
params = list(kwargs.values()) + [user_id]
print(f" ✓ 更新用户 {user_id}: {list(kwargs.keys())}")
return True
def delete_user(self, user_id):
"""删除用户"""
sql = "DELETE FROM users WHERE id = %s"
params = (user_id,)
print(f" ✓ 删除用户: {user_id}")
return True
def search_users(self, keyword, limit=10):
"""搜索用户"""
sql = "SELECT * FROM users WHERE username LIKE %s OR email LIKE %s LIMIT %s"
params = (f'%{keyword}%', f'%{keyword}%', limit)
# 模拟搜索结果
results = [
{'id': i, 'username': f'{keyword}_user_{i}', 'email': f'{keyword}_{i}@example.com'}
for i in range(1, min(limit + 1, 4))
]
print(f" ✓ 搜索用户 '{keyword}': 找到 {len(results)} 个结果")
return results
# 3. 测试连接池和DAO
print("\n3. 测试连接池和DAO:")
# 创建连接池
config = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'test_db',
'charset': 'utf8mb4'
}
pool = MySQLConnectionPool(config, pool_size=10)
# 创建DAO实例
user_dao = UserDAO(pool)
# 测试CRUD操作
print("\n CRUD操作测试:")
# 创建用户
user_dao.create_user('张三', 'zhangsan@example.com', 'password123')
user_dao.create_user('李四', 'lisi@example.com', 'password456')
# 查询用户
user = user_dao.get_user_by_id(1)
# 更新用户
user_dao.update_user(1, email='zhangsan_new@example.com', username='张三_new')
# 搜索用户
results = user_dao.search_users('张')
# 删除用户
user_dao.delete_user(2)
# 4. 批量操作和性能优化
print("\n4. 批量操作和性能优化:")
def batch_operations_demo():
"""批量操作演示"""
# 批量插入
batch_data = [
('用户1', 'user1@example.com', 'pass1'),
('用户2', 'user2@example.com', 'pass2'),
('用户3', 'user3@example.com', 'pass3'),
('用户4', 'user4@example.com', 'pass4'),
('用户5', 'user5@example.com', 'pass5')
]
print(f" ✓ 批量插入 {len(batch_data)} 个用户")
# 分页查询
page_size = 10
page_num = 1
offset = (page_num - 1) * page_size
pagination_sql = f"SELECT * FROM users LIMIT {page_size} OFFSET {offset}"
print(f" ✓ 分页查询: 第{page_num}页,每页{page_size}条")
# 索引优化建议
index_suggestions = [
"CREATE INDEX idx_users_email ON users(email)",
"CREATE INDEX idx_users_username ON users(username)",
"CREATE INDEX idx_users_created_at ON users(created_at)"
]
print(" 索引优化建议:")
for suggestion in index_suggestions:
print(f" {suggestion}")
batch_operations_demo()
# 5. 数据库监控和统计
print("\n5. 数据库监控:")
def database_monitoring_demo():
"""数据库监控演示"""
monitoring_queries = {
"连接数统计": "SHOW STATUS LIKE 'Threads_connected'",
"查询缓存命中率": "SHOW STATUS LIKE 'Qcache_hits'",
"慢查询数量": "SHOW STATUS LIKE 'Slow_queries'",
"表锁等待": "SHOW STATUS LIKE 'Table_locks_waited'",
"InnoDB缓冲池": "SHOW STATUS LIKE 'Innodb_buffer_pool_read_requests'"
}
print(" 监控查询:")
for desc, query in monitoring_queries.items():
print(f" {desc}: {query}")
# 性能分析
performance_tips = [
"使用EXPLAIN分析查询计划",
"合理创建索引,避免过多索引",
"使用连接池减少连接开销",
"定期分析表统计信息",
"监控慢查询日志",
"优化数据库配置参数"
]
print("\n 性能优化建议:")
for tip in performance_tips:
print(f" • {tip}")
database_monitoring_demo()
# 运行MySQL高级演示
mysql_advanced_demo()
# 4. ORM框架
# 4.1 SQLAlchemy基础
# 注意:需要安装 SQLAlchemy: pip install sqlalchemy
try:
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
SQLALCHEMY_AVAILABLE = True
except ImportError:
SQLALCHEMY_AVAILABLE = False
print("SQLAlchemy未安装,请运行: pip install sqlalchemy")
def sqlalchemy_basic_demo():
"""SQLAlchemy基础演示"""
print("=== SQLAlchemy基础演示 ===")
if not SQLALCHEMY_AVAILABLE:
print(" ⚠️ SQLAlchemy模块未安装,跳过ORM演示")
return
# 1. 创建数据库引擎
print("\n1. 创建数据库引擎:")
# 使用SQLite内存数据库进行演示
engine = create_engine('sqlite:///:memory:', echo=False)
print(" ✓ 创建SQLite内存数据库引擎")
# 2. 定义模型
print("\n2. 定义ORM模型:")
Base = declarative_base()
class User(Base):
"""用户模型"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# 关系
posts = relationship("Post", back_populates="author")
def __repr__(self):
return f"<User(username='{self.username}', email='{self.email}')>"
class Post(Base):
"""文章模型"""
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
content = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
author_id = Column(Integer, ForeignKey('users.id'))
# 关系
author = relationship("User", back_populates="posts")
def __repr__(self):
return f"<Post(title='{self.title}')>"
print(" ✓ 定义User和Post模型")
# 3. 创建表
print("\n3. 创建数据表:")
Base.metadata.create_all(engine)
print(" ✓ 创建所有表")
# 4. 创建会话
print("\n4. 创建数据库会话:")
Session = sessionmaker(bind=engine)
session = Session()
print(" ✓ 创建数据库会话")
# 5. 插入数据
print("\n5. 插入数据:")
# 创建用户
user1 = User(username='张三', email='zhangsan@example.com')
user2 = User(username='李四', email='lisi@example.com')
session.add(user1)
session.add(user2)
session.commit()
print(" ✓ 创建用户")
# 创建文章
post1 = Post(title='Python学习笔记', content='今天学习了Python基础语法...', author=user1)
post2 = Post(title='数据库设计', content='数据库设计的基本原则...', author=user1)
post3 = Post(title='Web开发入门', content='Web开发的基础知识...', author=user2)
session.add_all([post1, post2, post3])
session.commit()
print(" ✓ 创建文章")
# 6. 查询数据
print("\n6. 查询数据:")
# 查询所有用户
users = session.query(User).all()
print(f" 所有用户 ({len(users)} 个):")
for user in users:
print(f" {user}")
# 条件查询
user = session.query(User).filter(User.username == '张三').first()
if user:
print(f"\n 找到用户: {user}")
print(f" 用户文章数: {len(user.posts)}")
for post in user.posts:
print(f" • {post.title}")
# 联表查询
posts_with_authors = session.query(Post).join(User).all()
print(f"\n 文章及作者 ({len(posts_with_authors)} 篇):")
for post in posts_with_authors:
print(f" 《{post.title}》 - {post.author.username}")
# 7. 更新数据
print("\n7. 更新数据:")
user = session.query(User).filter(User.username == '张三').first()
if user:
user.email = 'zhangsan_new@example.com'
session.commit()
print(f" ✓ 更新用户邮箱: {user.email}")
# 8. 删除数据
print("\n8. 删除数据:")
post_to_delete = session.query(Post).filter(Post.title == 'Web开发入门').first()
if post_to_delete:
session.delete(post_to_delete)
session.commit()
print(" ✓ 删除文章: Web开发入门")
# 9. 高级查询
print("\n9. 高级查询:")
# 聚合查询
from sqlalchemy import func
user_post_counts = session.query(
User.username,
func.count(Post.id).label('post_count')
).join(Post).group_by(User.id).all()
print(" 用户文章统计:")
for username, count in user_post_counts:
print(f" {username}: {count} 篇文章")
# 排序和限制
recent_posts = session.query(Post).order_by(Post.created_at.desc()).limit(2).all()
print(f"\n 最新文章 ({len(recent_posts)} 篇):")
for post in recent_posts:
print(f" {post.title}")
# 10. 关闭会话
session.close()
print("\n ✓ 关闭数据库会话")
# 运行SQLAlchemy基础演示
sqlalchemy_basic_demo()
# 4.2 SQLAlchemy高级功能
def sqlalchemy_advanced_demo():
"""SQLAlchemy高级功能演示"""
print("=== SQLAlchemy高级功能演示 ===")
if not SQLALCHEMY_AVAILABLE:
print(" ⚠️ SQLAlchemy模块未安装,跳过高级ORM演示")
return
# 1. 数据库连接池配置
print("\n1. 连接池配置:")
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# 配置连接池
engine = create_engine(
'sqlite:///:memory:',
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
echo=False
)
print(" ✓ 配置连接池 (大小: 10, 最大溢出: 20)")
# 2. 模型继承和混入
print("\n2. 模型继承和混入:")
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Boolean
from datetime import datetime
Base = declarative_base()
class TimestampMixin:
"""时间戳混入类"""
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class SoftDeleteMixin:
"""软删除混入类"""
is_deleted = Column(Boolean, default=False)
deleted_at = Column(DateTime)
class BaseModel(Base, TimestampMixin, SoftDeleteMixin):
"""基础模型"""
__abstract__ = True
id = Column(Integer, primary_key=True)
class Product(BaseModel):
"""产品模型"""
__tablename__ = 'products'
name = Column(String(100), nullable=False)
price = Column(Integer) # 以分为单位存储价格
description = Column(String(500))
def __repr__(self):
return f"<Product(name='{self.name}', price={self.price})>"
print(" ✓ 定义带混入的Product模型")
# 3. 自定义查询类
print("\n3. 自定义查询类:")
from sqlalchemy.orm import Query
class SoftDeleteQuery(Query):
"""支持软删除的查询类"""
def filter_active(self):
"""过滤未删除的记录"""
return self.filter(self.column_descriptions[0]['type'].is_deleted == False)
def filter_deleted(self):
"""过滤已删除的记录"""
return self.filter(self.column_descriptions[0]['type'].is_deleted == True)
# 4. 事务管理
print("\n4. 事务管理:")
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
Session = sessionmaker(bind=engine)
@contextmanager
def get_db_session():
"""数据库会话上下文管理器"""
session = Session()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
# 创建表
Base.metadata.create_all(engine)
# 使用事务
try:
with get_db_session() as session:
# 批量创建产品
products = [
Product(name='笔记本电脑', price=599999, description='高性能笔记本'),
Product(name='无线鼠标', price=19999, description='人体工学设计'),
Product(name='机械键盘', price=39999, description='青轴机械键盘')
]
session.add_all(products)
print(" ✓ 批量创建产品(事务中)")
# 模拟可能的错误
# raise Exception("模拟错误")
except Exception as e:
print(f" ✗ 事务失败: {e}")
# 5. 查询优化
print("\n5. 查询优化:")
with get_db_session() as session:
# 延迟加载 vs 立即加载
from sqlalchemy.orm import joinedload, selectinload
# 查询产品数量
product_count = session.query(Product).filter(Product.is_deleted == False).count()
print(f" 活跃产品数量: {product_count}")
# 分页查询
page_size = 2
page_num = 1
offset = (page_num - 1) * page_size
products = session.query(Product).filter(
Product.is_deleted == False
).offset(offset).limit(page_size).all()
print(f" 分页查询结果 (第{page_num}页):")
for product in products:
print(f" {product.name}: ¥{product.price/100:.2f}")
# 6. 原生SQL查询
print("\n6. 原生SQL查询:")
with get_db_session() as session:
# 执行原生SQL
result = session.execute(
"SELECT name, price FROM products WHERE is_deleted = 0 ORDER BY price DESC"
)
print(" 价格排序(原生SQL):")
for row in result:
print(f" {row[0]}: ¥{row[1]/100:.2f}")
# 7. 数据验证和约束
print("\n7. 数据验证:")
from sqlalchemy.orm import validates
class ValidatedProduct(BaseModel):
"""带验证的产品模型"""
__tablename__ = 'validated_products'
name = Column(String(100), nullable=False)
price = Column(Integer)
@validates('price')
def validate_price(self, key, price):
"""价格验证"""
if price is not None and price < 0:
raise ValueError("价格不能为负数")
return price
@validates('name')
def validate_name(self, key, name):
"""名称验证"""
if not name or len(name.strip()) == 0:
raise ValueError("产品名称不能为空")
return name.strip()
print(" ✓ 定义带验证的产品模型")
# 8. 性能监控
print("\n8. 性能监控:")
import time
class QueryTimer:
"""查询计时器"""
def __init__(self, description):
self.description = description
self.start_time = None
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
elapsed = time.time() - self.start_time
print(f" {self.description}: {elapsed*1000:.2f}ms")
with get_db_session() as session:
with QueryTimer("查询所有产品"):
products = session.query(Product).all()
with QueryTimer("统计产品数量"):
count = session.query(Product).count()
print("\n ✓ SQLAlchemy高级功能演示完成")
# 运行SQLAlchemy高级演示
sqlalchemy_advanced_demo()
# 5. 数据库设计最佳实践
# 5.1 数据库设计原则
def database_design_demo():
"""数据库设计最佳实践演示"""
print("=== 数据库设计最佳实践 ===")
# 1. 数据库设计原则
print("\n1. 数据库设计原则:")
design_principles = {
"第一范式 (1NF)": {
"定义": "每个字段都是原子性的,不可再分",
"示例": "将'姓名'字段拆分为'姓'和'名'",
"好处": "避免数据冗余,便于查询和维护"
},
"第二范式 (2NF)": {
"定义": "满足1NF,且非主键字段完全依赖于主键",
"示例": "订单表中商品信息应该单独建表",
"好处": "减少数据冗余,提高数据一致性"
},
"第三范式 (3NF)": {
"定义": "满足2NF,且非主键字段不依赖于其他非主键字段",
"示例": "员工表中部门信息应该单独建表",
"好处": "进一步减少冗余,提高数据完整性"
}
}
for principle, details in design_principles.items():
print(f"\n {principle}:")
for key, value in details.items():
print(f" {key}: {value}")
# 2. 命名规范
print("\n2. 命名规范:")
naming_conventions = {
"表名": {
"规则": "使用复数形式,小写字母,下划线分隔",
"示例": "users, user_profiles, order_items",
"避免": "User, userProfile, OrderItem"
},
"字段名": {
"规则": "使用小写字母,下划线分隔,见名知意",
"示例": "user_id, created_at, email_address",
"避免": "userId, createdAt, email"
},
"索引名": {
"规则": "idx_表名_字段名",
"示例": "idx_users_email, idx_orders_created_at",
"避免": "index1, user_index"
},
"外键名": {
"规则": "fk_表名_引用表名",
"示例": "fk_orders_users, fk_order_items_products",
"避免": "foreign_key1, user_fk"
}
}
for category, details in naming_conventions.items():
print(f"\n {category}:")
for key, value in details.items():
print(f" {key}: {value}")
# 3. 数据类型选择
print("\n3. 数据类型选择:")
data_type_guidelines = {
"整数类型": {
"TINYINT": "0-255,适用于状态、类型等小范围值",
"INT": "适用于ID、数量等常规整数",
"BIGINT": "适用于大数值,如时间戳、大ID"
},
"字符串类型": {
"CHAR": "固定长度,适用于长度固定的字段如手机号",
"VARCHAR": "可变长度,适用于姓名、邮箱等",
"TEXT": "大文本,适用于文章内容、描述等"
},
"时间类型": {
"DATE": "仅日期,如生日",
"DATETIME": "日期和时间,如创建时间",
"TIMESTAMP": "时间戳,自动更新"
},
"数值类型": {
"DECIMAL": "精确小数,适用于金额",
"FLOAT/DOUBLE": "浮点数,适用于科学计算"
}
}
for category, types in data_type_guidelines.items():
print(f"\n {category}:")
for type_name, usage in types.items():
print(f" {type_name}: {usage}")
# 4. 索引设计
print("\n4. 索引设计原则:")
index_guidelines = [
"为经常用于WHERE条件的字段创建索引",
"为经常用于JOIN的字段创建索引",
"为经常用于ORDER BY的字段创建索引",
"避免在小表上创建过多索引",
"避免在频繁更新的字段上创建索引",
"考虑创建复合索引来优化多字段查询",
"定期分析和优化索引使用情况"
]
for i, guideline in enumerate(index_guidelines, 1):
print(f" {i}. {guideline}")
# 5. 示例:电商系统数据库设计
print("\n5. 电商系统数据库设计示例:")
ecommerce_tables = {
"users": {
"字段": ["id", "username", "email", "password_hash", "phone", "created_at", "updated_at"],
"索引": ["idx_users_email", "idx_users_username"],
"说明": "用户基本信息表"
},
"user_profiles": {
"字段": ["user_id", "first_name", "last_name", "birth_date", "gender", "avatar_url"],
"索引": ["idx_user_profiles_user_id"],
"说明": "用户详细信息表"
},
"categories": {
"字段": ["id", "name", "parent_id", "sort_order", "is_active"],
"索引": ["idx_categories_parent_id"],
"说明": "商品分类表(支持层级)"
},
"products": {
"字段": ["id", "name", "description", "price", "stock", "category_id", "created_at"],
"索引": ["idx_products_category_id", "idx_products_price"],
"说明": "商品信息表"
},
"orders": {
"字段": ["id", "user_id", "total_amount", "status", "created_at", "updated_at"],
"索引": ["idx_orders_user_id", "idx_orders_status", "idx_orders_created_at"],
"说明": "订单主表"
},
"order_items": {
"字段": ["id", "order_id", "product_id", "quantity", "price", "subtotal"],
"索引": ["idx_order_items_order_id", "idx_order_items_product_id"],
"说明": "订单明细表"
}
}
for table_name, details in ecommerce_tables.items():
print(f"\n {table_name}:")
print(f" 说明: {details['说明']}")
print(f" 字段: {', '.join(details['字段'])}")
print(f" 索引: {', '.join(details['索引'])}")
# 运行数据库设计演示
database_design_demo()
# 5.2 性能优化策略
def database_performance_demo():
"""数据库性能优化演示"""
print("=== 数据库性能优化策略 ===")
# 1. 查询优化
print("\n1. 查询优化策略:")
query_optimization = {
"使用索引": {
"原则": "为WHERE、JOIN、ORDER BY字段创建合适的索引",
"示例": "CREATE INDEX idx_users_email ON users(email)",
"注意": "避免过多索引,影响写入性能"
},
"避免SELECT *": {
"原则": "只查询需要的字段",
"好例子": "SELECT id, name FROM users",
"坏例子": "SELECT * FROM users"
},
"使用LIMIT": {
"原则": "限制查询结果数量",
"示例": "SELECT * FROM products LIMIT 10 OFFSET 20",
"好处": "减少内存使用和网络传输"
},
"优化JOIN": {
"原则": "使用合适的JOIN类型,确保JOIN字段有索引",
"示例": "SELECT u.name, p.title FROM users u INNER JOIN posts p ON u.id = p.user_id",
"注意": "避免不必要的JOIN操作"
}
}
for strategy, details in query_optimization.items():
print(f"\n {strategy}:")
for key, value in details.items():
print(f" {key}: {value}")
# 2. 索引优化
print("\n2. 索引优化技巧:")
index_optimization = [
"复合索引:将多个常一起查询的字段组合成复合索引",
"前缀索引:对于长字符串字段,使用前缀索引节省空间",
"覆盖索引:让索引包含查询所需的所有字段",
"部分索引:只为满足特定条件的行创建索引",
"定期维护:使用ANALYZE TABLE更新索引统计信息"
]
for i, tip in enumerate(index_optimization, 1):
print(f" {i}. {tip}")
# 3. 连接池优化
print("\n3. 连接池优化:")
connection_pool_tips = {
"合理设置池大小": "根据应用并发量设置合适的连接池大小",
"连接超时设置": "设置合理的连接超时和空闲超时时间",
"连接验证": "启用连接验证,确保连接可用性",
"监控连接使用": "监控连接池使用情况,及时调整配置"
}
for tip, description in connection_pool_tips.items():
print(f" • {tip}: {description}")
# 4. 缓存策略
print("\n4. 缓存策略:")
caching_strategies = {
"查询结果缓存": {
"适用场景": "频繁查询且数据变化不频繁",
"实现方式": "Redis、Memcached",
"注意事项": "设置合理的过期时间"
},
"对象缓存": {
"适用场景": "复杂对象的构建成本较高",
"实现方式": "应用层缓存",
"注意事项": "注意缓存一致性"
},
"页面缓存": {
"适用场景": "静态或半静态页面",
"实现方式": "CDN、反向代理",
"注意事项": "处理动态内容"
}
}
for strategy, details in caching_strategies.items():
print(f"\n {strategy}:")
for key, value in details.items():
print(f" {key}: {value}")
# 5. 分库分表策略
print("\n5. 分库分表策略:")
sharding_strategies = {
"垂直分库": "按业务模块分离数据库",
"水平分库": "按数据量分离数据库",
"垂直分表": "按字段使用频率分离表",
"水平分表": "按数据量或时间分离表"
}
for strategy, description in sharding_strategies.items():
print(f" • {strategy}: {description}")
# 6. 监控和诊断
print("\n6. 性能监控指标:")
monitoring_metrics = [
"查询响应时间",
"慢查询日志",
"连接数使用情况",
"缓存命中率",
"磁盘I/O使用率",
"CPU和内存使用率",
"锁等待时间",
"死锁检测"
]
for metric in monitoring_metrics:
print(f" • {metric}")
# 7. 性能测试示例
print("\n7. 性能测试示例:")
performance_test_example = '''
# 使用Python进行简单的性能测试
import time
import sqlite3
def performance_test():
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
# 创建测试表
cursor.execute('''
CREATE TABLE test_table (
id INTEGER PRIMARY KEY,
name TEXT,
value INTEGER
)
''')
# 测试插入性能
start_time = time.time()
for i in range(10000):
cursor.execute("INSERT INTO test_table (name, value) VALUES (?, ?)",
(f'name_{i}', i))
conn.commit()
insert_time = time.time() - start_time
# 测试查询性能
start_time = time.time()
cursor.execute("SELECT * FROM test_table WHERE value > 5000")
results = cursor.fetchall()
query_time = time.time() - start_time
print(f"插入10000条记录耗时: {insert_time:.3f}秒")
print(f"查询耗时: {query_time:.3f}秒")
print(f"查询结果数量: {len(results)}")
conn.close()
'''
print(" 性能测试代码示例:")
print(performance_test_example)
# 运行数据库性能优化演示
database_performance_demo()
# 6. 实际应用案例
# 6.1 用户管理系统
def user_management_system_demo():
"""用户管理系统演示"""
print("=== 用户管理系统演示 ===")
import sqlite3
import hashlib
import datetime
import json
# 1. 数据库初始化
print("\n1. 初始化用户管理数据库:")
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
# 创建用户表
cursor.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(64) NOT NULL,
salt VARCHAR(32) NOT NULL,
status INTEGER DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_login DATETIME,
login_count INTEGER DEFAULT 0
)
''')
# 创建用户资料表
cursor.execute('''
CREATE TABLE user_profiles (
user_id INTEGER PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50),
phone VARCHAR(20),
birth_date DATE,
avatar_url VARCHAR(200),
bio TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
)
''')
# 创建登录日志表
cursor.execute('''
CREATE TABLE login_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
ip_address VARCHAR(45),
user_agent TEXT,
login_time DATETIME DEFAULT CURRENT_TIMESTAMP,
success INTEGER DEFAULT 1,
FOREIGN KEY (user_id) REFERENCES users(id)
)
''')
# 创建索引
cursor.execute('CREATE INDEX idx_users_email ON users(email)')
cursor.execute('CREATE INDEX idx_users_username ON users(username)')
cursor.execute('CREATE INDEX idx_login_logs_user_id ON login_logs(user_id)')
cursor.execute('CREATE INDEX idx_login_logs_time ON login_logs(login_time)')
print(" ✓ 创建用户管理相关表和索引")
# 2. 用户注册功能
print("\n2. 用户注册功能:")
def register_user(username, email, password, first_name=None, last_name=None):
"""用户注册"""
try:
# 生成盐值和密码哈希
import secrets
salt = secrets.token_hex(16)
password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
# 插入用户基本信息
cursor.execute('''
INSERT INTO users (username, email, password_hash, salt)
VALUES (?, ?, ?, ?)
''', (username, email, password_hash, salt))
user_id = cursor.lastrowid
# 插入用户资料
cursor.execute('''
INSERT INTO user_profiles (user_id, first_name, last_name)
VALUES (?, ?, ?)
''', (user_id, first_name, last_name))
conn.commit()
return {'success': True, 'user_id': user_id, 'message': '注册成功'}
except sqlite3.IntegrityError as e:
conn.rollback()
if 'username' in str(e):
return {'success': False, 'message': '用户名已存在'}
elif 'email' in str(e):
return {'success': False, 'message': '邮箱已被注册'}
else:
return {'success': False, 'message': '注册失败'}
except Exception as e:
conn.rollback()
return {'success': False, 'message': f'系统错误: {str(e)}'}
# 注册测试用户
users_to_register = [
('张三', 'zhangsan@example.com', 'password123', '三', '张'),
('李四', 'lisi@example.com', 'password456', '四', '李'),
('王五', 'wangwu@example.com', 'password789', '五', '王')
]
for username, email, password, first_name, last_name in users_to_register:
result = register_user(username, email, password, first_name, last_name)
print(f" 注册用户 {username}: {result['message']}")
# 3. 用户登录功能
print("\n3. 用户登录功能:")
def login_user(username_or_email, password, ip_address='127.0.0.1', user_agent='Python Client'):
"""用户登录"""
try:
# 查找用户
cursor.execute('''
SELECT id, username, email, password_hash, salt, status
FROM users
WHERE username = ? OR email = ?
''', (username_or_email, username_or_email))
user = cursor.fetchone()
if not user:
# 记录失败登录
cursor.execute('''
INSERT INTO login_logs (user_id, ip_address, user_agent, success)
VALUES (NULL, ?, ?, 0)
''', (ip_address, user_agent))
conn.commit()
return {'success': False, 'message': '用户不存在'}
user_id, username, email, stored_hash, salt, status = user
# 检查用户状态
if status != 1:
return {'success': False, 'message': '账户已被禁用'}
# 验证密码
password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
if password_hash != stored_hash:
# 记录失败登录
cursor.execute('''
INSERT INTO login_logs (user_id, ip_address, user_agent, success)
VALUES (?, ?, ?, 0)
''', (user_id, ip_address, user_agent))
conn.commit()
return {'success': False, 'message': '密码错误'}
# 更新用户登录信息
cursor.execute('''
UPDATE users
SET last_login = CURRENT_TIMESTAMP,
login_count = login_count + 1,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (user_id,))
# 记录成功登录
cursor.execute('''
INSERT INTO login_logs (user_id, ip_address, user_agent, success)
VALUES (?, ?, ?, 1)
''', (user_id, ip_address, user_agent))
conn.commit()
return {
'success': True,
'message': '登录成功',
'user': {
'id': user_id,
'username': username,
'email': email
}
}
except Exception as e:
conn.rollback()
return {'success': False, 'message': f'登录失败: {str(e)}'}
# 测试登录
login_tests = [
('张三', 'password123'),
('lisi@example.com', 'password456'),
('王五', 'wrongpassword'),
('nonexistent', 'password')
]
for username, password in login_tests:
result = login_user(username, password)
print(f" 登录测试 {username}: {result['message']}")
# 4. 用户信息管理
print("\n4. 用户信息管理:")
def get_user_profile(user_id):
"""获取用户完整信息"""
cursor.execute('''
SELECT u.id, u.username, u.email, u.status, u.created_at,
u.last_login, u.login_count,
p.first_name, p.last_name, p.phone, p.birth_date, p.bio
FROM users u
LEFT JOIN user_profiles p ON u.id = p.user_id
WHERE u.id = ?
''', (user_id,))
result = cursor.fetchone()
if result:
return {
'id': result[0],
'username': result[1],
'email': result[2],
'status': result[3],
'created_at': result[4],
'last_login': result[5],
'login_count': result[6],
'first_name': result[7],
'last_name': result[8],
'phone': result[9],
'birth_date': result[10],
'bio': result[11]
}
return None
def update_user_profile(user_id, **kwargs):
"""更新用户资料"""
try:
# 更新用户基本信息
if 'email' in kwargs:
cursor.execute('''
UPDATE users SET email = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (kwargs['email'], user_id))
# 更新用户详细资料
profile_fields = ['first_name', 'last_name', 'phone', 'birth_date', 'bio']
profile_updates = {k: v for k, v in kwargs.items() if k in profile_fields}
if profile_updates:
set_clause = ', '.join([f'{k} = ?' for k in profile_updates.keys()])
values = list(profile_updates.values()) + [user_id]
cursor.execute(f'''
UPDATE user_profiles SET {set_clause}
WHERE user_id = ?
''', values)
conn.commit()
return {'success': True, 'message': '资料更新成功'}
except Exception as e:
conn.rollback()
return {'success': False, 'message': f'更新失败: {str(e)}'}
# 测试用户信息管理
user_profile = get_user_profile(1)
if user_profile:
print(f" 用户信息: {user_profile['username']} ({user_profile['email']})")
print(f" 登录次数: {user_profile['login_count']}")
# 更新用户资料
update_result = update_user_profile(1, phone='13800138000', bio='Python开发者')
print(f" 更新资料: {update_result['message']}")
# 5. 统计分析
print("\n5. 用户统计分析:")
def get_user_statistics():
"""获取用户统计信息"""
stats = {}
# 总用户数
cursor.execute('SELECT COUNT(*) FROM users')
stats['total_users'] = cursor.fetchone()[0]
# 活跃用户数(状态为1)
cursor.execute('SELECT COUNT(*) FROM users WHERE status = 1')
stats['active_users'] = cursor.fetchone()[0]
# 今日登录用户数
cursor.execute('''
SELECT COUNT(DISTINCT user_id)
FROM login_logs
WHERE DATE(login_time) = DATE('now') AND success = 1
''')
stats['today_logins'] = cursor.fetchone()[0]
# 平均登录次数
cursor.execute('SELECT AVG(login_count) FROM users WHERE login_count > 0')
avg_logins = cursor.fetchone()[0]
stats['avg_login_count'] = round(avg_logins, 2) if avg_logins else 0
# 最近注册的用户
cursor.execute('''
SELECT username, created_at
FROM users
ORDER BY created_at DESC
LIMIT 3
''')
stats['recent_users'] = cursor.fetchall()
return stats
stats = get_user_statistics()
print(f" 总用户数: {stats['total_users']}")
print(f" 活跃用户数: {stats['active_users']}")
print(f" 今日登录: {stats['today_logins']}")
print(f" 平均登录次数: {stats['avg_login_count']}")
print(" 最近注册用户:")
for username, created_at in stats['recent_users']:
print(f" {username} - {created_at}")
# 6. 清理资源
conn.close()
print("\n ✓ 用户管理系统演示完成")
# 运行用户管理系统演示
user_management_system_demo()
# 6.2 数据分析系统
def data_analysis_system_demo():
"""数据分析系统演示"""
print("=== 数据分析系统演示 ===")
import sqlite3
import random
import datetime
from datetime import timedelta
# 1. 创建销售数据分析数据库
print("\n1. 创建销售数据分析数据库:")
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
# 创建产品表
cursor.execute('''
CREATE TABLE products (
id INTEGER PRIMARY KEY,
name VARCHAR(100) NOT NULL,
category VARCHAR(50),
price DECIMAL(10,2),
cost DECIMAL(10,2),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建销售记录表
cursor.execute('''
CREATE TABLE sales (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
quantity INTEGER,
unit_price DECIMAL(10,2),
total_amount DECIMAL(10,2),
sale_date DATE,
customer_id INTEGER,
region VARCHAR(50),
FOREIGN KEY (product_id) REFERENCES products(id)
)
''')
# 创建客户表
cursor.execute('''
CREATE TABLE customers (
id INTEGER PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
region VARCHAR(50),
registration_date DATE
)
''')
# 创建索引
cursor.execute('CREATE INDEX idx_sales_date ON sales(sale_date)')
cursor.execute('CREATE INDEX idx_sales_product ON sales(product_id)')
cursor.execute('CREATE INDEX idx_sales_customer ON sales(customer_id)')
cursor.execute('CREATE INDEX idx_sales_region ON sales(region)')
print(" ✓ 创建销售分析相关表和索引")
# 2. 生成测试数据
print("\n2. 生成测试数据:")
# 插入产品数据
products = [
(1, '笔记本电脑', '电子产品', 5999.00, 4500.00),
(2, '无线鼠标', '电子产品', 199.00, 120.00),
(3, '机械键盘', '电子产品', 399.00, 250.00),
(4, '显示器', '电子产品', 1299.00, 900.00),
(5, '办公椅', '办公用品', 899.00, 600.00),
(6, '办公桌', '办公用品', 1599.00, 1000.00)
]
cursor.executemany('''
INSERT INTO products (id, name, category, price, cost)
VALUES (?, ?, ?, ?, ?)
''', products)
# 插入客户数据
customers = [
(1, '张三公司', 'zhangsan@company.com', '北京', '2023-01-15'),
(2, '李四企业', 'lisi@enterprise.com', '上海', '2023-02-20'),
(3, '王五集团', 'wangwu@group.com', '广州', '2023-03-10'),
(4, '赵六科技', 'zhaoliu@tech.com', '深圳', '2023-04-05'),
(5, '钱七贸易', 'qianqi@trade.com', '杭州', '2023-05-12')
]
cursor.executemany('''
INSERT INTO customers (id, name, email, region, registration_date)
VALUES (?, ?, ?, ?, ?)
''', customers)
# 生成销售数据
regions = ['北京', '上海', '广州', '深圳', '杭州']
start_date = datetime.date(2023, 1, 1)
end_date = datetime.date(2023, 12, 31)
sales_data = []
for _ in range(500): # 生成500条销售记录
product_id = random.randint(1, 6)
customer_id = random.randint(1, 5)
quantity = random.randint(1, 10)
# 获取产品价格
cursor.execute('SELECT price FROM products WHERE id = ?', (product_id,))
unit_price = cursor.fetchone()[0]
# 添加一些价格波动
unit_price = float(unit_price) * random.uniform(0.9, 1.1)
total_amount = unit_price * quantity
# 随机日期
days_diff = (end_date - start_date).days
random_days = random.randint(0, days_diff)
sale_date = start_date + timedelta(days=random_days)
region = random.choice(regions)
sales_data.append((
product_id, quantity, unit_price, total_amount,
sale_date.strftime('%Y-%m-%d'), customer_id, region
))
cursor.executemany('''
INSERT INTO sales (product_id, quantity, unit_price, total_amount, sale_date, customer_id, region)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', sales_data)
conn.commit()
print(f" ✓ 生成 {len(products)} 个产品, {len(customers)} 个客户, {len(sales_data)} 条销售记录")
# 3. 销售数据分析
print("\n3. 销售数据分析:")
# 总销售额
cursor.execute('SELECT SUM(total_amount) FROM sales')
total_sales = cursor.fetchone()[0]
print(f" 总销售额: ¥{total_sales:,.2f}")
# 按月份统计销售额
cursor.execute('''
SELECT strftime('%Y-%m', sale_date) as month,
SUM(total_amount) as monthly_sales,
COUNT(*) as order_count
FROM sales
GROUP BY strftime('%Y-%m', sale_date)
ORDER BY month
''')
print("\n 月度销售统计:")
monthly_data = cursor.fetchall()
for month, sales, orders in monthly_data[:6]: # 显示前6个月
print(f" {month}: ¥{sales:,.2f} ({orders} 笔订单)")
# 按产品类别统计
cursor.execute('''
SELECT p.category,
SUM(s.total_amount) as category_sales,
SUM(s.quantity) as total_quantity,
COUNT(s.id) as order_count
FROM sales s
JOIN products p ON s.product_id = p.id
GROUP BY p.category
ORDER BY category_sales DESC
''')
print("\n 产品类别销售统计:")
for category, sales, quantity, orders in cursor.fetchall():
print(f" {category}: ¥{sales:,.2f} (数量: {quantity}, 订单: {orders})")
# 按地区统计
cursor.execute('''
SELECT region,
SUM(total_amount) as region_sales,
COUNT(*) as order_count,
AVG(total_amount) as avg_order_value
FROM sales
GROUP BY region
ORDER BY region_sales DESC
''')
print("\n 地区销售统计:")
for region, sales, orders, avg_value in cursor.fetchall():
print(f" {region}: ¥{sales:,.2f} ({orders} 笔, 平均: ¥{avg_value:.2f})")
# 4. 高级分析查询
print("\n4. 高级分析查询:")
# 最畅销产品
cursor.execute('''
SELECT p.name,
SUM(s.quantity) as total_sold,
SUM(s.total_amount) as total_revenue,
COUNT(s.id) as order_count
FROM sales s
JOIN products p ON s.product_id = p.id
GROUP BY p.id, p.name
ORDER BY total_sold DESC
LIMIT 3
''')
print(" 最畅销产品 TOP 3:")
for name, sold, revenue, orders in cursor.fetchall():
print(f" {name}: 销量 {sold}, 收入 ¥{revenue:,.2f}, 订单 {orders}")
# 客户价值分析
cursor.execute('''
SELECT c.name,
SUM(s.total_amount) as customer_value,
COUNT(s.id) as order_count,
AVG(s.total_amount) as avg_order_value,
MAX(s.sale_date) as last_purchase
FROM sales s
JOIN customers c ON s.customer_id = c.id
GROUP BY c.id, c.name
ORDER BY customer_value DESC
LIMIT 3
''')
print("\n 高价值客户 TOP 3:")
for name, value, orders, avg_value, last_purchase in cursor.fetchall():
print(f" {name}: 总价值 ¥{value:,.2f}, {orders} 笔订单, 最后购买 {last_purchase}")
# 季度趋势分析
cursor.execute('''
SELECT
CASE
WHEN strftime('%m', sale_date) IN ('01','02','03') THEN 'Q1'
WHEN strftime('%m', sale_date) IN ('04','05','06') THEN 'Q2'
WHEN strftime('%m', sale_date) IN ('07','08','09') THEN 'Q3'
ELSE 'Q4'
END as quarter,
SUM(total_amount) as quarterly_sales,
COUNT(*) as order_count
FROM sales
WHERE strftime('%Y', sale_date) = '2023'
GROUP BY quarter
ORDER BY quarter
''')
print("\n 2023年季度销售趋势:")
for quarter, sales, orders in cursor.fetchall():
print(f" {quarter}: ¥{sales:,.2f} ({orders} 笔订单)")
# 5. 利润分析
print("\n5. 利润分析:")
cursor.execute('''
SELECT p.name,
SUM(s.quantity) as total_sold,
SUM(s.total_amount) as revenue,
SUM(s.quantity * p.cost) as total_cost,
SUM(s.total_amount) - SUM(s.quantity * p.cost) as profit,
(SUM(s.total_amount) - SUM(s.quantity * p.cost)) / SUM(s.total_amount) * 100 as profit_margin
FROM sales s
JOIN products p ON s.product_id = p.id
GROUP BY p.id, p.name
ORDER BY profit DESC
''')
print(" 产品利润分析:")
for name, sold, revenue, cost, profit, margin in cursor.fetchall():
print(f" {name}: 利润 ¥{profit:,.2f}, 利润率 {margin:.1f}%")
# 6. 清理资源
conn.close()
print("\n ✓ 数据分析系统演示完成")
# 运行数据分析系统演示
data_analysis_system_demo()
# 7. 最佳实践和安全
# 7.1 数据库安全最佳实践
def database_security_demo():
"""数据库安全最佳实践演示"""
print("=== 数据库安全最佳实践 ===")
# 1. SQL注入防护
print("\n1. SQL注入防护:")
import sqlite3
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()
# 创建测试表
cursor.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY,
username VARCHAR(50),
password VARCHAR(100)
)
''')
cursor.execute("INSERT INTO users (username, password) VALUES ('admin', 'secret123')")
cursor.execute("INSERT INTO users (username, password) VALUES ('user1', 'password456')")
conn.commit()
# 错误的做法(容易SQL注入)
def unsafe_login(username, password):
"""不安全的登录方法(仅用于演示)"""
query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
print(f" ⚠️ 不安全的查询: {query}")
# 注意:这里只是演示,实际不执行
return "不安全的查询方式"
# 正确的做法(使用参数化查询)
def safe_login(username, password):
"""安全的登录方法"""
cursor.execute("SELECT * FROM users WHERE username = ? AND password = ?", (username, password))
result = cursor.fetchone()
return result is not None
print(" 安全登录演示:")
# 正常登录
if safe_login('admin', 'secret123'):
print(" ✓ 正常登录成功")
# 模拟SQL注入尝试
malicious_input = "admin'; DROP TABLE users; --"
unsafe_login(malicious_input, "anything")
print(" ✓ 参数化查询防止了SQL注入")
# 2. 密码安全
print("\n2. 密码安全处理:")
import hashlib
import secrets
import bcrypt
def hash_password_simple(password):
"""简单的密码哈希(不推荐用于生产)"""
salt = secrets.token_hex(16)
password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
return salt, password_hash
def hash_password_bcrypt(password):
"""使用bcrypt哈希密码(推荐)"""
try:
# 生成盐并哈希密码
salt = bcrypt.gensalt()
password_hash = bcrypt.hashpw(password.encode('utf-8'), salt)
return password_hash.decode('utf-8')
except ImportError:
print(" ⚠️ bcrypt未安装,使用简单哈希方法")
return hash_password_simple(password)
def verify_password_bcrypt(password, hashed):
"""验证bcrypt哈希密码"""
try:
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
except ImportError:
return False
# 演示密码哈希
test_password = "mySecurePassword123!"
# 简单哈希方法
salt, simple_hash = hash_password_simple(test_password)
print(f" 简单哈希: {simple_hash[:20]}...")
# bcrypt方法(如果可用)
try:
bcrypt_hash = hash_password_bcrypt(test_password)
print(f" bcrypt哈希: {bcrypt_hash[:20]}...")
# 验证密码
if verify_password_bcrypt(test_password, bcrypt_hash):
print(" ✓ 密码验证成功")
except:
print(" ⚠️ bcrypt不可用,建议安装: pip install bcrypt")
# 3. 数据库连接安全
print("\n3. 数据库连接安全:")
connection_security_tips = {
"使用SSL/TLS": "确保数据库连接使用加密传输",
"最小权限原则": "为应用程序创建专用数据库用户,只授予必要权限",
"连接字符串保护": "不要在代码中硬编码数据库凭据",
"连接池配置": "合理配置连接池,避免连接泄露",
"网络隔离": "将数据库服务器放在私有网络中",
"定期更新": "保持数据库软件和驱动程序最新"
}
for tip, description in connection_security_tips.items():
print(f" • {tip}: {description}")
# 4. 数据加密
print("\n4. 敏感数据加密:")
from cryptography.fernet import Fernet
try:
# 生成加密密钥
key = Fernet.generate_key()
cipher_suite = Fernet(key)
# 加密敏感数据
sensitive_data = "用户身份证号: 123456789012345678"
encrypted_data = cipher_suite.encrypt(sensitive_data.encode())
print(f" 原始数据: {sensitive_data}")
print(f" 加密数据: {encrypted_data[:30]}...")
# 解密数据
decrypted_data = cipher_suite.decrypt(encrypted_data).decode()
print(f" 解密数据: {decrypted_data}")
print(" ✓ 数据加密/解密成功")
except ImportError:
print(" ⚠️ cryptography库未安装,建议安装: pip install cryptography")
# 使用简单的base64编码作为演示(不安全)
import base64
sensitive_data = "演示数据"
encoded = base64.b64encode(sensitive_data.encode()).decode()
decoded = base64.b64decode(encoded).decode()
print(f" Base64编码演示: {encoded}")
print(" ⚠️ Base64不是加密,仅用于演示")
# 5. 审计和日志
print("\n5. 数据库审计和日志:")
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def log_database_operation(operation, table, user_id=None, details=None):
"""记录数据库操作日志"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'operation': operation,
'table': table,
'user_id': user_id,
'details': details
}
logging.info(f"DB Operation: {log_entry}")
# 演示日志记录
log_database_operation('SELECT', 'users', user_id=1, details='Login attempt')
log_database_operation('UPDATE', 'user_profiles', user_id=1, details='Profile update')
log_database_operation('DELETE', 'sessions', user_id=1, details='Logout')
print(" ✓ 数据库操作日志记录完成")
# 6. 备份和恢复
print("\n6. 数据备份策略:")
backup_strategies = {
"定期备份": "设置自动化的定期备份任务",
"增量备份": "结合全量备份和增量备份",
"异地备份": "将备份存储在不同的地理位置",
"备份测试": "定期测试备份的完整性和可恢复性",
"版本控制": "保留多个备份版本",
"加密备份": "对备份文件进行加密保护"
}
for strategy, description in backup_strategies.items():
print(f" • {strategy}: {description}")
# 简单的SQLite备份演示
def backup_sqlite_database(source_db, backup_path):
"""SQLite数据库备份"""
try:
import shutil
shutil.copy2(source_db, backup_path)
return True
except Exception as e:
print(f"备份失败: {e}")
return False
print(" ✓ 数据库备份策略说明完成")
# 清理资源
conn.close()
print("\n ✓ 数据库安全最佳实践演示完成")
# 运行数据库安全演示
database_security_demo()
# 8. 学习建议和总结
# 8.1 学习路径
def learning_path_guide():
"""数据库学习路径指南"""
print("=== Python数据库编程学习路径 ===")
learning_stages = {
"初级阶段 (1-2周)": {
"目标": "掌握基础数据库操作",
"内容": [
"理解数据库基本概念",
"学习SQL基础语法",
"掌握Python DB-API 2.0规范",
"练习SQLite基础操作",
"学习CRUD操作"
],
"实践项目": "简单的联系人管理系统"
},
"中级阶段 (2-3周)": {
"目标": "掌握高级数据库功能",
"内容": [
"学习事务处理",
"掌握连接池使用",
"理解索引和查询优化",
"学习MySQL/PostgreSQL操作",
"掌握数据库设计原则"
],
"实践项目": "博客系统或电商系统"
},
"高级阶段 (3-4周)": {
"目标": "掌握ORM和高级特性",
"内容": [
"学习SQLAlchemy ORM",
"掌握数据库迁移",
"学习性能优化技巧",
"理解数据库安全",
"掌握分布式数据库概念"
],
"实践项目": "完整的Web应用后端"
},
"专家阶段 (持续学习)": {
"目标": "深入理解数据库架构",
"内容": [
"学习数据库内部原理",
"掌握分库分表策略",
"学习NoSQL数据库",
"理解大数据处理",
"掌握数据库运维"
],
"实践项目": "高并发分布式系统"
}
}
for stage, details in learning_stages.items():
print(f"\n{stage}:")
print(f" 目标: {details['目标']}")
print(" 学习内容:")
for content in details['内容']:
print(f" • {content}")
print(f" 实践项目: {details['实践项目']}")
# 运行学习路径指南
learning_path_guide()
# 8.2 最佳实践总结
def best_practices_summary():
"""数据库编程最佳实践总结"""
print("=== 数据库编程最佳实践总结 ===")
# 1. 代码组织
print("\n1. 代码组织最佳实践:")
code_organization = [
"使用数据访问对象(DAO)模式分离数据访问逻辑",
"创建数据库连接管理器统一管理连接",
"使用配置文件管理数据库连接参数",
"实现统一的异常处理机制",
"编写可重用的数据库操作工具函数",
"使用类型提示提高代码可读性",
"编写完整的文档和注释"
]
for i, practice in enumerate(code_organization, 1):
print(f" {i}. {practice}")
# 2. 性能优化
print("\n2. 性能优化最佳实践:")
performance_tips = [
"使用连接池避免频繁创建连接",
"合理使用索引优化查询性能",
"避免N+1查询问题",
"使用批量操作处理大量数据",
"实现查询结果缓存",
"定期分析和优化慢查询",
"使用分页避免大结果集"
]
for i, tip in enumerate(performance_tips, 1):
print(f" {i}. {tip}")
# 3. 安全实践
print("\n3. 安全最佳实践:")
security_practices = [
"始终使用参数化查询防止SQL注入",
"对敏感数据进行加密存储",
"使用强密码哈希算法",
"实施最小权限原则",
"启用数据库连接加密",
"定期备份和测试恢复",
"记录和监控数据库访问日志"
]
for i, practice in enumerate(security_practices, 1):
print(f" {i}. {practice}")
# 4. 错误处理
print("\n4. 错误处理最佳实践:")
error_handling = [
"使用try-catch块处理数据库异常",
"实现事务回滚机制",
"提供有意义的错误消息",
"记录详细的错误日志",
"实现重试机制处理临时故障",
"优雅地处理连接超时",
"避免向用户暴露敏感错误信息"
]
for i, practice in enumerate(error_handling, 1):
print(f" {i}. {practice}")
# 运行最佳实践总结
best_practices_summary()
# 8.3 常见陷阱和解决方案
def common_pitfalls_and_solutions():
"""常见陷阱和解决方案"""
print("=== 常见陷阱和解决方案 ===")
pitfalls = {
"连接泄露": {
"问题": "忘记关闭数据库连接导致连接池耗尽",
"解决方案": "使用with语句或try-finally确保连接关闭",
"示例": "with get_connection() as conn: # 自动关闭连接"
},
"SQL注入": {
"问题": "直接拼接SQL字符串导致安全漏洞",
"解决方案": "使用参数化查询或ORM",
"示例": "cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))"
},
"N+1查询": {
"问题": "在循环中执行查询导致性能问题",
"解决方案": "使用JOIN查询或预加载",
"示例": "使用SQLAlchemy的joinedload或selectinload"
},
"事务管理": {
"问题": "忘记提交事务或处理回滚",
"解决方案": "使用上下文管理器自动管理事务",
"示例": "使用@contextmanager装饰器创建事务管理器"
},
"字符编码": {
"问题": "中文字符显示乱码",
"解决方案": "确保数据库、连接和Python使用相同编码",
"示例": "连接字符串中指定charset=utf8mb4"
},
"时区问题": {
"问题": "时间数据在不同时区显示错误",
"解决方案": "统一使用UTC时间或明确指定时区",
"示例": "使用datetime.utcnow()和时区转换"
}
}
for pitfall, details in pitfalls.items():
print(f"\n{pitfall}:")
print(f" 问题: {details['问题']}")
print(f" 解决方案: {details['解决方案']}")
print(f" 示例: {details['示例']}")
# 运行常见陷阱和解决方案
common_pitfalls_and_solutions()
# 8.4 本章总结
def chapter_summary():
"""第20天学习总结"""
print("=== 第20天 - Python数据库访问学习总结 ===")
summary_points = {
"核心概念": [
"数据库基础知识和DB-API 2.0规范",
"SQLite、MySQL等数据库的连接和操作",
"SQL语句的执行和结果处理",
"事务管理和错误处理机制"
],
"重要技能": [
"使用sqlite3模块进行SQLite操作",
"使用mysql-connector-python操作MySQL",
"掌握SQLAlchemy ORM框架",
"实现数据库连接池和性能优化"
],
"实践应用": [
"用户管理系统的设计和实现",
"销售数据分析系统的构建",
"数据库安全和最佳实践",
"性能优化和监控策略"
],
"进阶方向": [
"学习NoSQL数据库(MongoDB, Redis)",
"掌握数据库集群和分布式架构",
"深入理解数据库内部原理",
"学习大数据处理技术"
]
}
for category, points in summary_points.items():
print(f"\n{category}:")
for point in points:
print(f" • {point}")
print("\n学习成果:")
achievements = [
"✓ 掌握了Python数据库编程的基础知识",
"✓ 学会了使用多种数据库和ORM框架",
"✓ 理解了数据库设计和性能优化原则",
"✓ 具备了构建实际数据库应用的能力",
"✓ 了解了数据库安全和最佳实践"
]
for achievement in achievements:
print(f" {achievement}")
print("\n下一步学习建议:")
next_steps = [
"深入学习特定数据库的高级特性",
"实践更复杂的数据库应用项目",
"学习数据库运维和监控技能",
"探索大数据和分布式数据库技术",
"关注数据库技术的最新发展趋势"
]
for step in next_steps:
print(f" • {step}")
print("\n恭喜你完成了Python数据库访问的学习!")
print("数据库是现代应用的核心,继续实践和深入学习将让你成为更优秀的开发者。")
# 运行本章总结
chapter_summary()
# 总结
通过第20天的学习,我们全面掌握了Python数据库访问的各个方面:
- 数据库基础 - 理解了数据库概念、DB-API规范和SQL基础
- SQLite操作 - 掌握了轻量级数据库的使用和高级功能
- MySQL操作 - 学会了企业级数据库的连接和操作
- ORM框架 - 深入学习了SQLAlchemy的使用和高级特性
- 设计最佳实践 - 了解了数据库设计原则和性能优化策略
- 实际应用 - 通过用户管理和数据分析系统掌握了实践技能
- 安全实践 - 学习了数据库安全和最佳实践
数据库编程是后端开发的核心技能,掌握这些知识将为你的Python开发之路奠定坚实的基础。继续实践和深入学习,你将能够构建更加复杂和高效的数据库应用!