第11天-错误和调试

2023/6/15

# 第11天-错误和调试

# 学习目标

通过本章学习,你将掌握:

  • Python中常见的错误类型和异常处理
  • 使用try-except语句处理异常
  • 自定义异常类
  • 调试技巧和工具
  • 日志记录和错误追踪
  • 单元测试和测试驱动开发
  • 性能分析和优化

# 一、Python异常处理基础

# 1.1 什么是异常?

异常是程序运行时发生的错误,它会中断程序的正常执行流程。Python使用异常机制来处理运行时错误。

# 常见的异常示例

# 1. ZeroDivisionError - 除零错误
try:
    result = 10 / 0
except ZeroDivisionError:
    print("不能除以零!")

# 2. IndexError - 索引错误
try:
    my_list = [1, 2, 3]
    print(my_list[10])  # 索引超出范围
except IndexError:
    print("列表索引超出范围!")

# 3. KeyError - 键错误
try:
    my_dict = {'name': '张三', 'age': 25}
    print(my_dict['salary'])  # 键不存在
except KeyError:
    print("字典中不存在该键!")

# 4. TypeError - 类型错误
try:
    result = "hello" + 5  # 字符串和数字不能直接相加
except TypeError:
    print("类型不匹配!")

# 5. ValueError - 值错误
try:
    number = int("abc")  # 无法将字符串转换为整数
except ValueError:
    print("值转换错误!")

# 6. FileNotFoundError - 文件未找到错误
try:
    with open("不存在的文件.txt", "r") as file:
        content = file.read()
except FileNotFoundError:
    print("文件未找到!")

# 1.2 异常处理语法

# 基本的try-except语法
try:
    # 可能出现异常的代码
    risky_code()
except ExceptionType:
    # 处理特定类型的异常
    handle_exception()

# 完整的异常处理语法
try:
    # 可能出现异常的代码
    risky_code()
except SpecificException as e:
    # 处理特定异常
    print(f"发生特定异常: {e}")
except (Exception1, Exception2) as e:
    # 处理多种异常
    print(f"发生多种异常之一: {e}")
except Exception as e:
    # 处理所有其他异常
    print(f"发生未知异常: {e}")
else:
    # 没有异常时执行
    print("代码执行成功")
finally:
    # 无论是否有异常都会执行
    print("清理资源")

# 1.3 实际应用示例

def safe_divide(a, b):
    """安全的除法函数"""
    try:
        result = a / b
        return result
    except ZeroDivisionError:
        print("错误:除数不能为零")
        return None
    except TypeError:
        print("错误:参数必须是数字")
        return None
    except Exception as e:
        print(f"未知错误:{e}")
        return None

def safe_file_read(filename):
    """安全的文件读取函数"""
    try:
        with open(filename, 'r', encoding='utf-8') as file:
            content = file.read()
            return content
    except FileNotFoundError:
        print(f"文件 {filename} 不存在")
        return None
    except PermissionError:
        print(f"没有权限读取文件 {filename}")
        return None
    except UnicodeDecodeError:
        print(f"文件 {filename} 编码格式不正确")
        return None
    except Exception as e:
        print(f"读取文件时发生未知错误:{e}")
        return None
    finally:
        print(f"文件读取操作完成")

def safe_user_input():
    """安全的用户输入处理"""
    while True:
        try:
            age = int(input("请输入您的年龄:"))
            if age < 0:
                raise ValueError("年龄不能为负数")
            if age > 150:
                raise ValueError("年龄不能超过150岁")
            return age
        except ValueError as e:
            if "invalid literal" in str(e):
                print("请输入有效的数字")
            else:
                print(f"输入错误:{e}")
        except KeyboardInterrupt:
            print("\n用户取消输入")
            return None

# 使用示例
print("=== 异常处理示例 ===")

# 测试安全除法
print("\n--- 安全除法测试 ---")
print(f"10 / 2 = {safe_divide(10, 2)}")
print(f"10 / 0 = {safe_divide(10, 0)}")
print(f"'10' / 2 = {safe_divide('10', 2)}")

# 测试安全文件读取
print("\n--- 安全文件读取测试 ---")
content = safe_file_read("test.txt")
if content:
    print(f"文件内容:{content[:50]}...")

# 测试安全用户输入
print("\n--- 安全用户输入测试 ---")
# age = safe_user_input()  # 取消注释以测试
# if age is not None:
#     print(f"您的年龄是:{age}岁")

# 二、自定义异常

# 2.1 创建自定义异常类

# 基本自定义异常
class CustomError(Exception):
    """自定义异常基类"""
    pass

class ValidationError(CustomError):
    """数据验证异常"""
    def __init__(self, message, field=None):
        super().__init__(message)
        self.field = field
        self.message = message
    
    def __str__(self):
        if self.field:
            return f"验证错误 [{self.field}]: {self.message}"
        return f"验证错误: {self.message}"

class BusinessLogicError(CustomError):
    """业务逻辑异常"""
    def __init__(self, message, error_code=None):
        super().__init__(message)
        self.error_code = error_code
        self.message = message
    
    def __str__(self):
        if self.error_code:
            return f"业务错误 [{self.error_code}]: {self.message}"
        return f"业务错误: {self.message}"

class DatabaseError(CustomError):
    """数据库操作异常"""
    def __init__(self, message, query=None, params=None):
        super().__init__(message)
        self.query = query
        self.params = params
        self.message = message
    
    def __str__(self):
        base_msg = f"数据库错误: {self.message}"
        if self.query:
            base_msg += f"\n查询语句: {self.query}"
        if self.params:
            base_msg += f"\n参数: {self.params}"
        return base_msg

# 2.2 使用自定义异常的实际案例

class User:
    """用户类示例"""
    
    def __init__(self, username, email, age):
        self.username = self._validate_username(username)
        self.email = self._validate_email(email)
        self.age = self._validate_age(age)
    
    def _validate_username(self, username):
        """验证用户名"""
        if not username:
            raise ValidationError("用户名不能为空", "username")
        if len(username) < 3:
            raise ValidationError("用户名长度不能少于3个字符", "username")
        if len(username) > 20:
            raise ValidationError("用户名长度不能超过20个字符", "username")
        if not username.isalnum():
            raise ValidationError("用户名只能包含字母和数字", "username")
        return username
    
    def _validate_email(self, email):
        """验证邮箱"""
        import re
        if not email:
            raise ValidationError("邮箱不能为空", "email")
        
        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        if not re.match(email_pattern, email):
            raise ValidationError("邮箱格式不正确", "email")
        return email
    
    def _validate_age(self, age):
        """验证年龄"""
        if not isinstance(age, int):
            raise ValidationError("年龄必须是整数", "age")
        if age < 0:
            raise ValidationError("年龄不能为负数", "age")
        if age > 150:
            raise ValidationError("年龄不能超过150岁", "age")
        return age
    
    def update_age(self, new_age):
        """更新年龄"""
        old_age = self.age
        try:
            self.age = self._validate_age(new_age)
            print(f"年龄已从 {old_age} 更新为 {new_age}")
        except ValidationError as e:
            print(f"年龄更新失败:{e}")
            # 保持原来的年龄不变
    
    def __str__(self):
        return f"User(username='{self.username}', email='{self.email}', age={self.age})"

