第7天-函数式编程

2023/6/15

# 第7天-Python函数式编程

欢迎来到Python函数式编程的世界!今天我们将深入学习一种优雅的编程范式,它能让你的代码更加简洁、可读和可维护。

# 什么是函数式编程?

函数式编程(Functional Programming,FP)是一种编程范式,它将计算视为数学函数的求值,避免改变状态和可变数据。

生活类比

  • 命令式编程:像做菜的详细步骤,"先切菜,再炒菜,然后装盘"
  • 函数式编程:像数学公式,"f(原料) = 成品菜",关注输入和输出的关系

# 函数式编程的核心特点

  1. 函数是一等公民:函数可以作为参数传递、作为返回值、赋值给变量
  2. 不可变性:数据一旦创建就不能修改
  3. 纯函数:相同输入总是产生相同输出,没有副作用
  4. 高阶函数:接受函数作为参数或返回函数的函数

# 一、纯函数(Pure Functions)

# 1.1 什么是纯函数?

纯函数具有两个特点:

  1. 确定性:相同的输入总是产生相同的输出
  2. 无副作用:不修改外部状态,不产生可观察的副作用
# 纯函数示例
def add(x, y):
    """纯函数:只依赖输入参数,总是返回相同结果"""
    return x + y

def multiply(x, y):
    """纯函数:简单的数学运算"""
    return x * y

def get_full_name(first_name, last_name):
    """纯函数:字符串处理"""
    return f"{first_name} {last_name}"

# 测试纯函数
print(add(2, 3))  # 总是返回 5
print(multiply(4, 5))  # 总是返回 20
print(get_full_name("张", "三"))  # 总是返回 "张 三"

# 1.2 非纯函数示例

import random
import datetime

# 非纯函数示例(不推荐在函数式编程中使用)
counter = 0

def impure_increment():
    """非纯函数:修改全局变量"""
    global counter
    counter += 1
    return counter

def get_random_number():
    """非纯函数:每次调用结果不同"""
    return random.randint(1, 100)

def get_current_time():
    """非纯函数:依赖外部状态(当前时间)"""
    return datetime.datetime.now()

# 这些函数的输出不可预测
print(impure_increment())  # 1
print(impure_increment())  # 2
print(get_random_number())  # 随机数
print(get_current_time())  # 当前时间

# 1.3 将非纯函数转换为纯函数

# 改进:将非纯函数转换为纯函数
def pure_increment(current_value):
    """纯函数版本:不修改外部状态"""
    return current_value + 1

def generate_sequence(start, length):
    """纯函数:生成确定的序列"""
    return [start + i for i in range(length)]

def format_timestamp(timestamp):
    """纯函数:格式化给定的时间戳"""
    return datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')

# 使用纯函数
current = 0
for i in range(3):
    current = pure_increment(current)
    print(f"计数器:{current}")

sequence = generate_sequence(10, 5)
print(f"序列:{sequence}")  # [10, 11, 12, 13, 14]

timestamp = 1640995200  # 固定时间戳
formatted_time = format_timestamp(timestamp)
print(f"格式化时间:{formatted_time}")

# 二、高阶函数(Higher-Order Functions)

# 2.1 函数作为参数

def apply_operation(numbers, operation):
    """高阶函数:接受函数作为参数"""
    return [operation(num) for num in numbers]

def square(x):
    return x ** 2

def cube(x):
    return x ** 3

def double(x):
    return x * 2

# 使用不同的操作函数
numbers = [1, 2, 3, 4, 5]

squared = apply_operation(numbers, square)
print(f"平方:{squared}")  # [1, 4, 9, 16, 25]

cubed = apply_operation(numbers, cube)
print(f"立方:{cubed}")  # [1, 8, 27, 64, 125]

doubled = apply_operation(numbers, double)
print(f"翻倍:{doubled}")  # [2, 4, 6, 8, 10]

# 使用lambda函数
halved = apply_operation(numbers, lambda x: x / 2)
print(f"减半:{halved}")  # [0.5, 1.0, 1.5, 2.0, 2.5]

