第12天-IO编程

2023/6/15

# 第12天-IO编程

# 一、IO编程概述

# 1.1 什么是IO编程

IO(Input/Output)编程是指程序与外部世界进行数据交换的编程技术。在Python中,IO编程主要包括:

  • 文件IO:读写文件
  • 网络IO:网络通信
  • 标准IO:控制台输入输出
  • 内存IO:内存中的数据流操作

# 1.2 IO编程的重要性

# IO编程的应用场景
print("=== IO编程的应用场景 ===")

# 1. 数据持久化
print("1. 数据持久化:将程序数据保存到文件")
print("   - 配置文件读写")
print("   - 日志记录")
print("   - 数据备份")

# 2. 数据交换
print("\n2. 数据交换:与其他程序或系统交换数据")
print("   - 网络通信")
print("   - API调用")
print("   - 数据库操作")

# 3. 用户交互
print("\n3. 用户交互:与用户进行输入输出")
print("   - 命令行界面")
print("   - 图形界面")
print("   - Web界面")

# 4. 系统集成
print("\n4. 系统集成:与操作系统和其他程序集成")
print("   - 进程间通信")
print("   - 系统调用")
print("   - 外部程序调用")

# 二、文件IO操作

# 2.1 文件操作基础

import os
import shutil
from pathlib import Path

def file_operations_demo():
    """文件操作基础演示"""
    print("=== 文件操作基础 ===")
    
    # 1. 创建文件
    print("\n1. 创建和写入文件")
    
    # 使用open函数创建文件
    with open('demo.txt', 'w', encoding='utf-8') as f:
        f.write('Hello, Python IO!\n')
        f.write('这是第二行\n')
        f.write('这是第三行\n')
    print("文件 demo.txt 已创建")
    
    # 2. 读取文件
    print("\n2. 读取文件内容")
    
    # 读取全部内容
    with open('demo.txt', 'r', encoding='utf-8') as f:
        content = f.read()
        print("全部内容:")
        print(content)
    
    # 按行读取
    with open('demo.txt', 'r', encoding='utf-8') as f:
        print("按行读取:")
        for line_num, line in enumerate(f, 1):
            print(f"第{line_num}行: {line.strip()}")
    
    # 3. 追加内容
    print("\n3. 追加内容")
    with open('demo.txt', 'a', encoding='utf-8') as f:
        f.write('这是追加的内容\n')
    
    # 验证追加结果
    with open('demo.txt', 'r', encoding='utf-8') as f:
        print("追加后的内容:")
        print(f.read())
    
    # 4. 文件信息
    print("\n4. 文件信息")
    file_path = Path('demo.txt')
    if file_path.exists():
        stat = file_path.stat()
        print(f"文件大小: {stat.st_size} 字节")
        print(f"创建时间: {stat.st_ctime}")
        print(f"修改时间: {stat.st_mtime}")
        print(f"是否为文件: {file_path.is_file()}")
        print(f"是否为目录: {file_path.is_dir()}")
    
    # 5. 清理
    if file_path.exists():
        file_path.unlink()
        print("\n文件已删除")

# 运行演示
file_operations_demo()

# 2.2 文件读写模式详解

def file_modes_demo():
    """文件读写模式演示"""
    print("=== 文件读写模式详解 ===")
    
    # 准备测试数据
    test_data = "Hello, World!\n这是测试数据\n第三行内容"
    
    # 1. 文本模式
    print("\n1. 文本模式操作")
    
    # 写入模式 'w' - 覆盖写入
    with open('test_w.txt', 'w', encoding='utf-8') as f:
        f.write(test_data)
    print("'w' 模式:覆盖写入完成")
    
    # 读取模式 'r' - 只读
    with open('test_w.txt', 'r', encoding='utf-8') as f:
        content = f.read()
        print(f"'r' 模式读取:\n{content}")
    
    # 追加模式 'a' - 追加写入
    with open('test_w.txt', 'a', encoding='utf-8') as f:
        f.write("\n追加的内容")
    print("'a' 模式:追加写入完成")
    
    # 读写模式 'r+' - 读写
    with open('test_w.txt', 'r+', encoding='utf-8') as f:
        content = f.read()
        print(f"'r+' 模式读取:\n{content}")
        f.write("\n通过r+模式追加")
    
    # 2. 二进制模式
    print("\n2. 二进制模式操作")
    
    # 二进制写入
    binary_data = b"\x48\x65\x6c\x6c\x6f"  # "Hello" 的字节表示
    with open('test_binary.bin', 'wb') as f:
        f.write(binary_data)
    print("二进制数据写入完成")
    
    # 二进制读取
    with open('test_binary.bin', 'rb') as f:
        data = f.read()
        print(f"二进制数据读取: {data}")
        print(f"转换为字符串: {data.decode('utf-8')}")
    
    # 3. 文件模式组合
    print("\n3. 常用文件模式总结")
    modes = {
        'r': '只读模式(默认)',
        'w': '写入模式(覆盖)',
        'a': '追加模式',
        'r+': '读写模式',
        'w+': '写读模式(覆盖)',
        'a+': '追加读写模式',
        'rb': '二进制只读',
        'wb': '二进制写入',
        'ab': '二进制追加',
        'rb+': '二进制读写',
        'wb+': '二进制写读',
        'ab+': '二进制追加读写'
    }
    
    for mode, description in modes.items():
        print(f"  {mode:4s}: {description}")
    
    # 清理文件
    for filename in ['test_w.txt', 'test_binary.bin']:
        if os.path.exists(filename):
            os.remove(filename)
    print("\n测试文件已清理")

# 运行演示
file_modes_demo()

# 2.3 高级文件操作

import json
import csv
import pickle
from datetime import datetime

