第12天-IO编程
哪吒 2023/6/15
# 第12天-IO编程
# 一、IO编程概述
# 1.1 什么是IO编程
IO(Input/Output)编程是指程序与外部世界进行数据交换的编程技术。在Python中,IO编程主要包括:
- 文件IO:读写文件
- 网络IO:网络通信
- 标准IO:控制台输入输出
- 内存IO:内存中的数据流操作
# 1.2 IO编程的重要性
# IO编程的应用场景
print("=== IO编程的应用场景 ===")
# 1. 数据持久化
print("1. 数据持久化:将程序数据保存到文件")
print(" - 配置文件读写")
print(" - 日志记录")
print(" - 数据备份")
# 2. 数据交换
print("\n2. 数据交换:与其他程序或系统交换数据")
print(" - 网络通信")
print(" - API调用")
print(" - 数据库操作")
# 3. 用户交互
print("\n3. 用户交互:与用户进行输入输出")
print(" - 命令行界面")
print(" - 图形界面")
print(" - Web界面")
# 4. 系统集成
print("\n4. 系统集成:与操作系统和其他程序集成")
print(" - 进程间通信")
print(" - 系统调用")
print(" - 外部程序调用")
# 二、文件IO操作
# 2.1 文件操作基础
import os
import shutil
from pathlib import Path
def file_operations_demo():
"""文件操作基础演示"""
print("=== 文件操作基础 ===")
# 1. 创建文件
print("\n1. 创建和写入文件")
# 使用open函数创建文件
with open('demo.txt', 'w', encoding='utf-8') as f:
f.write('Hello, Python IO!\n')
f.write('这是第二行\n')
f.write('这是第三行\n')
print("文件 demo.txt 已创建")
# 2. 读取文件
print("\n2. 读取文件内容")
# 读取全部内容
with open('demo.txt', 'r', encoding='utf-8') as f:
content = f.read()
print("全部内容:")
print(content)
# 按行读取
with open('demo.txt', 'r', encoding='utf-8') as f:
print("按行读取:")
for line_num, line in enumerate(f, 1):
print(f"第{line_num}行: {line.strip()}")
# 3. 追加内容
print("\n3. 追加内容")
with open('demo.txt', 'a', encoding='utf-8') as f:
f.write('这是追加的内容\n')
# 验证追加结果
with open('demo.txt', 'r', encoding='utf-8') as f:
print("追加后的内容:")
print(f.read())
# 4. 文件信息
print("\n4. 文件信息")
file_path = Path('demo.txt')
if file_path.exists():
stat = file_path.stat()
print(f"文件大小: {stat.st_size} 字节")
print(f"创建时间: {stat.st_ctime}")
print(f"修改时间: {stat.st_mtime}")
print(f"是否为文件: {file_path.is_file()}")
print(f"是否为目录: {file_path.is_dir()}")
# 5. 清理
if file_path.exists():
file_path.unlink()
print("\n文件已删除")
# 运行演示
file_operations_demo()
# 2.2 文件读写模式详解
def file_modes_demo():
"""文件读写模式演示"""
print("=== 文件读写模式详解 ===")
# 准备测试数据
test_data = "Hello, World!\n这是测试数据\n第三行内容"
# 1. 文本模式
print("\n1. 文本模式操作")
# 写入模式 'w' - 覆盖写入
with open('test_w.txt', 'w', encoding='utf-8') as f:
f.write(test_data)
print("'w' 模式:覆盖写入完成")
# 读取模式 'r' - 只读
with open('test_w.txt', 'r', encoding='utf-8') as f:
content = f.read()
print(f"'r' 模式读取:\n{content}")
# 追加模式 'a' - 追加写入
with open('test_w.txt', 'a', encoding='utf-8') as f:
f.write("\n追加的内容")
print("'a' 模式:追加写入完成")
# 读写模式 'r+' - 读写
with open('test_w.txt', 'r+', encoding='utf-8') as f:
content = f.read()
print(f"'r+' 模式读取:\n{content}")
f.write("\n通过r+模式追加")
# 2. 二进制模式
print("\n2. 二进制模式操作")
# 二进制写入
binary_data = b"\x48\x65\x6c\x6c\x6f" # "Hello" 的字节表示
with open('test_binary.bin', 'wb') as f:
f.write(binary_data)
print("二进制数据写入完成")
# 二进制读取
with open('test_binary.bin', 'rb') as f:
data = f.read()
print(f"二进制数据读取: {data}")
print(f"转换为字符串: {data.decode('utf-8')}")
# 3. 文件模式组合
print("\n3. 常用文件模式总结")
modes = {
'r': '只读模式(默认)',
'w': '写入模式(覆盖)',
'a': '追加模式',
'r+': '读写模式',
'w+': '写读模式(覆盖)',
'a+': '追加读写模式',
'rb': '二进制只读',
'wb': '二进制写入',
'ab': '二进制追加',
'rb+': '二进制读写',
'wb+': '二进制写读',
'ab+': '二进制追加读写'
}
for mode, description in modes.items():
print(f" {mode:4s}: {description}")
# 清理文件
for filename in ['test_w.txt', 'test_binary.bin']:
if os.path.exists(filename):
os.remove(filename)
print("\n测试文件已清理")
# 运行演示
file_modes_demo()
# 2.3 高级文件操作
import json
import csv
import pickle
from datetime import datetime
def advanced_file_operations():
"""高级文件操作演示"""
print("=== 高级文件操作 ===")
# 1. JSON文件操作
print("\n1. JSON文件操作")
# 准备JSON数据
data = {
'name': '张三',
'age': 25,
'skills': ['Python', 'JavaScript', 'SQL'],
'address': {
'city': '北京',
'district': '朝阳区'
},
'timestamp': datetime.now().isoformat()
}
# 写入JSON文件
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print("JSON数据已写入 data.json")
# 读取JSON文件
with open('data.json', 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
print(f"读取的JSON数据: {loaded_data}")
# 2. CSV文件操作
print("\n2. CSV文件操作")
# 写入CSV文件
csv_data = [
['姓名', '年龄', '城市', '薪资'],
['张三', 25, '北京', 8000],
['李四', 30, '上海', 12000],
['王五', 28, '深圳', 10000],
['赵六', 32, '广州', 9500]
]
with open('employees.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(csv_data)
print("CSV数据已写入 employees.csv")
# 读取CSV文件
with open('employees.csv', 'r', encoding='utf-8') as f:
reader = csv.reader(f)
print("CSV数据读取:")
for row_num, row in enumerate(reader):
print(f" 第{row_num + 1}行: {row}")
# 使用DictReader读取CSV
with open('employees.csv', 'r', encoding='utf-8') as f:
dict_reader = csv.DictReader(f)
print("\n使用DictReader读取:")
for row in dict_reader:
print(f" {row}")
# 3. Pickle序列化
print("\n3. Pickle序列化操作")
# 复杂数据结构
complex_data = {
'list': [1, 2, 3, [4, 5]],
'dict': {'a': 1, 'b': 2},
'tuple': (1, 2, 3),
'set': {1, 2, 3},
'datetime': datetime.now(),
'function': lambda x: x * 2
}
# 序列化到文件
with open('data.pickle', 'wb') as f:
pickle.dump(complex_data, f)
print("复杂数据已序列化到 data.pickle")
# 从文件反序列化
with open('data.pickle', 'rb') as f:
loaded_complex_data = pickle.load(f)
print(f"反序列化的数据: {loaded_complex_data}")
# 测试函数是否正常
func = loaded_complex_data['function']
print(f"函数测试: func(5) = {func(5)}")
# 4. 大文件处理
print("\n4. 大文件处理技巧")
# 创建一个较大的测试文件
with open('large_file.txt', 'w', encoding='utf-8') as f:
for i in range(10000):
f.write(f"这是第{i+1}行数据,包含一些测试内容\n")
print("大文件 large_file.txt 已创建")
# 逐行读取大文件(内存友好)
line_count = 0
with open('large_file.txt', 'r', encoding='utf-8') as f:
for line in f:
line_count += 1
if line_count <= 5: # 只显示前5行
print(f" {line.strip()}")
print(f"大文件总行数: {line_count}")
# 分块读取大文件
chunk_size = 1024 # 1KB
with open('large_file.txt', 'r', encoding='utf-8') as f:
chunk_count = 0
while True:
chunk = f.read(chunk_size)
if not chunk:
break
chunk_count += 1
if chunk_count == 1: # 只显示第一个块的部分内容
print(f"第一个块的前100个字符: {chunk[:100]}...")
print(f"文件被分为 {chunk_count} 个块读取")
# 清理文件
files_to_clean = ['data.json', 'employees.csv', 'data.pickle', 'large_file.txt']
for filename in files_to_clean:
if os.path.exists(filename):
os.remove(filename)
print("\n测试文件已清理")
# 运行演示
advanced_file_operations()
# 三、目录操作
# 3.1 目录基础操作
import os
import shutil
from pathlib import Path
import tempfile
def directory_operations_demo():
"""目录操作演示"""
print("=== 目录操作演示 ===")
# 1. 获取当前目录信息
print("\n1. 当前目录信息")
current_dir = os.getcwd()
print(f"当前工作目录: {current_dir}")
print(f"目录内容: {os.listdir('.')}")
# 使用pathlib
current_path = Path.cwd()
print(f"使用pathlib获取当前目录: {current_path}")
# 2. 创建目录
print("\n2. 创建目录")
# 创建单个目录
test_dir = Path('test_directory')
test_dir.mkdir(exist_ok=True)
print(f"目录 {test_dir} 已创建")
# 创建多级目录
nested_dir = Path('parent/child/grandchild')
nested_dir.mkdir(parents=True, exist_ok=True)
print(f"多级目录 {nested_dir} 已创建")
# 3. 遍历目录
print("\n3. 遍历目录")
# 创建一些测试文件
(test_dir / 'file1.txt').write_text('内容1', encoding='utf-8')
(test_dir / 'file2.txt').write_text('内容2', encoding='utf-8')
(test_dir / 'subdir').mkdir(exist_ok=True)
(test_dir / 'subdir' / 'file3.txt').write_text('内容3', encoding='utf-8')
# 使用os.walk遍历
print("使用os.walk遍历:")
for root, dirs, files in os.walk(test_dir):
level = root.replace(str(test_dir), '').count(os.sep)
indent = ' ' * 2 * level
print(f"{indent}{os.path.basename(root)}/")
subindent = ' ' * 2 * (level + 1)
for file in files:
print(f"{subindent}{file}")
# 使用pathlib遍历
print("\n使用pathlib遍历:")
for item in test_dir.rglob('*'):
if item.is_file():
print(f"文件: {item}")
elif item.is_dir():
print(f"目录: {item}")
# 4. 目录信息
print("\n4. 目录信息")
for item in test_dir.iterdir():
stat = item.stat()
item_type = "目录" if item.is_dir() else "文件"
print(f"{item_type}: {item.name}, 大小: {stat.st_size} 字节")
# 5. 复制和移动目录
print("\n5. 复制和移动目录")
# 复制目录
backup_dir = Path('test_directory_backup')
if backup_dir.exists():
shutil.rmtree(backup_dir)
shutil.copytree(test_dir, backup_dir)
print(f"目录已复制到 {backup_dir}")
# 移动目录
moved_dir = Path('moved_directory')
if moved_dir.exists():
shutil.rmtree(moved_dir)
shutil.move(str(backup_dir), str(moved_dir))
print(f"目录已移动到 {moved_dir}")
# 6. 删除目录
print("\n6. 删除目录")
# 删除空目录
empty_dir = Path('empty_dir')
empty_dir.mkdir(exist_ok=True)
empty_dir.rmdir()
print("空目录已删除")
# 删除非空目录
shutil.rmtree(test_dir)
shutil.rmtree(moved_dir)
shutil.rmtree(nested_dir.parent.parent) # 删除parent目录
print("非空目录已删除")
# 运行演示
directory_operations_demo()
# 3.2 路径操作
from pathlib import Path
import os
def path_operations_demo():
"""路径操作演示"""
print("=== 路径操作演示 ===")
# 1. 路径构建
print("\n1. 路径构建")
# 使用pathlib构建路径
path1 = Path('documents') / 'projects' / 'python' / 'main.py'
print(f"pathlib构建路径: {path1}")
# 使用os.path构建路径
path2 = os.path.join('documents', 'projects', 'python', 'main.py')
print(f"os.path构建路径: {path2}")
# 2. 路径解析
print("\n2. 路径解析")
sample_path = Path('/home/user/documents/project/main.py')
print(f"完整路径: {sample_path}")
print(f"父目录: {sample_path.parent}")
print(f"文件名: {sample_path.name}")
print(f"文件名(无扩展名): {sample_path.stem}")
print(f"扩展名: {sample_path.suffix}")
print(f"所有扩展名: {sample_path.suffixes}")
print(f"路径部分: {sample_path.parts}")
# 3. 路径判断
print("\n3. 路径判断")
current_file = Path(__file__) if '__file__' in globals() else Path('demo.py')
print(f"测试路径: {current_file}")
print(f"是否存在: {current_file.exists()}")
print(f"是否为文件: {current_file.is_file()}")
print(f"是否为目录: {current_file.is_dir()}")
print(f"是否为绝对路径: {current_file.is_absolute()}")
# 4. 路径转换
print("\n4. 路径转换")
relative_path = Path('documents/project/main.py')
print(f"相对路径: {relative_path}")
# 转换为绝对路径
absolute_path = relative_path.resolve()
print(f"绝对路径: {absolute_path}")
# 获取相对路径
try:
current_dir = Path.cwd()
relative_to_current = absolute_path.relative_to(current_dir)
print(f"相对于当前目录: {relative_to_current}")
except ValueError:
print("路径不在当前目录下")
# 5. 路径匹配
print("\n5. 路径匹配")
test_paths = [
Path('documents/project1/main.py'),
Path('documents/project2/test.py'),
Path('documents/project1/utils.py'),
Path('images/photo.jpg'),
Path('documents/readme.txt')
]
print("Python文件:")
for path in test_paths:
if path.match('*.py'):
print(f" {path}")
print("\nproject1目录下的文件:")
for path in test_paths:
if path.match('*/project1/*'):
print(f" {path}")
print("\ndocuments目录下的所有文件:")
for path in test_paths:
if path.match('documents/**/*'):
print(f" {path}")
# 6. 路径操作实用函数
print("\n6. 路径操作实用函数")
def safe_path_join(*parts):
"""安全的路径连接"""
return Path(*parts)
def get_file_info(file_path):
"""获取文件信息"""
path = Path(file_path)
if not path.exists():
return None
stat = path.stat()
return {
'name': path.name,
'size': stat.st_size,
'modified': stat.st_mtime,
'is_file': path.is_file(),
'is_dir': path.is_dir(),
'parent': str(path.parent),
'extension': path.suffix
}
def find_files_by_extension(directory, extension):
"""按扩展名查找文件"""
path = Path(directory)
if not path.exists() or not path.is_dir():
return []
pattern = f"**/*{extension}"
return list(path.glob(pattern))
# 测试实用函数
test_path = safe_path_join('documents', 'project', 'main.py')
print(f"安全路径连接: {test_path}")
# 创建测试文件来演示
demo_file = Path('demo_file.txt')
demo_file.write_text('测试内容', encoding='utf-8')
file_info = get_file_info(demo_file)
if file_info:
print(f"文件信息: {file_info}")
# 清理
demo_file.unlink()
print("\n演示文件已清理")
# 运行演示
path_operations_demo()
# 四、标准输入输出
# 4.1 控制台输入输出
import sys
import getpass
from datetime import datetime
def console_io_demo():
"""控制台输入输出演示"""
print("=== 控制台输入输出演示 ===")
# 1. 基本输出
print("\n1. 基本输出方式")
# 普通输出
print("这是普通输出")
# 格式化输出
name = "张三"
age = 25
print(f"姓名: {name}, 年龄: {age}")
print("姓名: {}, 年龄: {}".format(name, age))
print("姓名: %s, 年龄: %d" % (name, age))
# 输出到不同流
print("正常信息", file=sys.stdout)
print("错误信息", file=sys.stderr)
# 控制输出结束符
print("第一部分", end=" ")
print("第二部分", end=" ")
print("第三部分") # 默认换行
# 2. 高级输出格式
print("\n2. 高级输出格式")
# 表格输出
data = [
['姓名', '年龄', '城市'],
['张三', 25, '北京'],
['李四', 30, '上海'],
['王五', 28, '深圳']
]
print("表格输出:")
for row in data:
print(f"{row[0]:8s} {row[1]:>3} {row[2]:6s}")
# 进度条输出
print("\n进度条演示:")
import time
for i in range(21):
percent = i * 5
bar = '█' * i + '░' * (20 - i)
print(f"\r进度: [{bar}] {percent}%", end='', flush=True)
time.sleep(0.1)
print() # 换行
# 3. 彩色输出
print("\n3. 彩色输出")
# ANSI颜色代码
colors = {
'red': '\033[31m',
'green': '\033[32m',
'yellow': '\033[33m',
'blue': '\033[34m',
'magenta': '\033[35m',
'cyan': '\033[36m',
'white': '\033[37m',
'reset': '\033[0m'
}
for color_name, color_code in colors.items():
if color_name != 'reset':
print(f"{color_code}这是{color_name}颜色的文本{colors['reset']}")
# 4. 输入演示(注释掉以避免阻塞)
print("\n4. 输入方式(演示代码)")
# 基本输入
print("# 基本输入")
print("# user_input = input('请输入您的姓名: ')")
print("# print(f'您好, {user_input}!')")
# 数字输入
print("\n# 数字输入")
print("# try:")
print("# age = int(input('请输入您的年龄: '))")
print("# print(f'您的年龄是: {age}')")
print("# except ValueError:")
print("# print('请输入有效的数字')")
# 密码输入
print("\n# 密码输入")
print("# password = getpass.getpass('请输入密码: ')")
print("# print('密码已输入(不显示)')")
# 5. 命令行参数
print("\n5. 命令行参数")
print(f"脚本名称: {sys.argv[0] if sys.argv else 'unknown'}")
print(f"参数列表: {sys.argv[1:] if len(sys.argv) > 1 else '无参数'}")
print(f"参数数量: {len(sys.argv) - 1}")
def interactive_menu_demo():
"""交互式菜单演示"""
print("\n=== 交互式菜单演示 ===")
def show_menu():
"""显示菜单"""
print("\n" + "="*30)
print(" 主菜单")
print("="*30)
print("1. 查看当前时间")
print("2. 计算器")
print("3. 文件操作")
print("4. 系统信息")
print("0. 退出")
print("="*30)
def get_current_time():
"""获取当前时间"""
now = datetime.now()
print(f"当前时间: {now.strftime('%Y-%m-%d %H:%M:%S')}")
def calculator():
"""简单计算器"""
print("简单计算器(输入 'q' 退出)")
while True:
try:
expr = input("请输入表达式: ").strip()
if expr.lower() == 'q':
break
result = eval(expr) # 注意:实际应用中应该使用更安全的方法
print(f"结果: {result}")
except Exception as e:
print(f"错误: {e}")
def file_operations():
"""文件操作"""
print("当前目录文件列表:")
import os
files = os.listdir('.')
for i, file in enumerate(files[:10], 1): # 只显示前10个
print(f" {i}. {file}")
if len(files) > 10:
print(f" ... 还有 {len(files) - 10} 个文件")
def system_info():
"""系统信息"""
import platform
print(f"操作系统: {platform.system()}")
print(f"系统版本: {platform.version()}")
print(f"处理器: {platform.processor()}")
print(f"Python版本: {platform.python_version()}")
# 菜单功能映射
menu_functions = {
'1': get_current_time,
'2': calculator,
'3': file_operations,
'4': system_info
}
print("交互式菜单演示(模拟)")
print("在实际应用中,这里会有真正的用户交互")
# 模拟用户选择
demo_choices = ['1', '3', '4']
for choice in demo_choices:
print(f"\n模拟用户选择: {choice}")
if choice in menu_functions:
menu_functions[choice]()
elif choice == '0':
print("退出程序")
break
else:
print("无效选择")
# 运行演示
console_io_demo()
interactive_menu_demo()
# 4.2 格式化输出进阶
from datetime import datetime
import locale
def advanced_formatting_demo():
"""高级格式化输出演示"""
print("=== 高级格式化输出演示 ===")
# 1. 数字格式化
print("\n1. 数字格式化")
number = 1234567.89
print(f"原始数字: {number}")
print(f"千分位分隔: {number:,}")
print(f"保留2位小数: {number:.2f}")
print(f"科学计数法: {number:.2e}")
print(f"百分比: {0.1234:.2%}")
print(f"填充零: {42:08d}")
print(f"右对齐: {42:>10d}")
print(f"左对齐: {42:<10d}")
print(f"居中对齐: {42:^10d}")
# 2. 字符串格式化
print("\n2. 字符串格式化")
text = "Python"
print(f"原始字符串: '{text}'")
print(f"右对齐(15): '{text:>15s}'")
print(f"左对齐(15): '{text:<15s}'")
print(f"居中对齐(15): '{text:^15s}'")
print(f"填充字符: '{text:*^15s}'")
print(f"截断: '{text:.3s}'")
# 3. 日期时间格式化
print("\n3. 日期时间格式化")
now = datetime.now()
print(f"当前时间: {now}")
print(f"日期: {now:%Y-%m-%d}")
print(f"时间: {now:%H:%M:%S}")
print(f"完整格式: {now:%Y年%m月%d日 %H时%M分%S秒}")
print(f"星期: {now:%A}")
print(f"月份: {now:%B}")
# 4. 自定义格式化类
print("\n4. 自定义格式化类")
class Person:
def __init__(self, name, age, salary):
self.name = name
self.age = age
self.salary = salary
def __format__(self, format_spec):
if format_spec == 'short':
return f"{self.name}({self.age})"
elif format_spec == 'long':
return f"{self.name}, {self.age}岁, 薪资{self.salary:,}元"
elif format_spec == 'salary':
return f"{self.salary:,.2f}"
else:
return str(self)
def __str__(self):
return f"Person(name='{self.name}', age={self.age}, salary={self.salary})"
person = Person("张三", 30, 8500.50)
print(f"默认格式: {person}")
print(f"短格式: {person:short}")
print(f"长格式: {person:long}")
print(f"薪资格式: {person:salary}")
# 5. 表格格式化
print("\n5. 表格格式化")
def print_table(data, headers, widths=None):
"""打印格式化表格"""
if widths is None:
widths = [max(len(str(row[i])) for row in [headers] + data) + 2
for i in range(len(headers))]
# 打印分隔线
separator = '+' + '+'.join('-' * width for width in widths) + '+'
print(separator)
# 打印表头
header_row = '|' + '|'.join(f"{headers[i]:^{widths[i]}}" for i in range(len(headers))) + '|'
print(header_row)
print(separator)
# 打印数据行
for row in data:
data_row = '|' + '|'.join(f"{str(row[i]):^{widths[i]}}" for i in range(len(row))) + '|'
print(data_row)
print(separator)
# 示例数据
headers = ['姓名', '年龄', '城市', '薪资']
table_data = [
['张三', 25, '北京', '8,500'],
['李四', 30, '上海', '12,000'],
['王五', 28, '深圳', '10,500'],
['赵六', 32, '广州', '9,800']
]
print_table(table_data, headers)
# 6. 进度条和状态显示
print("\n6. 进度条和状态显示")
def progress_bar(current, total, width=50, prefix='进度', suffix='完成'):
"""显示进度条"""
percent = current / total
filled_width = int(width * percent)
bar = '█' * filled_width + '░' * (width - filled_width)
return f"{prefix}: [{bar}] {percent:.1%} {suffix}"
# 模拟进度
import time
total_steps = 20
for i in range(total_steps + 1):
progress = progress_bar(i, total_steps)
print(f"\r{progress}", end='', flush=True)
time.sleep(0.05)
print() # 换行
# 7. 多行格式化
print("\n7. 多行格式化")
template = """
╔══════════════════════════════════════╗
║ 用户信息 ║
╠══════════════════════════════════════╣
║ 姓名: {name:<30} ║
║ 年龄: {age:<30} ║
║ 邮箱: {email:<30} ║
║ 注册时间: {reg_time:<26} ║
╚══════════════════════════════════════╝
"""
user_info = {
'name': '张三',
'age': 25,
'email': 'zhangsan@example.com',
'reg_time': '2023-01-15 10:30:00'
}
print(template.format(**user_info))
# 运行演示
advanced_formatting_demo()
# 五、内存IO操作
# 5.1 StringIO和BytesIO
from io import StringIO, BytesIO
import json
def memory_io_demo():
"""内存IO操作演示"""
print("=== 内存IO操作演示 ===")
# 1. StringIO - 字符串流
print("\n1. StringIO操作")
# 创建StringIO对象
string_buffer = StringIO()
# 写入数据
string_buffer.write("Hello, ")
string_buffer.write("StringIO!\n")
string_buffer.write("这是第二行\n")
# 获取当前位置
print(f"当前位置: {string_buffer.tell()}")
# 获取全部内容
content = string_buffer.getvalue()
print(f"StringIO内容:\n{content}")
# 重置位置并读取
string_buffer.seek(0)
line1 = string_buffer.readline()
line2 = string_buffer.readline()
print(f"第一行: {line1.strip()}")
print(f"第二行: {line2.strip()}")
# 关闭StringIO
string_buffer.close()
# 2. BytesIO - 字节流
print("\n2. BytesIO操作")
# 创建BytesIO对象
bytes_buffer = BytesIO()
# 写入字节数据
bytes_buffer.write(b"Hello, ")
bytes_buffer.write(b"BytesIO!\n")
bytes_buffer.write("这是中文".encode('utf-8'))
# 获取字节内容
byte_content = bytes_buffer.getvalue()
print(f"BytesIO内容: {byte_content}")
print(f"解码后: {byte_content.decode('utf-8')}")
# 重置位置并读取
bytes_buffer.seek(0)
chunk1 = bytes_buffer.read(7)
chunk2 = bytes_buffer.read(8)
print(f"第一块: {chunk1}")
print(f"第二块: {chunk2}")
bytes_buffer.close()
# 3. 实际应用示例
print("\n3. 实际应用示例")
def create_csv_in_memory(data):
"""在内存中创建CSV"""
import csv
output = StringIO()
writer = csv.writer(output)
# 写入表头
if data:
writer.writerow(data[0].keys())
# 写入数据
for row in data:
writer.writerow(row.values())
csv_content = output.getvalue()
output.close()
return csv_content
# 测试数据
sample_data = [
{'name': '张三', 'age': 25, 'city': '北京'},
{'name': '李四', 'age': 30, 'city': '上海'},
{'name': '王五', 'age': 28, 'city': '深圳'}
]
csv_result = create_csv_in_memory(sample_data)
print("内存中生成的CSV:")
print(csv_result)
def process_json_in_memory(data):
"""在内存中处理JSON"""
# 序列化到内存
json_buffer = StringIO()
json.dump(data, json_buffer, ensure_ascii=False, indent=2)
# 获取JSON字符串
json_str = json_buffer.getvalue()
json_buffer.close()
# 从字符串反序列化
json_input = StringIO(json_str)
loaded_data = json.load(json_input)
json_input.close()
return json_str, loaded_data
json_str, loaded_data = process_json_in_memory(sample_data)
print("\n内存中处理的JSON:")
print(json_str)
print(f"\n反序列化结果: {loaded_data}")
# 运行演示
memory_io_demo()
# 5.2 临时文件操作
import tempfile
import os
from pathlib import Path
def temp_file_demo():
"""临时文件操作演示"""
print("=== 临时文件操作演示 ===")
# 1. 临时文件
print("\n1. 临时文件操作")
# 创建临时文件
with tempfile.NamedTemporaryFile(mode='w+', encoding='utf-8', delete=False) as temp_file:
temp_file_path = temp_file.name
print(f"临时文件路径: {temp_file_path}")
# 写入数据
temp_file.write("这是临时文件的内容\n")
temp_file.write("第二行内容\n")
# 重置位置并读取
temp_file.seek(0)
content = temp_file.read()
print(f"临时文件内容:\n{content}")
# 手动删除临时文件
os.unlink(temp_file_path)
print("临时文件已删除")
# 2. 自动删除的临时文件
print("\n2. 自动删除的临时文件")
with tempfile.NamedTemporaryFile(mode='w+', encoding='utf-8') as auto_temp:
print(f"自动删除临时文件: {auto_temp.name}")
auto_temp.write("这个文件会自动删除")
auto_temp.seek(0)
print(f"内容: {auto_temp.read()}")
# 文件在这里自动删除
print("临时文件已自动删除")
# 3. 临时目录
print("\n3. 临时目录操作")
with tempfile.TemporaryDirectory() as temp_dir:
print(f"临时目录: {temp_dir}")
# 在临时目录中创建文件
temp_file_path = Path(temp_dir) / 'test.txt'
temp_file_path.write_text('临时目录中的文件', encoding='utf-8')
# 创建子目录
sub_dir = Path(temp_dir) / 'subdir'
sub_dir.mkdir()
(sub_dir / 'subfile.txt').write_text('子目录文件', encoding='utf-8')
# 列出临时目录内容
print("临时目录内容:")
for item in Path(temp_dir).rglob('*'):
print(f" {item}")
# 临时目录及其内容在这里自动删除
print("临时目录已自动删除")
# 4. 自定义临时文件
print("\n4. 自定义临时文件")
# 指定前缀和后缀
with tempfile.NamedTemporaryFile(
mode='w+',
prefix='myapp_',
suffix='.log',
encoding='utf-8'
) as custom_temp:
print(f"自定义临时文件: {custom_temp.name}")
custom_temp.write("自定义前缀和后缀的临时文件")
# 5. 获取临时目录信息
print("\n5. 临时目录信息")
temp_dir = tempfile.gettempdir()
print(f"系统临时目录: {temp_dir}")
# 创建唯一的临时文件名
temp_name = tempfile.mktemp(suffix='.txt', prefix='unique_')
print(f"唯一临时文件名: {temp_name}")
# 注意:mktemp只生成名称,不创建文件,需要手动创建和删除
# 6. 临时文件的实际应用
print("\n6. 临时文件实际应用")
def process_large_data_with_temp(data_generator):
"""使用临时文件处理大量数据"""
with tempfile.NamedTemporaryFile(mode='w+', encoding='utf-8') as temp_file:
# 写入大量数据到临时文件
for i, data in enumerate(data_generator):
temp_file.write(f"{i}: {data}\n")
# 重置位置并处理数据
temp_file.seek(0)
line_count = 0
total_length = 0
for line in temp_file:
line_count += 1
total_length += len(line)
return {
'lines': line_count,
'total_chars': total_length,
'temp_file': temp_file.name
}
# 模拟大量数据
def data_generator():
for i in range(1000):
yield f"数据项 {i} - 这是一些测试数据"
result = process_large_data_with_temp(data_generator())
print(f"处理结果: {result}")
def create_temp_config(config_data):
"""创建临时配置文件"""
import json
with tempfile.NamedTemporaryFile(
mode='w',
suffix='.json',
delete=False,
encoding='utf-8'
) as config_file:
json.dump(config_data, config_file, ensure_ascii=False, indent=2)
return config_file.name
# 创建临时配置
config = {
'database': {
'host': 'localhost',
'port': 5432,
'name': 'testdb'
},
'logging': {
'level': 'INFO',
'file': '/tmp/app.log'
}
}
config_file_path = create_temp_config(config)
print(f"\n临时配置文件: {config_file_path}")
# 读取配置文件
with open(config_file_path, 'r', encoding='utf-8') as f:
loaded_config = json.load(f)
print(f"加载的配置: {loaded_config}")
# 清理临时配置文件
os.unlink(config_file_path)
print("临时配置文件已删除")
# 运行演示
temp_file_demo()
# 六、实战练习:文件管理系统
# 6.1 构建一个完整的文件管理系统
import os
import shutil
import json
import hashlib
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
import tempfile
@dataclass
class FileInfo:
"""文件信息数据类"""
path: str
name: str
size: int
modified: float
created: float
is_directory: bool
extension: str
hash_md5: Optional[str] = None
class FileManager:
"""文件管理系统"""
def __init__(self, base_directory: str = None):
self.base_directory = Path(base_directory) if base_directory else Path.cwd()
self.file_index: Dict[str, FileInfo] = {}
self.operation_log: List[Dict] = []
def scan_directory(self, directory: str = None) -> List[FileInfo]:
"""扫描目录并建立文件索引"""
scan_dir = Path(directory) if directory else self.base_directory
if not scan_dir.exists() or not scan_dir.is_dir():
raise ValueError(f"目录不存在或不是有效目录: {scan_dir}")
file_list = []
for item in scan_dir.rglob('*'):
try:
stat = item.stat()
file_info = FileInfo(
path=str(item),
name=item.name,
size=stat.st_size,
modified=stat.st_mtime,
created=stat.st_ctime,
is_directory=item.is_dir(),
extension=item.suffix.lower() if item.suffix else ''
)
# 为文件计算MD5哈希
if not file_info.is_directory and file_info.size < 10 * 1024 * 1024: # 小于10MB
try:
file_info.hash_md5 = self._calculate_md5(item)
except Exception:
pass # 忽略无法读取的文件
file_list.append(file_info)
self.file_index[str(item)] = file_info
except (OSError, PermissionError):
continue # 跳过无法访问的文件
self._log_operation('scan_directory', {'directory': str(scan_dir), 'files_found': len(file_list)})
return file_list
def _calculate_md5(self, file_path: Path) -> str:
"""计算文件MD5哈希值"""
hash_md5 = hashlib.md5()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def find_files(self, **criteria) -> List[FileInfo]:
"""根据条件查找文件"""
results = []
for file_info in self.file_index.values():
match = True
# 按名称模式匹配
if 'name_pattern' in criteria:
import fnmatch
if not fnmatch.fnmatch(file_info.name.lower(), criteria['name_pattern'].lower()):
match = False
# 按扩展名匹配
if 'extension' in criteria:
if file_info.extension != criteria['extension'].lower():
match = False
# 按大小范围匹配
if 'min_size' in criteria and file_info.size < criteria['min_size']:
match = False
if 'max_size' in criteria and file_info.size > criteria['max_size']:
match = False
# 按修改时间匹配
if 'modified_after' in criteria and file_info.modified < criteria['modified_after']:
match = False
if 'modified_before' in criteria and file_info.modified > criteria['modified_before']:
match = False
# 按文件类型匹配
if 'is_directory' in criteria and file_info.is_directory != criteria['is_directory']:
match = False
if match:
results.append(file_info)
self._log_operation('find_files', {'criteria': criteria, 'results_count': len(results)})
return results
def find_duplicates(self) -> Dict[str, List[FileInfo]]:
"""查找重复文件"""
hash_groups = {}
for file_info in self.file_index.values():
if not file_info.is_directory and file_info.hash_md5:
if file_info.hash_md5 not in hash_groups:
hash_groups[file_info.hash_md5] = []
hash_groups[file_info.hash_md5].append(file_info)
# 只返回有重复的组
duplicates = {hash_val: files for hash_val, files in hash_groups.items() if len(files) > 1}
self._log_operation('find_duplicates', {'duplicate_groups': len(duplicates)})
return duplicates
def organize_files(self, source_dir: str, target_dir: str, organize_by: str = 'extension'):
"""按指定规则整理文件"""
source_path = Path(source_dir)
target_path = Path(target_dir)
if not source_path.exists():
raise ValueError(f"源目录不存在: {source_path}")
target_path.mkdir(parents=True, exist_ok=True)
moved_files = 0
for file_path in source_path.rglob('*'):
if file_path.is_file():
# 确定目标子目录
if organize_by == 'extension':
ext = file_path.suffix.lower() or 'no_extension'
sub_dir = target_path / ext.lstrip('.')
elif organize_by == 'date':
modified_date = datetime.fromtimestamp(file_path.stat().st_mtime)
sub_dir = target_path / modified_date.strftime('%Y') / modified_date.strftime('%m')
elif organize_by == 'size':
size = file_path.stat().st_size
if size < 1024 * 1024: # < 1MB
sub_dir = target_path / 'small'
elif size < 10 * 1024 * 1024: # < 10MB
sub_dir = target_path / 'medium'
else:
sub_dir = target_path / 'large'
else:
sub_dir = target_path / 'others'
# 创建目标目录
sub_dir.mkdir(parents=True, exist_ok=True)
# 移动文件
target_file = sub_dir / file_path.name
# 处理文件名冲突
counter = 1
while target_file.exists():
stem = file_path.stem
suffix = file_path.suffix
target_file = sub_dir / f"{stem}_{counter}{suffix}"
counter += 1
shutil.move(str(file_path), str(target_file))
moved_files += 1
self._log_operation('organize_files', {
'source': str(source_path),
'target': str(target_path),
'organize_by': organize_by,
'moved_files': moved_files
})
return moved_files
def backup_files(self, source_paths: List[str], backup_dir: str,
compression: bool = True) -> str:
"""备份文件"""
backup_path = Path(backup_dir)
backup_path.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if compression:
import zipfile
backup_file = backup_path / f"backup_{timestamp}.zip"
with zipfile.ZipFile(backup_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
for source_path in source_paths:
source = Path(source_path)
if source.exists():
if source.is_file():
zipf.write(source, source.name)
else:
for file_path in source.rglob('*'):
if file_path.is_file():
arcname = file_path.relative_to(source.parent)
zipf.write(file_path, arcname)
else:
backup_file = backup_path / f"backup_{timestamp}"
backup_file.mkdir(exist_ok=True)
for source_path in source_paths:
source = Path(source_path)
if source.exists():
target = backup_file / source.name
if source.is_file():
shutil.copy2(source, target)
else:
shutil.copytree(source, target, dirs_exist_ok=True)
self._log_operation('backup_files', {
'sources': source_paths,
'backup_file': str(backup_file),
'compression': compression
})
return str(backup_file)
def clean_empty_directories(self, directory: str = None) -> int:
"""清理空目录"""
clean_dir = Path(directory) if directory else self.base_directory
removed_count = 0
# 从最深层开始清理
for dir_path in sorted(clean_dir.rglob('*'), key=lambda p: len(p.parts), reverse=True):
if dir_path.is_dir():
try:
if not any(dir_path.iterdir()): # 目录为空
dir_path.rmdir()
removed_count += 1
except OSError:
continue # 跳过无法删除的目录
self._log_operation('clean_empty_directories', {
'directory': str(clean_dir),
'removed_count': removed_count
})
return removed_count
def get_directory_stats(self, directory: str = None) -> Dict:
"""获取目录统计信息"""
stats_dir = Path(directory) if directory else self.base_directory
stats = {
'total_files': 0,
'total_directories': 0,
'total_size': 0,
'file_types': {},
'size_distribution': {'small': 0, 'medium': 0, 'large': 0},
'largest_files': [],
'newest_files': [],
'oldest_files': []
}
all_files = []
for item in stats_dir.rglob('*'):
try:
if item.is_file():
stats['total_files'] += 1
size = item.stat().st_size
stats['total_size'] += size
# 文件类型统计
ext = item.suffix.lower() or 'no_extension'
stats['file_types'][ext] = stats['file_types'].get(ext, 0) + 1
# 大小分布
if size < 1024 * 1024: # < 1MB
stats['size_distribution']['small'] += 1
elif size < 10 * 1024 * 1024: # < 10MB
stats['size_distribution']['medium'] += 1
else:
stats['size_distribution']['large'] += 1
# 收集文件信息用于排序
file_stat = item.stat()
all_files.append({
'path': str(item),
'size': size,
'modified': file_stat.st_mtime
})
elif item.is_dir():
stats['total_directories'] += 1
except (OSError, PermissionError):
continue
# 最大文件(前10个)
stats['largest_files'] = sorted(all_files, key=lambda x: x['size'], reverse=True)[:10]
# 最新文件(前10个)
stats['newest_files'] = sorted(all_files, key=lambda x: x['modified'], reverse=True)[:10]
# 最旧文件(前10个)
stats['oldest_files'] = sorted(all_files, key=lambda x: x['modified'])[:10]
return stats
def _log_operation(self, operation: str, details: Dict):
"""记录操作日志"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'operation': operation,
'details': details
}
self.operation_log.append(log_entry)
def export_index(self, filename: str):
"""导出文件索引"""
export_data = {
'timestamp': datetime.now().isoformat(),
'base_directory': str(self.base_directory),
'file_count': len(self.file_index),
'files': [asdict(file_info) for file_info in self.file_index.values()]
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(export_data, f, ensure_ascii=False, indent=2)
self._log_operation('export_index', {'filename': filename})
def get_operation_log(self) -> List[Dict]:
"""获取操作日志"""
return self.operation_log.copy()
def file_manager_demo():
"""文件管理系统演示"""
print("=== 文件管理系统演示 ===")
# 创建临时测试环境
with tempfile.TemporaryDirectory() as temp_dir:
print(f"\n使用临时目录: {temp_dir}")
# 创建测试文件结构
test_dir = Path(temp_dir)
# 创建不同类型的文件
(test_dir / 'documents').mkdir()
(test_dir / 'images').mkdir()
(test_dir / 'code').mkdir()
(test_dir / 'archive').mkdir()
# 创建测试文件
test_files = [
('documents/report.txt', '这是一个报告文件'),
('documents/notes.md', '# 笔记\n\n这是一些笔记'),
('images/photo1.jpg', b'\xFF\xD8\xFF\xE0'), # JPEG文件头
('images/photo2.png', b'\x89PNG\r\n\x1a\n'), # PNG文件头
('code/main.py', 'print("Hello, World!")'),
('code/utils.py', 'def helper(): pass'),
('archive/data.zip', b'PK\x03\x04'), # ZIP文件头
('readme.txt', '项目说明文件'),
]
for file_path, content in test_files:
full_path = test_dir / file_path
if isinstance(content, str):
full_path.write_text(content, encoding='utf-8')
else:
full_path.write_bytes(content)
# 创建一些重复文件
(test_dir / 'documents' / 'copy_report.txt').write_text('这是一个报告文件', encoding='utf-8')
print("测试文件结构已创建")
# 初始化文件管理器
fm = FileManager(temp_dir)
# 1. 扫描目录
print("\n1. 扫描目录")
files = fm.scan_directory()
print(f"发现 {len(files)} 个文件和目录")
# 2. 查找文件
print("\n2. 查找文件")
# 查找Python文件
python_files = fm.find_files(extension='.py')
print(f"Python文件: {len(python_files)} 个")
for file_info in python_files:
print(f" {file_info.name} ({file_info.size} 字节)")
# 查找文本文件
text_files = fm.find_files(name_pattern='*.txt')
print(f"\n文本文件: {len(text_files)} 个")
for file_info in text_files:
print(f" {file_info.name}")
# 3. 查找重复文件
print("\n3. 查找重复文件")
duplicates = fm.find_duplicates()
if duplicates:
for hash_val, files in duplicates.items():
print(f"重复文件组 (MD5: {hash_val[:8]}...):")
for file_info in files:
print(f" {file_info.path}")
else:
print("未发现重复文件")
# 4. 获取目录统计
print("\n4. 目录统计")
stats = fm.get_directory_stats()
print(f"总文件数: {stats['total_files']}")
print(f"总目录数: {stats['total_directories']}")
print(f"总大小: {stats['total_size']} 字节")
print("文件类型分布:")
for ext, count in stats['file_types'].items():
print(f" {ext}: {count} 个")
# 5. 整理文件
print("\n5. 整理文件")
organize_source = test_dir / 'documents'
organize_target = test_dir / 'organized'
moved_count = fm.organize_files(
str(organize_source),
str(organize_target),
organize_by='extension'
)
print(f"已整理 {moved_count} 个文件")
# 6. 备份文件
print("\n6. 备份文件")
backup_sources = [str(test_dir / 'code'), str(test_dir / 'readme.txt')]
backup_dir = test_dir / 'backups'
backup_file = fm.backup_files(backup_sources, str(backup_dir), compression=True)
print(f"备份已创建: {backup_file}")
# 7. 清理空目录
print("\n7. 清理空目录")
removed_count = fm.clean_empty_directories()
print(f"已删除 {removed_count} 个空目录")
# 8. 导出索引
print("\n8. 导出文件索引")
index_file = test_dir / 'file_index.json'
fm.export_index(str(index_file))
print(f"文件索引已导出到: {index_file}")
# 9. 查看操作日志
print("\n9. 操作日志")
log = fm.get_operation_log()
print(f"共记录 {len(log)} 个操作:")
for entry in log[-3:]: # 显示最后3个操作
print(f" {entry['timestamp']}: {entry['operation']}")
print("\n文件管理系统演示完成")
# 运行演示
file_manager_demo()
# 七、总结和最佳实践
# 7.1 IO编程最佳实践
使用上下文管理器
- 始终使用
with
语句处理文件操作 - 确保资源正确释放
- 始终使用
选择合适的文件模式
- 明确区分文本模式和二进制模式
- 根据需求选择读写模式
处理编码问题
- 明确指定文件编码(通常使用UTF-8)
- 处理编码错误
异常处理
- 捕获和处理IO相关异常
- 提供有意义的错误信息
性能优化
- 对大文件使用分块读取
- 使用缓冲区提高效率
- 考虑使用内存映射文件
# 7.2 文件操作安全性
路径安全
- 验证文件路径的合法性
- 防止路径遍历攻击
权限检查
- 检查文件读写权限
- 处理权限不足的情况
数据完整性
- 使用校验和验证文件完整性
- 实现原子操作避免数据损坏
# 7.3 下一步学习方向
网络IO编程
- 学习socket编程
- 掌握HTTP客户端和服务器
异步IO
- 学习asyncio模块
- 掌握异步文件操作
数据库IO
- 学习数据库连接和操作
- 掌握ORM框架
高级文件格式
- 学习处理Excel、PDF等格式
- 掌握图像和音视频文件处理
# 7.4 练习建议
- 创建文件处理工具,实践各种文件操作
- 编写日志系统,掌握文件写入和轮转
- 实现配置管理,练习JSON/YAML文件处理
- 开发备份工具,综合运用文件和目录操作
- 构建文件监控系统,学习文件系统事件处理
通过本章的学习,你应该能够:
- 熟练进行各种文件IO操作
- 掌握目录和路径操作技巧
- 理解标准输入输出的使用
- 运用内存IO和临时文件
- 构建完整的文件管理系统
- 遵循IO编程的最佳实践
IO编程是Python应用开发的基础技能,需要在实践中不断提高。记住,良好的IO处理不仅能提高程序性能,还能增强程序的稳定性和用户体验。