# 2.2 函数作为返回值

def create_multiplier(factor):
    """返回一个乘法函数"""
    def multiplier(x):
        return x * factor
    return multiplier

def create_validator(min_value, max_value):
    """返回一个验证函数"""
    def validator(value):
        return min_value <= value <= max_value
    return validator

def create_formatter(prefix, suffix):
    """返回一个格式化函数"""
    def formatter(text):
        return f"{prefix}{text}{suffix}"
    return formatter

# 使用函数工厂
double_func = create_multiplier(2)
triple_func = create_multiplier(3)

print(double_func(5))  # 10
print(triple_func(5))  # 15

# 创建验证器
age_validator = create_validator(0, 120)
score_validator = create_validator(0, 100)

print(age_validator(25))   # True
print(age_validator(150))  # False
print(score_validator(85)) # True
print(score_validator(105)) # False

# 创建格式化器
html_formatter = create_formatter("<p>", "</p>")
markdown_formatter = create_formatter("**", "**")

print(html_formatter("Hello"))     # <p>Hello</p>
print(markdown_formatter("Bold"))  # **Bold**

# 2.3 函数组合

def compose(f, g):
    """函数组合:返回 f(g(x))"""
    return lambda x: f(g(x))

def pipe(*functions):
    """管道操作:从左到右依次应用函数"""
    def piped_function(value):
        for func in functions:
            value = func(value)
        return value
    return piped_function

# 基础函数
def add_one(x):
    return x + 1

def multiply_by_two(x):
    return x * 2

def square(x):
    return x ** 2

# 函数组合
add_then_multiply = compose(multiply_by_two, add_one)
result1 = add_then_multiply(3)  # multiply_by_two(add_one(3)) = multiply_by_two(4) = 8
print(f"组合结果:{result1}")

# 管道操作
process_number = pipe(add_one, multiply_by_two, square)
result2 = process_number(3)  # ((3+1)*2)^2 = (4*2)^2 = 8^2 = 64
print(f"管道结果:{result2}")

# 处理字符串的管道
def to_upper(s):
    return s.upper()

def add_exclamation(s):
    return s + "!"

def add_prefix(s):
    return ">>> " + s

process_text = pipe(to_upper, add_exclamation, add_prefix)
result3 = process_text("hello world")
print(f"文本处理:{result3}")  # >>> HELLO WORLD!

# 三、内置高阶函数深入

# 3.1 map()函数进阶

# 处理多个序列
def add_three_numbers(x, y, z):
    return x + y + z

list1 = [1, 2, 3]
list2 = [10, 20, 30]
list3 = [100, 200, 300]

result = list(map(add_three_numbers, list1, list2, list3))
print(f"三个列表相加:{result}")  # [111, 222, 333]

# 处理字典
students = [
    {'name': 'Alice', 'score': 85},
    {'name': 'Bob', 'score': 92},
    {'name': 'Charlie', 'score': 78}
]

# 提取姓名
names = list(map(lambda student: student['name'], students))
print(f"学生姓名:{names}")  # ['Alice', 'Bob', 'Charlie']

# 计算等级
def get_grade(score):
    if score >= 90:
        return 'A'
    elif score >= 80:
        return 'B'
    elif score >= 70:
        return 'C'
    else:
        return 'D'

grades = list(map(lambda s: get_grade(s['score']), students))
print(f"学生等级:{grades}")  # ['B', 'A', 'C']

# 3.2 filter()函数进阶

# 复杂过滤条件
numbers = range(1, 21)

# 过滤质数
def is_prime(n):
    if n < 2:
        return False
    for i in range(2, int(n ** 0.5) + 1):
        if n % i == 0:
            return False
    return True

primes = list(filter(is_prime, numbers))
print(f"质数:{primes}")  # [2, 3, 5, 7, 11, 13, 17, 19]

# 过滤字符串
words = ['apple', 'banana', 'cherry', 'date', 'elderberry', 'fig']