class BankAccount:
    """银行账户类示例"""
    
    def __init__(self, account_number, initial_balance=0):
        self.account_number = account_number
        self.balance = initial_balance
        self.transaction_history = []
    
    def deposit(self, amount):
        """存款"""
        try:
            if amount <= 0:
                raise BusinessLogicError("存款金额必须大于0", "INVALID_AMOUNT")
            
            self.balance += amount
            self.transaction_history.append(f"存款: +{amount}")
            print(f"存款成功,当前余额:{self.balance}")
            
        except BusinessLogicError as e:
            print(f"存款失败:{e}")
    
    def withdraw(self, amount):
        """取款"""
        try:
            if amount <= 0:
                raise BusinessLogicError("取款金额必须大于0", "INVALID_AMOUNT")
            
            if amount > self.balance:
                raise BusinessLogicError(
                    f"余额不足,当前余额:{self.balance},尝试取款:{amount}", 
                    "INSUFFICIENT_FUNDS"
                )
            
            if amount > 10000:
                raise BusinessLogicError(
                    "单次取款金额不能超过10000元", 
                    "AMOUNT_LIMIT_EXCEEDED"
                )
            
            self.balance -= amount
            self.transaction_history.append(f"取款: -{amount}")
            print(f"取款成功,当前余额:{self.balance}")
            
        except BusinessLogicError as e:
            print(f"取款失败:{e}")
    
    def transfer(self, target_account, amount):
        """转账"""
        try:
            if not isinstance(target_account, BankAccount):
                raise BusinessLogicError("目标账户无效", "INVALID_TARGET")
            
            if target_account.account_number == self.account_number:
                raise BusinessLogicError("不能向自己转账", "SELF_TRANSFER")
            
            # 先从当前账户取款
            if amount > self.balance:
                raise BusinessLogicError(
                    f"余额不足,无法转账 {amount} 元", 
                    "INSUFFICIENT_FUNDS"
                )
            
            # 执行转账
            self.balance -= amount
            target_account.balance += amount
            
            # 记录交易历史
            self.transaction_history.append(f"转出: -{amount} (到账户 {target_account.account_number})")
            target_account.transaction_history.append(f"转入: +{amount} (从账户 {self.account_number})")
            
            print(f"转账成功:{amount} 元已转至账户 {target_account.account_number}")
            
        except BusinessLogicError as e:
            print(f"转账失败:{e}")
    
    def get_balance(self):
        """获取余额"""
        return self.balance
    
    def get_transaction_history(self):
        """获取交易历史"""
        return self.transaction_history.copy()

def custom_exception_examples():
    """自定义异常示例"""
    print("=== 自定义异常示例 ===")
    
    # 用户验证示例
    print("\n--- 用户验证示例 ---")
    
    # 正确的用户创建
    try:
        user1 = User("zhangsan", "zhangsan@example.com", 25)
        print(f"用户创建成功:{user1}")
    except ValidationError as e:
        print(f"用户创建失败:{e}")
    
    # 错误的用户创建
    test_cases = [
        ("", "test@example.com", 25),  # 空用户名
        ("ab", "test@example.com", 25),  # 用户名太短
        ("user@123", "test@example.com", 25),  # 用户名包含特殊字符
        ("validuser", "invalid-email", 25),  # 邮箱格式错误
        ("validuser", "test@example.com", -5),  # 年龄为负数
        ("validuser", "test@example.com", 200),  # 年龄过大
    ]
    
    for username, email, age in test_cases:
        try:
            user = User(username, email, age)
            print(f"用户创建成功:{user}")
        except ValidationError as e:
            print(f"用户创建失败:{e}")
    
    # 银行账户示例
    print("\n--- 银行账户示例 ---")
    
    # 创建账户
    account1 = BankAccount("123456", 1000)
    account2 = BankAccount("789012", 500)
    
    print(f"账户1余额:{account1.get_balance()}")
    print(f"账户2余额:{account2.get_balance()}")
    
    # 测试各种操作
    account1.deposit(500)  # 正常存款
    account1.deposit(-100)  # 错误存款
    
    account1.withdraw(200)  # 正常取款
    account1.withdraw(5000)  # 余额不足
    account1.withdraw(15000)  # 超过限额
    
    account1.transfer(account2, 300)  # 正常转账
    account1.transfer(account1, 100)  # 向自己转账
    account1.transfer(account2, 2000)  # 余额不足转账
    
    print(f"\n最终余额:")
    print(f"账户1:{account1.get_balance()}")
    print(f"账户2:{account2.get_balance()}")
    
    print(f"\n账户1交易历史:")
    for transaction in account1.get_transaction_history():
        print(f"  {transaction}")

# 运行示例
if __name__ == "__main__":
    custom_exception_examples()

# 三、调试技巧和工具

# 3.1 使用print调试

def debug_with_print():
    """使用print进行调试"""
    print("=== 使用print调试 ===")
    
    def calculate_average(numbers):
        """计算平均值"""
        print(f"DEBUG: 输入的数字列表: {numbers}")  # 调试信息
        
        if not numbers:
            print("DEBUG: 列表为空,返回0")  # 调试信息
            return 0
        
        total = sum(numbers)
        count = len(numbers)
        
        print(f"DEBUG: 总和 = {total}, 数量 = {count}")  # 调试信息
        
        average = total / count
        print(f"DEBUG: 平均值 = {average}")  # 调试信息
        
        return average
    
    # 测试函数
    test_data = [
        [1, 2, 3, 4, 5],
        [],
        [10, 20, 30]
    ]
    
    for data in test_data:
        print(f"\n测试数据: {data}")
        result = calculate_average(data)
        print(f"结果: {result}")
        print("-" * 30)

def debug_with_assert():
    """使用断言进行调试"""
    print("\n=== 使用断言调试 ===")
    
    def factorial(n):
        """计算阶乘"""
        # 前置条件断言
        assert isinstance(n, int), f"参数必须是整数,实际类型: {type(n)}"
        assert n >= 0, f"参数必须是非负数,实际值: {n}"
        
        if n == 0 or n == 1:
            result = 1
        else:
            result = n * factorial(n - 1)
        
        # 后置条件断言
        assert result > 0, f"阶乘结果必须为正数,实际值: {result}"
        
        return result
    
    # 测试断言
    test_cases = [5, 0, 1, -1, "abc"]
    
    for test_case in test_cases:
        try:
            result = factorial(test_case)
            print(f"{test_case}! = {result}")
        except AssertionError as e:
            print(f"断言失败: {e}")
        except Exception as e:
            print(f"其他错误: {e}")

