第20天-访问数据库

2023/6/15

# 第20天-访问数据库

# 1. 数据库基础

# 1.1 数据库概述

数据库是存储和管理数据的系统,Python提供了多种方式来访问不同类型的数据库。

def database_overview_demo():
    """数据库概述演示"""
    print("=== 数据库概述演示 ===")
    
    # 1. 数据库类型
    print("\n1. 常见数据库类型:")
    
    database_types = {
        "关系型数据库": {
            "SQLite": "轻量级文件数据库,无需服务器",
            "MySQL": "开源关系型数据库管理系统",
            "PostgreSQL": "功能强大的开源对象关系数据库",
            "Oracle": "企业级商业数据库",
            "SQL Server": "微软的关系型数据库"
        },
        "NoSQL数据库": {
            "MongoDB": "文档型数据库",
            "Redis": "内存键值数据库",
            "Cassandra": "分布式列族数据库",
            "Neo4j": "图形数据库"
        }
    }
    
    for category, databases in database_types.items():
        print(f"\n   {category}:")
        for name, description in databases.items():
            print(f"     • {name}: {description}")
    
    # 2. Python数据库API规范
    print("\n2. Python数据库API规范 (DB-API 2.0):")
    
    api_components = {
        "连接对象 (Connection)": "表示数据库连接",
        "游标对象 (Cursor)": "执行SQL语句和获取结果",
        "异常类型": "处理数据库相关错误",
        "类型构造器": "处理特殊数据类型"
    }
    
    for component, description in api_components.items():
        print(f"   • {component}: {description}")
    
    # 3. 常用Python数据库模块
    print("\n3. 常用Python数据库模块:")
    
    python_modules = {
        "sqlite3": "Python内置SQLite模块",
        "pymysql": "纯Python MySQL客户端",
        "psycopg2": "PostgreSQL适配器",
        "cx_Oracle": "Oracle数据库接口",
        "pymongo": "MongoDB Python驱动",
        "redis-py": "Redis Python客户端"
    }
    
    for module, description in python_modules.items():
        print(f"   • {module}: {description}")
    
    # 4. 数据库操作基本流程
    print("\n4. 数据库操作基本流程:")
    
    basic_flow = [
        "1. 导入数据库模块",
        "2. 建立数据库连接",
        "3. 创建游标对象",
        "4. 执行SQL语句",
        "5. 处理查询结果",
        "6. 提交事务(如需要)",
        "7. 关闭游标和连接"
    ]
    
    for step in basic_flow:
        print(f"   {step}")
    
    # 5. 基本SQL语句回顾
    print("\n5. 基本SQL语句回顾:")
    
    sql_examples = {
        "创建表": "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)",
        "插入数据": "INSERT INTO users (name, email) VALUES ('张三', 'zhang@example.com')",
        "查询数据": "SELECT * FROM users WHERE name = '张三'",
        "更新数据": "UPDATE users SET email = 'new@example.com' WHERE id = 1",
        "删除数据": "DELETE FROM users WHERE id = 1"
    }
    
    for operation, sql in sql_examples.items():
        print(f"   {operation}:")
        print(f"     {sql}")

# 运行数据库概述演示
database_overview_demo()

# 2. SQLite数据库

# 2.1 SQLite基础操作

import sqlite3
import os
from datetime import datetime

def sqlite_basic_demo():
    """SQLite基础操作演示"""
    print("=== SQLite基础操作演示 ===")
    
    # 1. 连接数据库
    print("\n1. 连接SQLite数据库:")
    
    # 连接到数据库文件(如果不存在会自动创建)
    db_path = 'example.db'
    conn = sqlite3.connect(db_path)
    print(f"   ✓ 连接到数据库: {db_path}")
    
    # 创建游标
    cursor = conn.cursor()
    print("   ✓ 创建游标对象")
    
    # 2. 创建表
    print("\n2. 创建数据表:")
    
    # 用户表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            username TEXT NOT NULL UNIQUE,
            email TEXT NOT NULL,
            password TEXT NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            is_active BOOLEAN DEFAULT 1
        )
    ''')
    print("   ✓ 创建用户表")
    
    # 文章表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS articles (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            content TEXT,
            author_id INTEGER,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (author_id) REFERENCES users (id)
        )
    ''')
    print("   ✓ 创建文章表")
    
    # 3. 插入数据
    print("\n3. 插入数据:")
    
    # 插入单条记录
    try:
        cursor.execute(
            "INSERT INTO users (username, email, password) VALUES (?, ?, ?)",
            ('张三', 'zhangsan@example.com', 'password123')
        )
        print("   ✓ 插入用户: 张三")
    except sqlite3.IntegrityError as e:
        print(f"   用户已存在: {e}")
    
    # 插入多条记录
    users_data = [
        ('李四', 'lisi@example.com', 'password456'),
        ('王五', 'wangwu@example.com', 'password789'),
        ('赵六', 'zhaoliu@example.com', 'passwordabc')
    ]
    
    try:
        cursor.executemany(
            "INSERT INTO users (username, email, password) VALUES (?, ?, ?)",
            users_data
        )
        print(f"   ✓ 批量插入 {len(users_data)} 个用户")
    except sqlite3.IntegrityError as e:
        print(f"   部分用户已存在: {e}")
    
    # 提交事务
    conn.commit()
    print("   ✓ 提交事务")
    
    # 4. 查询数据
    print("\n4. 查询数据:")
    
    # 查询所有用户
    cursor.execute("SELECT * FROM users")
    users = cursor.fetchall()
    print(f"   查询到 {len(users)} 个用户:")
    for user in users:
        print(f"     ID: {user[0]}, 用户名: {user[1]}, 邮箱: {user[2]}")
    
    # 条件查询
    cursor.execute("SELECT id, username FROM users WHERE username LIKE ?", ('%张%',))
    filtered_users = cursor.fetchall()
    print(f"\n   姓张的用户 ({len(filtered_users)} 个):")
    for user in filtered_users:
        print(f"     ID: {user[0]}, 用户名: {user[1]}")
    
    # 使用fetchone获取单条记录
    cursor.execute("SELECT * FROM users WHERE username = ?", ('李四',))
    user = cursor.fetchone()
    if user:
        print(f"\n   找到用户: {user[1]} ({user[2]})")
    
    # 5. 更新数据
    print("\n5. 更新数据:")
    
    cursor.execute(
        "UPDATE users SET email = ? WHERE username = ?",
        ('zhangsan_new@example.com', '张三')
    )
    affected_rows = cursor.rowcount
    print(f"   ✓ 更新了 {affected_rows} 条记录")
    
    conn.commit()
    
    # 6. 删除数据
    print("\n6. 删除数据:")
    
    cursor.execute("DELETE FROM users WHERE username = ?", ('赵六',))
    deleted_rows = cursor.rowcount
    print(f"   ✓ 删除了 {deleted_rows} 条记录")
    
    conn.commit()
    
    # 7. 查看表结构
    print("\n7. 查看表结构:")
    
    cursor.execute("PRAGMA table_info(users)")
    columns = cursor.fetchall()
    print("   users表结构:")
    for col in columns:
        print(f"     {col[1]} {col[2]} {'NOT NULL' if col[3] else 'NULL'}")
    
    # 8. 关闭连接
    cursor.close()
    conn.close()
    print("\n   ✓ 关闭数据库连接")
    
    # 清理测试文件
    if os.path.exists(db_path):
        os.remove(db_path)
        print(f"   ✓ 清理测试文件: {db_path}")

# 运行SQLite基础演示
sqlite_basic_demo()

# 2.2 SQLite高级功能

import sqlite3
import json
from contextlib import contextmanager