# 长度大于5且包含'e'的单词
filtered_words = list(filter(
    lambda word: len(word) > 5 and 'e' in word, 
    words
))
print(f"过滤后的单词:{filtered_words}")  # ['cherry', 'elderberry']

# 过滤学生数据
students = [
    {'name': 'Alice', 'age': 20, 'score': 85},
    {'name': 'Bob', 'age': 22, 'score': 92},
    {'name': 'Charlie', 'age': 19, 'score': 78},
    {'name': 'Diana', 'age': 21, 'score': 96}
]

# 年龄大于20且分数大于80的学生
excellent_students = list(filter(
    lambda s: s['age'] > 20 and s['score'] > 80,
    students
))
print("优秀学生:")
for student in excellent_students:
    print(f"  {student['name']}: {student['age']}岁, {student['score']}分")

# 3.3 reduce()函数进阶

from functools import reduce

# 复杂的累积操作
numbers = [1, 2, 3, 4, 5]

# 计算阶乘
factorial = reduce(lambda x, y: x * y, numbers)
print(f"阶乘:{factorial}")  # 120

# 找到最大值和最小值
max_value = reduce(lambda x, y: x if x > y else y, numbers)
min_value = reduce(lambda x, y: x if x < y else y, numbers)
print(f"最大值:{max_value}, 最小值:{min_value}")  # 5, 1

# 字符串操作
words = ['Python', 'is', 'awesome', 'for', 'programming']

# 连接字符串
sentence = reduce(lambda x, y: x + ' ' + y, words)
print(f"句子:{sentence}")  # Python is awesome for programming

# 找最长的单词
longest_word = reduce(
    lambda x, y: x if len(x) > len(y) else y, 
    words
)
print(f"最长单词:{longest_word}")  # programming

# 复杂数据结构的处理
orders = [
    {'id': 1, 'amount': 100},
    {'id': 2, 'amount': 250},
    {'id': 3, 'amount': 75},
    {'id': 4, 'amount': 300}
]

# 计算总金额
total_amount = reduce(
    lambda total, order: total + order['amount'],
    orders,
    0  # 初始值
)
print(f"总金额:{total_amount}")  # 725

# 合并字典
data_sources = [
    {'users': 100, 'posts': 500},
    {'users': 50, 'comments': 200},
    {'users': 75, 'likes': 1000}
]

merged_data = reduce(
    lambda acc, data: {**acc, **data},
    data_sources,
    {}
)
print(f"合并数据:{merged_data}")
# {'users': 75, 'posts': 500, 'comments': 200, 'likes': 1000}

# 四、函数式编程工具

# 4.1 itertools模块

import itertools

# itertools.chain - 连接多个可迭代对象
list1 = [1, 2, 3]
list2 = [4, 5, 6]
list3 = [7, 8, 9]

chained = list(itertools.chain(list1, list2, list3))
print(f"连接列表:{chained}")  # [1, 2, 3, 4, 5, 6, 7, 8, 9]

# itertools.cycle - 无限循环
cycler = itertools.cycle(['A', 'B', 'C'])
first_10 = [next(cycler) for _ in range(10)]
print(f"循环前10个:{first_10}")  # ['A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A']

# itertools.repeat - 重复元素
repeated = list(itertools.repeat('Hello', 5))
print(f"重复元素:{repeated}")  # ['Hello', 'Hello', 'Hello', 'Hello', 'Hello']

# itertools.takewhile - 条件为真时取元素
numbers = [1, 3, 5, 8, 9, 11, 13]
less_than_10 = list(itertools.takewhile(lambda x: x < 10, numbers))
print(f"小于10的连续元素:{less_than_10}")  # [1, 3, 5, 8, 9]

# itertools.dropwhile - 条件为真时跳过元素
after_10 = list(itertools.dropwhile(lambda x: x < 10, numbers))
print(f"大于等于10的元素:{after_10}")  # [11, 13]