# 3.2 使用logging模块

import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('debug.log', encoding='utf-8'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

def debug_with_logging():
    """使用logging进行调试"""
    print("\n=== 使用logging调试 ===")
    
    def process_user_data(user_data):
        """处理用户数据"""
        logger.info(f"开始处理用户数据: {user_data}")
        
        try:
            # 验证必需字段
            required_fields = ['name', 'email', 'age']
            for field in required_fields:
                if field not in user_data:
                    logger.error(f"缺少必需字段: {field}")
                    raise ValueError(f"缺少必需字段: {field}")
                
                logger.debug(f"字段 {field} 验证通过: {user_data[field]}")
            
            # 验证数据类型
            if not isinstance(user_data['age'], int):
                logger.warning(f"年龄字段类型不正确,尝试转换: {user_data['age']}")
                user_data['age'] = int(user_data['age'])
                logger.info(f"年龄转换成功: {user_data['age']}")
            
            # 验证数据范围
            if user_data['age'] < 0 or user_data['age'] > 150:
                logger.error(f"年龄超出有效范围: {user_data['age']}")
                raise ValueError(f"年龄超出有效范围: {user_data['age']}")
            
            # 处理成功
            logger.info(f"用户数据处理成功: {user_data['name']}")
            return {
                'status': 'success',
                'data': user_data,
                'processed_at': datetime.now().isoformat()
            }
            
        except ValueError as e:
            logger.error(f"数据验证失败: {e}")
            return {
                'status': 'error',
                'error': str(e),
                'processed_at': datetime.now().isoformat()
            }
        except Exception as e:
            logger.critical(f"处理用户数据时发生未知错误: {e}")
            return {
                'status': 'critical_error',
                'error': str(e),
                'processed_at': datetime.now().isoformat()
            }
    
    # 测试数据
    test_users = [
        {'name': '张三', 'email': 'zhangsan@example.com', 'age': 25},
        {'name': '李四', 'email': 'lisi@example.com'},  # 缺少age字段
        {'name': '王五', 'email': 'wangwu@example.com', 'age': '30'},  # age是字符串
        {'name': '赵六', 'email': 'zhaoliu@example.com', 'age': -5},  # age为负数
        {'name': '钱七', 'email': 'qianqi@example.com', 'age': 200},  # age过大
    ]
    
    for user in test_users:
        print(f"\n处理用户: {user}")
        result = process_user_data(user)
        print(f"处理结果: {result['status']}")
        if result['status'] != 'success':
            print(f"错误信息: {result.get('error', 'Unknown error')}")

def create_custom_logger():
    """创建自定义日志记录器"""
    print("\n=== 自定义日志记录器 ===")
    
    # 创建自定义格式化器
    class ColoredFormatter(logging.Formatter):
        """彩色日志格式化器"""
        
        # ANSI颜色代码
        COLORS = {
            'DEBUG': '\033[36m',    # 青色
            'INFO': '\033[32m',     # 绿色
            'WARNING': '\033[33m',  # 黄色
            'ERROR': '\033[31m',    # 红色
            'CRITICAL': '\033[35m', # 紫色
            'RESET': '\033[0m'      # 重置
        }
        
        def format(self, record):
            # 添加颜色
            color = self.COLORS.get(record.levelname, self.COLORS['RESET'])
            record.levelname = f"{color}{record.levelname}{self.COLORS['RESET']}"
            return super().format(record)
    
    # 创建自定义日志记录器
    custom_logger = logging.getLogger('custom_app')
    custom_logger.setLevel(logging.DEBUG)
    
    # 清除现有处理器
    custom_logger.handlers.clear()
    
    # 创建控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_formatter = ColoredFormatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    console_handler.setFormatter(console_formatter)
    
    # 创建文件处理器
    file_handler = logging.FileHandler('app.log', encoding='utf-8')
    file_handler.setLevel(logging.DEBUG)
    file_formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
    )
    file_handler.setFormatter(file_formatter)
    
    # 添加处理器
    custom_logger.addHandler(console_handler)
    custom_logger.addHandler(file_handler)
    
    # 测试自定义日志记录器
    custom_logger.debug("这是调试信息")
    custom_logger.info("这是普通信息")
    custom_logger.warning("这是警告信息")
    custom_logger.error("这是错误信息")
    custom_logger.critical("这是严重错误信息")
    
    return custom_logger

# 3.3 使用pdb调试器

import pdb

def debug_with_pdb():
    """使用pdb调试器"""
    print("\n=== 使用pdb调试器 ===")
    
    def complex_calculation(data):
        """复杂计算函数"""
        result = []
        
        for i, item in enumerate(data):
            # 设置断点 - 取消注释以启用
            # pdb.set_trace()
            
            if isinstance(item, (int, float)):
                # 计算平方
                square = item ** 2
                result.append(square)
            elif isinstance(item, str):
                # 计算字符串长度
                length = len(item)
                result.append(length)
            else:
                # 其他类型设为0
                result.append(0)
        
        return result
    
    # 测试数据
    test_data = [1, 2, "hello", 3.5, [1, 2, 3], "world", None, 4]
    
    print(f"输入数据: {test_data}")
    result = complex_calculation(test_data)
    print(f"计算结果: {result}")
    
    # pdb调试器常用命令说明
    print("\npdb调试器常用命令:")
    print("  l(ist) - 显示当前代码")
    print("  n(ext) - 执行下一行")
    print("  s(tep) - 进入函数内部")
    print("  c(ontinue) - 继续执行")
    print("  p <变量名> - 打印变量值")
    print("  pp <变量名> - 美化打印变量值")
    print("  w(here) - 显示调用栈")
    print("  u(p) - 上移一层调用栈")
    print("  d(own) - 下移一层调用栈")
    print("  q(uit) - 退出调试器")

def debug_with_breakpoint():
    """使用breakpoint()函数调试(Python 3.7+)"""
    print("\n=== 使用breakpoint()调试 ===")
    
    def fibonacci(n):
        """计算斐波那契数列"""
        if n <= 0:
            return []
        elif n == 1:
            return [0]
        elif n == 2:
            return [0, 1]
        
        fib_sequence = [0, 1]
        
        for i in range(2, n):
            # 使用breakpoint()设置断点 - 取消注释以启用
            # breakpoint()
            
            next_fib = fib_sequence[i-1] + fib_sequence[i-2]
            fib_sequence.append(next_fib)
        
        return fib_sequence
    
    # 测试斐波那契函数
    n = 10
    print(f"计算前{n}个斐波那契数:")
    result = fibonacci(n)
    print(f"结果: {result}")

