第7天-函数式编程
哪吒 2023/6/15
# 第7天-Python函数式编程
欢迎来到Python函数式编程的世界!今天我们将深入学习一种优雅的编程范式,它能让你的代码更加简洁、可读和可维护。
# 什么是函数式编程?
函数式编程(Functional Programming,FP)是一种编程范式,它将计算视为数学函数的求值,避免改变状态和可变数据。
生活类比:
- 命令式编程:像做菜的详细步骤,"先切菜,再炒菜,然后装盘"
- 函数式编程:像数学公式,"f(原料) = 成品菜",关注输入和输出的关系
# 函数式编程的核心特点
- 函数是一等公民:函数可以作为参数传递、作为返回值、赋值给变量
- 不可变性:数据一旦创建就不能修改
- 纯函数:相同输入总是产生相同输出,没有副作用
- 高阶函数:接受函数作为参数或返回函数的函数
# 一、纯函数(Pure Functions)
# 1.1 什么是纯函数?
纯函数具有两个特点:
- 确定性:相同的输入总是产生相同的输出
- 无副作用:不修改外部状态,不产生可观察的副作用
# 纯函数示例
def add(x, y):
"""纯函数:只依赖输入参数,总是返回相同结果"""
return x + y
def multiply(x, y):
"""纯函数:简单的数学运算"""
return x * y
def get_full_name(first_name, last_name):
"""纯函数:字符串处理"""
return f"{first_name} {last_name}"
# 测试纯函数
print(add(2, 3)) # 总是返回 5
print(multiply(4, 5)) # 总是返回 20
print(get_full_name("张", "三")) # 总是返回 "张 三"
# 1.2 非纯函数示例
import random
import datetime
# 非纯函数示例(不推荐在函数式编程中使用)
counter = 0
def impure_increment():
"""非纯函数:修改全局变量"""
global counter
counter += 1
return counter
def get_random_number():
"""非纯函数:每次调用结果不同"""
return random.randint(1, 100)
def get_current_time():
"""非纯函数:依赖外部状态(当前时间)"""
return datetime.datetime.now()
# 这些函数的输出不可预测
print(impure_increment()) # 1
print(impure_increment()) # 2
print(get_random_number()) # 随机数
print(get_current_time()) # 当前时间
# 1.3 将非纯函数转换为纯函数
# 改进:将非纯函数转换为纯函数
def pure_increment(current_value):
"""纯函数版本:不修改外部状态"""
return current_value + 1
def generate_sequence(start, length):
"""纯函数:生成确定的序列"""
return [start + i for i in range(length)]
def format_timestamp(timestamp):
"""纯函数:格式化给定的时间戳"""
return datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
# 使用纯函数
current = 0
for i in range(3):
current = pure_increment(current)
print(f"计数器:{current}")
sequence = generate_sequence(10, 5)
print(f"序列:{sequence}") # [10, 11, 12, 13, 14]
timestamp = 1640995200 # 固定时间戳
formatted_time = format_timestamp(timestamp)
print(f"格式化时间:{formatted_time}")
# 二、高阶函数(Higher-Order Functions)
# 2.1 函数作为参数
def apply_operation(numbers, operation):
"""高阶函数:接受函数作为参数"""
return [operation(num) for num in numbers]
def square(x):
return x ** 2
def cube(x):
return x ** 3
def double(x):
return x * 2
# 使用不同的操作函数
numbers = [1, 2, 3, 4, 5]
squared = apply_operation(numbers, square)
print(f"平方:{squared}") # [1, 4, 9, 16, 25]
cubed = apply_operation(numbers, cube)
print(f"立方:{cubed}") # [1, 8, 27, 64, 125]
doubled = apply_operation(numbers, double)
print(f"翻倍:{doubled}") # [2, 4, 6, 8, 10]
# 使用lambda函数
halved = apply_operation(numbers, lambda x: x / 2)
print(f"减半:{halved}") # [0.5, 1.0, 1.5, 2.0, 2.5]
# 2.2 函数作为返回值
def create_multiplier(factor):
"""返回一个乘法函数"""
def multiplier(x):
return x * factor
return multiplier
def create_validator(min_value, max_value):
"""返回一个验证函数"""
def validator(value):
return min_value <= value <= max_value
return validator
def create_formatter(prefix, suffix):
"""返回一个格式化函数"""
def formatter(text):
return f"{prefix}{text}{suffix}"
return formatter
# 使用函数工厂
double_func = create_multiplier(2)
triple_func = create_multiplier(3)
print(double_func(5)) # 10
print(triple_func(5)) # 15
# 创建验证器
age_validator = create_validator(0, 120)
score_validator = create_validator(0, 100)
print(age_validator(25)) # True
print(age_validator(150)) # False
print(score_validator(85)) # True
print(score_validator(105)) # False
# 创建格式化器
html_formatter = create_formatter("<p>", "</p>")
markdown_formatter = create_formatter("**", "**")
print(html_formatter("Hello")) # <p>Hello</p>
print(markdown_formatter("Bold")) # **Bold**
# 2.3 函数组合
def compose(f, g):
"""函数组合:返回 f(g(x))"""
return lambda x: f(g(x))
def pipe(*functions):
"""管道操作:从左到右依次应用函数"""
def piped_function(value):
for func in functions:
value = func(value)
return value
return piped_function
# 基础函数
def add_one(x):
return x + 1
def multiply_by_two(x):
return x * 2
def square(x):
return x ** 2
# 函数组合
add_then_multiply = compose(multiply_by_two, add_one)
result1 = add_then_multiply(3) # multiply_by_two(add_one(3)) = multiply_by_two(4) = 8
print(f"组合结果:{result1}")
# 管道操作
process_number = pipe(add_one, multiply_by_two, square)
result2 = process_number(3) # ((3+1)*2)^2 = (4*2)^2 = 8^2 = 64
print(f"管道结果:{result2}")
# 处理字符串的管道
def to_upper(s):
return s.upper()
def add_exclamation(s):
return s + "!"
def add_prefix(s):
return ">>> " + s
process_text = pipe(to_upper, add_exclamation, add_prefix)
result3 = process_text("hello world")
print(f"文本处理:{result3}") # >>> HELLO WORLD!
# 三、内置高阶函数深入
# 3.1 map()函数进阶
# 处理多个序列
def add_three_numbers(x, y, z):
return x + y + z
list1 = [1, 2, 3]
list2 = [10, 20, 30]
list3 = [100, 200, 300]
result = list(map(add_three_numbers, list1, list2, list3))
print(f"三个列表相加:{result}") # [111, 222, 333]
# 处理字典
students = [
{'name': 'Alice', 'score': 85},
{'name': 'Bob', 'score': 92},
{'name': 'Charlie', 'score': 78}
]
# 提取姓名
names = list(map(lambda student: student['name'], students))
print(f"学生姓名:{names}") # ['Alice', 'Bob', 'Charlie']
# 计算等级
def get_grade(score):
if score >= 90:
return 'A'
elif score >= 80:
return 'B'
elif score >= 70:
return 'C'
else:
return 'D'
grades = list(map(lambda s: get_grade(s['score']), students))
print(f"学生等级:{grades}") # ['B', 'A', 'C']
# 3.2 filter()函数进阶
# 复杂过滤条件
numbers = range(1, 21)
# 过滤质数
def is_prime(n):
if n < 2:
return False
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return True
primes = list(filter(is_prime, numbers))
print(f"质数:{primes}") # [2, 3, 5, 7, 11, 13, 17, 19]
# 过滤字符串
words = ['apple', 'banana', 'cherry', 'date', 'elderberry', 'fig']
# 长度大于5且包含'e'的单词
filtered_words = list(filter(
lambda word: len(word) > 5 and 'e' in word,
words
))
print(f"过滤后的单词:{filtered_words}") # ['cherry', 'elderberry']
# 过滤学生数据
students = [
{'name': 'Alice', 'age': 20, 'score': 85},
{'name': 'Bob', 'age': 22, 'score': 92},
{'name': 'Charlie', 'age': 19, 'score': 78},
{'name': 'Diana', 'age': 21, 'score': 96}
]
# 年龄大于20且分数大于80的学生
excellent_students = list(filter(
lambda s: s['age'] > 20 and s['score'] > 80,
students
))
print("优秀学生:")
for student in excellent_students:
print(f" {student['name']}: {student['age']}岁, {student['score']}分")
# 3.3 reduce()函数进阶
from functools import reduce
# 复杂的累积操作
numbers = [1, 2, 3, 4, 5]
# 计算阶乘
factorial = reduce(lambda x, y: x * y, numbers)
print(f"阶乘:{factorial}") # 120
# 找到最大值和最小值
max_value = reduce(lambda x, y: x if x > y else y, numbers)
min_value = reduce(lambda x, y: x if x < y else y, numbers)
print(f"最大值:{max_value}, 最小值:{min_value}") # 5, 1
# 字符串操作
words = ['Python', 'is', 'awesome', 'for', 'programming']
# 连接字符串
sentence = reduce(lambda x, y: x + ' ' + y, words)
print(f"句子:{sentence}") # Python is awesome for programming
# 找最长的单词
longest_word = reduce(
lambda x, y: x if len(x) > len(y) else y,
words
)
print(f"最长单词:{longest_word}") # programming
# 复杂数据结构的处理
orders = [
{'id': 1, 'amount': 100},
{'id': 2, 'amount': 250},
{'id': 3, 'amount': 75},
{'id': 4, 'amount': 300}
]
# 计算总金额
total_amount = reduce(
lambda total, order: total + order['amount'],
orders,
0 # 初始值
)
print(f"总金额:{total_amount}") # 725
# 合并字典
data_sources = [
{'users': 100, 'posts': 500},
{'users': 50, 'comments': 200},
{'users': 75, 'likes': 1000}
]
merged_data = reduce(
lambda acc, data: {**acc, **data},
data_sources,
{}
)
print(f"合并数据:{merged_data}")
# {'users': 75, 'posts': 500, 'comments': 200, 'likes': 1000}
# 四、函数式编程工具
# 4.1 itertools模块
import itertools
# itertools.chain - 连接多个可迭代对象
list1 = [1, 2, 3]
list2 = [4, 5, 6]
list3 = [7, 8, 9]
chained = list(itertools.chain(list1, list2, list3))
print(f"连接列表:{chained}") # [1, 2, 3, 4, 5, 6, 7, 8, 9]
# itertools.cycle - 无限循环
cycler = itertools.cycle(['A', 'B', 'C'])
first_10 = [next(cycler) for _ in range(10)]
print(f"循环前10个:{first_10}") # ['A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A']
# itertools.repeat - 重复元素
repeated = list(itertools.repeat('Hello', 5))
print(f"重复元素:{repeated}") # ['Hello', 'Hello', 'Hello', 'Hello', 'Hello']
# itertools.takewhile - 条件为真时取元素
numbers = [1, 3, 5, 8, 9, 11, 13]
less_than_10 = list(itertools.takewhile(lambda x: x < 10, numbers))
print(f"小于10的连续元素:{less_than_10}") # [1, 3, 5, 8, 9]
# itertools.dropwhile - 条件为真时跳过元素
after_10 = list(itertools.dropwhile(lambda x: x < 10, numbers))
print(f"大于等于10的元素:{after_10}") # [11, 13]
# itertools.groupby - 分组
data = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('C', 5)]
grouped = {k: list(g) for k, g in itertools.groupby(data, key=lambda x: x[0])}
print(f"分组数据:{grouped}")
# {'A': [('A', 1), ('A', 2)], 'B': [('B', 3), ('B', 4)], 'C': [('C', 5)]}
# 4.2 operator模块
import operator
from functools import reduce
# 使用operator模块替代lambda
numbers = [1, 2, 3, 4, 5]
# 求和
total = reduce(operator.add, numbers)
print(f"总和:{total}") # 15
# 求积
product = reduce(operator.mul, numbers)
print(f"乘积:{product}") # 120
# 字符串连接
words = ['Hello', 'World', 'Python']
sentence = reduce(operator.add, words)
print(f"连接字符串:{sentence}") # HelloWorldPython
# 获取属性和索引
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
def __repr__(self):
return f"Person('{self.name}', {self.age})"
people = [
Person('Alice', 25),
Person('Bob', 30),
Person('Charlie', 20)
]
# 按年龄排序
sorted_by_age = sorted(people, key=operator.attrgetter('age'))
print(f"按年龄排序:{sorted_by_age}")
# 按姓名排序
sorted_by_name = sorted(people, key=operator.attrgetter('name'))
print(f"按姓名排序:{sorted_by_name}")
# 处理元组列表
student_scores = [('Alice', 85), ('Bob', 92), ('Charlie', 78)]
sorted_by_score = sorted(student_scores, key=operator.itemgetter(1))
print(f"按分数排序:{sorted_by_score}")
# 4.3 functools模块进阶
from functools import partial, wraps, lru_cache, singledispatch
# partial - 偏函数应用
def multiply(x, y, z):
return x * y * z
# 创建偏函数
double = partial(multiply, 2) # 固定第一个参数为2
triple = partial(multiply, 3) # 固定第一个参数为3
print(double(5, 6)) # 2 * 5 * 6 = 60
print(triple(4, 5)) # 3 * 4 * 5 = 60
# 更复杂的偏函数
def log_message(level, message, timestamp=None):
import datetime
if timestamp is None:
timestamp = datetime.datetime.now()
return f"[{timestamp}] {level}: {message}"
# 创建特定级别的日志函数
info_log = partial(log_message, "INFO")
error_log = partial(log_message, "ERROR")
warning_log = partial(log_message, "WARNING")
print(info_log("应用启动"))
print(error_log("数据库连接失败"))
print(warning_log("内存使用率过高"))
# lru_cache - 缓存装饰器
@lru_cache(maxsize=128)
def fibonacci(n):
"""带缓存的斐波那契函数"""
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
# 测试缓存效果
import time
start = time.time()
result = fibonacci(35)
end = time.time()
print(f"fibonacci(35) = {result}, 耗时: {end - start:.4f}秒")
# 第二次调用会很快(使用缓存)
start = time.time()
result = fibonacci(35)
end = time.time()
print(f"fibonacci(35) = {result}, 耗时: {end - start:.6f}秒")
# singledispatch - 单分派泛型函数
@singledispatch
def process_data(data):
"""默认处理函数"""
return f"处理未知类型: {type(data).__name__}"
@process_data.register
def _(data: int):
return f"处理整数: {data * 2}"
@process_data.register
def _(data: str):
return f"处理字符串: {data.upper()}"
@process_data.register
def _(data: list):
return f"处理列表: {len(data)} 个元素"
# 测试单分派
print(process_data(42)) # 处理整数: 84
print(process_data("hello")) # 处理字符串: HELLO
print(process_data([1,2,3])) # 处理列表: 3 个元素
print(process_data(3.14)) # 处理未知类型: float
# 五、不可变数据结构
# 5.1 使用元组和命名元组
from collections import namedtuple
# 普通元组
point = (3, 4)
print(f"点坐标:{point}")
print(f"x坐标:{point[0]}, y坐标:{point[1]}")
# 命名元组 - 更具可读性
Point = namedtuple('Point', ['x', 'y'])
point = Point(3, 4)
print(f"点坐标:{point}")
print(f"x坐标:{point.x}, y坐标:{point.y}")
# 命名元组的方法
print(f"转为字典:{point._asdict()}")
print(f"替换值:{point._replace(x=5)}")
# 复杂的命名元组
Student = namedtuple('Student', ['name', 'age', 'grade', 'scores'])
student = Student(
name='Alice',
age=20,
grade='A',
scores=(85, 92, 78, 96)
)
print(f"学生信息:{student}")
print(f"平均分:{sum(student.scores) / len(student.scores):.2f}")
# 函数式处理命名元组
def calculate_gpa(student):
"""计算GPA(纯函数)"""
average = sum(student.scores) / len(student.scores)
if average >= 90:
return 4.0
elif average >= 80:
return 3.0
elif average >= 70:
return 2.0
else:
return 1.0
def update_grade(student, new_grade):
"""更新等级(返回新对象)"""
return student._replace(grade=new_grade)
gpa = calculate_gpa(student)
print(f"GPA:{gpa}")
updated_student = update_grade(student, 'A+')
print(f"更新后的学生:{updated_student}")
print(f"原学生不变:{student}")
# 5.2 frozenset的使用
# frozenset - 不可变集合
mutable_set = {1, 2, 3, 4, 5}
immutable_set = frozenset([1, 2, 3, 4, 5])
print(f"可变集合:{mutable_set}")
print(f"不可变集合:{immutable_set}")
# frozenset可以作为字典的键
set_dict = {
frozenset([1, 2]): "小集合",
frozenset([1, 2, 3, 4, 5]): "大集合"
}
print(f"集合字典:{set_dict}")
# 函数式操作frozenset
def union_sets(*sets):
"""合并多个集合"""
result = frozenset()
for s in sets:
result = result.union(s)
return result
def filter_set(s, predicate):
"""过滤集合元素"""
return frozenset(filter(predicate, s))
set1 = frozenset([1, 2, 3])
set2 = frozenset([3, 4, 5])
set3 = frozenset([5, 6, 7])
unioned = union_sets(set1, set2, set3)
print(f"合并集合:{uniond}")
even_numbers = filter_set(uniond, lambda x: x % 2 == 0)
print(f"偶数:{even_numbers}")
# 六、函数式编程实战
# 6.1 数据处理管道
from functools import reduce
from typing import List, Dict, Any
# 学生数据
students_data = [
{'name': 'Alice', 'age': 20, 'scores': [85, 92, 78, 96], 'major': 'CS'},
{'name': 'Bob', 'age': 22, 'scores': [76, 88, 82, 79], 'major': 'Math'},
{'name': 'Charlie', 'age': 19, 'scores': [95, 87, 91, 93], 'major': 'CS'},
{'name': 'Diana', 'age': 21, 'scores': [68, 74, 82, 79], 'major': 'Physics'},
{'name': 'Eve', 'age': 20, 'scores': [89, 94, 87, 92], 'major': 'CS'}
]
# 纯函数定义
def calculate_average(scores: List[int]) -> float:
"""计算平均分"""
return sum(scores) / len(scores)
def add_average_score(student: Dict[str, Any]) -> Dict[str, Any]:
"""添加平均分字段"""
return {
**student,
'average': calculate_average(student['scores'])
}
def is_excellent_student(student: Dict[str, Any]) -> bool:
"""判断是否为优秀学生(平均分>85)"""
return student['average'] > 85
def is_cs_major(student: Dict[str, Any]) -> bool:
"""判断是否为CS专业"""
return student['major'] == 'CS'
def get_student_summary(student: Dict[str, Any]) -> Dict[str, Any]:
"""获取学生摘要信息"""
return {
'name': student['name'],
'average': round(student['average'], 2),
'major': student['major']
}
# 函数式数据处理管道
def process_students(students: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""处理学生数据的完整管道"""
return list(
map(
get_student_summary,
filter(
lambda s: is_excellent_student(s) and is_cs_major(s),
map(add_average_score, students)
)
)
)
# 执行处理
excellent_cs_students = process_students(students_data)
print("优秀的CS专业学生:")
for student in excellent_cs_students:
print(f" {student['name']}: {student['average']} ({student['major']})")
# 使用reduce计算统计信息
def calculate_stats(students: List[Dict[str, Any]]) -> Dict[str, float]:
"""计算统计信息"""
students_with_avg = list(map(add_average_score, students))
total_avg = reduce(
lambda acc, student: acc + student['average'],
students_with_avg,
0
) / len(students_with_avg)
max_avg = reduce(
lambda acc, student: max(acc, student['average']),
students_with_avg,
0
)
min_avg = reduce(
lambda acc, student: min(acc, student['average']),
students_with_avg,
float('inf')
)
return {
'total_average': round(total_avg, 2),
'max_average': round(max_avg, 2),
'min_average': round(min_avg, 2)
}
stats = calculate_stats(students_data)
print(f"\n统计信息:{stats}")
# 6.2 函数式配置管理
from functools import partial
from typing import Callable, Any
# 配置验证函数
def validate_range(min_val: float, max_val: float, value: float) -> bool:
"""验证值是否在范围内"""
return min_val <= value <= max_val
def validate_type(expected_type: type, value: Any) -> bool:
"""验证类型"""
return isinstance(value, expected_type)
def validate_length(min_len: int, max_len: int, value: str) -> bool:
"""验证字符串长度"""
return min_len <= len(value) <= max_len
# 创建特定的验证器
validate_port = partial(validate_range, 1, 65535)
validate_percentage = partial(validate_range, 0, 100)
validate_string = partial(validate_type, str)
validate_int = partial(validate_type, int)
validate_username = partial(validate_length, 3, 20)
# 配置规则
config_rules = {
'server_port': [validate_int, validate_port],
'cpu_threshold': [validate_int, validate_percentage],
'username': [validate_string, validate_username],
'timeout': [validate_int, partial(validate_range, 1, 300)]
}
def validate_config(rules: Dict[str, List[Callable]], config: Dict[str, Any]) -> Dict[str, bool]:
"""验证配置"""
results = {}
for key, validators in rules.items():
if key in config:
# 所有验证器都必须通过
results[key] = all(
validator(config[key]) for validator in validators
)
else:
results[key] = False
return results
# 测试配置
test_config = {
'server_port': 8080,
'cpu_threshold': 85,
'username': 'alice',
'timeout': 30
}
validation_results = validate_config(config_rules, test_config)
print("配置验证结果:")
for key, is_valid in validation_results.items():
status = "✓" if is_valid else "✗"
print(f" {key}: {status}")
# 无效配置测试
invalid_config = {
'server_port': 70000, # 超出范围
'cpu_threshold': 150, # 超出百分比范围
'username': 'ab', # 太短
'timeout': 500 # 超时时间太长
}
invalid_results = validate_config(config_rules, invalid_config)
print("\n无效配置验证结果:")
for key, is_valid in invalid_results.items():
status = "✓" if is_valid else "✗"
print(f" {key}: {status}")
# 6.3 函数式错误处理
from typing import Union, Callable, Any
from dataclasses import dataclass
# 定义Result类型(模拟Rust的Result)
@dataclass
class Success:
value: Any
def is_success(self) -> bool:
return True
def is_error(self) -> bool:
return False
def map(self, func: Callable) -> 'Union[Success, Error]':
try:
return Success(func(self.value))
except Exception as e:
return Error(str(e))
def flat_map(self, func: Callable) -> 'Union[Success, Error]':
try:
return func(self.value)
except Exception as e:
return Error(str(e))
@dataclass
class Error:
message: str
def is_success(self) -> bool:
return False
def is_error(self) -> bool:
return True
def map(self, func: Callable) -> 'Error':
return self
def flat_map(self, func: Callable) -> 'Error':
return self
Result = Union[Success, Error]
# 安全的数学操作
def safe_divide(x: float, y: float) -> Result:
"""安全除法"""
if y == 0:
return Error("除零错误")
return Success(x / y)
def safe_sqrt(x: float) -> Result:
"""安全开方"""
if x < 0:
return Error("负数不能开方")
return Success(x ** 0.5)
def safe_log(x: float) -> Result:
"""安全对数"""
if x <= 0:
return Error("对数的参数必须大于0")
import math
return Success(math.log(x))
# 函数式错误处理链
def calculate_complex_formula(a: float, b: float, c: float) -> Result:
"""计算复杂公式:log(sqrt(a/b) + c)"""
return (safe_divide(a, b)
.flat_map(lambda x: safe_sqrt(x))
.map(lambda x: x + c)
.flat_map(lambda x: safe_log(x)))
# 测试错误处理
test_cases = [
(16, 4, 1), # 正常情况
(16, 0, 1), # 除零错误
(-16, 4, 1), # 负数开方错误
(16, 4, -3), # 对数参数错误
]
print("复杂公式计算结果:")
for a, b, c in test_cases:
result = calculate_complex_formula(a, b, c)
if result.is_success():
print(f" f({a}, {b}, {c}) = {result.value:.4f}")
else:
print(f" f({a}, {b}, {c}) = 错误: {result.message}")
# 批量处理
def process_batch(data: List[tuple], processor: Callable) -> Dict[str, List]:
"""批量处理数据"""
successes = []
errors = []
for item in data:
result = processor(*item)
if result.is_success():
successes.append((item, result.value))
else:
errors.append((item, result.message))
return {'successes': successes, 'errors': errors}
batch_results = process_batch(test_cases, calculate_complex_formula)
print(f"\n批量处理结果:")
print(f"成功: {len(batch_results['successes'])} 个")
print(f"失败: {len(batch_results['errors'])} 个")
# 七、总结
# 7.1 函数式编程的优势
- 可预测性:纯函数使代码行为可预测
- 可测试性:纯函数易于单元测试
- 可组合性:函数可以轻松组合成复杂操作
- 并发安全:不可变数据避免了并发问题
- 代码简洁:高阶函数减少重复代码
# 7.2 何时使用函数式编程
适合的场景:
- 数据转换和处理
- 配置验证
- 数学计算
- 并发编程
- 需要高可靠性的系统
不太适合的场景:
- 需要频繁修改状态的应用
- 性能要求极高的场景
- 简单的脚本任务
# 7.3 最佳实践
- 优先使用纯函数:避免副作用
- 保持函数简单:一个函数只做一件事
- 使用类型注解:提高代码可读性
- 合理使用高阶函数:不要过度抽象
- 结合其他范式:Python支持多种编程范式
# 7.4 下一步学习
- 深入学习itertools和functools
- 了解更多函数式编程库(如toolz)
- 学习其他函数式语言(如Haskell、Clojure)
- 实践函数式设计模式
恭喜你!掌握了函数式编程,你的Python技能又上了一个新台阶!🎉
练习建议:
- 重构现有代码,使用纯函数
- 练习使用map、filter、reduce处理数据
- 尝试函数组合和管道操作
- 实现自己的高阶函数
- 用函数式方法解决实际问题