# itertools.groupby - 分组
data = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('C', 5)]
grouped = {k: list(g) for k, g in itertools.groupby(data, key=lambda x: x[0])}
print(f"分组数据:{grouped}")
# {'A': [('A', 1), ('A', 2)], 'B': [('B', 3), ('B', 4)], 'C': [('C', 5)]}

# 4.2 operator模块

import operator
from functools import reduce

# 使用operator模块替代lambda
numbers = [1, 2, 3, 4, 5]

# 求和
total = reduce(operator.add, numbers)
print(f"总和:{total}")  # 15

# 求积
product = reduce(operator.mul, numbers)
print(f"乘积:{product}")  # 120

# 字符串连接
words = ['Hello', 'World', 'Python']
sentence = reduce(operator.add, words)
print(f"连接字符串:{sentence}")  # HelloWorldPython

# 获取属性和索引
class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age
    
    def __repr__(self):
        return f"Person('{self.name}', {self.age})"

people = [
    Person('Alice', 25),
    Person('Bob', 30),
    Person('Charlie', 20)
]

# 按年龄排序
sorted_by_age = sorted(people, key=operator.attrgetter('age'))
print(f"按年龄排序:{sorted_by_age}")

# 按姓名排序
sorted_by_name = sorted(people, key=operator.attrgetter('name'))
print(f"按姓名排序:{sorted_by_name}")

# 处理元组列表
student_scores = [('Alice', 85), ('Bob', 92), ('Charlie', 78)]
sorted_by_score = sorted(student_scores, key=operator.itemgetter(1))
print(f"按分数排序:{sorted_by_score}")

# 4.3 functools模块进阶

from functools import partial, wraps, lru_cache, singledispatch

# partial - 偏函数应用
def multiply(x, y, z):
    return x * y * z

# 创建偏函数
double = partial(multiply, 2)  # 固定第一个参数为2
triple = partial(multiply, 3)  # 固定第一个参数为3

print(double(5, 6))  # 2 * 5 * 6 = 60
print(triple(4, 5))  # 3 * 4 * 5 = 60

# 更复杂的偏函数
def log_message(level, message, timestamp=None):
    import datetime
    if timestamp is None:
        timestamp = datetime.datetime.now()
    return f"[{timestamp}] {level}: {message}"

# 创建特定级别的日志函数
info_log = partial(log_message, "INFO")
error_log = partial(log_message, "ERROR")
warning_log = partial(log_message, "WARNING")

print(info_log("应用启动"))
print(error_log("数据库连接失败"))
print(warning_log("内存使用率过高"))

# lru_cache - 缓存装饰器
@lru_cache(maxsize=128)
def fibonacci(n):
    """带缓存的斐波那契函数"""
    if n < 2:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

# 测试缓存效果
import time

start = time.time()
result = fibonacci(35)
end = time.time()
print(f"fibonacci(35) = {result}, 耗时: {end - start:.4f}秒")

# 第二次调用会很快(使用缓存)
start = time.time()
result = fibonacci(35)
end = time.time()
print(f"fibonacci(35) = {result}, 耗时: {end - start:.6f}秒")

# singledispatch - 单分派泛型函数
@singledispatch
def process_data(data):
    """默认处理函数"""
    return f"处理未知类型: {type(data).__name__}"

@process_data.register
def _(data: int):
    return f"处理整数: {data * 2}"

@process_data.register
def _(data: str):
    return f"处理字符串: {data.upper()}"

@process_data.register
def _(data: list):
    return f"处理列表: {len(data)} 个元素"

# 测试单分派
print(process_data(42))        # 处理整数: 84
print(process_data("hello"))   # 处理字符串: HELLO
print(process_data([1,2,3]))   # 处理列表: 3 个元素
print(process_data(3.14))      # 处理未知类型: float

# 五、不可变数据结构

# 5.1 使用元组和命名元组

from collections import namedtuple

# 普通元组
point = (3, 4)
print(f"点坐标:{point}")
print(f"x坐标:{point[0]}, y坐标:{point[1]}")