# 四、日志记录和错误追踪

# 4.1 高级日志配置

import logging
import logging.config
import json
from datetime import datetime
import traceback
import sys

# 日志配置字典
LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'standard': {
            'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
        },
        'detailed': {
            'format': '%(asctime)s [%(levelname)s] %(name)s:%(funcName)s:%(lineno)d: %(message)s'
        },
        'json': {
            'format': '%(asctime)s %(name)s %(levelname)s %(message)s',
            'class': 'pythonjsonlogger.jsonlogger.JsonFormatter'
        }
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'standard',
            'stream': 'ext://sys.stdout'
        },
        'file': {
            'level': 'DEBUG',
            'class': 'logging.handlers.RotatingFileHandler',
            'formatter': 'detailed',
            'filename': 'app.log',
            'maxBytes': 1024*1024*5,  # 5MB
            'backupCount': 3,
            'encoding': 'utf-8'
        },
        'error_file': {
            'level': 'ERROR',
            'class': 'logging.FileHandler',
            'formatter': 'detailed',
            'filename': 'error.log',
            'encoding': 'utf-8'
        }
    },
    'loggers': {
        '': {  # root logger
            'handlers': ['console', 'file', 'error_file'],
            'level': 'DEBUG',
            'propagate': False
        }
    }
}

def setup_logging():
    """设置日志配置"""
    logging.config.dictConfig(LOGGING_CONFIG)
    return logging.getLogger(__name__)

class ErrorTracker:
    """错误追踪器"""
    
    def __init__(self, logger):
        self.logger = logger
        self.error_count = 0
        self.error_history = []
    
    def log_error(self, error, context=None):
        """记录错误"""
        self.error_count += 1
        
        error_info = {
            'timestamp': datetime.now().isoformat(),
            'error_type': type(error).__name__,
            'error_message': str(error),
            'traceback': traceback.format_exc(),
            'context': context or {},
            'error_id': self.error_count
        }
        
        self.error_history.append(error_info)
        
        # 记录到日志
        self.logger.error(
            f"错误 #{self.error_count}: {error_info['error_type']} - {error_info['error_message']}",
            extra={'error_info': error_info}
        )
        
        return error_info['error_id']
    
    def get_error_summary(self):
        """获取错误摘要"""
        if not self.error_history:
            return "没有记录到错误"
        
        error_types = {}
        for error in self.error_history:
            error_type = error['error_type']
            error_types[error_type] = error_types.get(error_type, 0) + 1
        
        summary = f"总错误数: {self.error_count}\n"
        summary += "错误类型分布:\n"
        for error_type, count in error_types.items():
            summary += f"  {error_type}: {count}\n"
        
        return summary
    
    def get_recent_errors(self, count=5):
        """获取最近的错误"""
        return self.error_history[-count:]

def advanced_logging_example():
    """高级日志记录示例"""
    print("=== 高级日志记录示例 ===")
    
    # 设置日志
    logger = setup_logging()
    error_tracker = ErrorTracker(logger)
    
    # 模拟应用程序
    class DataProcessor:
        """数据处理器"""
        
        def __init__(self, logger, error_tracker):
            self.logger = logger
            self.error_tracker = error_tracker
        
        def process_data(self, data):
            """处理数据"""
            self.logger.info(f"开始处理数据,数据量: {len(data)}")
            
            processed_items = []
            failed_items = []
            
            for i, item in enumerate(data):
                try:
                    self.logger.debug(f"处理第 {i+1} 项: {item}")
                    
                    # 模拟数据处理
                    if item is None:
                        raise ValueError("数据项不能为None")
                    
                    if isinstance(item, str) and len(item) == 0:
                        raise ValueError("字符串不能为空")
                    
                    if isinstance(item, (int, float)) and item < 0:
                        raise ValueError("数字不能为负数")
                    
                    # 处理成功
                    processed_item = self._transform_item(item)
                    processed_items.append(processed_item)
                    
                    self.logger.debug(f"第 {i+1} 项处理成功: {item} -> {processed_item}")
                    
                except Exception as e:
                    error_id = self.error_tracker.log_error(
                        e, 
                        context={
                            'item_index': i,
                            'item_value': item,
                            'function': 'process_data'
                        }
                    )
                    
                    failed_items.append({
                        'index': i,
                        'item': item,
                        'error_id': error_id,
                        'error': str(e)
                    })
            
            # 记录处理结果
            self.logger.info(
                f"数据处理完成 - 成功: {len(processed_items)}, 失败: {len(failed_items)}"
            )
            
            if failed_items:
                self.logger.warning(f"有 {len(failed_items)} 项数据处理失败")
            
            return {
                'processed': processed_items,
                'failed': failed_items,
                'summary': {
                    'total': len(data),
                    'success': len(processed_items),
                    'failed': len(failed_items)
                }
            }
        
        def _transform_item(self, item):
            """转换数据项"""
            if isinstance(item, str):
                return item.upper()
            elif isinstance(item, (int, float)):
                return item * 2
            elif isinstance(item, list):
                return len(item)
            else:
                return str(item)
    
    # 测试数据处理器
    processor = DataProcessor(logger, error_tracker)
    
    test_data = [
        "hello",
        42,
        None,  # 会引发错误
        "",    # 会引发错误
        -10,   # 会引发错误
        [1, 2, 3],
        3.14,
        "world"
    ]
    
    logger.info("开始数据处理任务")
    result = processor.process_data(test_data)
    
    print(f"\n处理结果:")
    print(f"成功处理: {result['summary']['success']} 项")
    print(f"处理失败: {result['summary']['failed']} 项")
    
    if result['failed']:
        print("\n失败的项目:")
        for failed in result['failed']:
            print(f"  索引 {failed['index']}: {failed['item']} - {failed['error']}")
    
    print(f"\n错误追踪摘要:")
    print(error_tracker.get_error_summary())
    
    logger.info("数据处理任务完成")

# 4.2 性能监控和分析

import time
import functools
import cProfile
import pstats
import io
from memory_profiler import profile as memory_profile