def advanced_file_operations():
    """高级文件操作演示"""
    print("=== 高级文件操作 ===")
    
    # 1. JSON文件操作
    print("\n1. JSON文件操作")
    
    # 准备JSON数据
    data = {
        'name': '张三',
        'age': 25,
        'skills': ['Python', 'JavaScript', 'SQL'],
        'address': {
            'city': '北京',
            'district': '朝阳区'
        },
        'timestamp': datetime.now().isoformat()
    }
    
    # 写入JSON文件
    with open('data.json', 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    print("JSON数据已写入 data.json")
    
    # 读取JSON文件
    with open('data.json', 'r', encoding='utf-8') as f:
        loaded_data = json.load(f)
        print(f"读取的JSON数据: {loaded_data}")
    
    # 2. CSV文件操作
    print("\n2. CSV文件操作")
    
    # 写入CSV文件
    csv_data = [
        ['姓名', '年龄', '城市', '薪资'],
        ['张三', 25, '北京', 8000],
        ['李四', 30, '上海', 12000],
        ['王五', 28, '深圳', 10000],
        ['赵六', 32, '广州', 9500]
    ]
    
    with open('employees.csv', 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerows(csv_data)
    print("CSV数据已写入 employees.csv")
    
    # 读取CSV文件
    with open('employees.csv', 'r', encoding='utf-8') as f:
        reader = csv.reader(f)
        print("CSV数据读取:")
        for row_num, row in enumerate(reader):
            print(f"  第{row_num + 1}行: {row}")
    
    # 使用DictReader读取CSV
    with open('employees.csv', 'r', encoding='utf-8') as f:
        dict_reader = csv.DictReader(f)
        print("\n使用DictReader读取:")
        for row in dict_reader:
            print(f"  {row}")
    
    # 3. Pickle序列化
    print("\n3. Pickle序列化操作")
    
    # 复杂数据结构
    complex_data = {
        'list': [1, 2, 3, [4, 5]],
        'dict': {'a': 1, 'b': 2},
        'tuple': (1, 2, 3),
        'set': {1, 2, 3},
        'datetime': datetime.now(),
        'function': lambda x: x * 2
    }
    
    # 序列化到文件
    with open('data.pickle', 'wb') as f:
        pickle.dump(complex_data, f)
    print("复杂数据已序列化到 data.pickle")
    
    # 从文件反序列化
    with open('data.pickle', 'rb') as f:
        loaded_complex_data = pickle.load(f)
        print(f"反序列化的数据: {loaded_complex_data}")
        # 测试函数是否正常
        func = loaded_complex_data['function']
        print(f"函数测试: func(5) = {func(5)}")
    
    # 4. 大文件处理
    print("\n4. 大文件处理技巧")
    
    # 创建一个较大的测试文件
    with open('large_file.txt', 'w', encoding='utf-8') as f:
        for i in range(10000):
            f.write(f"这是第{i+1}行数据,包含一些测试内容\n")
    print("大文件 large_file.txt 已创建")
    
    # 逐行读取大文件(内存友好)
    line_count = 0
    with open('large_file.txt', 'r', encoding='utf-8') as f:
        for line in f:
            line_count += 1
            if line_count <= 5:  # 只显示前5行
                print(f"  {line.strip()}")
    print(f"大文件总行数: {line_count}")
    
    # 分块读取大文件
    chunk_size = 1024  # 1KB
    with open('large_file.txt', 'r', encoding='utf-8') as f:
        chunk_count = 0
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            chunk_count += 1
            if chunk_count == 1:  # 只显示第一个块的部分内容
                print(f"第一个块的前100个字符: {chunk[:100]}...")
    print(f"文件被分为 {chunk_count} 个块读取")
    
    # 清理文件
    files_to_clean = ['data.json', 'employees.csv', 'data.pickle', 'large_file.txt']
    for filename in files_to_clean:
        if os.path.exists(filename):
            os.remove(filename)
    print("\n测试文件已清理")

# 运行演示
advanced_file_operations()

# 三、目录操作

# 3.1 目录基础操作

import os
import shutil
from pathlib import Path
import tempfile

def directory_operations_demo():
    """目录操作演示"""
    print("=== 目录操作演示 ===")
    
    # 1. 获取当前目录信息
    print("\n1. 当前目录信息")
    current_dir = os.getcwd()
    print(f"当前工作目录: {current_dir}")
    print(f"目录内容: {os.listdir('.')}")
    
    # 使用pathlib
    current_path = Path.cwd()
    print(f"使用pathlib获取当前目录: {current_path}")
    
    # 2. 创建目录
    print("\n2. 创建目录")
    
    # 创建单个目录
    test_dir = Path('test_directory')
    test_dir.mkdir(exist_ok=True)
    print(f"目录 {test_dir} 已创建")
    
    # 创建多级目录
    nested_dir = Path('parent/child/grandchild')
    nested_dir.mkdir(parents=True, exist_ok=True)
    print(f"多级目录 {nested_dir} 已创建")
    
    # 3. 遍历目录
    print("\n3. 遍历目录")
    
    # 创建一些测试文件
    (test_dir / 'file1.txt').write_text('内容1', encoding='utf-8')
    (test_dir / 'file2.txt').write_text('内容2', encoding='utf-8')
    (test_dir / 'subdir').mkdir(exist_ok=True)
    (test_dir / 'subdir' / 'file3.txt').write_text('内容3', encoding='utf-8')
    
    # 使用os.walk遍历
    print("使用os.walk遍历:")
    for root, dirs, files in os.walk(test_dir):
        level = root.replace(str(test_dir), '').count(os.sep)
        indent = ' ' * 2 * level
        print(f"{indent}{os.path.basename(root)}/")
        subindent = ' ' * 2 * (level + 1)
        for file in files:
            print(f"{subindent}{file}")
    
    # 使用pathlib遍历
    print("\n使用pathlib遍历:")
    for item in test_dir.rglob('*'):
        if item.is_file():
            print(f"文件: {item}")
        elif item.is_dir():
            print(f"目录: {item}")
    
    # 4. 目录信息
    print("\n4. 目录信息")
    for item in test_dir.iterdir():
        stat = item.stat()
        item_type = "目录" if item.is_dir() else "文件"
        print(f"{item_type}: {item.name}, 大小: {stat.st_size} 字节")
    
    # 5. 复制和移动目录
    print("\n5. 复制和移动目录")
    
    # 复制目录
    backup_dir = Path('test_directory_backup')
    if backup_dir.exists():
        shutil.rmtree(backup_dir)
    shutil.copytree(test_dir, backup_dir)
    print(f"目录已复制到 {backup_dir}")
    
    # 移动目录
    moved_dir = Path('moved_directory')
    if moved_dir.exists():
        shutil.rmtree(moved_dir)
    shutil.move(str(backup_dir), str(moved_dir))
    print(f"目录已移动到 {moved_dir}")
    
    # 6. 删除目录
    print("\n6. 删除目录")
    
    # 删除空目录
    empty_dir = Path('empty_dir')
    empty_dir.mkdir(exist_ok=True)
    empty_dir.rmdir()
    print("空目录已删除")
    
    # 删除非空目录
    shutil.rmtree(test_dir)
    shutil.rmtree(moved_dir)
    shutil.rmtree(nested_dir.parent.parent)  # 删除parent目录
    print("非空目录已删除")

# 运行演示
directory_operations_demo()

# 3.2 路径操作

from pathlib import Path
import os

def path_operations_demo():
    """路径操作演示"""
    print("=== 路径操作演示 ===")
    
    # 1. 路径构建
    print("\n1. 路径构建")
    
    # 使用pathlib构建路径
    path1 = Path('documents') / 'projects' / 'python' / 'main.py'
    print(f"pathlib构建路径: {path1}")
    
    # 使用os.path构建路径
    path2 = os.path.join('documents', 'projects', 'python', 'main.py')
    print(f"os.path构建路径: {path2}")
    
    # 2. 路径解析
    print("\n2. 路径解析")
    
    sample_path = Path('/home/user/documents/project/main.py')
    print(f"完整路径: {sample_path}")
    print(f"父目录: {sample_path.parent}")
    print(f"文件名: {sample_path.name}")
    print(f"文件名(无扩展名): {sample_path.stem}")
    print(f"扩展名: {sample_path.suffix}")
    print(f"所有扩展名: {sample_path.suffixes}")
    print(f"路径部分: {sample_path.parts}")
    
    # 3. 路径判断
    print("\n3. 路径判断")
    
    current_file = Path(__file__) if '__file__' in globals() else Path('demo.py')
    print(f"测试路径: {current_file}")
    print(f"是否存在: {current_file.exists()}")
    print(f"是否为文件: {current_file.is_file()}")
    print(f"是否为目录: {current_file.is_dir()}")
    print(f"是否为绝对路径: {current_file.is_absolute()}")
    
    # 4. 路径转换
    print("\n4. 路径转换")
    
    relative_path = Path('documents/project/main.py')
    print(f"相对路径: {relative_path}")
    
    # 转换为绝对路径
    absolute_path = relative_path.resolve()
    print(f"绝对路径: {absolute_path}")
    
    # 获取相对路径
    try:
        current_dir = Path.cwd()
        relative_to_current = absolute_path.relative_to(current_dir)
        print(f"相对于当前目录: {relative_to_current}")
    except ValueError:
        print("路径不在当前目录下")
    
    # 5. 路径匹配
    print("\n5. 路径匹配")
    
    test_paths = [
        Path('documents/project1/main.py'),
        Path('documents/project2/test.py'),
        Path('documents/project1/utils.py'),
        Path('images/photo.jpg'),
        Path('documents/readme.txt')
    ]
    
    print("Python文件:")
    for path in test_paths:
        if path.match('*.py'):
            print(f"  {path}")
    
    print("\nproject1目录下的文件:")
    for path in test_paths:
        if path.match('*/project1/*'):
            print(f"  {path}")
    
    print("\ndocuments目录下的所有文件:")
    for path in test_paths:
        if path.match('documents/**/*'):
            print(f"  {path}")
    
    # 6. 路径操作实用函数
    print("\n6. 路径操作实用函数")
    
    def safe_path_join(*parts):
        """安全的路径连接"""
        return Path(*parts)
    
    def get_file_info(file_path):
        """获取文件信息"""
        path = Path(file_path)
        if not path.exists():
            return None
        
        stat = path.stat()
        return {
            'name': path.name,
            'size': stat.st_size,
            'modified': stat.st_mtime,
            'is_file': path.is_file(),
            'is_dir': path.is_dir(),
            'parent': str(path.parent),
            'extension': path.suffix
        }
    
    def find_files_by_extension(directory, extension):
        """按扩展名查找文件"""
        path = Path(directory)
        if not path.exists() or not path.is_dir():
            return []
        
        pattern = f"**/*{extension}"
        return list(path.glob(pattern))
    
    # 测试实用函数
    test_path = safe_path_join('documents', 'project', 'main.py')
    print(f"安全路径连接: {test_path}")
    
    # 创建测试文件来演示
    demo_file = Path('demo_file.txt')
    demo_file.write_text('测试内容', encoding='utf-8')
    
    file_info = get_file_info(demo_file)
    if file_info:
        print(f"文件信息: {file_info}")
    
    # 清理
    demo_file.unlink()
    print("\n演示文件已清理")

# 运行演示
path_operations_demo()

# 四、标准输入输出

# 4.1 控制台输入输出

import sys
import getpass
from datetime import datetime

def console_io_demo():
    """控制台输入输出演示"""
    print("=== 控制台输入输出演示 ===")
    
    # 1. 基本输出
    print("\n1. 基本输出方式")
    
    # 普通输出
    print("这是普通输出")
    
    # 格式化输出
    name = "张三"
    age = 25
    print(f"姓名: {name}, 年龄: {age}")
    print("姓名: {}, 年龄: {}".format(name, age))
    print("姓名: %s, 年龄: %d" % (name, age))
    
    # 输出到不同流
    print("正常信息", file=sys.stdout)
    print("错误信息", file=sys.stderr)
    
    # 控制输出结束符
    print("第一部分", end=" ")
    print("第二部分", end=" ")
    print("第三部分")  # 默认换行
    
    # 2. 高级输出格式
    print("\n2. 高级输出格式")
    
    # 表格输出
    data = [
        ['姓名', '年龄', '城市'],
        ['张三', 25, '北京'],
        ['李四', 30, '上海'],
        ['王五', 28, '深圳']
    ]
    
    print("表格输出:")
    for row in data:
        print(f"{row[0]:8s} {row[1]:>3} {row[2]:6s}")
    
    # 进度条输出
    print("\n进度条演示:")
    import time
    for i in range(21):
        percent = i * 5
        bar = '█' * i + '░' * (20 - i)
        print(f"\r进度: [{bar}] {percent}%", end='', flush=True)
        time.sleep(0.1)
    print()  # 换行
    
    # 3. 彩色输出
    print("\n3. 彩色输出")
    
    # ANSI颜色代码
    colors = {
        'red': '\033[31m',
        'green': '\033[32m',
        'yellow': '\033[33m',
        'blue': '\033[34m',
        'magenta': '\033[35m',
        'cyan': '\033[36m',
        'white': '\033[37m',
        'reset': '\033[0m'
    }
    
    for color_name, color_code in colors.items():
        if color_name != 'reset':
            print(f"{color_code}这是{color_name}颜色的文本{colors['reset']}")
    
    # 4. 输入演示(注释掉以避免阻塞)
    print("\n4. 输入方式(演示代码)")
    
    # 基本输入
    print("# 基本输入")
    print("# user_input = input('请输入您的姓名: ')")
    print("# print(f'您好, {user_input}!')")
    
    # 数字输入
    print("\n# 数字输入")
    print("# try:")
    print("#     age = int(input('请输入您的年龄: '))")
    print("#     print(f'您的年龄是: {age}')")
    print("# except ValueError:")
    print("#     print('请输入有效的数字')")
    
    # 密码输入
    print("\n# 密码输入")
    print("# password = getpass.getpass('请输入密码: ')")
    print("# print('密码已输入(不显示)')")
    
    # 5. 命令行参数
    print("\n5. 命令行参数")
    print(f"脚本名称: {sys.argv[0] if sys.argv else 'unknown'}")
    print(f"参数列表: {sys.argv[1:] if len(sys.argv) > 1 else '无参数'}")
    print(f"参数数量: {len(sys.argv) - 1}")

def interactive_menu_demo():
    """交互式菜单演示"""
    print("\n=== 交互式菜单演示 ===")
    
    def show_menu():
        """显示菜单"""
        print("\n" + "="*30)
        print("      主菜单")
        print("="*30)
        print("1. 查看当前时间")
        print("2. 计算器")
        print("3. 文件操作")
        print("4. 系统信息")
        print("0. 退出")
        print("="*30)
    
    def get_current_time():
        """获取当前时间"""
        now = datetime.now()
        print(f"当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
    
    def calculator():
        """简单计算器"""
        print("简单计算器(输入 'q' 退出)")
        while True:
            try:
                expr = input("请输入表达式: ").strip()
                if expr.lower() == 'q':
                    break
                result = eval(expr)  # 注意:实际应用中应该使用更安全的方法
                print(f"结果: {result}")
            except Exception as e:
                print(f"错误: {e}")
    
    def file_operations():
        """文件操作"""
        print("当前目录文件列表:")
        import os
        files = os.listdir('.')
        for i, file in enumerate(files[:10], 1):  # 只显示前10个
            print(f"  {i}. {file}")
        if len(files) > 10:
            print(f"  ... 还有 {len(files) - 10} 个文件")
    
    def system_info():
        """系统信息"""
        import platform
        print(f"操作系统: {platform.system()}")
        print(f"系统版本: {platform.version()}")
        print(f"处理器: {platform.processor()}")
        print(f"Python版本: {platform.python_version()}")
    
    # 菜单功能映射
    menu_functions = {
        '1': get_current_time,
        '2': calculator,
        '3': file_operations,
        '4': system_info
    }
    
    print("交互式菜单演示(模拟)")
    print("在实际应用中,这里会有真正的用户交互")
    
    # 模拟用户选择
    demo_choices = ['1', '3', '4']
    for choice in demo_choices:
        print(f"\n模拟用户选择: {choice}")
        if choice in menu_functions:
            menu_functions[choice]()
        elif choice == '0':
            print("退出程序")
            break
        else:
            print("无效选择")

# 运行演示
console_io_demo()
interactive_menu_demo()

# 4.2 格式化输出进阶

from datetime import datetime
import locale

def advanced_formatting_demo():
    """高级格式化输出演示"""
    print("=== 高级格式化输出演示 ===")
    
    # 1. 数字格式化
    print("\n1. 数字格式化")
    
    number = 1234567.89
    print(f"原始数字: {number}")
    print(f"千分位分隔: {number:,}")
    print(f"保留2位小数: {number:.2f}")
    print(f"科学计数法: {number:.2e}")
    print(f"百分比: {0.1234:.2%}")
    print(f"填充零: {42:08d}")
    print(f"右对齐: {42:>10d}")
    print(f"左对齐: {42:<10d}")
    print(f"居中对齐: {42:^10d}")
    
    # 2. 字符串格式化
    print("\n2. 字符串格式化")
    
    text = "Python"
    print(f"原始字符串: '{text}'")
    print(f"右对齐(15): '{text:>15s}'")
    print(f"左对齐(15): '{text:<15s}'")
    print(f"居中对齐(15): '{text:^15s}'")
    print(f"填充字符: '{text:*^15s}'")
    print(f"截断: '{text:.3s}'")
    
    # 3. 日期时间格式化
    print("\n3. 日期时间格式化")
    
    now = datetime.now()
    print(f"当前时间: {now}")
    print(f"日期: {now:%Y-%m-%d}")
    print(f"时间: {now:%H:%M:%S}")
    print(f"完整格式: {now:%Y年%m月%d日 %H时%M分%S秒}")
    print(f"星期: {now:%A}")
    print(f"月份: {now:%B}")
    
    # 4. 自定义格式化类
    print("\n4. 自定义格式化类")
    
    class Person:
        def __init__(self, name, age, salary):
            self.name = name
            self.age = age
            self.salary = salary
        
        def __format__(self, format_spec):
            if format_spec == 'short':
                return f"{self.name}({self.age})"
            elif format_spec == 'long':
                return f"{self.name}, {self.age}岁, 薪资{self.salary:,}元"
            elif format_spec == 'salary':
                return f"{self.salary:,.2f}"
            else:
                return str(self)
        
        def __str__(self):
            return f"Person(name='{self.name}', age={self.age}, salary={self.salary})"
    
    person = Person("张三", 30, 8500.50)
    print(f"默认格式: {person}")
    print(f"短格式: {person:short}")
    print(f"长格式: {person:long}")
    print(f"薪资格式: {person:salary}")
    
    # 5. 表格格式化
    print("\n5. 表格格式化")
    
    def print_table(data, headers, widths=None):
        """打印格式化表格"""
        if widths is None:
            widths = [max(len(str(row[i])) for row in [headers] + data) + 2 
                     for i in range(len(headers))]
        
        # 打印分隔线
        separator = '+' + '+'.join('-' * width for width in widths) + '+'
        print(separator)
        
        # 打印表头
        header_row = '|' + '|'.join(f"{headers[i]:^{widths[i]}}" for i in range(len(headers))) + '|'
        print(header_row)
        print(separator)
        
        # 打印数据行
        for row in data:
            data_row = '|' + '|'.join(f"{str(row[i]):^{widths[i]}}" for i in range(len(row))) + '|'
            print(data_row)
        
        print(separator)
    
    # 示例数据
    headers = ['姓名', '年龄', '城市', '薪资']
    table_data = [
        ['张三', 25, '北京', '8,500'],
        ['李四', 30, '上海', '12,000'],
        ['王五', 28, '深圳', '10,500'],
        ['赵六', 32, '广州', '9,800']
    ]
    
    print_table(table_data, headers)
    
    # 6. 进度条和状态显示
    print("\n6. 进度条和状态显示")
    
    def progress_bar(current, total, width=50, prefix='进度', suffix='完成'):
        """显示进度条"""
        percent = current / total
        filled_width = int(width * percent)
        bar = '█' * filled_width + '░' * (width - filled_width)
        return f"{prefix}: [{bar}] {percent:.1%} {suffix}"
    
    # 模拟进度
    import time
    total_steps = 20
    for i in range(total_steps + 1):
        progress = progress_bar(i, total_steps)
        print(f"\r{progress}", end='', flush=True)
        time.sleep(0.05)
    print()  # 换行
    
    # 7. 多行格式化
    print("\n7. 多行格式化")
    
    template = """
    ╔══════════════════════════════════════╗
    ║              用户信息                ║
    ╠══════════════════════════════════════╣
    ║ 姓名: {name:<30} ║
    ║ 年龄: {age:<30} ║
    ║ 邮箱: {email:<30} ║
    ║ 注册时间: {reg_time:<26} ║
    ╚══════════════════════════════════════╝
    """
    
    user_info = {
        'name': '张三',
        'age': 25,
        'email': 'zhangsan@example.com',
        'reg_time': '2023-01-15 10:30:00'
    }
    
    print(template.format(**user_info))

# 运行演示
advanced_formatting_demo()

# 五、内存IO操作

# 5.1 StringIO和BytesIO

from io import StringIO, BytesIO
import json

def memory_io_demo():
    """内存IO操作演示"""
    print("=== 内存IO操作演示 ===")
    
    # 1. StringIO - 字符串流
    print("\n1. StringIO操作")
    
    # 创建StringIO对象
    string_buffer = StringIO()
    
    # 写入数据
    string_buffer.write("Hello, ")
    string_buffer.write("StringIO!\n")
    string_buffer.write("这是第二行\n")
    
    # 获取当前位置
    print(f"当前位置: {string_buffer.tell()}")
    
    # 获取全部内容
    content = string_buffer.getvalue()
    print(f"StringIO内容:\n{content}")
    
    # 重置位置并读取
    string_buffer.seek(0)
    line1 = string_buffer.readline()
    line2 = string_buffer.readline()
    print(f"第一行: {line1.strip()}")
    print(f"第二行: {line2.strip()}")
    
    # 关闭StringIO
    string_buffer.close()
    
    # 2. BytesIO - 字节流
    print("\n2. BytesIO操作")
    
    # 创建BytesIO对象
    bytes_buffer = BytesIO()
    
    # 写入字节数据
    bytes_buffer.write(b"Hello, ")
    bytes_buffer.write(b"BytesIO!\n")
    bytes_buffer.write("这是中文".encode('utf-8'))
    
    # 获取字节内容
    byte_content = bytes_buffer.getvalue()
    print(f"BytesIO内容: {byte_content}")
    print(f"解码后: {byte_content.decode('utf-8')}")
    
    # 重置位置并读取
    bytes_buffer.seek(0)
    chunk1 = bytes_buffer.read(7)
    chunk2 = bytes_buffer.read(8)
    print(f"第一块: {chunk1}")
    print(f"第二块: {chunk2}")
    
    bytes_buffer.close()
    
    # 3. 实际应用示例
    print("\n3. 实际应用示例")
    
    def create_csv_in_memory(data):
        """在内存中创建CSV"""
        import csv
        
        output = StringIO()
        writer = csv.writer(output)
        
        # 写入表头
        if data:
            writer.writerow(data[0].keys())
            
            # 写入数据
            for row in data:
                writer.writerow(row.values())
        
        csv_content = output.getvalue()
        output.close()
        return csv_content
    
    # 测试数据
    sample_data = [
        {'name': '张三', 'age': 25, 'city': '北京'},
        {'name': '李四', 'age': 30, 'city': '上海'},
        {'name': '王五', 'age': 28, 'city': '深圳'}
    ]
    
    csv_result = create_csv_in_memory(sample_data)
    print("内存中生成的CSV:")
    print(csv_result)
    
    def process_json_in_memory(data):
        """在内存中处理JSON"""
        # 序列化到内存
        json_buffer = StringIO()
        json.dump(data, json_buffer, ensure_ascii=False, indent=2)
        
        # 获取JSON字符串
        json_str = json_buffer.getvalue()
        json_buffer.close()
        
        # 从字符串反序列化
        json_input = StringIO(json_str)
        loaded_data = json.load(json_input)
        json_input.close()
        
        return json_str, loaded_data
    
    json_str, loaded_data = process_json_in_memory(sample_data)
    print("\n内存中处理的JSON:")
    print(json_str)
    print(f"\n反序列化结果: {loaded_data}")

# 运行演示
memory_io_demo()

# 5.2 临时文件操作

import tempfile
import os
from pathlib import Path

def temp_file_demo():
    """临时文件操作演示"""
    print("=== 临时文件操作演示 ===")
    
    # 1. 临时文件
    print("\n1. 临时文件操作")
    
    # 创建临时文件
    with tempfile.NamedTemporaryFile(mode='w+', encoding='utf-8', delete=False) as temp_file:
        temp_file_path = temp_file.name
        print(f"临时文件路径: {temp_file_path}")
        
        # 写入数据
        temp_file.write("这是临时文件的内容\n")
        temp_file.write("第二行内容\n")
        
        # 重置位置并读取
        temp_file.seek(0)
        content = temp_file.read()
        print(f"临时文件内容:\n{content}")
    
    # 手动删除临时文件
    os.unlink(temp_file_path)
    print("临时文件已删除")
    
    # 2. 自动删除的临时文件
    print("\n2. 自动删除的临时文件")
    
    with tempfile.NamedTemporaryFile(mode='w+', encoding='utf-8') as auto_temp:
        print(f"自动删除临时文件: {auto_temp.name}")
        auto_temp.write("这个文件会自动删除")
        auto_temp.seek(0)
        print(f"内容: {auto_temp.read()}")
    # 文件在这里自动删除
    print("临时文件已自动删除")
    
    # 3. 临时目录
    print("\n3. 临时目录操作")
    
    with tempfile.TemporaryDirectory() as temp_dir:
        print(f"临时目录: {temp_dir}")
        
        # 在临时目录中创建文件
        temp_file_path = Path(temp_dir) / 'test.txt'
        temp_file_path.write_text('临时目录中的文件', encoding='utf-8')
        
        # 创建子目录
        sub_dir = Path(temp_dir) / 'subdir'
        sub_dir.mkdir()
        (sub_dir / 'subfile.txt').write_text('子目录文件', encoding='utf-8')
        
        # 列出临时目录内容
        print("临时目录内容:")
        for item in Path(temp_dir).rglob('*'):
            print(f"  {item}")
    # 临时目录及其内容在这里自动删除
    print("临时目录已自动删除")
    
    # 4. 自定义临时文件
    print("\n4. 自定义临时文件")
    
    # 指定前缀和后缀
    with tempfile.NamedTemporaryFile(
        mode='w+', 
        prefix='myapp_', 
        suffix='.log', 
        encoding='utf-8'
    ) as custom_temp:
        print(f"自定义临时文件: {custom_temp.name}")
        custom_temp.write("自定义前缀和后缀的临时文件")
    
    # 5. 获取临时目录信息
    print("\n5. 临时目录信息")
    
    temp_dir = tempfile.gettempdir()
    print(f"系统临时目录: {temp_dir}")
    
    # 创建唯一的临时文件名
    temp_name = tempfile.mktemp(suffix='.txt', prefix='unique_')
    print(f"唯一临时文件名: {temp_name}")
    
    # 注意:mktemp只生成名称,不创建文件,需要手动创建和删除
    
    # 6. 临时文件的实际应用
    print("\n6. 临时文件实际应用")
    
    def process_large_data_with_temp(data_generator):
        """使用临时文件处理大量数据"""
        with tempfile.NamedTemporaryFile(mode='w+', encoding='utf-8') as temp_file:
            # 写入大量数据到临时文件
            for i, data in enumerate(data_generator):
                temp_file.write(f"{i}: {data}\n")
            
            # 重置位置并处理数据
            temp_file.seek(0)
            line_count = 0
            total_length = 0
            
            for line in temp_file:
                line_count += 1
                total_length += len(line)
            
            return {
                'lines': line_count,
                'total_chars': total_length,
                'temp_file': temp_file.name
            }
    
    # 模拟大量数据
    def data_generator():
        for i in range(1000):
            yield f"数据项 {i} - 这是一些测试数据"
    
    result = process_large_data_with_temp(data_generator())
    print(f"处理结果: {result}")
    
    def create_temp_config(config_data):
        """创建临时配置文件"""
        import json
        
        with tempfile.NamedTemporaryFile(
            mode='w', 
            suffix='.json', 
            delete=False, 
            encoding='utf-8'
        ) as config_file:
            json.dump(config_data, config_file, ensure_ascii=False, indent=2)
            return config_file.name
    
    # 创建临时配置
    config = {
        'database': {
            'host': 'localhost',
            'port': 5432,
            'name': 'testdb'
        },
        'logging': {
            'level': 'INFO',
            'file': '/tmp/app.log'
        }
    }
    
    config_file_path = create_temp_config(config)
    print(f"\n临时配置文件: {config_file_path}")
    
    # 读取配置文件
    with open(config_file_path, 'r', encoding='utf-8') as f:
        loaded_config = json.load(f)
        print(f"加载的配置: {loaded_config}")
    
    # 清理临时配置文件
    os.unlink(config_file_path)
    print("临时配置文件已删除")

# 运行演示
temp_file_demo()

# 六、实战练习:文件管理系统

# 6.1 构建一个完整的文件管理系统

import os
import shutil
import json
import hashlib
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
import tempfile

@dataclass
class FileInfo:
    """文件信息数据类"""
    path: str
    name: str
    size: int
    modified: float
    created: float
    is_directory: bool
    extension: str
    hash_md5: Optional[str] = None

class FileManager:
    """文件管理系统"""
    
    def __init__(self, base_directory: str = None):
        self.base_directory = Path(base_directory) if base_directory else Path.cwd()
        self.file_index: Dict[str, FileInfo] = {}
        self.operation_log: List[Dict] = []
        
    def scan_directory(self, directory: str = None) -> List[FileInfo]:
        """扫描目录并建立文件索引"""
        scan_dir = Path(directory) if directory else self.base_directory
        
        if not scan_dir.exists() or not scan_dir.is_dir():
            raise ValueError(f"目录不存在或不是有效目录: {scan_dir}")
        
        file_list = []
        
        for item in scan_dir.rglob('*'):
            try:
                stat = item.stat()
                
                file_info = FileInfo(
                    path=str(item),
                    name=item.name,
                    size=stat.st_size,
                    modified=stat.st_mtime,
                    created=stat.st_ctime,
                    is_directory=item.is_dir(),
                    extension=item.suffix.lower() if item.suffix else ''
                )
                
                # 为文件计算MD5哈希
                if not file_info.is_directory and file_info.size < 10 * 1024 * 1024:  # 小于10MB
                    try:
                        file_info.hash_md5 = self._calculate_md5(item)
                    except Exception:
                        pass  # 忽略无法读取的文件
                
                file_list.append(file_info)
                self.file_index[str(item)] = file_info
                
            except (OSError, PermissionError):
                continue  # 跳过无法访问的文件
        
        self._log_operation('scan_directory', {'directory': str(scan_dir), 'files_found': len(file_list)})
        return file_list
    
    def _calculate_md5(self, file_path: Path) -> str:
        """计算文件MD5哈希值"""
        hash_md5 = hashlib.md5()
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()
    
    def find_files(self, **criteria) -> List[FileInfo]:
        """根据条件查找文件"""
        results = []
        
        for file_info in self.file_index.values():
            match = True
            
            # 按名称模式匹配
            if 'name_pattern' in criteria:
                import fnmatch
                if not fnmatch.fnmatch(file_info.name.lower(), criteria['name_pattern'].lower()):
                    match = False
            
            # 按扩展名匹配
            if 'extension' in criteria:
                if file_info.extension != criteria['extension'].lower():
                    match = False
            
            # 按大小范围匹配
            if 'min_size' in criteria and file_info.size < criteria['min_size']:
                match = False
            if 'max_size' in criteria and file_info.size > criteria['max_size']:
                match = False
            
            # 按修改时间匹配
            if 'modified_after' in criteria and file_info.modified < criteria['modified_after']:
                match = False
            if 'modified_before' in criteria and file_info.modified > criteria['modified_before']:
                match = False
            
            # 按文件类型匹配
            if 'is_directory' in criteria and file_info.is_directory != criteria['is_directory']:
                match = False
            
            if match:
                results.append(file_info)
        
        self._log_operation('find_files', {'criteria': criteria, 'results_count': len(results)})
        return results
    
    def find_duplicates(self) -> Dict[str, List[FileInfo]]:
        """查找重复文件"""
        hash_groups = {}
        
        for file_info in self.file_index.values():
            if not file_info.is_directory and file_info.hash_md5:
                if file_info.hash_md5 not in hash_groups:
                    hash_groups[file_info.hash_md5] = []
                hash_groups[file_info.hash_md5].append(file_info)
        
        # 只返回有重复的组
        duplicates = {hash_val: files for hash_val, files in hash_groups.items() if len(files) > 1}
        
        self._log_operation('find_duplicates', {'duplicate_groups': len(duplicates)})
        return duplicates
    
    def organize_files(self, source_dir: str, target_dir: str, organize_by: str = 'extension'):
        """按指定规则整理文件"""
        source_path = Path(source_dir)
        target_path = Path(target_dir)
        
        if not source_path.exists():
            raise ValueError(f"源目录不存在: {source_path}")
        
        target_path.mkdir(parents=True, exist_ok=True)
        moved_files = 0
        
        for file_path in source_path.rglob('*'):
            if file_path.is_file():
                # 确定目标子目录
                if organize_by == 'extension':
                    ext = file_path.suffix.lower() or 'no_extension'
                    sub_dir = target_path / ext.lstrip('.')
                elif organize_by == 'date':
                    modified_date = datetime.fromtimestamp(file_path.stat().st_mtime)
                    sub_dir = target_path / modified_date.strftime('%Y') / modified_date.strftime('%m')
                elif organize_by == 'size':
                    size = file_path.stat().st_size
                    if size < 1024 * 1024:  # < 1MB
                        sub_dir = target_path / 'small'
                    elif size < 10 * 1024 * 1024:  # < 10MB
                        sub_dir = target_path / 'medium'
                    else:
                        sub_dir = target_path / 'large'
                else:
                    sub_dir = target_path / 'others'
                
                # 创建目标目录
                sub_dir.mkdir(parents=True, exist_ok=True)
                
                # 移动文件
                target_file = sub_dir / file_path.name
                
                # 处理文件名冲突
                counter = 1
                while target_file.exists():
                    stem = file_path.stem
                    suffix = file_path.suffix
                    target_file = sub_dir / f"{stem}_{counter}{suffix}"
                    counter += 1
                
                shutil.move(str(file_path), str(target_file))
                moved_files += 1
        
        self._log_operation('organize_files', {
            'source': str(source_path),
            'target': str(target_path),
            'organize_by': organize_by,
            'moved_files': moved_files
        })
        
        return moved_files
    
    def backup_files(self, source_paths: List[str], backup_dir: str, 
                    compression: bool = True) -> str:
        """备份文件"""
        backup_path = Path(backup_dir)
        backup_path.mkdir(parents=True, exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        if compression:
            import zipfile
            backup_file = backup_path / f"backup_{timestamp}.zip"
            
            with zipfile.ZipFile(backup_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
                for source_path in source_paths:
                    source = Path(source_path)
                    if source.exists():
                        if source.is_file():
                            zipf.write(source, source.name)
                        else:
                            for file_path in source.rglob('*'):
                                if file_path.is_file():
                                    arcname = file_path.relative_to(source.parent)
                                    zipf.write(file_path, arcname)
        else:
            backup_file = backup_path / f"backup_{timestamp}"
            backup_file.mkdir(exist_ok=True)
            
            for source_path in source_paths:
                source = Path(source_path)
                if source.exists():
                    target = backup_file / source.name
                    if source.is_file():
                        shutil.copy2(source, target)
                    else:
                        shutil.copytree(source, target, dirs_exist_ok=True)
        
        self._log_operation('backup_files', {
            'sources': source_paths,
            'backup_file': str(backup_file),
            'compression': compression
        })
        
        return str(backup_file)
    
    def clean_empty_directories(self, directory: str = None) -> int:
        """清理空目录"""
        clean_dir = Path(directory) if directory else self.base_directory
        removed_count = 0
        
        # 从最深层开始清理
        for dir_path in sorted(clean_dir.rglob('*'), key=lambda p: len(p.parts), reverse=True):
            if dir_path.is_dir():
                try:
                    if not any(dir_path.iterdir()):  # 目录为空
                        dir_path.rmdir()
                        removed_count += 1
                except OSError:
                    continue  # 跳过无法删除的目录
        
        self._log_operation('clean_empty_directories', {
            'directory': str(clean_dir),
            'removed_count': removed_count
        })
        
        return removed_count
    
    def get_directory_stats(self, directory: str = None) -> Dict:
        """获取目录统计信息"""
        stats_dir = Path(directory) if directory else self.base_directory
        
        stats = {
            'total_files': 0,
            'total_directories': 0,
            'total_size': 0,
            'file_types': {},
            'size_distribution': {'small': 0, 'medium': 0, 'large': 0},
            'largest_files': [],
            'newest_files': [],
            'oldest_files': []
        }
        
        all_files = []
        
        for item in stats_dir.rglob('*'):
            try:
                if item.is_file():
                    stats['total_files'] += 1
                    size = item.stat().st_size
                    stats['total_size'] += size
                    
                    # 文件类型统计
                    ext = item.suffix.lower() or 'no_extension'
                    stats['file_types'][ext] = stats['file_types'].get(ext, 0) + 1
                    
                    # 大小分布
                    if size < 1024 * 1024:  # < 1MB
                        stats['size_distribution']['small'] += 1
                    elif size < 10 * 1024 * 1024:  # < 10MB
                        stats['size_distribution']['medium'] += 1
                    else:
                        stats['size_distribution']['large'] += 1
                    
                    # 收集文件信息用于排序
                    file_stat = item.stat()
                    all_files.append({
                        'path': str(item),
                        'size': size,
                        'modified': file_stat.st_mtime
                    })
                    
                elif item.is_dir():
                    stats['total_directories'] += 1
                    
            except (OSError, PermissionError):
                continue
        
        # 最大文件(前10个)
        stats['largest_files'] = sorted(all_files, key=lambda x: x['size'], reverse=True)[:10]
        
        # 最新文件(前10个)
        stats['newest_files'] = sorted(all_files, key=lambda x: x['modified'], reverse=True)[:10]
        
        # 最旧文件(前10个)
        stats['oldest_files'] = sorted(all_files, key=lambda x: x['modified'])[:10]
        
        return stats
    
    def _log_operation(self, operation: str, details: Dict):
        """记录操作日志"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'operation': operation,
            'details': details
        }
        self.operation_log.append(log_entry)
    
    def export_index(self, filename: str):
        """导出文件索引"""
        export_data = {
            'timestamp': datetime.now().isoformat(),
            'base_directory': str(self.base_directory),
            'file_count': len(self.file_index),
            'files': [asdict(file_info) for file_info in self.file_index.values()]
        }
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(export_data, f, ensure_ascii=False, indent=2)
        
        self._log_operation('export_index', {'filename': filename})
    
    def get_operation_log(self) -> List[Dict]:
        """获取操作日志"""
        return self.operation_log.copy()

def file_manager_demo():
    """文件管理系统演示"""
    print("=== 文件管理系统演示 ===")
    
    # 创建临时测试环境
    with tempfile.TemporaryDirectory() as temp_dir:
        print(f"\n使用临时目录: {temp_dir}")
        
        # 创建测试文件结构
        test_dir = Path(temp_dir)
        
        # 创建不同类型的文件
        (test_dir / 'documents').mkdir()
        (test_dir / 'images').mkdir()
        (test_dir / 'code').mkdir()
        (test_dir / 'archive').mkdir()
        
        # 创建测试文件
        test_files = [
            ('documents/report.txt', '这是一个报告文件'),
            ('documents/notes.md', '# 笔记\n\n这是一些笔记'),
            ('images/photo1.jpg', b'\xFF\xD8\xFF\xE0'),  # JPEG文件头
            ('images/photo2.png', b'\x89PNG\r\n\x1a\n'),  # PNG文件头
            ('code/main.py', 'print("Hello, World!")'),
            ('code/utils.py', 'def helper(): pass'),
            ('archive/data.zip', b'PK\x03\x04'),  # ZIP文件头
            ('readme.txt', '项目说明文件'),
        ]
        
        for file_path, content in test_files:
            full_path = test_dir / file_path
            if isinstance(content, str):
                full_path.write_text(content, encoding='utf-8')
            else:
                full_path.write_bytes(content)
        
        # 创建一些重复文件
        (test_dir / 'documents' / 'copy_report.txt').write_text('这是一个报告文件', encoding='utf-8')
        
        print("测试文件结构已创建")
        
        # 初始化文件管理器
        fm = FileManager(temp_dir)
        
        # 1. 扫描目录
        print("\n1. 扫描目录")
        files = fm.scan_directory()
        print(f"发现 {len(files)} 个文件和目录")
        
        # 2. 查找文件
        print("\n2. 查找文件")
        
        # 查找Python文件
        python_files = fm.find_files(extension='.py')
        print(f"Python文件: {len(python_files)} 个")
        for file_info in python_files:
            print(f"  {file_info.name} ({file_info.size} 字节)")
        
        # 查找文本文件
        text_files = fm.find_files(name_pattern='*.txt')
        print(f"\n文本文件: {len(text_files)} 个")
        for file_info in text_files:
            print(f"  {file_info.name}")
        
        # 3. 查找重复文件
        print("\n3. 查找重复文件")
        duplicates = fm.find_duplicates()
        if duplicates:
            for hash_val, files in duplicates.items():
                print(f"重复文件组 (MD5: {hash_val[:8]}...):")
                for file_info in files:
                    print(f"  {file_info.path}")
        else:
            print("未发现重复文件")
        
        # 4. 获取目录统计
        print("\n4. 目录统计")
        stats = fm.get_directory_stats()
        print(f"总文件数: {stats['total_files']}")
        print(f"总目录数: {stats['total_directories']}")
        print(f"总大小: {stats['total_size']} 字节")
        print("文件类型分布:")
        for ext, count in stats['file_types'].items():
            print(f"  {ext}: {count} 个")
        
        # 5. 整理文件
        print("\n5. 整理文件")
        organize_source = test_dir / 'documents'
        organize_target = test_dir / 'organized'
        
        moved_count = fm.organize_files(
            str(organize_source), 
            str(organize_target), 
            organize_by='extension'
        )
        print(f"已整理 {moved_count} 个文件")
        
        # 6. 备份文件
        print("\n6. 备份文件")
        backup_sources = [str(test_dir / 'code'), str(test_dir / 'readme.txt')]
        backup_dir = test_dir / 'backups'
        
        backup_file = fm.backup_files(backup_sources, str(backup_dir), compression=True)
        print(f"备份已创建: {backup_file}")
        
        # 7. 清理空目录
        print("\n7. 清理空目录")
        removed_count = fm.clean_empty_directories()
        print(f"已删除 {removed_count} 个空目录")
        
        # 8. 导出索引
        print("\n8. 导出文件索引")
        index_file = test_dir / 'file_index.json'
        fm.export_index(str(index_file))
        print(f"文件索引已导出到: {index_file}")
        
        # 9. 查看操作日志
        print("\n9. 操作日志")
        log = fm.get_operation_log()
        print(f"共记录 {len(log)} 个操作:")
        for entry in log[-3:]:  # 显示最后3个操作
            print(f"  {entry['timestamp']}: {entry['operation']}")
        
        print("\n文件管理系统演示完成")

# 运行演示
file_manager_demo()

# 七、总结和最佳实践

# 7.1 IO编程最佳实践

  1. 使用上下文管理器

    • 始终使用with语句处理文件操作
    • 确保资源正确释放
  2. 选择合适的文件模式

    • 明确区分文本模式和二进制模式
    • 根据需求选择读写模式
  3. 处理编码问题

    • 明确指定文件编码(通常使用UTF-8)
    • 处理编码错误
  4. 异常处理

    • 捕获和处理IO相关异常
    • 提供有意义的错误信息
  5. 性能优化

    • 对大文件使用分块读取
    • 使用缓冲区提高效率
    • 考虑使用内存映射文件

# 7.2 文件操作安全性

  1. 路径安全

    • 验证文件路径的合法性
    • 防止路径遍历攻击
  2. 权限检查

    • 检查文件读写权限
    • 处理权限不足的情况
  3. 数据完整性

    • 使用校验和验证文件完整性
    • 实现原子操作避免数据损坏

# 7.3 下一步学习方向

  1. 网络IO编程

    • 学习socket编程
    • 掌握HTTP客户端和服务器
  2. 异步IO

    • 学习asyncio模块
    • 掌握异步文件操作
  3. 数据库IO

    • 学习数据库连接和操作
    • 掌握ORM框架
  4. 高级文件格式

    • 学习处理Excel、PDF等格式
    • 掌握图像和音视频文件处理

# 7.4 练习建议

  1. 创建文件处理工具,实践各种文件操作
  2. 编写日志系统,掌握文件写入和轮转
  3. 实现配置管理,练习JSON/YAML文件处理
  4. 开发备份工具,综合运用文件和目录操作
  5. 构建文件监控系统,学习文件系统事件处理

通过本章的学习,你应该能够:

  • 熟练进行各种文件IO操作
  • 掌握目录和路径操作技巧
  • 理解标准输入输出的使用
  • 运用内存IO和临时文件
  • 构建完整的文件管理系统
  • 遵循IO编程的最佳实践

IO编程是Python应用开发的基础技能,需要在实践中不断提高。记住,良好的IO处理不仅能提高程序性能,还能增强程序的稳定性和用户体验。