# 命名元组 - 更具可读性
Point = namedtuple('Point', ['x', 'y'])
point = Point(3, 4)
print(f"点坐标:{point}")
print(f"x坐标:{point.x}, y坐标:{point.y}")

# 命名元组的方法
print(f"转为字典:{point._asdict()}")
print(f"替换值:{point._replace(x=5)}")

# 复杂的命名元组
Student = namedtuple('Student', ['name', 'age', 'grade', 'scores'])
student = Student(
    name='Alice',
    age=20,
    grade='A',
    scores=(85, 92, 78, 96)
)

print(f"学生信息:{student}")
print(f"平均分:{sum(student.scores) / len(student.scores):.2f}")

# 函数式处理命名元组
def calculate_gpa(student):
    """计算GPA(纯函数)"""
    average = sum(student.scores) / len(student.scores)
    if average >= 90:
        return 4.0
    elif average >= 80:
        return 3.0
    elif average >= 70:
        return 2.0
    else:
        return 1.0

def update_grade(student, new_grade):
    """更新等级(返回新对象)"""
    return student._replace(grade=new_grade)

gpa = calculate_gpa(student)
print(f"GPA:{gpa}")

updated_student = update_grade(student, 'A+')
print(f"更新后的学生:{updated_student}")
print(f"原学生不变:{student}")

# 5.2 frozenset的使用

# frozenset - 不可变集合
mutable_set = {1, 2, 3, 4, 5}
immutable_set = frozenset([1, 2, 3, 4, 5])

print(f"可变集合:{mutable_set}")
print(f"不可变集合:{immutable_set}")

# frozenset可以作为字典的键
set_dict = {
    frozenset([1, 2]): "小集合",
    frozenset([1, 2, 3, 4, 5]): "大集合"
}

print(f"集合字典:{set_dict}")

# 函数式操作frozenset
def union_sets(*sets):
    """合并多个集合"""
    result = frozenset()
    for s in sets:
        result = result.union(s)
    return result

def filter_set(s, predicate):
    """过滤集合元素"""
    return frozenset(filter(predicate, s))

set1 = frozenset([1, 2, 3])
set2 = frozenset([3, 4, 5])
set3 = frozenset([5, 6, 7])

unioned = union_sets(set1, set2, set3)
print(f"合并集合:{uniond}")

even_numbers = filter_set(uniond, lambda x: x % 2 == 0)
print(f"偶数:{even_numbers}")

# 六、函数式编程实战

# 6.1 数据处理管道

from functools import reduce
from typing import List, Dict, Any

# 学生数据
students_data = [
    {'name': 'Alice', 'age': 20, 'scores': [85, 92, 78, 96], 'major': 'CS'},
    {'name': 'Bob', 'age': 22, 'scores': [76, 88, 82, 79], 'major': 'Math'},
    {'name': 'Charlie', 'age': 19, 'scores': [95, 87, 91, 93], 'major': 'CS'},
    {'name': 'Diana', 'age': 21, 'scores': [68, 74, 82, 79], 'major': 'Physics'},
    {'name': 'Eve', 'age': 20, 'scores': [89, 94, 87, 92], 'major': 'CS'}
]

# 纯函数定义
def calculate_average(scores: List[int]) -> float:
    """计算平均分"""
    return sum(scores) / len(scores)

def add_average_score(student: Dict[str, Any]) -> Dict[str, Any]:
    """添加平均分字段"""
    return {
        **student,
        'average': calculate_average(student['scores'])
    }

def is_excellent_student(student: Dict[str, Any]) -> bool:
    """判断是否为优秀学生(平均分>85)"""
    return student['average'] > 85

def is_cs_major(student: Dict[str, Any]) -> bool:
    """判断是否为CS专业"""
    return student['major'] == 'CS'

def get_student_summary(student: Dict[str, Any]) -> Dict[str, Any]:
    """获取学生摘要信息"""
    return {
        'name': student['name'],
        'average': round(student['average'], 2),
        'major': student['major']
    }