def performance_monitor(func):
    """性能监控装饰器"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # 记录开始时间
        start_time = time.time()
        start_cpu = time.process_time()
        
        try:
            # 执行函数
            result = func(*args, **kwargs)
            
            # 记录结束时间
            end_time = time.time()
            end_cpu = time.process_time()
            
            # 计算性能指标
            wall_time = end_time - start_time
            cpu_time = end_cpu - start_cpu
            
            # 记录性能信息
            logger = logging.getLogger(__name__)
            logger.info(
                f"函数 {func.__name__} 性能统计 - "
                f"墙钟时间: {wall_time:.4f}s, "
                f"CPU时间: {cpu_time:.4f}s"
            )
            
            return result
            
        except Exception as e:
            # 记录异常和性能信息
            end_time = time.time()
            wall_time = end_time - start_time
            
            logger = logging.getLogger(__name__)
            logger.error(
                f"函数 {func.__name__} 执行失败 - "
                f"执行时间: {wall_time:.4f}s, "
                f"错误: {e}"
            )
            raise
    
    return wrapper

def profile_code(func):
    """代码性能分析装饰器"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        pr = cProfile.Profile()
        pr.enable()
        
        result = func(*args, **kwargs)
        
        pr.disable()
        
        # 创建统计报告
        s = io.StringIO()
        ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
        ps.print_stats(10)  # 显示前10个最耗时的函数
        
        print(f"\n=== {func.__name__} 性能分析报告 ===")
        print(s.getvalue())
        
        return result
    
    return wrapper

@performance_monitor
@profile_code
def cpu_intensive_task(n):
    """CPU密集型任务"""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

@performance_monitor
def io_intensive_task():
    """IO密集型任务模拟"""
    import tempfile
    import os
    
    # 创建临时文件
    with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
        temp_filename = f.name
        
        # 写入大量数据
        for i in range(10000):
            f.write(f"这是第 {i} 行数据\n")
    
    # 读取数据
    line_count = 0
    with open(temp_filename, 'r') as f:
        for line in f:
            line_count += 1
    
    # 清理临时文件
    os.unlink(temp_filename)
    
    return line_count

# @memory_profile  # 取消注释以启用内存分析
def memory_intensive_task():
    """内存密集型任务"""
    # 创建大量数据
    data = []
    for i in range(100000):
        data.append([j for j in range(100)])
    
    # 处理数据
    processed = []
    for item in data:
        processed.append(sum(item))
    
    return len(processed)

def performance_analysis_examples():
    """性能分析示例"""
    print("=== 性能分析示例 ===")
    
    # 设置日志
    logger = setup_logging()
    
    # 测试CPU密集型任务
    print("\n--- CPU密集型任务测试 ---")
    result1 = cpu_intensive_task(100000)
    print(f"CPU任务结果: {result1}")
    
    # 测试IO密集型任务
    print("\n--- IO密集型任务测试 ---")
    result2 = io_intensive_task()
    print(f"IO任务结果: {result2} 行")
    
    # 测试内存密集型任务
    print("\n--- 内存密集型任务测试 ---")
    result3 = memory_intensive_task()
    print(f"内存任务结果: {result3} 项")

# 运行示例
if __name__ == "__main__":
    debug_with_print()
    debug_with_assert()
    debug_with_logging()
    create_custom_logger()
    debug_with_pdb()
    debug_with_breakpoint()
    advanced_logging_example()
    performance_analysis_examples()

# 五、单元测试和测试驱动开发

# 5.1 使用unittest模块

import unittest
from unittest.mock import Mock, patch, MagicMock
import tempfile
import os

# 被测试的类和函数
class Calculator:
    """计算器类"""
    
    def add(self, a, b):
        """加法"""
        return a + b
    
    def subtract(self, a, b):
        """减法"""
        return a - b
    
    def multiply(self, a, b):
        """乘法"""
        return a * b
    
    def divide(self, a, b):
        """除法"""
        if b == 0:
            raise ValueError("除数不能为零")
        return a / b
    
    def power(self, base, exponent):
        """幂运算"""
        if not isinstance(base, (int, float)) or not isinstance(exponent, (int, float)):
            raise TypeError("参数必须是数字")
        return base ** exponent

class FileManager:
    """文件管理器类"""
    
    def read_file(self, filename):
        """读取文件"""
        try:
            with open(filename, 'r', encoding='utf-8') as f:
                return f.read()
        except FileNotFoundError:
            raise FileNotFoundError(f"文件 {filename} 不存在")
    
    def write_file(self, filename, content):
        """写入文件"""
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(content)
    
    def file_exists(self, filename):
        """检查文件是否存在"""
        return os.path.exists(filename)
    
    def get_file_size(self, filename):
        """获取文件大小"""
        if not self.file_exists(filename):
            raise FileNotFoundError(f"文件 {filename} 不存在")
        return os.path.getsize(filename)

# 测试类
class TestCalculator(unittest.TestCase):
    """计算器测试类"""
    
    def setUp(self):
        """测试前的设置"""
        self.calc = Calculator()
    
    def tearDown(self):
        """测试后的清理"""
        # 这里可以进行清理工作
        pass
    
    def test_add(self):
        """测试加法"""
        self.assertEqual(self.calc.add(2, 3), 5)
        self.assertEqual(self.calc.add(-1, 1), 0)
        self.assertEqual(self.calc.add(0, 0), 0)
        self.assertEqual(self.calc.add(1.5, 2.5), 4.0)
    
    def test_subtract(self):
        """测试减法"""
        self.assertEqual(self.calc.subtract(5, 3), 2)
        self.assertEqual(self.calc.subtract(1, 1), 0)
        self.assertEqual(self.calc.subtract(-1, -1), 0)
        self.assertEqual(self.calc.subtract(3.5, 1.5), 2.0)
    
    def test_multiply(self):
        """测试乘法"""
        self.assertEqual(self.calc.multiply(3, 4), 12)
        self.assertEqual(self.calc.multiply(0, 5), 0)
        self.assertEqual(self.calc.multiply(-2, 3), -6)
        self.assertAlmostEqual(self.calc.multiply(1.5, 2.0), 3.0)
    
    def test_divide(self):
        """测试除法"""
        self.assertEqual(self.calc.divide(10, 2), 5)
        self.assertEqual(self.calc.divide(7, 2), 3.5)
        self.assertAlmostEqual(self.calc.divide(1, 3), 0.3333333333333333)
        
        # 测试除零异常
        with self.assertRaises(ValueError):
            self.calc.divide(10, 0)
        
        with self.assertRaisesRegex(ValueError, "除数不能为零"):
            self.calc.divide(5, 0)
    
    def test_power(self):
        """测试幂运算"""
        self.assertEqual(self.calc.power(2, 3), 8)
        self.assertEqual(self.calc.power(5, 0), 1)
        self.assertEqual(self.calc.power(4, 0.5), 2.0)
        
        # 测试类型错误
        with self.assertRaises(TypeError):
            self.calc.power("2", 3)
        
        with self.assertRaises(TypeError):
            self.calc.power(2, "3")
    
    def test_edge_cases(self):
        """测试边界情况"""
        # 测试大数
        large_num = 10**10
        self.assertEqual(self.calc.add(large_num, 1), large_num + 1)
        
        # 测试小数精度
        result = self.calc.add(0.1, 0.2)
        self.assertAlmostEqual(result, 0.3, places=7)