def sqlite_advanced_demo():
    """SQLite高级功能演示"""
    print("=== SQLite高级功能演示 ===")
    
    # 1. 连接管理和上下文管理器
    print("\n1. 连接管理:")
    
    @contextmanager
    def get_db_connection(db_path):
        """数据库连接上下文管理器"""
        conn = None
        try:
            conn = sqlite3.connect(db_path)
            # 设置行工厂,使查询结果可以像字典一样访问
            conn.row_factory = sqlite3.Row
            yield conn
        except Exception as e:
            if conn:
                conn.rollback()
            raise e
        finally:
            if conn:
                conn.close()
    
    db_path = 'advanced_example.db'
    
    # 使用上下文管理器
    with get_db_connection(db_path) as conn:
        cursor = conn.cursor()
        
        # 创建测试表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                price REAL NOT NULL,
                category TEXT,
                metadata TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        print("   ✓ 使用上下文管理器创建连接")
    
    # 2. 事务处理
    print("\n2. 事务处理:")
    
    def transfer_money_demo():
        """转账事务演示"""
        with get_db_connection(db_path) as conn:
            cursor = conn.cursor()
            
            # 创建账户表
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS accounts (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT NOT NULL,
                    balance REAL NOT NULL DEFAULT 0
                )
            ''')
            
            # 插入测试账户
            cursor.execute("INSERT OR REPLACE INTO accounts (id, name, balance) VALUES (1, '账户A', 1000)")
            cursor.execute("INSERT OR REPLACE INTO accounts (id, name, balance) VALUES (2, '账户B', 500)")
            conn.commit()
            
            try:
                # 开始事务
                cursor.execute("BEGIN TRANSACTION")
                
                # 从账户A扣款
                cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", (200, 1))
                
                # 检查余额是否足够
                cursor.execute("SELECT balance FROM accounts WHERE id = ?", (1,))
                balance = cursor.fetchone()[0]
                
                if balance < 0:
                    raise ValueError("余额不足")
                
                # 向账户B转账
                cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", (200, 2))
                
                # 提交事务
                conn.commit()
                print("   ✓ 转账成功")
                
                # 查看结果
                cursor.execute("SELECT name, balance FROM accounts")
                accounts = cursor.fetchall()
                for account in accounts:
                    print(f"     {account['name']}: {account['balance']} 元")
                
            except Exception as e:
                # 回滚事务
                conn.rollback()
                print(f"   ✗ 转账失败,已回滚: {e}")
    
    transfer_money_demo()
    
    # 3. 索引和性能优化
    print("\n3. 索引和性能优化:")
    
    with get_db_connection(db_path) as conn:
        cursor = conn.cursor()
        
        # 创建索引
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_products_category ON products(category)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_products_price ON products(price)")
        print("   ✓ 创建索引")
        
        # 插入测试数据
        products_data = [
            ('笔记本电脑', 5999.99, '电子产品', json.dumps({'brand': 'Dell', 'model': 'XPS13'})),
            ('无线鼠标', 199.99, '电子产品', json.dumps({'brand': 'Logitech', 'wireless': True})),
            ('办公椅', 899.99, '家具', json.dumps({'material': '皮革', 'adjustable': True})),
            ('台灯', 299.99, '家具', json.dumps({'type': 'LED', 'dimmable': True}))
        ]
        
        cursor.executemany(
            "INSERT OR REPLACE INTO products (name, price, category, metadata) VALUES (?, ?, ?, ?)",
            products_data
        )
        conn.commit()
        print(f"   ✓ 插入 {len(products_data)} 个产品")
        
        # 使用EXPLAIN QUERY PLAN查看查询计划
        cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM products WHERE category = '电子产品'")
        plan = cursor.fetchall()
        print("   查询计划:")
        for step in plan:
            print(f"     {step[3]}")
    
    # 4. JSON数据处理
    print("\n4. JSON数据处理:")
    
    with get_db_connection(db_path) as conn:
        cursor = conn.cursor()
        
        # 查询并解析JSON数据
        cursor.execute("SELECT name, metadata FROM products WHERE category = '电子产品'")
        products = cursor.fetchall()
        
        print("   电子产品详情:")
        for product in products:
            metadata = json.loads(product['metadata'])
            print(f"     {product['name']}:")
            for key, value in metadata.items():
                print(f"       {key}: {value}")
    
    # 5. 自定义函数
    print("\n5. 自定义函数:")
    
    def calculate_discount(price, discount_rate):
        """计算折扣价格"""
        return price * (1 - discount_rate / 100)
    
    with get_db_connection(db_path) as conn:
        # 注册自定义函数
        conn.create_function("calculate_discount", 2, calculate_discount)
        
        cursor = conn.cursor()
        
        # 使用自定义函数
        cursor.execute(
            "SELECT name, price, calculate_discount(price, 20) as discounted_price FROM products"
        )
        products = cursor.fetchall()
        
        print("   产品价格(8折优惠):")
        for product in products:
            print(f"     {product['name']}: {product['price']:.2f}{product['discounted_price']:.2f}")
    
    # 6. 备份和恢复
    print("\n6. 数据库备份:")
    
    def backup_database(source_db, backup_db):
        """备份数据库"""
        with sqlite3.connect(source_db) as source:
            with sqlite3.connect(backup_db) as backup:
                source.backup(backup)
        print(f"   ✓ 备份完成: {source_db}{backup_db}")
    
    backup_path = 'backup_example.db'
    backup_database(db_path, backup_path)
    
    # 验证备份
    with get_db_connection(backup_path) as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT COUNT(*) FROM products")
        count = cursor.fetchone()[0]
        print(f"   ✓ 备份验证: 产品数量 {count}")
    
    # 清理文件
    import os
    for file_path in [db_path, backup_path]:
        if os.path.exists(file_path):
            os.remove(file_path)
    print("   ✓ 清理测试文件")

# 运行SQLite高级演示
sqlite_advanced_demo()

# 3. MySQL数据库

# 3.1 MySQL连接和基本操作

# 注意:需要安装 pymysql: pip install pymysql
try:
    import pymysql
    PYMYSQL_AVAILABLE = True
except ImportError:
    PYMYSQL_AVAILABLE = False
    print("PyMySQL未安装,请运行: pip install pymysql")

def mysql_basic_demo():
    """MySQL基础操作演示"""
    print("=== MySQL基础操作演示 ===")
    
    if not PYMYSQL_AVAILABLE:
        print("   ⚠️ PyMySQL模块未安装,跳过MySQL演示")
        return
    
    # 1. 连接配置
    print("\n1. MySQL连接配置:")
    
    # 数据库连接配置
    config = {
        'host': 'localhost',
        'port': 3306,
        'user': 'root',
        'password': 'password',  # 请替换为实际密码
        'database': 'test_db',
        'charset': 'utf8mb4',
        'autocommit': False
    }
    
    print("   连接配置:")
    for key, value in config.items():
        if key != 'password':
            print(f"     {key}: {value}")
        else:
            print(f"     {key}: {'*' * len(str(value))}")
    
    # 2. 连接管理类
    print("\n2. MySQL连接管理:")
    
    class MySQLManager:
        """MySQL连接管理器"""
        
        def __init__(self, config):
            self.config = config
            self.connection = None
        
        def connect(self):
            """建立连接"""
            try:
                self.connection = pymysql.connect(**self.config)
                print("   ✓ 连接MySQL成功")
                return True
            except pymysql.Error as e:
                print(f"   ✗ 连接失败: {e}")
                return False
        
        def disconnect(self):
            """关闭连接"""
            if self.connection:
                self.connection.close()
                print("   ✓ 关闭MySQL连接")
        
        def execute_query(self, sql, params=None):
            """执行查询"""
            try:
                with self.connection.cursor() as cursor:
                    cursor.execute(sql, params or ())
                    return cursor.fetchall()
            except pymysql.Error as e:
                print(f"   ✗ 查询失败: {e}")
                return None
        
        def execute_update(self, sql, params=None):
            """执行更新"""
            try:
                with self.connection.cursor() as cursor:
                    affected_rows = cursor.execute(sql, params or ())
                    self.connection.commit()
                    return affected_rows
            except pymysql.Error as e:
                print(f"   ✗ 更新失败: {e}")
                self.connection.rollback()
                return 0
        
        def execute_many(self, sql, params_list):
            """批量执行"""
            try:
                with self.connection.cursor() as cursor:
                    affected_rows = cursor.executemany(sql, params_list)
                    self.connection.commit()
                    return affected_rows
            except pymysql.Error as e:
                print(f"   ✗ 批量执行失败: {e}")
                self.connection.rollback()
                return 0
    
    # 3. 模拟MySQL操作(如果无法连接真实数据库)
    print("\n3. MySQL操作演示:")
    
    def simulate_mysql_operations():
        """模拟MySQL操作"""
        print("   模拟MySQL数据库操作:")
        
        # 模拟创建表
        create_table_sql = '''
            CREATE TABLE IF NOT EXISTS employees (
                id INT AUTO_INCREMENT PRIMARY KEY,
                name VARCHAR(100) NOT NULL,
                email VARCHAR(100) UNIQUE NOT NULL,
                department VARCHAR(50),
                salary DECIMAL(10, 2),
                hire_date DATE,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        '''
        print("     ✓ 创建员工表")
        
        # 模拟插入数据
        employees_data = [
            ('张三', 'zhangsan@company.com', '技术部', 8000.00, '2023-01-15'),
            ('李四', 'lisi@company.com', '销售部', 6000.00, '2023-02-20'),
            ('王五', 'wangwu@company.com', '人事部', 7000.00, '2023-03-10'),
            ('赵六', 'zhaoliu@company.com', '技术部', 9000.00, '2023-04-05')
        ]
        
        insert_sql = '''
            INSERT INTO employees (name, email, department, salary, hire_date)
            VALUES (%s, %s, %s, %s, %s)
        '''
        print(f"     ✓ 插入 {len(employees_data)} 个员工")
        
        # 模拟查询操作
        queries = {
            "查询所有员工": "SELECT * FROM employees",
            "按部门查询": "SELECT * FROM employees WHERE department = '技术部'",
            "薪资统计": "SELECT department, AVG(salary) as avg_salary FROM employees GROUP BY department",
            "高薪员工": "SELECT name, salary FROM employees WHERE salary > 7000 ORDER BY salary DESC"
        }
        
        for desc, sql in queries.items():
            print(f"     • {desc}: {sql[:50]}...")
        
        # 模拟更新操作
        update_sql = "UPDATE employees SET salary = salary * 1.1 WHERE department = '技术部'"
        print("     ✓ 技术部员工加薪10%")
        
        # 模拟删除操作
        delete_sql = "DELETE FROM employees WHERE hire_date < '2023-02-01'"
        print("     ✓ 删除早期员工记录")
    
    # 尝试连接真实数据库,如果失败则模拟操作
    mysql_manager = MySQLManager(config)
    if mysql_manager.connect():
        # 真实数据库操作
        try:
            # 创建数据库(如果不存在)
            mysql_manager.execute_update("CREATE DATABASE IF NOT EXISTS test_db")
            mysql_manager.execute_update("USE test_db")
            
            # 执行实际操作...
            print("   执行真实MySQL操作")
            
        except Exception as e:
            print(f"   操作过程中出错: {e}")
        finally:
            mysql_manager.disconnect()
    else:
        # 模拟操作
        simulate_mysql_operations()

# 运行MySQL基础演示
mysql_basic_demo()

# 3.2 MySQL连接池和高级功能

# 连接池需要额外安装: pip install DBUtils
try:
    from DBUtils.PooledDB import PooledDB
    DBUTILS_AVAILABLE = True
except ImportError:
    DBUTILS_AVAILABLE = False

def mysql_advanced_demo():
    """MySQL高级功能演示"""
    print("=== MySQL高级功能演示 ===")
    
    # 1. 连接池管理
    print("\n1. 连接池管理:")
    
    class MySQLConnectionPool:
        """MySQL连接池管理器"""
        
        def __init__(self, config, pool_size=5):
            self.config = config
            self.pool_size = pool_size
            self.pool = None
            self._init_pool()
        
        def _init_pool(self):
            """初始化连接池"""
            if DBUTILS_AVAILABLE and PYMYSQL_AVAILABLE:
                try:
                    self.pool = PooledDB(
                        creator=pymysql,
                        maxconnections=self.pool_size,
                        mincached=2,
                        maxcached=5,
                        maxshared=3,
                        blocking=True,
                        maxusage=None,
                        setsession=[],
                        ping=0,
                        **self.config
                    )
                    print(f"   ✓ 初始化连接池,大小: {self.pool_size}")
                except Exception as e:
                    print(f"   ✗ 连接池初始化失败: {e}")
            else:
                print("   ⚠️ 缺少依赖,使用模拟连接池")
                self.pool = None
        
        def get_connection(self):
            """从连接池获取连接"""
            if self.pool:
                return self.pool.connection()
            else:
                # 模拟连接
                print("   获取模拟连接")
                return None
        
        def execute_transaction(self, operations):
            """执行事务"""
            conn = self.get_connection()
            if not conn:
                print("   ⚠️ 无法获取连接,模拟事务执行")
                return
            
            try:
                cursor = conn.cursor()
                
                # 开始事务
                conn.begin()
                
                for operation in operations:
                    sql = operation['sql']
                    params = operation.get('params', ())
                    cursor.execute(sql, params)
                
                # 提交事务
                conn.commit()
                print(f"   ✓ 事务执行成功,包含 {len(operations)} 个操作")
                
            except Exception as e:
                conn.rollback()
                print(f"   ✗ 事务执行失败,已回滚: {e}")
            finally:
                cursor.close()
                conn.close()
    
    # 2. 数据访问对象 (DAO) 模式
    print("\n2. 数据访问对象模式:")
    
    class UserDAO:
        """用户数据访问对象"""
        
        def __init__(self, connection_pool):
            self.pool = connection_pool
        
        def create_user(self, username, email, password):
            """创建用户"""
            sql = "INSERT INTO users (username, email, password) VALUES (%s, %s, %s)"
            params = (username, email, password)
            
            # 模拟执行
            print(f"   ✓ 创建用户: {username} ({email})")
            return True
        
        def get_user_by_id(self, user_id):
            """根据ID获取用户"""
            sql = "SELECT * FROM users WHERE id = %s"
            params = (user_id,)
            
            # 模拟返回数据
            user_data = {
                'id': user_id,
                'username': f'user_{user_id}',
                'email': f'user_{user_id}@example.com',
                'created_at': '2023-01-01 00:00:00'
            }
            
            print(f"   ✓ 获取用户: {user_data['username']}")
            return user_data
        
        def update_user(self, user_id, **kwargs):
            """更新用户信息"""
            if not kwargs:
                return False
            
            set_clause = ', '.join([f"{key} = %s" for key in kwargs.keys()])
            sql = f"UPDATE users SET {set_clause} WHERE id = %s"
            params = list(kwargs.values()) + [user_id]
            
            print(f"   ✓ 更新用户 {user_id}: {list(kwargs.keys())}")
            return True
        
        def delete_user(self, user_id):
            """删除用户"""
            sql = "DELETE FROM users WHERE id = %s"
            params = (user_id,)
            
            print(f"   ✓ 删除用户: {user_id}")
            return True
        
        def search_users(self, keyword, limit=10):
            """搜索用户"""
            sql = "SELECT * FROM users WHERE username LIKE %s OR email LIKE %s LIMIT %s"
            params = (f'%{keyword}%', f'%{keyword}%', limit)
            
            # 模拟搜索结果
            results = [
                {'id': i, 'username': f'{keyword}_user_{i}', 'email': f'{keyword}_{i}@example.com'}
                for i in range(1, min(limit + 1, 4))
            ]
            
            print(f"   ✓ 搜索用户 '{keyword}': 找到 {len(results)} 个结果")
            return results
    
    # 3. 测试连接池和DAO
    print("\n3. 测试连接池和DAO:")
    
    # 创建连接池
    config = {
        'host': 'localhost',
        'user': 'root',
        'password': 'password',
        'database': 'test_db',
        'charset': 'utf8mb4'
    }
    
    pool = MySQLConnectionPool(config, pool_size=10)
    
    # 创建DAO实例
    user_dao = UserDAO(pool)
    
    # 测试CRUD操作
    print("\n   CRUD操作测试:")
    
    # 创建用户
    user_dao.create_user('张三', 'zhangsan@example.com', 'password123')
    user_dao.create_user('李四', 'lisi@example.com', 'password456')
    
    # 查询用户
    user = user_dao.get_user_by_id(1)
    
    # 更新用户
    user_dao.update_user(1, email='zhangsan_new@example.com', username='张三_new')
    
    # 搜索用户
    results = user_dao.search_users('张')
    
    # 删除用户
    user_dao.delete_user(2)
    
    # 4. 批量操作和性能优化
    print("\n4. 批量操作和性能优化:")
    
    def batch_operations_demo():
        """批量操作演示"""
        
        # 批量插入
        batch_data = [
            ('用户1', 'user1@example.com', 'pass1'),
            ('用户2', 'user2@example.com', 'pass2'),
            ('用户3', 'user3@example.com', 'pass3'),
            ('用户4', 'user4@example.com', 'pass4'),
            ('用户5', 'user5@example.com', 'pass5')
        ]
        
        print(f"   ✓ 批量插入 {len(batch_data)} 个用户")
        
        # 分页查询
        page_size = 10
        page_num = 1
        offset = (page_num - 1) * page_size
        
        pagination_sql = f"SELECT * FROM users LIMIT {page_size} OFFSET {offset}"
        print(f"   ✓ 分页查询: 第{page_num}页,每页{page_size}条")
        
        # 索引优化建议
        index_suggestions = [
            "CREATE INDEX idx_users_email ON users(email)",
            "CREATE INDEX idx_users_username ON users(username)",
            "CREATE INDEX idx_users_created_at ON users(created_at)"
        ]
        
        print("   索引优化建议:")
        for suggestion in index_suggestions:
            print(f"     {suggestion}")
    
    batch_operations_demo()
    
    # 5. 数据库监控和统计
    print("\n5. 数据库监控:")
    
    def database_monitoring_demo():
        """数据库监控演示"""
        
        monitoring_queries = {
            "连接数统计": "SHOW STATUS LIKE 'Threads_connected'",
            "查询缓存命中率": "SHOW STATUS LIKE 'Qcache_hits'",
            "慢查询数量": "SHOW STATUS LIKE 'Slow_queries'",
            "表锁等待": "SHOW STATUS LIKE 'Table_locks_waited'",
            "InnoDB缓冲池": "SHOW STATUS LIKE 'Innodb_buffer_pool_read_requests'"
        }
        
        print("   监控查询:")
        for desc, query in monitoring_queries.items():
            print(f"     {desc}: {query}")
        
        # 性能分析
        performance_tips = [
            "使用EXPLAIN分析查询计划",
            "合理创建索引,避免过多索引",
            "使用连接池减少连接开销",
            "定期分析表统计信息",
            "监控慢查询日志",
            "优化数据库配置参数"
        ]
        
        print("\n   性能优化建议:")
        for tip in performance_tips:
            print(f"     • {tip}")
    
    database_monitoring_demo()

# 运行MySQL高级演示
mysql_advanced_demo()

# 4. ORM框架

# 4.1 SQLAlchemy基础

# 注意:需要安装 SQLAlchemy: pip install sqlalchemy
try:
    from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Text
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker, relationship
    from datetime import datetime
    SQLALCHEMY_AVAILABLE = True
except ImportError:
    SQLALCHEMY_AVAILABLE = False
    print("SQLAlchemy未安装,请运行: pip install sqlalchemy")

def sqlalchemy_basic_demo():
    """SQLAlchemy基础演示"""
    print("=== SQLAlchemy基础演示 ===")
    
    if not SQLALCHEMY_AVAILABLE:
        print("   ⚠️ SQLAlchemy模块未安装,跳过ORM演示")
        return
    
    # 1. 创建数据库引擎
    print("\n1. 创建数据库引擎:")
    
    # 使用SQLite内存数据库进行演示
    engine = create_engine('sqlite:///:memory:', echo=False)
    print("   ✓ 创建SQLite内存数据库引擎")
    
    # 2. 定义模型
    print("\n2. 定义ORM模型:")
    
    Base = declarative_base()
    
    class User(Base):
        """用户模型"""
        __tablename__ = 'users'
        
        id = Column(Integer, primary_key=True)
        username = Column(String(50), unique=True, nullable=False)
        email = Column(String(100), unique=True, nullable=False)
        created_at = Column(DateTime, default=datetime.utcnow)
        
        # 关系
        posts = relationship("Post", back_populates="author")
        
        def __repr__(self):
            return f"<User(username='{self.username}', email='{self.email}')>"
    
    class Post(Base):
        """文章模型"""
        __tablename__ = 'posts'
        
        id = Column(Integer, primary_key=True)
        title = Column(String(200), nullable=False)
        content = Column(Text)
        created_at = Column(DateTime, default=datetime.utcnow)
        author_id = Column(Integer, ForeignKey('users.id'))
        
        # 关系
        author = relationship("User", back_populates="posts")
        
        def __repr__(self):
            return f"<Post(title='{self.title}')>"
    
    print("   ✓ 定义User和Post模型")
    
    # 3. 创建表
    print("\n3. 创建数据表:")
    
    Base.metadata.create_all(engine)
    print("   ✓ 创建所有表")
    
    # 4. 创建会话
    print("\n4. 创建数据库会话:")
    
    Session = sessionmaker(bind=engine)
    session = Session()
    print("   ✓ 创建数据库会话")
    
    # 5. 插入数据
    print("\n5. 插入数据:")
    
    # 创建用户
    user1 = User(username='张三', email='zhangsan@example.com')
    user2 = User(username='李四', email='lisi@example.com')
    
    session.add(user1)
    session.add(user2)
    session.commit()
    print("   ✓ 创建用户")
    
    # 创建文章
    post1 = Post(title='Python学习笔记', content='今天学习了Python基础语法...', author=user1)
    post2 = Post(title='数据库设计', content='数据库设计的基本原则...', author=user1)
    post3 = Post(title='Web开发入门', content='Web开发的基础知识...', author=user2)
    
    session.add_all([post1, post2, post3])
    session.commit()
    print("   ✓ 创建文章")
    
    # 6. 查询数据
    print("\n6. 查询数据:")
    
    # 查询所有用户
    users = session.query(User).all()
    print(f"   所有用户 ({len(users)} 个):")
    for user in users:
        print(f"     {user}")
    
    # 条件查询
    user = session.query(User).filter(User.username == '张三').first()
    if user:
        print(f"\n   找到用户: {user}")
        print(f"   用户文章数: {len(user.posts)}")
        for post in user.posts:
            print(f"     • {post.title}")
    
    # 联表查询
    posts_with_authors = session.query(Post).join(User).all()
    print(f"\n   文章及作者 ({len(posts_with_authors)} 篇):")
    for post in posts_with_authors:
        print(f"     《{post.title}》 - {post.author.username}")
    
    # 7. 更新数据
    print("\n7. 更新数据:")
    
    user = session.query(User).filter(User.username == '张三').first()
    if user:
        user.email = 'zhangsan_new@example.com'
        session.commit()
        print(f"   ✓ 更新用户邮箱: {user.email}")
    
    # 8. 删除数据
    print("\n8. 删除数据:")
    
    post_to_delete = session.query(Post).filter(Post.title == 'Web开发入门').first()
    if post_to_delete:
        session.delete(post_to_delete)
        session.commit()
        print("   ✓ 删除文章: Web开发入门")
    
    # 9. 高级查询
    print("\n9. 高级查询:")
    
    # 聚合查询
    from sqlalchemy import func
    
    user_post_counts = session.query(
        User.username,
        func.count(Post.id).label('post_count')
    ).join(Post).group_by(User.id).all()
    
    print("   用户文章统计:")
    for username, count in user_post_counts:
        print(f"     {username}: {count} 篇文章")
    
    # 排序和限制
    recent_posts = session.query(Post).order_by(Post.created_at.desc()).limit(2).all()
    print(f"\n   最新文章 ({len(recent_posts)} 篇):")
    for post in recent_posts:
        print(f"     {post.title}")
    
    # 10. 关闭会话
    session.close()
    print("\n   ✓ 关闭数据库会话")

# 运行SQLAlchemy基础演示
sqlalchemy_basic_demo()

# 4.2 SQLAlchemy高级功能

def sqlalchemy_advanced_demo():
    """SQLAlchemy高级功能演示"""
    print("=== SQLAlchemy高级功能演示 ===")
    
    if not SQLALCHEMY_AVAILABLE:
        print("   ⚠️ SQLAlchemy模块未安装,跳过高级ORM演示")
        return
    
    # 1. 数据库连接池配置
    print("\n1. 连接池配置:")
    
    from sqlalchemy import create_engine
    from sqlalchemy.pool import QueuePool
    
    # 配置连接池
    engine = create_engine(
        'sqlite:///:memory:',
        poolclass=QueuePool,
        pool_size=10,
        max_overflow=20,
        pool_pre_ping=True,
        echo=False
    )
    print("   ✓ 配置连接池 (大小: 10, 最大溢出: 20)")
    
    # 2. 模型继承和混入
    print("\n2. 模型继承和混入:")
    
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, DateTime, Boolean
    from datetime import datetime
    
    Base = declarative_base()
    
    class TimestampMixin:
        """时间戳混入类"""
        created_at = Column(DateTime, default=datetime.utcnow)
        updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    class SoftDeleteMixin:
        """软删除混入类"""
        is_deleted = Column(Boolean, default=False)
        deleted_at = Column(DateTime)
    
    class BaseModel(Base, TimestampMixin, SoftDeleteMixin):
        """基础模型"""
        __abstract__ = True
        
        id = Column(Integer, primary_key=True)
    
    class Product(BaseModel):
        """产品模型"""
        __tablename__ = 'products'
        
        name = Column(String(100), nullable=False)
        price = Column(Integer)  # 以分为单位存储价格
        description = Column(String(500))
        
        def __repr__(self):
            return f"<Product(name='{self.name}', price={self.price})>"
    
    print("   ✓ 定义带混入的Product模型")
    
    # 3. 自定义查询类
    print("\n3. 自定义查询类:")
    
    from sqlalchemy.orm import Query
    
    class SoftDeleteQuery(Query):
        """支持软删除的查询类"""
        
        def filter_active(self):
            """过滤未删除的记录"""
            return self.filter(self.column_descriptions[0]['type'].is_deleted == False)
        
        def filter_deleted(self):
            """过滤已删除的记录"""
            return self.filter(self.column_descriptions[0]['type'].is_deleted == True)
    
    # 4. 事务管理
    print("\n4. 事务管理:")
    
    from sqlalchemy.orm import sessionmaker
    from contextlib import contextmanager
    
    Session = sessionmaker(bind=engine)
    
    @contextmanager
    def get_db_session():
        """数据库会话上下文管理器"""
        session = Session()
        try:
            yield session
            session.commit()
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.close()
    
    # 创建表
    Base.metadata.create_all(engine)
    
    # 使用事务
    try:
        with get_db_session() as session:
            # 批量创建产品
            products = [
                Product(name='笔记本电脑', price=599999, description='高性能笔记本'),
                Product(name='无线鼠标', price=19999, description='人体工学设计'),
                Product(name='机械键盘', price=39999, description='青轴机械键盘')
            ]
            
            session.add_all(products)
            print("   ✓ 批量创建产品(事务中)")
            
            # 模拟可能的错误
            # raise Exception("模拟错误")
            
    except Exception as e:
        print(f"   ✗ 事务失败: {e}")
    
    # 5. 查询优化
    print("\n5. 查询优化:")
    
    with get_db_session() as session:
        # 延迟加载 vs 立即加载
        from sqlalchemy.orm import joinedload, selectinload
        
        # 查询产品数量
        product_count = session.query(Product).filter(Product.is_deleted == False).count()
        print(f"   活跃产品数量: {product_count}")
        
        # 分页查询
        page_size = 2
        page_num = 1
        offset = (page_num - 1) * page_size
        
        products = session.query(Product).filter(
            Product.is_deleted == False
        ).offset(offset).limit(page_size).all()
        
        print(f"   分页查询结果 (第{page_num}页):")
        for product in products:
            print(f"     {product.name}: ¥{product.price/100:.2f}")
    
    # 6. 原生SQL查询
    print("\n6. 原生SQL查询:")
    
    with get_db_session() as session:
        # 执行原生SQL
        result = session.execute(
            "SELECT name, price FROM products WHERE is_deleted = 0 ORDER BY price DESC"
        )
        
        print("   价格排序(原生SQL):")
        for row in result:
            print(f"     {row[0]}: ¥{row[1]/100:.2f}")
    
    # 7. 数据验证和约束
    print("\n7. 数据验证:")
    
    from sqlalchemy.orm import validates
    
    class ValidatedProduct(BaseModel):
        """带验证的产品模型"""
        __tablename__ = 'validated_products'
        
        name = Column(String(100), nullable=False)
        price = Column(Integer)
        
        @validates('price')
        def validate_price(self, key, price):
            """价格验证"""
            if price is not None and price < 0:
                raise ValueError("价格不能为负数")
            return price
        
        @validates('name')
        def validate_name(self, key, name):
            """名称验证"""
            if not name or len(name.strip()) == 0:
                raise ValueError("产品名称不能为空")
            return name.strip()
    
    print("   ✓ 定义带验证的产品模型")
    
    # 8. 性能监控
    print("\n8. 性能监控:")
    
    import time
    
    class QueryTimer:
        """查询计时器"""
        
        def __init__(self, description):
            self.description = description
            self.start_time = None
        
        def __enter__(self):
            self.start_time = time.time()
            return self
        
        def __exit__(self, exc_type, exc_val, exc_tb):
            elapsed = time.time() - self.start_time
            print(f"   {self.description}: {elapsed*1000:.2f}ms")
    
    with get_db_session() as session:
        with QueryTimer("查询所有产品"):
            products = session.query(Product).all()
        
        with QueryTimer("统计产品数量"):
            count = session.query(Product).count()
    
    print("\n   ✓ SQLAlchemy高级功能演示完成")

# 运行SQLAlchemy高级演示
sqlalchemy_advanced_demo()

# 5. 数据库设计最佳实践

# 5.1 数据库设计原则

def database_design_demo():
    """数据库设计最佳实践演示"""
    print("=== 数据库设计最佳实践 ===")
    
    # 1. 数据库设计原则
    print("\n1. 数据库设计原则:")
    
    design_principles = {
        "第一范式 (1NF)": {
            "定义": "每个字段都是原子性的,不可再分",
            "示例": "将'姓名'字段拆分为'姓'和'名'",
            "好处": "避免数据冗余,便于查询和维护"
        },
        "第二范式 (2NF)": {
            "定义": "满足1NF,且非主键字段完全依赖于主键",
            "示例": "订单表中商品信息应该单独建表",
            "好处": "减少数据冗余,提高数据一致性"
        },
        "第三范式 (3NF)": {
            "定义": "满足2NF,且非主键字段不依赖于其他非主键字段",
            "示例": "员工表中部门信息应该单独建表",
            "好处": "进一步减少冗余,提高数据完整性"
        }
    }
    
    for principle, details in design_principles.items():
        print(f"\n   {principle}:")
        for key, value in details.items():
            print(f"     {key}: {value}")
    
    # 2. 命名规范
    print("\n2. 命名规范:")
    
    naming_conventions = {
        "表名": {
            "规则": "使用复数形式,小写字母,下划线分隔",
            "示例": "users, user_profiles, order_items",
            "避免": "User, userProfile, OrderItem"
        },
        "字段名": {
            "规则": "使用小写字母,下划线分隔,见名知意",
            "示例": "user_id, created_at, email_address",
            "避免": "userId, createdAt, email"
        },
        "索引名": {
            "规则": "idx_表名_字段名",
            "示例": "idx_users_email, idx_orders_created_at",
            "避免": "index1, user_index"
        },
        "外键名": {
            "规则": "fk_表名_引用表名",
            "示例": "fk_orders_users, fk_order_items_products",
            "避免": "foreign_key1, user_fk"
        }
    }
    
    for category, details in naming_conventions.items():
        print(f"\n   {category}:")
        for key, value in details.items():
            print(f"     {key}: {value}")
    
    # 3. 数据类型选择
    print("\n3. 数据类型选择:")
    
    data_type_guidelines = {
        "整数类型": {
            "TINYINT": "0-255,适用于状态、类型等小范围值",
            "INT": "适用于ID、数量等常规整数",
            "BIGINT": "适用于大数值,如时间戳、大ID"
        },
        "字符串类型": {
            "CHAR": "固定长度,适用于长度固定的字段如手机号",
            "VARCHAR": "可变长度,适用于姓名、邮箱等",
            "TEXT": "大文本,适用于文章内容、描述等"
        },
        "时间类型": {
            "DATE": "仅日期,如生日",
            "DATETIME": "日期和时间,如创建时间",
            "TIMESTAMP": "时间戳,自动更新"
        },
        "数值类型": {
            "DECIMAL": "精确小数,适用于金额",
            "FLOAT/DOUBLE": "浮点数,适用于科学计算"
        }
    }
    
    for category, types in data_type_guidelines.items():
        print(f"\n   {category}:")
        for type_name, usage in types.items():
            print(f"     {type_name}: {usage}")
    
    # 4. 索引设计
    print("\n4. 索引设计原则:")
    
    index_guidelines = [
        "为经常用于WHERE条件的字段创建索引",
        "为经常用于JOIN的字段创建索引",
        "为经常用于ORDER BY的字段创建索引",
        "避免在小表上创建过多索引",
        "避免在频繁更新的字段上创建索引",
        "考虑创建复合索引来优化多字段查询",
        "定期分析和优化索引使用情况"
    ]
    
    for i, guideline in enumerate(index_guidelines, 1):
        print(f"   {i}. {guideline}")
    
    # 5. 示例:电商系统数据库设计
    print("\n5. 电商系统数据库设计示例:")
    
    ecommerce_tables = {
        "users": {
            "字段": ["id", "username", "email", "password_hash", "phone", "created_at", "updated_at"],
            "索引": ["idx_users_email", "idx_users_username"],
            "说明": "用户基本信息表"
        },
        "user_profiles": {
            "字段": ["user_id", "first_name", "last_name", "birth_date", "gender", "avatar_url"],
            "索引": ["idx_user_profiles_user_id"],
            "说明": "用户详细信息表"
        },
        "categories": {
            "字段": ["id", "name", "parent_id", "sort_order", "is_active"],
            "索引": ["idx_categories_parent_id"],
            "说明": "商品分类表(支持层级)"
        },
        "products": {
            "字段": ["id", "name", "description", "price", "stock", "category_id", "created_at"],
            "索引": ["idx_products_category_id", "idx_products_price"],
            "说明": "商品信息表"
        },
        "orders": {
            "字段": ["id", "user_id", "total_amount", "status", "created_at", "updated_at"],
            "索引": ["idx_orders_user_id", "idx_orders_status", "idx_orders_created_at"],
            "说明": "订单主表"
        },
        "order_items": {
            "字段": ["id", "order_id", "product_id", "quantity", "price", "subtotal"],
            "索引": ["idx_order_items_order_id", "idx_order_items_product_id"],
            "说明": "订单明细表"
        }
    }
    
    for table_name, details in ecommerce_tables.items():
        print(f"\n   {table_name}:")
        print(f"     说明: {details['说明']}")
        print(f"     字段: {', '.join(details['字段'])}")
        print(f"     索引: {', '.join(details['索引'])}")

# 运行数据库设计演示
database_design_demo()

# 5.2 性能优化策略

def database_performance_demo():
    """数据库性能优化演示"""
    print("=== 数据库性能优化策略 ===")
    
    # 1. 查询优化
    print("\n1. 查询优化策略:")
    
    query_optimization = {
        "使用索引": {
            "原则": "为WHERE、JOIN、ORDER BY字段创建合适的索引",
            "示例": "CREATE INDEX idx_users_email ON users(email)",
            "注意": "避免过多索引,影响写入性能"
        },
        "避免SELECT *": {
            "原则": "只查询需要的字段",
            "好例子": "SELECT id, name FROM users",
            "坏例子": "SELECT * FROM users"
        },
        "使用LIMIT": {
            "原则": "限制查询结果数量",
            "示例": "SELECT * FROM products LIMIT 10 OFFSET 20",
            "好处": "减少内存使用和网络传输"
        },
        "优化JOIN": {
            "原则": "使用合适的JOIN类型,确保JOIN字段有索引",
            "示例": "SELECT u.name, p.title FROM users u INNER JOIN posts p ON u.id = p.user_id",
            "注意": "避免不必要的JOIN操作"
        }
    }
    
    for strategy, details in query_optimization.items():
        print(f"\n   {strategy}:")
        for key, value in details.items():
            print(f"     {key}: {value}")
    
    # 2. 索引优化
    print("\n2. 索引优化技巧:")
    
    index_optimization = [
        "复合索引:将多个常一起查询的字段组合成复合索引",
        "前缀索引:对于长字符串字段,使用前缀索引节省空间",
        "覆盖索引:让索引包含查询所需的所有字段",
        "部分索引:只为满足特定条件的行创建索引",
        "定期维护:使用ANALYZE TABLE更新索引统计信息"
    ]
    
    for i, tip in enumerate(index_optimization, 1):
        print(f"   {i}. {tip}")
    
    # 3. 连接池优化
    print("\n3. 连接池优化:")
    
    connection_pool_tips = {
        "合理设置池大小": "根据应用并发量设置合适的连接池大小",
        "连接超时设置": "设置合理的连接超时和空闲超时时间",
        "连接验证": "启用连接验证,确保连接可用性",
        "监控连接使用": "监控连接池使用情况,及时调整配置"
    }
    
    for tip, description in connection_pool_tips.items():
        print(f"   • {tip}: {description}")
    
    # 4. 缓存策略
    print("\n4. 缓存策略:")
    
    caching_strategies = {
        "查询结果缓存": {
            "适用场景": "频繁查询且数据变化不频繁",
            "实现方式": "Redis、Memcached",
            "注意事项": "设置合理的过期时间"
        },
        "对象缓存": {
            "适用场景": "复杂对象的构建成本较高",
            "实现方式": "应用层缓存",
            "注意事项": "注意缓存一致性"
        },
        "页面缓存": {
            "适用场景": "静态或半静态页面",
            "实现方式": "CDN、反向代理",
            "注意事项": "处理动态内容"
        }
    }
    
    for strategy, details in caching_strategies.items():
        print(f"\n   {strategy}:")
        for key, value in details.items():
            print(f"     {key}: {value}")
    
    # 5. 分库分表策略
    print("\n5. 分库分表策略:")
    
    sharding_strategies = {
        "垂直分库": "按业务模块分离数据库",
        "水平分库": "按数据量分离数据库",
        "垂直分表": "按字段使用频率分离表",
        "水平分表": "按数据量或时间分离表"
    }
    
    for strategy, description in sharding_strategies.items():
        print(f"   • {strategy}: {description}")
    
    # 6. 监控和诊断
    print("\n6. 性能监控指标:")
    
    monitoring_metrics = [
        "查询响应时间",
        "慢查询日志",
        "连接数使用情况",
        "缓存命中率",
        "磁盘I/O使用率",
        "CPU和内存使用率",
        "锁等待时间",
        "死锁检测"
    ]
    
    for metric in monitoring_metrics:
        print(f"   • {metric}")
    
    # 7. 性能测试示例
    print("\n7. 性能测试示例:")
    
    performance_test_example = '''
    # 使用Python进行简单的性能测试
    import time
    import sqlite3
    
    def performance_test():
        conn = sqlite3.connect(':memory:')
        cursor = conn.cursor()
        
        # 创建测试表
        cursor.execute('''
            CREATE TABLE test_table (
                id INTEGER PRIMARY KEY,
                name TEXT,
                value INTEGER
            )
        ''')
        
        # 测试插入性能
        start_time = time.time()
        for i in range(10000):
            cursor.execute("INSERT INTO test_table (name, value) VALUES (?, ?)", 
                         (f'name_{i}', i))
        conn.commit()
        insert_time = time.time() - start_time
        
        # 测试查询性能
        start_time = time.time()
        cursor.execute("SELECT * FROM test_table WHERE value > 5000")
        results = cursor.fetchall()
        query_time = time.time() - start_time
        
        print(f"插入10000条记录耗时: {insert_time:.3f}秒")
        print(f"查询耗时: {query_time:.3f}秒")
        print(f"查询结果数量: {len(results)}")
        
        conn.close()
    '''
    
    print("   性能测试代码示例:")
    print(performance_test_example)

# 运行数据库性能优化演示
database_performance_demo()

# 6. 实际应用案例

# 6.1 用户管理系统

def user_management_system_demo():
    """用户管理系统演示"""
    print("=== 用户管理系统演示 ===")
    
    import sqlite3
    import hashlib
    import datetime
    import json
    
    # 1. 数据库初始化
    print("\n1. 初始化用户管理数据库:")
    
    conn = sqlite3.connect(':memory:')
    cursor = conn.cursor()
    
    # 创建用户表
    cursor.execute('''
        CREATE TABLE users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            username VARCHAR(50) UNIQUE NOT NULL,
            email VARCHAR(100) UNIQUE NOT NULL,
            password_hash VARCHAR(64) NOT NULL,
            salt VARCHAR(32) NOT NULL,
            status INTEGER DEFAULT 1,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
            updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
            last_login DATETIME,
            login_count INTEGER DEFAULT 0
        )
    ''')
    
    # 创建用户资料表
    cursor.execute('''
        CREATE TABLE user_profiles (
            user_id INTEGER PRIMARY KEY,
            first_name VARCHAR(50),
            last_name VARCHAR(50),
            phone VARCHAR(20),
            birth_date DATE,
            avatar_url VARCHAR(200),
            bio TEXT,
            FOREIGN KEY (user_id) REFERENCES users(id)
        )
    ''')
    
    # 创建登录日志表
    cursor.execute('''
        CREATE TABLE login_logs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id INTEGER,
            ip_address VARCHAR(45),
            user_agent TEXT,
            login_time DATETIME DEFAULT CURRENT_TIMESTAMP,
            success INTEGER DEFAULT 1,
            FOREIGN KEY (user_id) REFERENCES users(id)
        )
    ''')
    
    # 创建索引
    cursor.execute('CREATE INDEX idx_users_email ON users(email)')
    cursor.execute('CREATE INDEX idx_users_username ON users(username)')
    cursor.execute('CREATE INDEX idx_login_logs_user_id ON login_logs(user_id)')
    cursor.execute('CREATE INDEX idx_login_logs_time ON login_logs(login_time)')
    
    print("   ✓ 创建用户管理相关表和索引")
    
    # 2. 用户注册功能
    print("\n2. 用户注册功能:")
    
    def register_user(username, email, password, first_name=None, last_name=None):
        """用户注册"""
        try:
            # 生成盐值和密码哈希
            import secrets
            salt = secrets.token_hex(16)
            password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
            
            # 插入用户基本信息
            cursor.execute('''
                INSERT INTO users (username, email, password_hash, salt)
                VALUES (?, ?, ?, ?)
            ''', (username, email, password_hash, salt))
            
            user_id = cursor.lastrowid
            
            # 插入用户资料
            cursor.execute('''
                INSERT INTO user_profiles (user_id, first_name, last_name)
                VALUES (?, ?, ?)
            ''', (user_id, first_name, last_name))
            
            conn.commit()
            return {'success': True, 'user_id': user_id, 'message': '注册成功'}
            
        except sqlite3.IntegrityError as e:
            conn.rollback()
            if 'username' in str(e):
                return {'success': False, 'message': '用户名已存在'}
            elif 'email' in str(e):
                return {'success': False, 'message': '邮箱已被注册'}
            else:
                return {'success': False, 'message': '注册失败'}
        except Exception as e:
            conn.rollback()
            return {'success': False, 'message': f'系统错误: {str(e)}'}
    
    # 注册测试用户
    users_to_register = [
        ('张三', 'zhangsan@example.com', 'password123', '三', '张'),
        ('李四', 'lisi@example.com', 'password456', '四', '李'),
        ('王五', 'wangwu@example.com', 'password789', '五', '王')
    ]
    
    for username, email, password, first_name, last_name in users_to_register:
        result = register_user(username, email, password, first_name, last_name)
        print(f"   注册用户 {username}: {result['message']}")
    
    # 3. 用户登录功能
    print("\n3. 用户登录功能:")
    
    def login_user(username_or_email, password, ip_address='127.0.0.1', user_agent='Python Client'):
        """用户登录"""
        try:
            # 查找用户
            cursor.execute('''
                SELECT id, username, email, password_hash, salt, status
                FROM users
                WHERE username = ? OR email = ?
            ''', (username_or_email, username_or_email))
            
            user = cursor.fetchone()
            
            if not user:
                # 记录失败登录
                cursor.execute('''
                    INSERT INTO login_logs (user_id, ip_address, user_agent, success)
                    VALUES (NULL, ?, ?, 0)
                ''', (ip_address, user_agent))
                conn.commit()
                return {'success': False, 'message': '用户不存在'}
            
            user_id, username, email, stored_hash, salt, status = user
            
            # 检查用户状态
            if status != 1:
                return {'success': False, 'message': '账户已被禁用'}
            
            # 验证密码
            password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
            
            if password_hash != stored_hash:
                # 记录失败登录
                cursor.execute('''
                    INSERT INTO login_logs (user_id, ip_address, user_agent, success)
                    VALUES (?, ?, ?, 0)
                ''', (user_id, ip_address, user_agent))
                conn.commit()
                return {'success': False, 'message': '密码错误'}
            
            # 更新用户登录信息
            cursor.execute('''
                UPDATE users
                SET last_login = CURRENT_TIMESTAMP,
                    login_count = login_count + 1,
                    updated_at = CURRENT_TIMESTAMP
                WHERE id = ?
            ''', (user_id,))
            
            # 记录成功登录
            cursor.execute('''
                INSERT INTO login_logs (user_id, ip_address, user_agent, success)
                VALUES (?, ?, ?, 1)
            ''', (user_id, ip_address, user_agent))
            
            conn.commit()
            
            return {
                'success': True,
                'message': '登录成功',
                'user': {
                    'id': user_id,
                    'username': username,
                    'email': email
                }
            }
            
        except Exception as e:
            conn.rollback()
            return {'success': False, 'message': f'登录失败: {str(e)}'}
    
    # 测试登录
    login_tests = [
        ('张三', 'password123'),
        ('lisi@example.com', 'password456'),
        ('王五', 'wrongpassword'),
        ('nonexistent', 'password')
    ]
    
    for username, password in login_tests:
        result = login_user(username, password)
        print(f"   登录测试 {username}: {result['message']}")
    
    # 4. 用户信息管理
    print("\n4. 用户信息管理:")
    
    def get_user_profile(user_id):
        """获取用户完整信息"""
        cursor.execute('''
            SELECT u.id, u.username, u.email, u.status, u.created_at,
                   u.last_login, u.login_count,
                   p.first_name, p.last_name, p.phone, p.birth_date, p.bio
            FROM users u
            LEFT JOIN user_profiles p ON u.id = p.user_id
            WHERE u.id = ?
        ''', (user_id,))
        
        result = cursor.fetchone()
        if result:
            return {
                'id': result[0],
                'username': result[1],
                'email': result[2],
                'status': result[3],
                'created_at': result[4],
                'last_login': result[5],
                'login_count': result[6],
                'first_name': result[7],
                'last_name': result[8],
                'phone': result[9],
                'birth_date': result[10],
                'bio': result[11]
            }
        return None
    
    def update_user_profile(user_id, **kwargs):
        """更新用户资料"""
        try:
            # 更新用户基本信息
            if 'email' in kwargs:
                cursor.execute('''
                    UPDATE users SET email = ?, updated_at = CURRENT_TIMESTAMP
                    WHERE id = ?
                ''', (kwargs['email'], user_id))
            
            # 更新用户详细资料
            profile_fields = ['first_name', 'last_name', 'phone', 'birth_date', 'bio']
            profile_updates = {k: v for k, v in kwargs.items() if k in profile_fields}
            
            if profile_updates:
                set_clause = ', '.join([f'{k} = ?' for k in profile_updates.keys()])
                values = list(profile_updates.values()) + [user_id]
                
                cursor.execute(f'''
                    UPDATE user_profiles SET {set_clause}
                    WHERE user_id = ?
                ''', values)
            
            conn.commit()
            return {'success': True, 'message': '资料更新成功'}
            
        except Exception as e:
            conn.rollback()
            return {'success': False, 'message': f'更新失败: {str(e)}'}
    
    # 测试用户信息管理
    user_profile = get_user_profile(1)
    if user_profile:
        print(f"   用户信息: {user_profile['username']} ({user_profile['email']})")
        print(f"   登录次数: {user_profile['login_count']}")
    
    # 更新用户资料
    update_result = update_user_profile(1, phone='13800138000', bio='Python开发者')
    print(f"   更新资料: {update_result['message']}")
    
    # 5. 统计分析
    print("\n5. 用户统计分析:")
    
    def get_user_statistics():
        """获取用户统计信息"""
        stats = {}
        
        # 总用户数
        cursor.execute('SELECT COUNT(*) FROM users')
        stats['total_users'] = cursor.fetchone()[0]
        
        # 活跃用户数(状态为1)
        cursor.execute('SELECT COUNT(*) FROM users WHERE status = 1')
        stats['active_users'] = cursor.fetchone()[0]
        
        # 今日登录用户数
        cursor.execute('''
            SELECT COUNT(DISTINCT user_id)
            FROM login_logs
            WHERE DATE(login_time) = DATE('now') AND success = 1
        ''')
        stats['today_logins'] = cursor.fetchone()[0]
        
        # 平均登录次数
        cursor.execute('SELECT AVG(login_count) FROM users WHERE login_count > 0')
        avg_logins = cursor.fetchone()[0]
        stats['avg_login_count'] = round(avg_logins, 2) if avg_logins else 0
        
        # 最近注册的用户
        cursor.execute('''
            SELECT username, created_at
            FROM users
            ORDER BY created_at DESC
            LIMIT 3
        ''')
        stats['recent_users'] = cursor.fetchall()
        
        return stats
    
    stats = get_user_statistics()
    print(f"   总用户数: {stats['total_users']}")
    print(f"   活跃用户数: {stats['active_users']}")
    print(f"   今日登录: {stats['today_logins']}")
    print(f"   平均登录次数: {stats['avg_login_count']}")
    print("   最近注册用户:")
    for username, created_at in stats['recent_users']:
        print(f"     {username} - {created_at}")
    
    # 6. 清理资源
    conn.close()
    print("\n   ✓ 用户管理系统演示完成")

# 运行用户管理系统演示
user_management_system_demo()

# 6.2 数据分析系统

def data_analysis_system_demo():
    """数据分析系统演示"""
    print("=== 数据分析系统演示 ===")
    
    import sqlite3
    import random
    import datetime
    from datetime import timedelta
    
    # 1. 创建销售数据分析数据库
    print("\n1. 创建销售数据分析数据库:")
    
    conn = sqlite3.connect(':memory:')
    cursor = conn.cursor()
    
    # 创建产品表
    cursor.execute('''
        CREATE TABLE products (
            id INTEGER PRIMARY KEY,
            name VARCHAR(100) NOT NULL,
            category VARCHAR(50),
            price DECIMAL(10,2),
            cost DECIMAL(10,2),
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    
    # 创建销售记录表
    cursor.execute('''
        CREATE TABLE sales (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_id INTEGER,
            quantity INTEGER,
            unit_price DECIMAL(10,2),
            total_amount DECIMAL(10,2),
            sale_date DATE,
            customer_id INTEGER,
            region VARCHAR(50),
            FOREIGN KEY (product_id) REFERENCES products(id)
        )
    ''')
    
    # 创建客户表
    cursor.execute('''
        CREATE TABLE customers (
            id INTEGER PRIMARY KEY,
            name VARCHAR(100),
            email VARCHAR(100),
            region VARCHAR(50),
            registration_date DATE
        )
    ''')
    
    # 创建索引
    cursor.execute('CREATE INDEX idx_sales_date ON sales(sale_date)')
    cursor.execute('CREATE INDEX idx_sales_product ON sales(product_id)')
    cursor.execute('CREATE INDEX idx_sales_customer ON sales(customer_id)')
    cursor.execute('CREATE INDEX idx_sales_region ON sales(region)')
    
    print("   ✓ 创建销售分析相关表和索引")
    
    # 2. 生成测试数据
    print("\n2. 生成测试数据:")
    
    # 插入产品数据
    products = [
        (1, '笔记本电脑', '电子产品', 5999.00, 4500.00),
        (2, '无线鼠标', '电子产品', 199.00, 120.00),
        (3, '机械键盘', '电子产品', 399.00, 250.00),
        (4, '显示器', '电子产品', 1299.00, 900.00),
        (5, '办公椅', '办公用品', 899.00, 600.00),
        (6, '办公桌', '办公用品', 1599.00, 1000.00)
    ]
    
    cursor.executemany('''
        INSERT INTO products (id, name, category, price, cost)
        VALUES (?, ?, ?, ?, ?)
    ''', products)
    
    # 插入客户数据
    customers = [
        (1, '张三公司', 'zhangsan@company.com', '北京', '2023-01-15'),
        (2, '李四企业', 'lisi@enterprise.com', '上海', '2023-02-20'),
        (3, '王五集团', 'wangwu@group.com', '广州', '2023-03-10'),
        (4, '赵六科技', 'zhaoliu@tech.com', '深圳', '2023-04-05'),
        (5, '钱七贸易', 'qianqi@trade.com', '杭州', '2023-05-12')
    ]
    
    cursor.executemany('''
        INSERT INTO customers (id, name, email, region, registration_date)
        VALUES (?, ?, ?, ?, ?)
    ''', customers)
    
    # 生成销售数据
    regions = ['北京', '上海', '广州', '深圳', '杭州']
    start_date = datetime.date(2023, 1, 1)
    end_date = datetime.date(2023, 12, 31)
    
    sales_data = []
    for _ in range(500):  # 生成500条销售记录
        product_id = random.randint(1, 6)
        customer_id = random.randint(1, 5)
        quantity = random.randint(1, 10)
        
        # 获取产品价格
        cursor.execute('SELECT price FROM products WHERE id = ?', (product_id,))
        unit_price = cursor.fetchone()[0]
        
        # 添加一些价格波动
        unit_price = float(unit_price) * random.uniform(0.9, 1.1)
        total_amount = unit_price * quantity
        
        # 随机日期
        days_diff = (end_date - start_date).days
        random_days = random.randint(0, days_diff)
        sale_date = start_date + timedelta(days=random_days)
        
        region = random.choice(regions)
        
        sales_data.append((
            product_id, quantity, unit_price, total_amount,
            sale_date.strftime('%Y-%m-%d'), customer_id, region
        ))
    
    cursor.executemany('''
        INSERT INTO sales (product_id, quantity, unit_price, total_amount, sale_date, customer_id, region)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    ''', sales_data)
    
    conn.commit()
    print(f"   ✓ 生成 {len(products)} 个产品, {len(customers)} 个客户, {len(sales_data)} 条销售记录")
    
    # 3. 销售数据分析
    print("\n3. 销售数据分析:")
    
    # 总销售额
    cursor.execute('SELECT SUM(total_amount) FROM sales')
    total_sales = cursor.fetchone()[0]
    print(f"   总销售额: ¥{total_sales:,.2f}")
    
    # 按月份统计销售额
    cursor.execute('''
        SELECT strftime('%Y-%m', sale_date) as month,
               SUM(total_amount) as monthly_sales,
               COUNT(*) as order_count
        FROM sales
        GROUP BY strftime('%Y-%m', sale_date)
        ORDER BY month
    ''')
    
    print("\n   月度销售统计:")
    monthly_data = cursor.fetchall()
    for month, sales, orders in monthly_data[:6]:  # 显示前6个月
        print(f"     {month}: ¥{sales:,.2f} ({orders} 笔订单)")
    
    # 按产品类别统计
    cursor.execute('''
        SELECT p.category,
               SUM(s.total_amount) as category_sales,
               SUM(s.quantity) as total_quantity,
               COUNT(s.id) as order_count
        FROM sales s
        JOIN products p ON s.product_id = p.id
        GROUP BY p.category
        ORDER BY category_sales DESC
    ''')
    
    print("\n   产品类别销售统计:")
    for category, sales, quantity, orders in cursor.fetchall():
        print(f"     {category}: ¥{sales:,.2f} (数量: {quantity}, 订单: {orders})")
    
    # 按地区统计
    cursor.execute('''
        SELECT region,
               SUM(total_amount) as region_sales,
               COUNT(*) as order_count,
               AVG(total_amount) as avg_order_value
        FROM sales
        GROUP BY region
        ORDER BY region_sales DESC
    ''')
    
    print("\n   地区销售统计:")
    for region, sales, orders, avg_value in cursor.fetchall():
        print(f"     {region}: ¥{sales:,.2f} ({orders} 笔, 平均: ¥{avg_value:.2f})")
    
    # 4. 高级分析查询
    print("\n4. 高级分析查询:")
    
    # 最畅销产品
    cursor.execute('''
        SELECT p.name,
               SUM(s.quantity) as total_sold,
               SUM(s.total_amount) as total_revenue,
               COUNT(s.id) as order_count
        FROM sales s
        JOIN products p ON s.product_id = p.id
        GROUP BY p.id, p.name
        ORDER BY total_sold DESC
        LIMIT 3
    ''')
    
    print("   最畅销产品 TOP 3:")
    for name, sold, revenue, orders in cursor.fetchall():
        print(f"     {name}: 销量 {sold}, 收入 ¥{revenue:,.2f}, 订单 {orders}")
    
    # 客户价值分析
    cursor.execute('''
        SELECT c.name,
               SUM(s.total_amount) as customer_value,
               COUNT(s.id) as order_count,
               AVG(s.total_amount) as avg_order_value,
               MAX(s.sale_date) as last_purchase
        FROM sales s
        JOIN customers c ON s.customer_id = c.id
        GROUP BY c.id, c.name
        ORDER BY customer_value DESC
        LIMIT 3
    ''')
    
    print("\n   高价值客户 TOP 3:")
    for name, value, orders, avg_value, last_purchase in cursor.fetchall():
        print(f"     {name}: 总价值 ¥{value:,.2f}, {orders} 笔订单, 最后购买 {last_purchase}")
    
    # 季度趋势分析
    cursor.execute('''
        SELECT 
            CASE 
                WHEN strftime('%m', sale_date) IN ('01','02','03') THEN 'Q1'
                WHEN strftime('%m', sale_date) IN ('04','05','06') THEN 'Q2'
                WHEN strftime('%m', sale_date) IN ('07','08','09') THEN 'Q3'
                ELSE 'Q4'
            END as quarter,
            SUM(total_amount) as quarterly_sales,
            COUNT(*) as order_count
        FROM sales
        WHERE strftime('%Y', sale_date) = '2023'
        GROUP BY quarter
        ORDER BY quarter
    ''')
    
    print("\n   2023年季度销售趋势:")
    for quarter, sales, orders in cursor.fetchall():
        print(f"     {quarter}: ¥{sales:,.2f} ({orders} 笔订单)")
    
    # 5. 利润分析
    print("\n5. 利润分析:")
    
    cursor.execute('''
        SELECT p.name,
               SUM(s.quantity) as total_sold,
               SUM(s.total_amount) as revenue,
               SUM(s.quantity * p.cost) as total_cost,
               SUM(s.total_amount) - SUM(s.quantity * p.cost) as profit,
               (SUM(s.total_amount) - SUM(s.quantity * p.cost)) / SUM(s.total_amount) * 100 as profit_margin
        FROM sales s
        JOIN products p ON s.product_id = p.id
        GROUP BY p.id, p.name
        ORDER BY profit DESC
    ''')
    
    print("   产品利润分析:")
    for name, sold, revenue, cost, profit, margin in cursor.fetchall():
        print(f"     {name}: 利润 ¥{profit:,.2f}, 利润率 {margin:.1f}%")
    
    # 6. 清理资源
    conn.close()
    print("\n   ✓ 数据分析系统演示完成")

# 运行数据分析系统演示
data_analysis_system_demo()

# 7. 最佳实践和安全

# 7.1 数据库安全最佳实践

def database_security_demo():
    """数据库安全最佳实践演示"""
    print("=== 数据库安全最佳实践 ===")
    
    # 1. SQL注入防护
    print("\n1. SQL注入防护:")
    
    import sqlite3
    
    conn = sqlite3.connect(':memory:')
    cursor = conn.cursor()
    
    # 创建测试表
    cursor.execute('''
        CREATE TABLE users (
            id INTEGER PRIMARY KEY,
            username VARCHAR(50),
            password VARCHAR(100)
        )
    ''')
    
    cursor.execute("INSERT INTO users (username, password) VALUES ('admin', 'secret123')")
    cursor.execute("INSERT INTO users (username, password) VALUES ('user1', 'password456')")
    conn.commit()
    
    # 错误的做法(容易SQL注入)
    def unsafe_login(username, password):
        """不安全的登录方法(仅用于演示)"""
        query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
        print(f"   ⚠️ 不安全的查询: {query}")
        # 注意:这里只是演示,实际不执行
        return "不安全的查询方式"
    
    # 正确的做法(使用参数化查询)
    def safe_login(username, password):
        """安全的登录方法"""
        cursor.execute("SELECT * FROM users WHERE username = ? AND password = ?", (username, password))
        result = cursor.fetchone()
        return result is not None
    
    print("   安全登录演示:")
    
    # 正常登录
    if safe_login('admin', 'secret123'):
        print("     ✓ 正常登录成功")
    
    # 模拟SQL注入尝试
    malicious_input = "admin'; DROP TABLE users; --"
    unsafe_login(malicious_input, "anything")
    print("     ✓ 参数化查询防止了SQL注入")
    
    # 2. 密码安全
    print("\n2. 密码安全处理:")
    
    import hashlib
    import secrets
    import bcrypt
    
    def hash_password_simple(password):
        """简单的密码哈希(不推荐用于生产)"""
        salt = secrets.token_hex(16)
        password_hash = hashlib.sha256((password + salt).encode()).hexdigest()
        return salt, password_hash
    
    def hash_password_bcrypt(password):
        """使用bcrypt哈希密码(推荐)"""
        try:
            # 生成盐并哈希密码
            salt = bcrypt.gensalt()
            password_hash = bcrypt.hashpw(password.encode('utf-8'), salt)
            return password_hash.decode('utf-8')
        except ImportError:
            print("     ⚠️ bcrypt未安装,使用简单哈希方法")
            return hash_password_simple(password)
    
    def verify_password_bcrypt(password, hashed):
        """验证bcrypt哈希密码"""
        try:
            return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
        except ImportError:
            return False
    
    # 演示密码哈希
    test_password = "mySecurePassword123!"
    
    # 简单哈希方法
    salt, simple_hash = hash_password_simple(test_password)
    print(f"   简单哈希: {simple_hash[:20]}...")
    
    # bcrypt方法(如果可用)
    try:
        bcrypt_hash = hash_password_bcrypt(test_password)
        print(f"   bcrypt哈希: {bcrypt_hash[:20]}...")
        
        # 验证密码
        if verify_password_bcrypt(test_password, bcrypt_hash):
            print("     ✓ 密码验证成功")
    except:
        print("     ⚠️ bcrypt不可用,建议安装: pip install bcrypt")
    
    # 3. 数据库连接安全
    print("\n3. 数据库连接安全:")
    
    connection_security_tips = {
        "使用SSL/TLS": "确保数据库连接使用加密传输",
        "最小权限原则": "为应用程序创建专用数据库用户,只授予必要权限",
        "连接字符串保护": "不要在代码中硬编码数据库凭据",
        "连接池配置": "合理配置连接池,避免连接泄露",
        "网络隔离": "将数据库服务器放在私有网络中",
        "定期更新": "保持数据库软件和驱动程序最新"
    }
    
    for tip, description in connection_security_tips.items():
        print(f"   • {tip}: {description}")
    
    # 4. 数据加密
    print("\n4. 敏感数据加密:")
    
    from cryptography.fernet import Fernet
    
    try:
        # 生成加密密钥
        key = Fernet.generate_key()
        cipher_suite = Fernet(key)
        
        # 加密敏感数据
        sensitive_data = "用户身份证号: 123456789012345678"
        encrypted_data = cipher_suite.encrypt(sensitive_data.encode())
        
        print(f"   原始数据: {sensitive_data}")
        print(f"   加密数据: {encrypted_data[:30]}...")
        
        # 解密数据
        decrypted_data = cipher_suite.decrypt(encrypted_data).decode()
        print(f"   解密数据: {decrypted_data}")
        print("     ✓ 数据加密/解密成功")
        
    except ImportError:
        print("     ⚠️ cryptography库未安装,建议安装: pip install cryptography")
        
        # 使用简单的base64编码作为演示(不安全)
        import base64
        sensitive_data = "演示数据"
        encoded = base64.b64encode(sensitive_data.encode()).decode()
        decoded = base64.b64decode(encoded).decode()
        print(f"   Base64编码演示: {encoded}")
        print("     ⚠️ Base64不是加密,仅用于演示")
    
    # 5. 审计和日志
    print("\n5. 数据库审计和日志:")
    
    import logging
    from datetime import datetime
    
    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    
    def log_database_operation(operation, table, user_id=None, details=None):
        """记录数据库操作日志"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'operation': operation,
            'table': table,
            'user_id': user_id,
            'details': details
        }
        logging.info(f"DB Operation: {log_entry}")
    
    # 演示日志记录
    log_database_operation('SELECT', 'users', user_id=1, details='Login attempt')
    log_database_operation('UPDATE', 'user_profiles', user_id=1, details='Profile update')
    log_database_operation('DELETE', 'sessions', user_id=1, details='Logout')
    
    print("     ✓ 数据库操作日志记录完成")
    
    # 6. 备份和恢复
    print("\n6. 数据备份策略:")
    
    backup_strategies = {
        "定期备份": "设置自动化的定期备份任务",
        "增量备份": "结合全量备份和增量备份",
        "异地备份": "将备份存储在不同的地理位置",
        "备份测试": "定期测试备份的完整性和可恢复性",
        "版本控制": "保留多个备份版本",
        "加密备份": "对备份文件进行加密保护"
    }
    
    for strategy, description in backup_strategies.items():
        print(f"   • {strategy}: {description}")
    
    # 简单的SQLite备份演示
    def backup_sqlite_database(source_db, backup_path):
        """SQLite数据库备份"""
        try:
            import shutil
            shutil.copy2(source_db, backup_path)
            return True
        except Exception as e:
            print(f"备份失败: {e}")
            return False
    
    print("     ✓ 数据库备份策略说明完成")
    
    # 清理资源
    conn.close()
    print("\n   ✓ 数据库安全最佳实践演示完成")

# 运行数据库安全演示
database_security_demo()

# 8. 学习建议和总结

# 8.1 学习路径

def learning_path_guide():
    """数据库学习路径指南"""
    print("=== Python数据库编程学习路径 ===")
    
    learning_stages = {
        "初级阶段 (1-2周)": {
            "目标": "掌握基础数据库操作",
            "内容": [
                "理解数据库基本概念",
                "学习SQL基础语法",
                "掌握Python DB-API 2.0规范",
                "练习SQLite基础操作",
                "学习CRUD操作"
            ],
            "实践项目": "简单的联系人管理系统"
        },
        "中级阶段 (2-3周)": {
            "目标": "掌握高级数据库功能",
            "内容": [
                "学习事务处理",
                "掌握连接池使用",
                "理解索引和查询优化",
                "学习MySQL/PostgreSQL操作",
                "掌握数据库设计原则"
            ],
            "实践项目": "博客系统或电商系统"
        },
        "高级阶段 (3-4周)": {
            "目标": "掌握ORM和高级特性",
            "内容": [
                "学习SQLAlchemy ORM",
                "掌握数据库迁移",
                "学习性能优化技巧",
                "理解数据库安全",
                "掌握分布式数据库概念"
            ],
            "实践项目": "完整的Web应用后端"
        },
        "专家阶段 (持续学习)": {
            "目标": "深入理解数据库架构",
            "内容": [
                "学习数据库内部原理",
                "掌握分库分表策略",
                "学习NoSQL数据库",
                "理解大数据处理",
                "掌握数据库运维"
            ],
            "实践项目": "高并发分布式系统"
        }
    }
    
    for stage, details in learning_stages.items():
        print(f"\n{stage}:")
        print(f"   目标: {details['目标']}")
        print("   学习内容:")
        for content in details['内容']:
            print(f"     • {content}")
        print(f"   实践项目: {details['实践项目']}")

# 运行学习路径指南
learning_path_guide()

# 8.2 最佳实践总结

def best_practices_summary():
    """数据库编程最佳实践总结"""
    print("=== 数据库编程最佳实践总结 ===")
    
    # 1. 代码组织
    print("\n1. 代码组织最佳实践:")
    
    code_organization = [
        "使用数据访问对象(DAO)模式分离数据访问逻辑",
        "创建数据库连接管理器统一管理连接",
        "使用配置文件管理数据库连接参数",
        "实现统一的异常处理机制",
        "编写可重用的数据库操作工具函数",
        "使用类型提示提高代码可读性",
        "编写完整的文档和注释"
    ]
    
    for i, practice in enumerate(code_organization, 1):
        print(f"   {i}. {practice}")
    
    # 2. 性能优化
    print("\n2. 性能优化最佳实践:")
    
    performance_tips = [
        "使用连接池避免频繁创建连接",
        "合理使用索引优化查询性能",
        "避免N+1查询问题",
        "使用批量操作处理大量数据",
        "实现查询结果缓存",
        "定期分析和优化慢查询",
        "使用分页避免大结果集"
    ]
    
    for i, tip in enumerate(performance_tips, 1):
        print(f"   {i}. {tip}")
    
    # 3. 安全实践
    print("\n3. 安全最佳实践:")
    
    security_practices = [
        "始终使用参数化查询防止SQL注入",
        "对敏感数据进行加密存储",
        "使用强密码哈希算法",
        "实施最小权限原则",
        "启用数据库连接加密",
        "定期备份和测试恢复",
        "记录和监控数据库访问日志"
    ]
    
    for i, practice in enumerate(security_practices, 1):
        print(f"   {i}. {practice}")
    
    # 4. 错误处理
    print("\n4. 错误处理最佳实践:")
    
    error_handling = [
        "使用try-catch块处理数据库异常",
        "实现事务回滚机制",
        "提供有意义的错误消息",
        "记录详细的错误日志",
        "实现重试机制处理临时故障",
        "优雅地处理连接超时",
        "避免向用户暴露敏感错误信息"
    ]
    
    for i, practice in enumerate(error_handling, 1):
        print(f"   {i}. {practice}")

# 运行最佳实践总结
best_practices_summary()

# 8.3 常见陷阱和解决方案

def common_pitfalls_and_solutions():
    """常见陷阱和解决方案"""
    print("=== 常见陷阱和解决方案 ===")
    
    pitfalls = {
        "连接泄露": {
            "问题": "忘记关闭数据库连接导致连接池耗尽",
            "解决方案": "使用with语句或try-finally确保连接关闭",
            "示例": "with get_connection() as conn: # 自动关闭连接"
        },
        "SQL注入": {
            "问题": "直接拼接SQL字符串导致安全漏洞",
            "解决方案": "使用参数化查询或ORM",
            "示例": "cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))"
        },
        "N+1查询": {
            "问题": "在循环中执行查询导致性能问题",
            "解决方案": "使用JOIN查询或预加载",
            "示例": "使用SQLAlchemy的joinedload或selectinload"
        },
        "事务管理": {
            "问题": "忘记提交事务或处理回滚",
            "解决方案": "使用上下文管理器自动管理事务",
            "示例": "使用@contextmanager装饰器创建事务管理器"
        },
        "字符编码": {
            "问题": "中文字符显示乱码",
            "解决方案": "确保数据库、连接和Python使用相同编码",
            "示例": "连接字符串中指定charset=utf8mb4"
        },
        "时区问题": {
            "问题": "时间数据在不同时区显示错误",
            "解决方案": "统一使用UTC时间或明确指定时区",
            "示例": "使用datetime.utcnow()和时区转换"
        }
    }
    
    for pitfall, details in pitfalls.items():
        print(f"\n{pitfall}:")
        print(f"   问题: {details['问题']}")
        print(f"   解决方案: {details['解决方案']}")
        print(f"   示例: {details['示例']}")

# 运行常见陷阱和解决方案
common_pitfalls_and_solutions()

# 8.4 本章总结

def chapter_summary():
    """第20天学习总结"""
    print("=== 第20天 - Python数据库访问学习总结 ===")
    
    summary_points = {
        "核心概念": [
            "数据库基础知识和DB-API 2.0规范",
            "SQLite、MySQL等数据库的连接和操作",
            "SQL语句的执行和结果处理",
            "事务管理和错误处理机制"
        ],
        "重要技能": [
            "使用sqlite3模块进行SQLite操作",
            "使用mysql-connector-python操作MySQL",
            "掌握SQLAlchemy ORM框架",
            "实现数据库连接池和性能优化"
        ],
        "实践应用": [
            "用户管理系统的设计和实现",
            "销售数据分析系统的构建",
            "数据库安全和最佳实践",
            "性能优化和监控策略"
        ],
        "进阶方向": [
            "学习NoSQL数据库(MongoDB, Redis)",
            "掌握数据库集群和分布式架构",
            "深入理解数据库内部原理",
            "学习大数据处理技术"
        ]
    }
    
    for category, points in summary_points.items():
        print(f"\n{category}:")
        for point in points:
            print(f"   • {point}")
    
    print("\n学习成果:")
    achievements = [
        "✓ 掌握了Python数据库编程的基础知识",
        "✓ 学会了使用多种数据库和ORM框架",
        "✓ 理解了数据库设计和性能优化原则",
        "✓ 具备了构建实际数据库应用的能力",
        "✓ 了解了数据库安全和最佳实践"
    ]
    
    for achievement in achievements:
        print(f"   {achievement}")
    
    print("\n下一步学习建议:")
    next_steps = [
        "深入学习特定数据库的高级特性",
        "实践更复杂的数据库应用项目",
        "学习数据库运维和监控技能",
        "探索大数据和分布式数据库技术",
        "关注数据库技术的最新发展趋势"
    ]
    
    for step in next_steps:
        print(f"   • {step}")
    
    print("\n恭喜你完成了Python数据库访问的学习!")
    print("数据库是现代应用的核心,继续实践和深入学习将让你成为更优秀的开发者。")

# 运行本章总结
chapter_summary()

# 总结

通过第20天的学习,我们全面掌握了Python数据库访问的各个方面:

  1. 数据库基础 - 理解了数据库概念、DB-API规范和SQL基础
  2. SQLite操作 - 掌握了轻量级数据库的使用和高级功能
  3. MySQL操作 - 学会了企业级数据库的连接和操作
  4. ORM框架 - 深入学习了SQLAlchemy的使用和高级特性
  5. 设计最佳实践 - 了解了数据库设计原则和性能优化策略
  6. 实际应用 - 通过用户管理和数据分析系统掌握了实践技能
  7. 安全实践 - 学习了数据库安全和最佳实践

数据库编程是后端开发的核心技能,掌握这些知识将为你的Python开发之路奠定坚实的基础。继续实践和深入学习,你将能够构建更加复杂和高效的数据库应用!