# 函数式数据处理管道
def process_students(students: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """处理学生数据的完整管道"""
    return list(
        map(
            get_student_summary,
            filter(
                lambda s: is_excellent_student(s) and is_cs_major(s),
                map(add_average_score, students)
            )
        )
    )

# 执行处理
excellent_cs_students = process_students(students_data)
print("优秀的CS专业学生:")
for student in excellent_cs_students:
    print(f"  {student['name']}: {student['average']} ({student['major']})")

# 使用reduce计算统计信息
def calculate_stats(students: List[Dict[str, Any]]) -> Dict[str, float]:
    """计算统计信息"""
    students_with_avg = list(map(add_average_score, students))
    
    total_avg = reduce(
        lambda acc, student: acc + student['average'],
        students_with_avg,
        0
    ) / len(students_with_avg)
    
    max_avg = reduce(
        lambda acc, student: max(acc, student['average']),
        students_with_avg,
        0
    )
    
    min_avg = reduce(
        lambda acc, student: min(acc, student['average']),
        students_with_avg,
        float('inf')
    )
    
    return {
        'total_average': round(total_avg, 2),
        'max_average': round(max_avg, 2),
        'min_average': round(min_avg, 2)
    }

stats = calculate_stats(students_data)
print(f"\n统计信息:{stats}")

# 6.2 函数式配置管理

from functools import partial
from typing import Callable, Any

# 配置验证函数
def validate_range(min_val: float, max_val: float, value: float) -> bool:
    """验证值是否在范围内"""
    return min_val <= value <= max_val

def validate_type(expected_type: type, value: Any) -> bool:
    """验证类型"""
    return isinstance(value, expected_type)

def validate_length(min_len: int, max_len: int, value: str) -> bool:
    """验证字符串长度"""
    return min_len <= len(value) <= max_len

# 创建特定的验证器
validate_port = partial(validate_range, 1, 65535)
validate_percentage = partial(validate_range, 0, 100)
validate_string = partial(validate_type, str)
validate_int = partial(validate_type, int)
validate_username = partial(validate_length, 3, 20)

# 配置规则
config_rules = {
    'server_port': [validate_int, validate_port],
    'cpu_threshold': [validate_int, validate_percentage],
    'username': [validate_string, validate_username],
    'timeout': [validate_int, partial(validate_range, 1, 300)]
}

def validate_config(rules: Dict[str, List[Callable]], config: Dict[str, Any]) -> Dict[str, bool]:
    """验证配置"""
    results = {}
    
    for key, validators in rules.items():
        if key in config:
            # 所有验证器都必须通过
            results[key] = all(
                validator(config[key]) for validator in validators
            )
        else:
            results[key] = False
    
    return results

# 测试配置
test_config = {
    'server_port': 8080,
    'cpu_threshold': 85,
    'username': 'alice',
    'timeout': 30
}

validation_results = validate_config(config_rules, test_config)
print("配置验证结果:")
for key, is_valid in validation_results.items():
    status = "✓" if is_valid else "✗"
    print(f"  {key}: {status}")

# 无效配置测试
invalid_config = {
    'server_port': 70000,  # 超出范围
    'cpu_threshold': 150,  # 超出百分比范围
    'username': 'ab',      # 太短
    'timeout': 500         # 超时时间太长
}

invalid_results = validate_config(config_rules, invalid_config)
print("\n无效配置验证结果:")
for key, is_valid in invalid_results.items():
    status = "✓" if is_valid else "✗"
    print(f"  {key}: {status}")

# 6.3 函数式错误处理

from typing import Union, Callable, Any
from dataclasses import dataclass

# 定义Result类型(模拟Rust的Result)
@dataclass
class Success:
    value: Any
    
    def is_success(self) -> bool:
        return True
    
    def is_error(self) -> bool:
        return False
    
    def map(self, func: Callable) -> 'Union[Success, Error]':
        try:
            return Success(func(self.value))
        except Exception as e:
            return Error(str(e))
    
    def flat_map(self, func: Callable) -> 'Union[Success, Error]':
        try:
            return func(self.value)
        except Exception as e:
            return Error(str(e))

@dataclass
class Error:
    message: str
    
    def is_success(self) -> bool:
        return False
    
    def is_error(self) -> bool:
        return True
    
    def map(self, func: Callable) -> 'Error':
        return self
    
    def flat_map(self, func: Callable) -> 'Error':
        return self

Result = Union[Success, Error]

# 安全的数学操作
def safe_divide(x: float, y: float) -> Result:
    """安全除法"""
    if y == 0:
        return Error("除零错误")
    return Success(x / y)

def safe_sqrt(x: float) -> Result:
    """安全开方"""
    if x < 0:
        return Error("负数不能开方")
    return Success(x ** 0.5)

def safe_log(x: float) -> Result:
    """安全对数"""
    if x <= 0:
        return Error("对数的参数必须大于0")
    import math
    return Success(math.log(x))

# 函数式错误处理链
def calculate_complex_formula(a: float, b: float, c: float) -> Result:
    """计算复杂公式:log(sqrt(a/b) + c)"""
    return (safe_divide(a, b)
            .flat_map(lambda x: safe_sqrt(x))
            .map(lambda x: x + c)
            .flat_map(lambda x: safe_log(x)))

# 测试错误处理
test_cases = [
    (16, 4, 1),   # 正常情况
    (16, 0, 1),   # 除零错误
    (-16, 4, 1),  # 负数开方错误
    (16, 4, -3),  # 对数参数错误
]

print("复杂公式计算结果:")
for a, b, c in test_cases:
    result = calculate_complex_formula(a, b, c)
    if result.is_success():
        print(f"  f({a}, {b}, {c}) = {result.value:.4f}")
    else:
        print(f"  f({a}, {b}, {c}) = 错误: {result.message}")

# 批量处理
def process_batch(data: List[tuple], processor: Callable) -> Dict[str, List]:
    """批量处理数据"""
    successes = []
    errors = []
    
    for item in data:
        result = processor(*item)
        if result.is_success():
            successes.append((item, result.value))
        else:
            errors.append((item, result.message))
    
    return {'successes': successes, 'errors': errors}

batch_results = process_batch(test_cases, calculate_complex_formula)
print(f"\n批量处理结果:")
print(f"成功: {len(batch_results['successes'])} 个")
print(f"失败: {len(batch_results['errors'])} 个")

# 七、总结

# 7.1 函数式编程的优势

  1. 可预测性:纯函数使代码行为可预测
  2. 可测试性:纯函数易于单元测试
  3. 可组合性:函数可以轻松组合成复杂操作
  4. 并发安全:不可变数据避免了并发问题
  5. 代码简洁:高阶函数减少重复代码

# 7.2 何时使用函数式编程

适合的场景:

  • 数据转换和处理
  • 配置验证
  • 数学计算
  • 并发编程
  • 需要高可靠性的系统

不太适合的场景:

  • 需要频繁修改状态的应用
  • 性能要求极高的场景
  • 简单的脚本任务

# 7.3 最佳实践

  1. 优先使用纯函数:避免副作用
  2. 保持函数简单:一个函数只做一件事
  3. 使用类型注解:提高代码可读性
  4. 合理使用高阶函数:不要过度抽象
  5. 结合其他范式:Python支持多种编程范式

# 7.4 下一步学习

  • 深入学习itertools和functools
  • 了解更多函数式编程库(如toolz)
  • 学习其他函数式语言(如Haskell、Clojure)
  • 实践函数式设计模式

恭喜你!掌握了函数式编程,你的Python技能又上了一个新台阶!🎉


练习建议:

  1. 重构现有代码,使用纯函数
  2. 练习使用map、filter、reduce处理数据
  3. 尝试函数组合和管道操作
  4. 实现自己的高阶函数
  5. 用函数式方法解决实际问题