class TestFileManager(unittest.TestCase):
    """文件管理器测试类"""
    
    def setUp(self):
        """测试前的设置"""
        self.file_manager = FileManager()
        self.test_dir = tempfile.mkdtemp()
        self.test_file = os.path.join(self.test_dir, "test.txt")
    
    def tearDown(self):
        """测试后的清理"""
        # 清理测试文件
        if os.path.exists(self.test_file):
            os.remove(self.test_file)
        os.rmdir(self.test_dir)
    
    def test_write_and_read_file(self):
        """测试文件写入和读取"""
        content = "这是测试内容\n第二行"
        
        # 写入文件
        self.file_manager.write_file(self.test_file, content)
        
        # 读取文件
        read_content = self.file_manager.read_file(self.test_file)
        
        self.assertEqual(content, read_content)
    
    def test_read_nonexistent_file(self):
        """测试读取不存在的文件"""
        nonexistent_file = os.path.join(self.test_dir, "nonexistent.txt")
        
        with self.assertRaises(FileNotFoundError):
            self.file_manager.read_file(nonexistent_file)
    
    def test_file_exists(self):
        """测试文件存在检查"""
        # 文件不存在
        self.assertFalse(self.file_manager.file_exists(self.test_file))
        
        # 创建文件
        self.file_manager.write_file(self.test_file, "test")
        
        # 文件存在
        self.assertTrue(self.file_manager.file_exists(self.test_file))
    
    def test_get_file_size(self):
        """测试获取文件大小"""
        content = "Hello, World!"
        self.file_manager.write_file(self.test_file, content)
        
        size = self.file_manager.get_file_size(self.test_file)
        expected_size = len(content.encode('utf-8'))
        
        self.assertEqual(size, expected_size)
    
    def test_get_size_nonexistent_file(self):
        """测试获取不存在文件的大小"""
        nonexistent_file = os.path.join(self.test_dir, "nonexistent.txt")
        
        with self.assertRaises(FileNotFoundError):
            self.file_manager.get_file_size(nonexistent_file)

class TestWithMocks(unittest.TestCase):
    """使用Mock的测试类"""
    
    def test_mock_basic(self):
        """基本Mock使用"""
        # 创建Mock对象
        mock_obj = Mock()
        
        # 设置返回值
        mock_obj.method.return_value = "mocked result"
        
        # 调用方法
        result = mock_obj.method()
        
        # 验证结果
        self.assertEqual(result, "mocked result")
        
        # 验证方法被调用
        mock_obj.method.assert_called_once()
    
    def test_mock_with_side_effect(self):
        """使用side_effect的Mock"""
        mock_obj = Mock()
        
        # 设置副作用(异常)
        mock_obj.method.side_effect = ValueError("模拟错误")
        
        # 验证异常
        with self.assertRaises(ValueError):
            mock_obj.method()
    
    @patch('builtins.open', new_callable=unittest.mock.mock_open, read_data="file content")
    def test_file_reading_with_patch(self, mock_file):
        """使用patch模拟文件操作"""
        file_manager = FileManager()
        
        # 调用读取文件方法
        content = file_manager.read_file("any_file.txt")
        
        # 验证结果
        self.assertEqual(content, "file content")
        
        # 验证文件被正确打开
        mock_file.assert_called_once_with("any_file.txt", 'r', encoding='utf-8')
    
    @patch('os.path.exists')
    def test_file_exists_with_patch(self, mock_exists):
        """使用patch模拟os.path.exists"""
        # 设置Mock返回值
        mock_exists.return_value = True
        
        file_manager = FileManager()
        result = file_manager.file_exists("any_file.txt")
        
        # 验证结果
        self.assertTrue(result)
        
        # 验证os.path.exists被调用
        mock_exists.assert_called_once_with("any_file.txt")

def run_tests():
    """运行测试"""
    print("=== 运行单元测试 ===")
    
    # 创建测试套件
    test_suite = unittest.TestSuite()
    
    # 添加测试类
    test_suite.addTest(unittest.makeSuite(TestCalculator))
    test_suite.addTest(unittest.makeSuite(TestFileManager))
    test_suite.addTest(unittest.makeSuite(TestWithMocks))
    
    # 运行测试
    runner = unittest.TextTestRunner(verbosity=2)
    result = runner.run(test_suite)
    
    # 打印测试结果摘要
    print(f"\n=== 测试结果摘要 ===")
    print(f"运行测试数: {result.testsRun}")
    print(f"失败数: {len(result.failures)}")
    print(f"错误数: {len(result.errors)}")
    print(f"跳过数: {len(result.skipped)}")
    
    if result.failures:
        print("\n失败的测试:")
        for test, traceback in result.failures:
            print(f"  {test}: {traceback}")
    
    if result.errors:
         print("\n错误的测试:")
         for test, traceback in result.errors:
             print(f"  {test}: {traceback}")

def test_driven_development_example():
    """测试驱动开发示例"""
    print("\n=== 测试驱动开发示例 ===")
    
    # 第一步:编写失败的测试
    class TestStringUtils(unittest.TestCase):
        """字符串工具测试类"""
        
        def test_reverse_string(self):
            """测试字符串反转"""
            from string_utils import reverse_string
            
            self.assertEqual(reverse_string("hello"), "olleh")
            self.assertEqual(reverse_string(""), "")
            self.assertEqual(reverse_string("a"), "a")
            self.assertEqual(reverse_string("12345"), "54321")
        
        def test_is_palindrome(self):
            """测试回文检查"""
            from string_utils import is_palindrome
            
            self.assertTrue(is_palindrome("racecar"))
            self.assertTrue(is_palindrome("A man a plan a canal Panama"))
            self.assertTrue(is_palindrome(""))
            self.assertFalse(is_palindrome("hello"))
            self.assertFalse(is_palindrome("Python"))
        
        def test_word_count(self):
            """测试单词计数"""
            from string_utils import word_count
            
            self.assertEqual(word_count("hello world"), 2)
            self.assertEqual(word_count(""), 0)
            self.assertEqual(word_count("   "), 0)
            self.assertEqual(word_count("Python is awesome"), 3)
            self.assertEqual(word_count("one"), 1)
    
    # 第二步:实现功能使测试通过
    class StringUtils:
        """字符串工具类"""
        
        @staticmethod
        def reverse_string(s):
            """反转字符串"""
            return s[::-1]
        
        @staticmethod
        def is_palindrome(s):
            """检查是否为回文"""
            # 移除空格并转换为小写
            cleaned = ''.join(s.split()).lower()
            return cleaned == cleaned[::-1]
        
        @staticmethod
        def word_count(s):
            """计算单词数量"""
            if not s or s.isspace():
                return 0
            return len(s.split())
    
    # 模拟string_utils模块
    import sys
    import types
    
    string_utils_module = types.ModuleType('string_utils')
    string_utils_module.reverse_string = StringUtils.reverse_string
    string_utils_module.is_palindrome = StringUtils.is_palindrome
    string_utils_module.word_count = StringUtils.word_count
    sys.modules['string_utils'] = string_utils_module
    
    # 运行测试
    test_suite = unittest.TestSuite()
    test_suite.addTest(unittest.makeSuite(TestStringUtils))
    
    runner = unittest.TextTestRunner(verbosity=2)
    result = runner.run(test_suite)
    
    print(f"\nTDD测试结果: {result.testsRun} 个测试,{len(result.failures)} 个失败,{len(result.errors)} 个错误")

# 运行示例
if __name__ == "__main__":
    run_tests()
    test_driven_development_example()

# 六、实战练习:错误处理和调试系统

# 6.1 构建一个完整的错误处理系统

import logging
import traceback
import json
import time
from datetime import datetime
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum

class ErrorLevel(Enum):
    """错误级别枚举"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class ErrorReport:
    """错误报告数据类"""
    error_id: str
    timestamp: str
    error_type: str
    error_message: str
    level: ErrorLevel
    context: Dict[str, Any]
    traceback_info: str
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    resolved: bool = False
    resolution_notes: Optional[str] = None

class ErrorHandlingSystem:
    """错误处理系统"""
    
    def __init__(self, log_file="error_system.log"):
        self.error_reports: List[ErrorReport] = []
        self.error_count = 0
        self.setup_logging(log_file)
        
    def setup_logging(self, log_file):
        """设置日志系统"""
        self.logger = logging.getLogger("ErrorHandlingSystem")
        self.logger.setLevel(logging.DEBUG)
        
        # 清除现有处理器
        self.logger.handlers.clear()
        
        # 文件处理器
        file_handler = logging.FileHandler(log_file, encoding='utf-8')
        file_handler.setLevel(logging.DEBUG)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
        # 格式化器
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        # 添加处理器
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
    
    def generate_error_id(self) -> str:
        """生成错误ID"""
        self.error_count += 1
        timestamp = int(time.time() * 1000)
        return f"ERR_{timestamp}_{self.error_count:04d}"
    
    def determine_error_level(self, error: Exception) -> ErrorLevel:
        """确定错误级别"""
        if isinstance(error, (SystemExit, KeyboardInterrupt)):
            return ErrorLevel.CRITICAL
        elif isinstance(error, (MemoryError, OSError)):
            return ErrorLevel.HIGH
        elif isinstance(error, (ValueError, TypeError, AttributeError)):
            return ErrorLevel.MEDIUM
        else:
            return ErrorLevel.LOW
    
    def capture_error(self, error: Exception, context: Dict[str, Any] = None, 
                     user_id: str = None, session_id: str = None) -> str:
        """捕获错误"""
        error_id = self.generate_error_id()
        level = self.determine_error_level(error)
        
        error_report = ErrorReport(
            error_id=error_id,
            timestamp=datetime.now().isoformat(),
            error_type=type(error).__name__,
            error_message=str(error),
            level=level,
            context=context or {},
            traceback_info=traceback.format_exc(),
            user_id=user_id,
            session_id=session_id
        )
        
        self.error_reports.append(error_report)
        
        # 记录到日志
        log_level = {
            ErrorLevel.LOW: logging.INFO,
            ErrorLevel.MEDIUM: logging.WARNING,
            ErrorLevel.HIGH: logging.ERROR,
            ErrorLevel.CRITICAL: logging.CRITICAL
        }[level]
        
        self.logger.log(
            log_level,
            f"错误捕获 [{error_id}] {error_report.error_type}: {error_report.error_message}"
        )
        
        # 如果是严重错误,立即通知
        if level == ErrorLevel.CRITICAL:
            self._send_critical_alert(error_report)
        
        return error_id
    
    def _send_critical_alert(self, error_report: ErrorReport):
        """发送严重错误警报"""
        print(f"\n🚨 严重错误警报 🚨")
        print(f"错误ID: {error_report.error_id}")
        print(f"错误类型: {error_report.error_type}")
        print(f"错误信息: {error_report.error_message}")
        print(f"时间: {error_report.timestamp}")
        if error_report.user_id:
            print(f"用户ID: {error_report.user_id}")
        print("请立即处理!\n")
    
    def get_error_report(self, error_id: str) -> Optional[ErrorReport]:
        """获取错误报告"""
        for report in self.error_reports:
            if report.error_id == error_id:
                return report
        return None
    
    def resolve_error(self, error_id: str, resolution_notes: str) -> bool:
        """解决错误"""
        report = self.get_error_report(error_id)
        if report:
            report.resolved = True
            report.resolution_notes = resolution_notes
            self.logger.info(f"错误 {error_id} 已解决: {resolution_notes}")
            return True
        return False
    
    def get_error_statistics(self) -> Dict[str, Any]:
        """获取错误统计"""
        total_errors = len(self.error_reports)
        if total_errors == 0:
            return {"total_errors": 0}
        
        # 按级别统计
        level_stats = {}
        for level in ErrorLevel:
            level_stats[level.value] = sum(
                1 for report in self.error_reports 
                if report.level == level
            )
        
        # 按类型统计
        type_stats = {}
        for report in self.error_reports:
            error_type = report.error_type
            type_stats[error_type] = type_stats.get(error_type, 0) + 1
        
        # 解决率
        resolved_count = sum(1 for report in self.error_reports if report.resolved)
        resolution_rate = (resolved_count / total_errors) * 100
        
        return {
            "total_errors": total_errors,
            "resolved_errors": resolved_count,
            "unresolved_errors": total_errors - resolved_count,
            "resolution_rate": f"{resolution_rate:.1f}%",
            "level_distribution": level_stats,
            "type_distribution": type_stats
        }
    
    def export_error_reports(self, filename: str = None) -> str:
        """导出错误报告"""
        if filename is None:
            filename = f"error_reports_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        
        reports_data = [asdict(report) for report in self.error_reports]
        
        # 转换枚举为字符串
        for report_data in reports_data:
            report_data['level'] = report_data['level'].value
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(reports_data, f, ensure_ascii=False, indent=2)
        
        self.logger.info(f"错误报告已导出到 {filename}")
        return filename
    
    def get_recent_errors(self, count: int = 10) -> List[ErrorReport]:
        """获取最近的错误"""
        return self.error_reports[-count:]
    
    def get_unresolved_errors(self) -> List[ErrorReport]:
        """获取未解决的错误"""
        return [report for report in self.error_reports if not report.resolved]

class MonitoredApplication:
    """被监控的应用程序示例"""
    
    def __init__(self, error_system: ErrorHandlingSystem):
        self.error_system = error_system
        self.user_sessions = {}
    
    def safe_execute(self, func, *args, user_id=None, session_id=None, **kwargs):
        """安全执行函数"""
        try:
            return func(*args, **kwargs)
        except Exception as e:
            context = {
                'function_name': func.__name__,
                'args': str(args),
                'kwargs': str(kwargs)
            }
            
            error_id = self.error_system.capture_error(
                e, context=context, user_id=user_id, session_id=session_id
            )
            
            return {"error": True, "error_id": error_id, "message": str(e)}
    
    def divide_numbers(self, a, b):
        """除法运算"""
        if b == 0:
            raise ZeroDivisionError("除数不能为零")
        return a / b
    
    def access_list_item(self, lst, index):
        """访问列表项"""
        if not isinstance(lst, list):
            raise TypeError("参数必须是列表")
        return lst[index]
    
    def process_user_data(self, user_data):
        """处理用户数据"""
        if not isinstance(user_data, dict):
            raise TypeError("用户数据必须是字典")
        
        required_fields = ['name', 'email']
        for field in required_fields:
            if field not in user_data:
                raise ValueError(f"缺少必需字段: {field}")
        
        return {"status": "success", "processed_data": user_data}
    
    def simulate_memory_error(self):
        """模拟内存错误"""
        # 这只是模拟,实际不会真的耗尽内存
        raise MemoryError("模拟内存不足错误")
    
    def simulate_critical_error(self):
        """模拟严重错误"""
        raise SystemExit("模拟系统退出错误")

def error_handling_system_demo():
    """错误处理系统演示"""
    print("=== 错误处理系统演示 ===")
    
    # 创建错误处理系统
    error_system = ErrorHandlingSystem()
    app = MonitoredApplication(error_system)
    
    # 模拟各种错误场景
    test_scenarios = [
        # 正常操作
        ("正常除法", lambda: app.divide_numbers(10, 2), "user1", "session1"),
        
        # 除零错误
        ("除零错误", lambda: app.divide_numbers(10, 0), "user1", "session1"),
        
        # 索引错误
        ("索引错误", lambda: app.access_list_item([1, 2, 3], 10), "user2", "session2"),
        
        # 类型错误
        ("类型错误", lambda: app.access_list_item("not a list", 0), "user2", "session2"),
        
        # 数据验证错误
        ("数据验证错误", lambda: app.process_user_data({"name": "张三"}), "user3", "session3"),
        
        # 内存错误
        ("内存错误", lambda: app.simulate_memory_error(), "user4", "session4"),
        
        # 严重错误
        ("严重错误", lambda: app.simulate_critical_error(), "user5", "session5"),
    ]
    
    error_ids = []
    
    for scenario_name, operation, user_id, session_id in test_scenarios:
        print(f"\n--- 测试场景: {scenario_name} ---")
        
        result = app.safe_execute(
            operation, 
            user_id=user_id, 
            session_id=session_id
        )
        
        if isinstance(result, dict) and result.get("error"):
            error_ids.append(result["error_id"])
            print(f"错误已捕获,错误ID: {result['error_id']}")
        else:
            print(f"操作成功,结果: {result}")
    
    # 解决一些错误
    print("\n--- 解决错误 ---")
    if error_ids:
        # 解决第一个错误
        error_system.resolve_error(
            error_ids[0], 
            "已修复除零检查逻辑,添加了输入验证"
        )
        
        # 解决第二个错误
        if len(error_ids) > 1:
            error_system.resolve_error(
                error_ids[1], 
                "已添加边界检查,防止索引越界"
            )
    
    # 显示错误统计
    print("\n--- 错误统计 ---")
    stats = error_system.get_error_statistics()
    print(json.dumps(stats, ensure_ascii=False, indent=2))
    
    # 显示未解决的错误
    print("\n--- 未解决的错误 ---")
    unresolved = error_system.get_unresolved_errors()
    for report in unresolved:
        print(f"错误ID: {report.error_id}")
        print(f"类型: {report.error_type}")
        print(f"级别: {report.level.value}")
        print(f"消息: {report.error_message}")
        print("-" * 40)
    
    # 导出错误报告
    print("\n--- 导出错误报告 ---")
    export_file = error_system.export_error_reports()
    print(f"错误报告已导出到: {export_file}")

# 运行演示
if __name__ == "__main__":
    error_handling_system_demo()

# 七、总结和最佳实践

# 7.1 错误处理最佳实践

  1. 具体化异常处理

    • 捕获具体的异常类型,而不是使用通用的Exception
    • 为不同的错误情况提供不同的处理逻辑
  2. 提供有意义的错误信息

    • 错误信息应该清晰地描述问题
    • 包含足够的上下文信息帮助调试
  3. 使用自定义异常

    • 为业务逻辑创建专门的异常类
    • 异常类应该包含相关的错误信息和上下文
  4. 记录错误日志

    • 使用logging模块记录错误信息
    • 包含时间戳、错误级别、上下文信息
  5. 优雅降级

    • 当发生错误时,程序应该能够优雅地处理
    • 提供备选方案或默认行为

# 7.2 调试技巧总结

  1. 分层调试

    • 从简单的print语句开始
    • 使用logging进行结构化日志记录
    • 使用调试器进行深入分析
  2. 测试驱动调试

    • 编写测试用例重现问题
    • 使用单元测试验证修复
  3. 性能分析

    • 使用性能分析工具识别瓶颈
    • 监控内存使用和CPU时间

# 7.3 下一步学习方向

  1. 高级调试工具

    • 学习使用IDE的调试功能
    • 掌握远程调试技术
  2. 监控和告警

    • 学习应用程序监控
    • 设置错误告警系统
  3. 错误追踪服务

    • 了解Sentry等错误追踪服务
    • 学习分布式系统的错误追踪

# 7.4 练习建议

  1. 创建一个个人项目,实践本章学到的错误处理技巧
  2. 为现有代码添加异常处理,提高代码的健壮性
  3. 编写单元测试,覆盖各种错误场景
  4. 设置日志系统,记录应用程序的运行状态
  5. 模拟各种错误情况,测试错误处理的有效性

通过本章的学习,你应该能够:

  • 理解Python的异常处理机制
  • 创建和使用自定义异常
  • 使用各种调试技巧和工具
  • 建立完整的错误处理和日志系统
  • 编写健壮的、易于调试的Python代码

错误处理和调试是编程中的重要技能,需要在实践中不断提高。记住,好的错误处理不仅能让程序更稳定,还能大大提高开发和维护的效率。