协议适配层详细设计与实现

# 协议适配层详细设计与实现

# 1. 概述

# 1.1 背景故事

想象一下,在一个繁忙的国际机场,来自世界各地的旅客说着不同的语言——有英语、中文、法语、西班牙语等等。如果没有翻译服务,这些旅客就无法与机场工作人员有效沟通,机场的运营也会陷入混乱。

在物联网世界中,协议适配层就像是这个机场的"万能翻译官"。各种设备就像是说着不同"语言"的旅客:

  • 传感器设备说着"MQTT语言"
  • 工业设备说着"Modbus语言"
  • 智能家居设备说着"CoAP语言"
  • 企业系统说着"HTTP语言"

协议适配层的使命就是让这些"说着不同语言"的设备能够无障碍地交流,确保整个物联网系统的和谐运行。

# 1.2 核心功能

协议适配层主要包含三个核心组件:

  1. 协议解析器(Protocol Parser) - "语言识别专家"

    • 识别和解析各种设备协议
    • 将原始数据转换为标准格式
  2. 数据转换器(Data Transformer) - "内容翻译官"

    • 在不同数据格式间进行转换
    • 统一数据结构和语义
  3. 消息路由器(Message Router) - "智能调度员"

    • 根据规则将消息路由到正确的目标
    • 实现负载均衡和故障转移

# 2. 协议解析器(Protocol Parser)

# 2.1 设计原理

协议解析器就像一位精通多国语言的翻译专家,能够理解各种设备"说话"的方式。每种协议都有自己的"语法规则",解析器需要掌握这些规则,才能正确理解设备发送的消息。

# 2.2 核心接口设计

/**
 * 协议解析器接口 - 万能翻译官的基础能力
 * 
 * 就像翻译官需要具备"听懂"和"表达"的能力一样,
 * 协议解析器需要能够解析输入数据和构建输出数据
 */
public interface ProtocolParser {
    
    /**
     * 解析协议数据 - "听懂"设备说的话
     * 
     * @param rawData 原始数据(设备的"原话")
     * @param context 解析上下文("对话环境")
     * @return 解析结果("理解的内容")
     */
    ParseResult parse(byte[] rawData, ParseContext context);
    
    /**
     * 构建协议数据 - "说"设备能懂的话
     * 
     * @param message 标准消息("要表达的意思")
     * @param context 构建上下文("表达环境")
     * @return 协议数据("设备能懂的话")
     */
    byte[] build(StandardMessage message, BuildContext context);
    
    /**
     * 获取支持的协议类型 - "擅长的语言"
     */
    ProtocolType getSupportedProtocol();
    
    /**
     * 验证数据格式 - "检查语法是否正确"
     */
    boolean validate(byte[] data);
}

# 3. 数据转换器(Data Transformer)

# 3.1 设计原理

数据转换器就像一位精通各种"方言"的语言学家。虽然协议解析器已经把各种设备语言翻译成了"普通话",但不同地区的"普通话"还是有差异的——有的说"土豆",有的说"马铃薯",有的说"洋芋",说的是同一个东西,但表达方式不同。

数据转换器的任务就是统一这些"方言差异",确保所有数据都使用相同的"词汇"和"语法"。

# 3.2 核心接口设计

/**
 * 数据转换器接口 - 统一"方言"的专家
 * 
 * 就像联合国需要统一各国代表的表达方式一样,
 * 数据转换器需要统一各种设备的数据格式
 */
public interface DataTransformer {
    
    /**
     * 转换数据格式 - 统一"说话方式"
     * 
     * @param sourceData 源数据("方言版本")
     * @param transformRule 转换规则("翻译字典")
     * @return 转换结果("标准版本")
     */
    TransformResult transform(Object sourceData, TransformRule transformRule);
    
    /**
     * 批量转换 - 批量处理"方言"
     */
    List<TransformResult> batchTransform(List<Object> sourceDataList, TransformRule transformRule);
    
    /**
     * 反向转换 - 把"标准话"转回"方言"
     */
    TransformResult reverseTransform(Object standardData, TransformRule transformRule);
    
    /**
     * 获取支持的转换类型
     */
    Set<TransformType> getSupportedTypes();
    
    /**
     * 验证转换规则
     */
    boolean validateRule(TransformRule rule);
}

# 3.3 转换规则定义

/**
 * 数据转换规则 - "翻译字典"
 * 
 * 就像字典告诉我们"土豆"="马铃薯"一样,
 * 转换规则告诉系统如何统一不同的数据表达
 */
@Data
@Builder
public class TransformRule {
    
    /**
     * 规则ID - "字典编号"
     */
    private String ruleId;
    
    /**
     * 规则名称 - "字典名称"
     */
    private String ruleName;
    
    /**
     * 源数据类型 - "原始方言类型"
     */
    private String sourceType;
    
    /**
     * 目标数据类型 - "标准语言类型"
     */
    private String targetType;
    
    /**
     * 字段映射规则 - "词汇对照表"
     */
    private List<FieldMapping> fieldMappings;
    
    /**
     * 数据验证规则 - "语法检查规则"
     */
    private List<ValidationRule> validationRules;
    
    /**
     * 转换脚本 - "复杂翻译逻辑"
     */
    private String transformScript;
    
    /**
     * 转换优先级 - "翻译优先级"
     */
    private int priority;
    
    /**
     * 是否启用 - "字典是否可用"
     */
    private boolean enabled;
    
    /**
     * 创建时间
     */
    private LocalDateTime createTime;
    
    /**
     * 更新时间
     */
    private LocalDateTime updateTime;
}

/**
 * 字段映射规则 - "词汇对照条目"
 */
@Data
@Builder
public class FieldMapping {
    
    /**
     * 源字段名 - "方言词汇"
     */
    private String sourceField;
    
    /**
     * 目标字段名 - "标准词汇"
     */
    private String targetField;
    
    /**
     * 数据类型转换 - "词性转换"
     */
    private DataTypeMapping dataTypeMapping;
    
    /**
     * 默认值 - "找不到对应词汇时的默认翻译"
     */
    private Object defaultValue;
    
    /**
     * 转换表达式 - "复杂翻译公式"
     */
    private String expression;
    
    /**
     * 是否必需 - "是否为必须翻译的词汇"
     */
    private boolean required;
}

/**
 * 数据类型映射
 */
@Data
@Builder
public class DataTypeMapping {
    private Class<?> sourceType;  // 源数据类型
    private Class<?> targetType;  // 目标数据类型
    private String converter;     // 转换器名称
    private Map<String, Object> converterParams; // 转换参数
}

# 3.4 通用数据转换器实现

/**
 * 通用数据转换器 - 万能"翻译官"
 * 
 * 就像一位经验丰富的翻译官,能够处理各种复杂的翻译任务
 */
@Component
@Slf4j
public class UniversalDataTransformer implements DataTransformer {
    
    @Autowired
    private TransformRuleRepository ruleRepository;
    
    @Autowired
    private ScriptEngine scriptEngine;
    
    @Autowired
    private TypeConverterRegistry converterRegistry;
    
    @Autowired
    private ValidationService validationService;
    
    /**
     * 转换数据 - 执行"翻译"工作
     */
    @Override
    public TransformResult transform(Object sourceData, TransformRule transformRule) {
        long startTime = System.currentTimeMillis();
        
        try {
            // 第一步:验证输入数据(检查"原文"是否合法)
            if (!validateSourceData(sourceData, transformRule)) {
                return TransformResult.failure("SOURCE_DATA_INVALID", "源数据验证失败");
            }
            
            // 第二步:执行字段映射(按"字典"逐词翻译)
            Map<String, Object> mappedData = executeFieldMapping(sourceData, transformRule);
            
            // 第三步:执行脚本转换(处理复杂的"语法转换")
            if (transformRule.getTransformScript() != null) {
                mappedData = executeScriptTransform(mappedData, transformRule.getTransformScript());
            }
            
            // 第四步:数据类型转换(统一"词性")
            Object transformedData = executeTypeConversion(mappedData, transformRule);
            
            // 第五步:验证转换结果(检查"译文"质量)
            ValidationResult validationResult = validationService.validate(transformedData, transformRule.getValidationRules());
            
            long transformTime = System.currentTimeMillis() - startTime;
            
            log.debug("数据转换完成:ruleId={}, transformTime={}ms, valid={}", 
                    transformRule.getRuleId(), transformTime, validationResult.isValid());
            
            return TransformResult.builder()
                    .success(true)
                    .transformedData(transformedData)
                    .sourceData(sourceData)
                    .transformRule(transformRule)
                    .transformTime(transformTime)
                    .validationResult(validationResult)
                    .qualityScore(calculateQualityScore(transformedData, validationResult))
                    .build();
            
        } catch (Exception e) {
            log.error("数据转换失败:ruleId={}", transformRule.getRuleId(), e);
            return TransformResult.failure("TRANSFORM_ERROR", e.getMessage());
        }
    }
    
    /**
     * 执行字段映射 - 按"字典"翻译
     */
    private Map<String, Object> executeFieldMapping(Object sourceData, TransformRule transformRule) {
        Map<String, Object> result = new HashMap<>();
        Map<String, Object> sourceMap = convertToMap(sourceData);
        
        for (FieldMapping mapping : transformRule.getFieldMappings()) {
            try {
                Object sourceValue = extractSourceValue(sourceMap, mapping);
                Object transformedValue = transformFieldValue(sourceValue, mapping);
                
                if (transformedValue != null || mapping.isRequired()) {
                    result.put(mapping.getTargetField(), transformedValue);
                }
                
            } catch (Exception e) {
                log.warn("字段映射失败:sourceField={}, targetField={}", 
                        mapping.getSourceField(), mapping.getTargetField(), e);
                
                if (mapping.isRequired()) {
                    throw new TransformException("必需字段映射失败:" + mapping.getTargetField(), e);
                }
            }
        }
        
        return result;
    }
    
    /**
     * 提取源字段值 - 从"原文"中找到要翻译的"词汇"
     */
    private Object extractSourceValue(Map<String, Object> sourceMap, FieldMapping mapping) {
        String sourceField = mapping.getSourceField();
        
        // 支持嵌套字段访问:user.profile.name
        if (sourceField.contains(".")) {
            return extractNestedValue(sourceMap, sourceField);
        }
        
        Object value = sourceMap.get(sourceField);
        
        // 如果找不到值,使用默认值
        if (value == null && mapping.getDefaultValue() != null) {
            value = mapping.getDefaultValue();
        }
        
        return value;
    }
    
    /**
     * 提取嵌套字段值
     */
    private Object extractNestedValue(Map<String, Object> sourceMap, String fieldPath) {
        String[] parts = fieldPath.split("\\.");
        Object current = sourceMap;
        
        for (String part : parts) {
            if (current instanceof Map) {
                current = ((Map<?, ?>) current).get(part);
            } else {
                return null;
            }
        }
        
        return current;
    }
    
    /**
     * 转换字段值 - 执行具体的"词汇翻译"
     */
    private Object transformFieldValue(Object sourceValue, FieldMapping mapping) {
        if (sourceValue == null) {
            return mapping.getDefaultValue();
        }
        
        // 如果有转换表达式,使用表达式计算
        if (mapping.getExpression() != null) {
            return evaluateExpression(mapping.getExpression(), sourceValue);
        }
        
        // 如果有数据类型映射,执行类型转换
        if (mapping.getDataTypeMapping() != null) {
            return executeTypeMapping(sourceValue, mapping.getDataTypeMapping());
        }
        
        // 否则直接返回原值
        return sourceValue;
    }
    
    /**
     * 执行表达式计算 - 处理复杂的"翻译逻辑"
     */
    private Object evaluateExpression(String expression, Object sourceValue) {
        try {
            // 创建脚本上下文
            ScriptContext context = new SimpleScriptContext();
            context.setAttribute("value", sourceValue, ScriptContext.ENGINE_SCOPE);
            context.setAttribute("Math", Math.class, ScriptContext.ENGINE_SCOPE);
            
            // 执行表达式
            return scriptEngine.eval(expression, context);
            
        } catch (Exception e) {
            log.warn("表达式计算失败:expression={}, value={}", expression, sourceValue, e);
            return sourceValue;
        }
    }
    
    /**
     * 执行类型映射
     */
    private Object executeTypeMapping(Object sourceValue, DataTypeMapping typeMapping) {
        TypeConverter converter = converterRegistry.getConverter(typeMapping.getConverter());
        if (converter != null) {
            return converter.convert(sourceValue, typeMapping.getTargetType(), typeMapping.getConverterParams());
        }
        
        // 如果没有专门的转换器,尝试默认转换
        return convertType(sourceValue, typeMapping.getTargetType());
    }
    
    /**
     * 默认类型转换
     */
    private Object convertType(Object value, Class<?> targetType) {
        if (value == null || targetType.isInstance(value)) {
            return value;
        }
        
        try {
            if (targetType == String.class) {
                return value.toString();
            } else if (targetType == Integer.class || targetType == int.class) {
                return Integer.valueOf(value.toString());
            } else if (targetType == Long.class || targetType == long.class) {
                return Long.valueOf(value.toString());
            } else if (targetType == Double.class || targetType == double.class) {
                return Double.valueOf(value.toString());
            } else if (targetType == Boolean.class || targetType == boolean.class) {
                return Boolean.valueOf(value.toString());
            } else if (targetType == LocalDateTime.class) {
                return parseDateTime(value.toString());
            }
        } catch (Exception e) {
            log.warn("类型转换失败:value={}, targetType={}", value, targetType.getSimpleName(), e);
        }
        
        return value;
    }
    
    /**
     * 解析日期时间
     */
    private LocalDateTime parseDateTime(String dateStr) {
        // 支持多种日期格式
        String[] patterns = {
            "yyyy-MM-dd HH:mm:ss",
            "yyyy-MM-dd'T'HH:mm:ss",
            "yyyy-MM-dd'T'HH:mm:ss.SSS",
            "yyyy-MM-dd'T'HH:mm:ss'Z'",
            "yyyy-MM-dd",
            "MM/dd/yyyy HH:mm:ss"
        };
        
        for (String pattern : patterns) {
            try {
                DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
                return LocalDateTime.parse(dateStr, formatter);
            } catch (Exception e) {
                // 继续尝试下一个格式
            }
        }
        
        // 如果都失败了,尝试解析时间戳
        try {
            long timestamp = Long.parseLong(dateStr);
            return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
        } catch (Exception e) {
            throw new TransformException("无法解析日期时间:" + dateStr);
        }
    }
    
    /**
     * 执行脚本转换 - 处理复杂的"语法转换"
     */
    private Map<String, Object> executeScriptTransform(Map<String, Object> mappedData, String script) {
        try {
            ScriptContext context = new SimpleScriptContext();
            context.setAttribute("data", mappedData, ScriptContext.ENGINE_SCOPE);
            context.setAttribute("result", new HashMap<String, Object>(), ScriptContext.ENGINE_SCOPE);
            
            scriptEngine.eval(script, context);
            
            @SuppressWarnings("unchecked")
            Map<String, Object> result = (Map<String, Object>) context.getAttribute("result");
            
            return result != null ? result : mappedData;
            
        } catch (Exception e) {
            log.warn("脚本转换失败,使用映射结果", e);
            return mappedData;
        }
    }
    
    /**
     * 将对象转换为Map
     */
    private Map<String, Object> convertToMap(Object obj) {
        if (obj instanceof Map) {
            @SuppressWarnings("unchecked")
            Map<String, Object> map = (Map<String, Object>) obj;
            return map;
        }
        
        if (obj instanceof StandardMessage) {
            return convertMessageToMap((StandardMessage) obj);
        }
        
        // 使用反射转换普通对象
        return convertObjectToMap(obj);
    }
    
    /**
     * 转换StandardMessage为Map
     */
    private Map<String, Object> convertMessageToMap(StandardMessage message) {
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", message.getMessageId());
        map.put("deviceId", message.getDeviceId());
        map.put("messageType", message.getMessageType());
        map.put("timestamp", message.getTimestamp());
        map.put("priority", message.getPriority());
        map.put("sourceProtocol", message.getSourceProtocol());
        
        if (message.getHeader() != null) {
            map.put("header", convertObjectToMap(message.getHeader()));
        }
        
        if (message.getBody() != null) {
            map.put("body", convertObjectToMap(message.getBody()));
            if (message.getBody().getDataPoints() != null) {
                map.putAll(message.getBody().getDataPoints());
            }
        }
        
        return map;
    }
    
    /**
     * 使用反射转换对象为Map
     */
    private Map<String, Object> convertObjectToMap(Object obj) {
        Map<String, Object> map = new HashMap<>();
        
        try {
            Field[] fields = obj.getClass().getDeclaredFields();
            for (Field field : fields) {
                field.setAccessible(true);
                Object value = field.get(obj);
                if (value != null) {
                    map.put(field.getName(), value);
                }
            }
        } catch (Exception e) {
            log.warn("对象转Map失败:{}", obj.getClass().getSimpleName(), e);
        }
        
        return map;
    }
    
    /**
     * 计算转换质量评分
     */
    private int calculateQualityScore(Object transformedData, ValidationResult validationResult) {
        int score = 100;
        
        // 基于验证结果调整分数
        if (!validationResult.isValid()) {
            score -= validationResult.getErrorCount() * 10;
        }
        
        // 基于数据完整性调整分数
        if (transformedData instanceof Map) {
            Map<?, ?> dataMap = (Map<?, ?>) transformedData;
            if (dataMap.isEmpty()) {
                score -= 50;
            } else if (dataMap.size() < 3) {
                score -= 20;
            }
        }
        
        return Math.max(0, score);
    }
    
    @Override
    public List<TransformResult> batchTransform(List<Object> sourceDataList, TransformRule transformRule) {
        return sourceDataList.stream()
                .map(data -> transform(data, transformRule))
                .collect(Collectors.toList());
    }
    
    @Override
    public TransformResult reverseTransform(Object standardData, TransformRule transformRule) {
        // 创建反向转换规则
        TransformRule reverseRule = createReverseRule(transformRule);
        return transform(standardData, reverseRule);
    }
    
    /**
     * 创建反向转换规则
     */
    private TransformRule createReverseRule(TransformRule originalRule) {
        List<FieldMapping> reverseMappings = originalRule.getFieldMappings().stream()
                .map(mapping -> FieldMapping.builder()
                        .sourceField(mapping.getTargetField())
                        .targetField(mapping.getSourceField())
                        .dataTypeMapping(reverseDataTypeMapping(mapping.getDataTypeMapping()))
                        .defaultValue(mapping.getDefaultValue())
                        .required(mapping.isRequired())
                        .build())
                .collect(Collectors.toList());
        
        return TransformRule.builder()
                .ruleId(originalRule.getRuleId() + "_reverse")
                .ruleName(originalRule.getRuleName() + "_反向")
                .sourceType(originalRule.getTargetType())
                .targetType(originalRule.getSourceType())
                .fieldMappings(reverseMappings)
                .validationRules(originalRule.getValidationRules())
                .priority(originalRule.getPriority())
                .enabled(originalRule.isEnabled())
                .build();
    }
    
    /**
     * 反向数据类型映射
     */
    private DataTypeMapping reverseDataTypeMapping(DataTypeMapping original) {
        if (original == null) {
            return null;
        }
        
        return DataTypeMapping.builder()
                .sourceType(original.getTargetType())
                .targetType(original.getSourceType())
                .converter(original.getConverter())
                .converterParams(original.getConverterParams())
                .build();
    }
    
    @Override
    public Set<TransformType> getSupportedTypes() {
        return Set.of(
            TransformType.FIELD_MAPPING,
            TransformType.TYPE_CONVERSION,
            TransformType.SCRIPT_TRANSFORM,
            TransformType.VALIDATION
        );
    }
    
    @Override
    public boolean validateRule(TransformRule rule) {
        if (rule == null) {
            return false;
        }
        
        // 检查基本字段
        if (rule.getRuleId() == null || rule.getRuleId().isEmpty()) {
            return false;
        }
        
        if (rule.getFieldMappings() == null || rule.getFieldMappings().isEmpty()) {
            return false;
        }
        
        // 检查字段映射的有效性
        for (FieldMapping mapping : rule.getFieldMappings()) {
            if (mapping.getSourceField() == null || mapping.getTargetField() == null) {
                return false;
            }
        }
        
        return true;
    }
    
    /**
     * 验证源数据
     */
    private boolean validateSourceData(Object sourceData, TransformRule transformRule) {
        if (sourceData == null) {
            return false;
        }
        
        // 可以添加更多的验证逻辑
        return true;
    }
}

# 3.5 转换结果封装

/**
 * 数据转换结果 - "翻译成果报告"
 */
@Data
@Builder
public class TransformResult {
    
    /**
     * 转换是否成功
     */
    private boolean success;
    
    /**
     * 转换后的数据
     */
    private Object transformedData;
    
    /**
     * 原始数据
     */
    private Object sourceData;
    
    /**
     * 使用的转换规则
     */
    private TransformRule transformRule;
    
    /**
     * 转换耗时(毫秒)
     */
    private long transformTime;
    
    /**
     * 验证结果
     */
    private ValidationResult validationResult;
    
    /**
     * 质量评分(0-100)
     */
    private int qualityScore;
    
    /**
     * 错误信息
     */
    private String errorMessage;
    
    /**
     * 错误代码
     */
    private String errorCode;
    
    /**
     * 转换统计信息
     */
    private TransformStatistics statistics;
    
    /**
     * 创建成功结果
     */
    public static TransformResult success(Object transformedData) {
        return TransformResult.builder()
                .success(true)
                .transformedData(transformedData)
                .qualityScore(100)
                .build();
    }
    
    /**
     * 创建失败结果
     */
    public static TransformResult failure(String errorCode, String errorMessage) {
        return TransformResult.builder()
                .success(false)
                .errorCode(errorCode)
                .errorMessage(errorMessage)
                .qualityScore(0)
                .build();
    }
}

/**
 * 转换统计信息
 */
@Data
@Builder
public class TransformStatistics {
    private int totalFields;        // 总字段数
    private int transformedFields;  // 成功转换的字段数
    private int skippedFields;      // 跳过的字段数
    private int errorFields;        // 转换失败的字段数
    private Map<String, Object> metrics; // 其他指标
}

# 4. 消息路由器(Message Router)

# 4.1 设计原理

消息路由器就像一位经验丰富的邮局调度员。在繁忙的邮局里,每天都有成千上万的邮件需要分拣和投递。调度员需要根据邮件的地址、优先级、重量等信息,决定这些邮件应该走哪条路线,用什么方式投递。

在物联网系统中,消息路由器面临着类似的挑战:

  • 数以万计的设备消息需要处理
  • 不同类型的消息需要不同的处理方式
  • 系统负载需要均衡分配
  • 故障时需要自动切换路由

# 4.2 核心接口设计

/**
 * 消息路由器接口 - 智能"邮局调度员"
 * 
 * 就像邮局调度员需要决定邮件的投递路线一样,
 * 消息路由器需要决定消息的处理路径
 */
public interface MessageRouter {
    
    /**
     * 路由消息 - 决定消息的"投递路线"
     * 
     * @param message 待路由的消息("邮件")
     * @param routingContext 路由上下文("投递环境信息")
     * @return 路由结果("投递方案")
     */
    RoutingResult route(StandardMessage message, RoutingContext routingContext);
    
    /**
     * 批量路由 - 批量处理"邮件"
     */
    List<RoutingResult> batchRoute(List<StandardMessage> messages, RoutingContext routingContext);
    
    /**
     * 注册路由规则 - 添加新的"投递规则"
     */
    void registerRoutingRule(RoutingRule rule);
    
    /**
     * 移除路由规则 - 删除"投递规则"
     */
    void removeRoutingRule(String ruleId);
    
    /**
     * 获取路由统计 - 查看"投递统计"
     */
    RoutingStatistics getStatistics();
    
    /**
     * 健康检查 - 检查"投递系统"是否正常
     */
    HealthStatus checkHealth();
}

# 4.3 路由规则定义

/**
 * 路由规则 - "投递规则手册"
 * 
 * 就像邮局有详细的投递规则一样,
 * 路由规则定义了消息应该如何被处理和转发
 */
@Data
@Builder
public class RoutingRule {
    
    /**
     * 规则ID - "规则编号"
     */
    private String ruleId;
    
    /**
     * 规则名称 - "规则名称"
     */
    private String ruleName;
    
    /**
     * 匹配条件 - "适用条件"
     */
    private List<MatchCondition> matchConditions;
    
    /**
     * 路由目标 - "投递目的地"
     */
    private List<RouteTarget> routeTargets;
    
    /**
     * 路由策略 - "投递策略"
     */
    private RoutingStrategy strategy;
    
    /**
     * 优先级 - "规则优先级"
     */
    private int priority;
    
    /**
     * 是否启用 - "规则是否生效"
     */
    private boolean enabled;
    
    /**
     * 故障转移配置 - "备用投递方案"
     */
    private FailoverConfig failoverConfig;
    
    /**
     * 限流配置 - "流量控制"
     */
    private RateLimitConfig rateLimitConfig;
    
    /**
     * 创建时间
     */
    private LocalDateTime createTime;
    
    /**
     * 更新时间
     */
    private LocalDateTime updateTime;
}

/**
 * 匹配条件 - "规则适用条件"
 */
@Data
@Builder
public class MatchCondition {
    
    /**
     * 条件类型 - "条件种类"
     */
    private ConditionType type;
    
    /**
     * 字段名 - "检查的字段"
     */
    private String fieldName;
    
    /**
     * 操作符 - "比较方式"
     */
    private Operator operator;
    
    /**
     * 期望值 - "期望的值"
     */
    private Object expectedValue;
    
    /**
     * 逻辑关系 - "与其他条件的关系"
     */
    private LogicalOperator logicalOperator;
}

/**
 * 路由目标 - "投递目的地"
 */
@Data
@Builder
public class RouteTarget {
    
    /**
     * 目标ID - "目的地编号"
     */
    private String targetId;
    
    /**
     * 目标名称 - "目的地名称"
     */
    private String targetName;
    
    /**
     * 目标类型 - "目的地类型"
     */
    private TargetType targetType;
    
    /**
     * 连接配置 - "连接信息"
     */
    private ConnectionConfig connectionConfig;
    
    /**
     * 权重 - "分配权重"
     */
    private int weight;
    
    /**
     * 是否可用 - "目的地是否可用"
     */
    private boolean available;
    
    /**
     * 健康状态 - "目的地健康状况"
     */
    private HealthStatus healthStatus;
}

/**
 * 路由策略枚举
 */
public enum RoutingStrategy {
    ROUND_ROBIN("轮询", "依次分配给各个目标"),
    WEIGHTED_ROUND_ROBIN("加权轮询", "根据权重分配"),
    LEAST_CONNECTIONS("最少连接", "分配给连接数最少的目标"),
    RANDOM("随机", "随机选择目标"),
    HASH("哈希", "根据消息特征哈希选择"),
    BROADCAST("广播", "发送给所有目标"),
    FAILOVER("故障转移", "主备切换模式"),
    PRIORITY("优先级", "按优先级选择目标");
    
    private final String name;
    private final String description;
    
    RoutingStrategy(String name, String description) {
        this.name = name;
        this.description = description;
    }
}

# 4.4 智能消息路由器实现

/**
 * 智能消息路由器 - 高级"邮局调度系统"
 * 
 * 就像现代化的邮局调度系统,能够智能地处理各种复杂的投递需求
 */
@Component
@Slf4j
public class IntelligentMessageRouter implements MessageRouter {
    
    @Autowired
    private RoutingRuleRepository ruleRepository;
    
    @Autowired
    private LoadBalancer loadBalancer;
    
    @Autowired
    private HealthChecker healthChecker;
    
    @Autowired
    private RateLimiter rateLimiter;
    
    @Autowired
    private MetricsCollector metricsCollector;
    
    // 路由规则缓存("规则手册")
    private final Map<String, RoutingRule> ruleCache = new ConcurrentHashMap<>();
    
    // 路由统计("投递统计")
    private final RoutingStatistics statistics = new RoutingStatistics();
    
    /**
     * 路由消息 - 执行"智能投递"
     */
    @Override
    public RoutingResult route(StandardMessage message, RoutingContext routingContext) {
        long startTime = System.currentTimeMillis();
        
        try {
            // 第一步:查找匹配的路由规则("查找投递规则")
            List<RoutingRule> matchedRules = findMatchingRules(message, routingContext);
            
            if (matchedRules.isEmpty()) {
                log.warn("未找到匹配的路由规则:messageId={}, deviceId={}", 
                        message.getMessageId(), message.getDeviceId());
                return RoutingResult.failure("NO_MATCHING_RULE", "未找到匹配的路由规则");
            }
            
            // 第二步:选择最佳规则("选择最优投递方案")
            RoutingRule selectedRule = selectBestRule(matchedRules, message);
            
            // 第三步:检查限流("检查投递能力")
            if (!checkRateLimit(message, selectedRule)) {
                statistics.incrementRateLimitedCount();
                return RoutingResult.failure("RATE_LIMITED", "触发限流保护");
            }
            
            // 第四步:选择路由目标("选择具体投递地址")
            List<RouteTarget> availableTargets = getAvailableTargets(selectedRule);
            if (availableTargets.isEmpty()) {
                return RoutingResult.failure("NO_AVAILABLE_TARGET", "没有可用的路由目标");
            }
            
            RouteTarget selectedTarget = selectTarget(availableTargets, selectedRule.getStrategy(), message);
            
            // 第五步:执行路由("执行投递")
            RoutingResult result = executeRouting(message, selectedTarget, selectedRule, routingContext);
            
            // 第六步:更新统计("记录投递结果")
            updateStatistics(result, System.currentTimeMillis() - startTime);
            
            return result;
            
        } catch (Exception e) {
            log.error("消息路由失败:messageId={}", message.getMessageId(), e);
            statistics.incrementErrorCount();
            return RoutingResult.failure("ROUTING_ERROR", e.getMessage());
        }
    }
    
    /**
     * 查找匹配的路由规则 - "查找适用的投递规则"
     */
    private List<RoutingRule> findMatchingRules(StandardMessage message, RoutingContext context) {
        return ruleCache.values().stream()
                .filter(rule -> rule.isEnabled())
                .filter(rule -> matchesRule(message, rule, context))
                .sorted(Comparator.comparingInt(RoutingRule::getPriority).reversed())
                .collect(Collectors.toList());
    }
    
    /**
     * 检查消息是否匹配规则
     */
    private boolean matchesRule(StandardMessage message, RoutingRule rule, RoutingContext context) {
        if (rule.getMatchConditions() == null || rule.getMatchConditions().isEmpty()) {
            return true; // 没有条件表示匹配所有
        }
        
        // 构建消息属性映射
        Map<String, Object> messageAttributes = buildMessageAttributes(message, context);
        
        // 评估所有匹配条件
        return evaluateConditions(rule.getMatchConditions(), messageAttributes);
    }
    
    /**
     * 构建消息属性映射
     */
    private Map<String, Object> buildMessageAttributes(StandardMessage message, RoutingContext context) {
        Map<String, Object> attributes = new HashMap<>();
        
        // 基本属性
        attributes.put("messageId", message.getMessageId());
        attributes.put("deviceId", message.getDeviceId());
        attributes.put("messageType", message.getMessageType().name());
        attributes.put("priority", message.getPriority().name());
        attributes.put("sourceProtocol", message.getSourceProtocol().name());
        
        // 时间属性
        if (message.getTimestamp() != null) {
            attributes.put("timestamp", message.getTimestamp());
            attributes.put("hour", message.getTimestamp().getHour());
            attributes.put("dayOfWeek", message.getTimestamp().getDayOfWeek().getValue());
        }
        
        // 消息体属性
        if (message.getBody() != null && message.getBody().getDataPoints() != null) {
            message.getBody().getDataPoints().forEach((key, value) -> {
                attributes.put("data." + key, value);
            });
        }
        
        // 上下文属性
        if (context != null && context.getAttributes() != null) {
            context.getAttributes().forEach((key, value) -> {
                attributes.put("context." + key, value);
            });
        }
        
        return attributes;
    }
    
    /**
     * 评估匹配条件
     */
    private boolean evaluateConditions(List<MatchCondition> conditions, Map<String, Object> attributes) {
        if (conditions.isEmpty()) {
            return true;
        }
        
        boolean result = evaluateCondition(conditions.get(0), attributes);
        
        for (int i = 1; i < conditions.size(); i++) {
            MatchCondition condition = conditions.get(i);
            boolean conditionResult = evaluateCondition(condition, attributes);
            
            // 根据逻辑操作符组合结果
            if (condition.getLogicalOperator() == LogicalOperator.AND) {
                result = result && conditionResult;
            } else if (condition.getLogicalOperator() == LogicalOperator.OR) {
                result = result || conditionResult;
            }
        }
        
        return result;
    }
    
    /**
     * 评估单个条件
     */
    private boolean evaluateCondition(MatchCondition condition, Map<String, Object> attributes) {
        Object actualValue = attributes.get(condition.getFieldName());
        Object expectedValue = condition.getExpectedValue();
        
        if (actualValue == null) {
            return condition.getOperator() == Operator.IS_NULL;
        }
        
        switch (condition.getOperator()) {
            case EQUALS:
                return Objects.equals(actualValue, expectedValue);
            case NOT_EQUALS:
                return !Objects.equals(actualValue, expectedValue);
            case GREATER_THAN:
                return compareValues(actualValue, expectedValue) > 0;
            case GREATER_THAN_OR_EQUAL:
                return compareValues(actualValue, expectedValue) >= 0;
            case LESS_THAN:
                return compareValues(actualValue, expectedValue) < 0;
            case LESS_THAN_OR_EQUAL:
                return compareValues(actualValue, expectedValue) <= 0;
            case CONTAINS:
                return actualValue.toString().contains(expectedValue.toString());
            case STARTS_WITH:
                return actualValue.toString().startsWith(expectedValue.toString());
            case ENDS_WITH:
                return actualValue.toString().endsWith(expectedValue.toString());
            case REGEX:
                return actualValue.toString().matches(expectedValue.toString());
            case IN:
                return isValueInList(actualValue, expectedValue);
            case NOT_IN:
                return !isValueInList(actualValue, expectedValue);
            case IS_NULL:
                return false; // actualValue不为null
            case IS_NOT_NULL:
                return true; // actualValue不为null
            default:
                return false;
        }
    }
    
    /**
     * 比较值大小
     */
    @SuppressWarnings("unchecked")
    private int compareValues(Object actual, Object expected) {
        if (actual instanceof Comparable && expected instanceof Comparable) {
            try {
                return ((Comparable<Object>) actual).compareTo(expected);
            } catch (ClassCastException e) {
                // 类型不匹配,转换为字符串比较
                return actual.toString().compareTo(expected.toString());
            }
        }
        return actual.toString().compareTo(expected.toString());
    }
    
    /**
     * 检查值是否在列表中
     */
    private boolean isValueInList(Object value, Object listValue) {
        if (listValue instanceof List) {
            return ((List<?>) listValue).contains(value);
        } else if (listValue instanceof String) {
            // 支持逗号分隔的字符串
            String[] items = listValue.toString().split(",");
            return Arrays.asList(items).contains(value.toString());
        }
        return false;
    }
    
    /**
     * 选择最佳规则
     */
    private RoutingRule selectBestRule(List<RoutingRule> matchedRules, StandardMessage message) {
        // 已经按优先级排序,选择第一个
        return matchedRules.get(0);
    }
    
    /**
     * 检查限流
     */
    private boolean checkRateLimit(StandardMessage message, RoutingRule rule) {
        if (rule.getRateLimitConfig() == null) {
            return true;
        }
        
        String limitKey = buildRateLimitKey(message, rule);
        return rateLimiter.tryAcquire(limitKey, rule.getRateLimitConfig());
    }
    
    /**
     * 构建限流键
     */
    private String buildRateLimitKey(StandardMessage message, RoutingRule rule) {
        return String.format("route:%s:device:%s", rule.getRuleId(), message.getDeviceId());
    }
    
    /**
     * 获取可用的路由目标
     */
    private List<RouteTarget> getAvailableTargets(RoutingRule rule) {
        return rule.getRouteTargets().stream()
                .filter(RouteTarget::isAvailable)
                .filter(target -> healthChecker.isHealthy(target))
                .collect(Collectors.toList());
    }
    
    /**
     * 选择路由目标
     */
    private RouteTarget selectTarget(List<RouteTarget> targets, RoutingStrategy strategy, StandardMessage message) {
        switch (strategy) {
            case ROUND_ROBIN:
                return loadBalancer.roundRobin(targets);
            case WEIGHTED_ROUND_ROBIN:
                return loadBalancer.weightedRoundRobin(targets);
            case LEAST_CONNECTIONS:
                return loadBalancer.leastConnections(targets);
            case RANDOM:
                return loadBalancer.random(targets);
            case HASH:
                return loadBalancer.hash(targets, message.getDeviceId());
            case PRIORITY:
                return loadBalancer.priority(targets);
            default:
                return targets.get(0);
        }
    }
    
    /**
     * 执行路由
     */
    private RoutingResult executeRouting(StandardMessage message, RouteTarget target, 
                                       RoutingRule rule, RoutingContext context) {
        try {
            // 根据目标类型执行不同的路由逻辑
            switch (target.getTargetType()) {
                case MESSAGE_QUEUE:
                    return routeToMessageQueue(message, target, context);
                case HTTP_ENDPOINT:
                    return routeToHttpEndpoint(message, target, context);
                case DATABASE:
                    return routeToDatabase(message, target, context);
                case FILE_SYSTEM:
                    return routeToFileSystem(message, target, context);
                case KAFKA_TOPIC:
                    return routeToKafkaTopic(message, target, context);
                default:
                    return RoutingResult.failure("UNSUPPORTED_TARGET_TYPE", 
                            "不支持的目标类型:" + target.getTargetType());
            }
        } catch (Exception e) {
            log.error("路由执行失败:targetId={}, messageId={}", 
                    target.getTargetId(), message.getMessageId(), e);
            
            // 尝试故障转移
            return handleFailover(message, rule, context, e);
        }
    }
    
    /**
     * 路由到消息队列
     */
    private RoutingResult routeToMessageQueue(StandardMessage message, RouteTarget target, RoutingContext context) {
        // 实现消息队列路由逻辑
        log.debug("路由消息到队列:targetId={}, messageId={}", target.getTargetId(), message.getMessageId());
        
        // 这里应该实现具体的消息队列发送逻辑
        // 例如:rabbitTemplate.send(queueName, message);
        
        return RoutingResult.success(target, "消息已发送到队列");
    }
    
    /**
     * 路由到HTTP端点
     */
    private RoutingResult routeToHttpEndpoint(StandardMessage message, RouteTarget target, RoutingContext context) {
        // 实现HTTP端点路由逻辑
        log.debug("路由消息到HTTP端点:targetId={}, messageId={}", target.getTargetId(), message.getMessageId());
        
        // 这里应该实现具体的HTTP请求发送逻辑
        // 例如:restTemplate.postForObject(url, message, String.class);
        
        return RoutingResult.success(target, "消息已发送到HTTP端点");
    }
    
    /**
     * 路由到数据库
     */
    private RoutingResult routeToDatabase(StandardMessage message, RouteTarget target, RoutingContext context) {
        // 实现数据库路由逻辑
        log.debug("路由消息到数据库:targetId={}, messageId={}", target.getTargetId(), message.getMessageId());
        
        // 这里应该实现具体的数据库存储逻辑
        // 例如:messageRepository.save(message);
        
        return RoutingResult.success(target, "消息已存储到数据库");
    }
    
    /**
     * 路由到文件系统
     */
    private RoutingResult routeToFileSystem(StandardMessage message, RouteTarget target, RoutingContext context) {
        // 实现文件系统路由逻辑
        log.debug("路由消息到文件系统:targetId={}, messageId={}", target.getTargetId(), message.getMessageId());
        
        // 这里应该实现具体的文件写入逻辑
        
        return RoutingResult.success(target, "消息已写入文件系统");
    }
    
    /**
     * 路由到Kafka主题
     */
    private RoutingResult routeToKafkaTopic(StandardMessage message, RouteTarget target, RoutingContext context) {
        // 实现Kafka主题路由逻辑
        log.debug("路由消息到Kafka主题:targetId={}, messageId={}", target.getTargetId(), message.getMessageId());
        
        // 这里应该实现具体的Kafka发送逻辑
        // 例如:kafkaTemplate.send(topicName, message);
        
        return RoutingResult.success(target, "消息已发送到Kafka主题");
    }
    
    /**
     * 处理故障转移
     */
    private RoutingResult handleFailover(StandardMessage message, RoutingRule rule, 
                                       RoutingContext context, Exception originalError) {
        if (rule.getFailoverConfig() == null || !rule.getFailoverConfig().isEnabled()) {
            return RoutingResult.failure("ROUTING_FAILED", originalError.getMessage());
        }
        
        // 获取备用目标
        List<RouteTarget> backupTargets = rule.getFailoverConfig().getBackupTargets();
        if (backupTargets.isEmpty()) {
            return RoutingResult.failure("NO_BACKUP_TARGET", "没有可用的备用目标");
        }
        
        // 尝试备用目标
        for (RouteTarget backupTarget : backupTargets) {
            if (backupTarget.isAvailable() && healthChecker.isHealthy(backupTarget)) {
                try {
                    return executeRouting(message, backupTarget, rule, context);
                } catch (Exception e) {
                    log.warn("备用目标也失败:targetId={}", backupTarget.getTargetId(), e);
                }
            }
        }
        
        return RoutingResult.failure("ALL_TARGETS_FAILED", "所有目标都不可用");
    }
    
    /**
     * 更新统计信息
     */
    private void updateStatistics(RoutingResult result, long routingTime) {
        statistics.incrementTotalCount();
        
        if (result.isSuccess()) {
            statistics.incrementSuccessCount();
        } else {
            statistics.incrementFailureCount();
        }
        
        statistics.updateAverageRoutingTime(routingTime);
        
        // 发送指标到监控系统
        metricsCollector.recordRoutingMetrics(result, routingTime);
    }
    
    @Override
    public List<RoutingResult> batchRoute(List<StandardMessage> messages, RoutingContext routingContext) {
        return messages.parallelStream()
                .map(message -> route(message, routingContext))
                .collect(Collectors.toList());
    }
    
    @Override
    public void registerRoutingRule(RoutingRule rule) {
        if (validateRule(rule)) {
            ruleCache.put(rule.getRuleId(), rule);
            ruleRepository.save(rule);
            log.info("路由规则已注册:ruleId={}, ruleName={}", rule.getRuleId(), rule.getRuleName());
        } else {
            throw new IllegalArgumentException("无效的路由规则");
        }
    }
    
    @Override
    public void removeRoutingRule(String ruleId) {
        ruleCache.remove(ruleId);
        ruleRepository.deleteById(ruleId);
        log.info("路由规则已移除:ruleId={}", ruleId);
    }
    
    @Override
    public RoutingStatistics getStatistics() {
        return statistics.copy();
    }
    
    @Override
    public HealthStatus checkHealth() {
        // 检查各个组件的健康状态
        boolean healthy = true;
        StringBuilder details = new StringBuilder();
        
        // 检查规则缓存
        if (ruleCache.isEmpty()) {
            healthy = false;
            details.append("没有可用的路由规则; ");
        }
        
        // 检查负载均衡器
        if (!loadBalancer.isHealthy()) {
            healthy = false;
            details.append("负载均衡器不健康; ");
        }
        
        // 检查限流器
        if (!rateLimiter.isHealthy()) {
            healthy = false;
            details.append("限流器不健康; ");
        }
        
        return HealthStatus.builder()
                .healthy(healthy)
                .details(details.toString())
                .checkTime(LocalDateTime.now())
                .build();
    }
    
    /**
     * 验证路由规则
     */
    private boolean validateRule(RoutingRule rule) {
        if (rule == null || rule.getRuleId() == null || rule.getRuleId().isEmpty()) {
            return false;
        }
        
        if (rule.getRouteTargets() == null || rule.getRouteTargets().isEmpty()) {
            return false;
        }
        
        // 验证匹配条件
        if (rule.getMatchConditions() != null) {
            for (MatchCondition condition : rule.getMatchConditions()) {
                if (condition.getFieldName() == null || condition.getOperator() == null) {
                    return false;
                }
            }
        }
        
        return true;
    }
    
    /**
     * 初始化方法 - 加载路由规则
     */
    @PostConstruct
    public void initialize() {
        // 从数据库加载路由规则
        List<RoutingRule> rules = ruleRepository.findAllEnabled();
        for (RoutingRule rule : rules) {
            ruleCache.put(rule.getRuleId(), rule);
        }
        
        log.info("消息路由器初始化完成,加载了{}条路由规则", rules.size());
    }
}

# 4.5 路由结果封装

/**
 * 路由结果 - "投递结果报告"
 */
@Data
@Builder
public class RoutingResult {
    
    /**
     * 路由是否成功
     */
    private boolean success;
    
    /**
     * 选中的路由目标
     */
    private RouteTarget selectedTarget;
    
    /**
     * 使用的路由规则
     */
    private RoutingRule appliedRule;
    
    /**
     * 路由耗时(毫秒)
     */
    private long routingTime;
    
    /**
     * 结果消息
     */
    private String message;
    
    /**
     * 错误代码
     */
    private String errorCode;
    
    /**
     * 错误详情
     */
    private String errorDetails;
    
    /**
     * 路由跟踪信息
     */
    private List<RouteTrace> traces;
    
    /**
     * 创建成功结果
     */
    public static RoutingResult success(RouteTarget target, String message) {
        return RoutingResult.builder()
                .success(true)
                .selectedTarget(target)
                .message(message)
                .traces(new ArrayList<>())
                .build();
    }
    
    /**
     * 创建失败结果
     */
    public static RoutingResult failure(String errorCode, String errorDetails) {
        return RoutingResult.builder()
                .success(false)
                .errorCode(errorCode)
                .errorDetails(errorDetails)
                .traces(new ArrayList<>())
                .build();
    }
    
    /**
     * 添加跟踪信息
     */
    public void addTrace(String step, String details) {
        if (traces == null) {
            traces = new ArrayList<>();
        }
        traces.add(RouteTrace.builder()
                .step(step)
                .details(details)
                .timestamp(LocalDateTime.now())
                .build());
    }
}

/**
 * 路由跟踪信息
 */
@Data
@Builder
public class RouteTrace {
    private String step;           // 步骤名称
    private String details;        // 详细信息
    private LocalDateTime timestamp; // 时间戳
}

# 5. 实际应用示例

# 5.1 智能家居场景

让我们通过一个智能家居的故事来看看协议适配层是如何工作的:

故事背景:小明家里有各种智能设备:温度传感器(MQTT协议)、智能插座(HTTP协议)、门锁(CoAP协议)。这些设备需要统一管理。

/**
 * 智能家居协议适配器配置
 */
@Configuration
public class SmartHomeAdapterConfig {
    
    /**
     * 配置MQTT温度传感器解析器
     */
    @Bean
    public ProtocolParser temperatureSensorParser() {
        return MqttProtocolParser.builder()
                .deviceType("TEMPERATURE_SENSOR")
                .topicPattern("home/+/temperature")
                .messageFormat(MessageFormat.JSON)
                .build();
    }
    
    /**
     * 配置HTTP智能插座解析器
     */
    @Bean
    public ProtocolParser smartPlugParser() {
        return HttpProtocolParser.builder()
                .deviceType("SMART_PLUG")
                .endpoint("/api/plug/status")
                .method(HttpMethod.POST)
                .contentType("application/json")
                .build();
    }
    
    /**
     * 配置CoAP门锁解析器
     */
    @Bean
    public ProtocolParser doorLockParser() {
        return CoapProtocolParser.builder()
                .deviceType("DOOR_LOCK")
                .resourcePath("/lock/status")
                .messageFormat(MessageFormat.CBOR)
                .build();
    }
    
    /**
     * 配置数据转换规则
     */
    @Bean
    public DataTransformer smartHomeTransformer() {
        List<TransformRule> rules = Arrays.asList(
                // 温度数据转换规则
                TransformRule.builder()
                        .sourceField("temp")
                        .targetField("temperature")
                        .dataType(DataType.DOUBLE)
                        .unit("°C")
                        .validationRule("value >= -50 && value <= 100")
                        .build(),
                        
                // 插座状态转换规则
                TransformRule.builder()
                        .sourceField("power_state")
                        .targetField("status")
                        .dataType(DataType.STRING)
                        .mappingScript("value == 1 ? 'ON' : 'OFF'")
                        .build(),
                        
                // 门锁状态转换规则
                TransformRule.builder()
                        .sourceField("lock_status")
                        .targetField("locked")
                        .dataType(DataType.BOOLEAN)
                        .mappingScript("value == 'LOCKED'")
                        .build()
        );
        
        return new UniversalDataTransformer(rules);
    }
    
    /**
     * 配置消息路由规则
     */
    @Bean
    public List<RoutingRule> smartHomeRoutingRules() {
        return Arrays.asList(
                // 温度数据路由到时序数据库
                RoutingRule.builder()
                        .ruleId("temperature-to-tsdb")
                        .ruleName("温度数据路由")
                        .matchConditions(Arrays.asList(
                                MatchCondition.builder()
                                        .fieldName("messageType")
                                        .operator(Operator.EQUALS)
                                        .expectedValue("DATA")
                                        .logicalOperator(LogicalOperator.AND)
                                        .build(),
                                MatchCondition.builder()
                                        .fieldName("data.temperature")
                                        .operator(Operator.IS_NOT_NULL)
                                        .logicalOperator(LogicalOperator.AND)
                                        .build()
                        ))
                        .routeTargets(Arrays.asList(
                                RouteTarget.builder()
                                        .targetId("influxdb-cluster")
                                        .targetName("时序数据库集群")
                                        .targetType(TargetType.DATABASE)
                                        .weight(100)
                                        .available(true)
                                        .build()
                        ))
                        .strategy(RoutingStrategy.ROUND_ROBIN)
                        .priority(100)
                        .enabled(true)
                        .build(),
                        
                // 安全事件路由到告警系统
                RoutingRule.builder()
                        .ruleId("security-alert")
                        .ruleName("安全告警路由")
                        .matchConditions(Arrays.asList(
                                MatchCondition.builder()
                                        .fieldName("messageType")
                                        .operator(Operator.EQUALS)
                                        .expectedValue("EVENT")
                                        .logicalOperator(LogicalOperator.AND)
                                        .build(),
                                MatchCondition.builder()
                                        .fieldName("data.locked")
                                        .operator(Operator.EQUALS)
                                        .expectedValue(false)
                                        .logicalOperator(LogicalOperator.AND)
                                        .build(),
                                MatchCondition.builder()
                                        .fieldName("hour")
                                        .operator(Operator.GREATER_THAN_OR_EQUAL)
                                        .expectedValue(22)
                                        .logicalOperator(LogicalOperator.OR)
                                        .build(),
                                MatchCondition.builder()
                                        .fieldName("hour")
                                        .operator(Operator.LESS_THAN_OR_EQUAL)
                                        .expectedValue(6)
                                        .build()
                        ))
                        .routeTargets(Arrays.asList(
                                RouteTarget.builder()
                                        .targetId("alert-system")
                                        .targetName("告警系统")
                                        .targetType(TargetType.HTTP_ENDPOINT)
                                        .weight(100)
                                        .available(true)
                                        .build(),
                                RouteTarget.builder()
                                        .targetId("sms-gateway")
                                        .targetName("短信网关")
                                        .targetType(TargetType.HTTP_ENDPOINT)
                                        .weight(50)
                                        .available(true)
                                        .build()
                        ))
                        .strategy(RoutingStrategy.BROADCAST)
                        .priority(200)
                        .enabled(true)
                        .rateLimitConfig(RateLimitConfig.builder()
                                .maxRequests(10)
                                .timeWindow(Duration.ofMinutes(1))
                                .build())
                        .build()
        );
    }
}

# 5.2 工业物联网场景

故事背景:某工厂有多条生产线,每条生产线上有温度、压力、振动等传感器,使用不同的工业协议(Modbus、OPC-UA、Profinet等)。

/**
 * 工业物联网协议适配器
 */
@Component
@Slf4j
public class IndustrialIoTAdapter {
    
    @Autowired
    private ProtocolParserFactory parserFactory;
    
    @Autowired
    private DataTransformer dataTransformer;
    
    @Autowired
    private MessageRouter messageRouter;
    
    /**
     * 处理生产线数据
     */
    @EventListener
    public void handleProductionLineData(ProductionLineDataEvent event) {
        try {
            // 第一步:解析协议数据
            ProtocolParser parser = parserFactory.getParser(event.getProtocolType());
            ParseResult parseResult = parser.parse(event.getRawData(), event.getParseContext());
            
            if (!parseResult.isSuccess()) {
                log.error("生产线数据解析失败:line={}, error={}", 
                        event.getProductionLine(), parseResult.getErrorMessage());
                return;
            }
            
            StandardMessage message = parseResult.getMessage();
            
            // 第二步:数据转换和质量检查
            TransformResult transformResult = dataTransformer.transform(message, buildTransformContext(event));
            
            if (!transformResult.isSuccess()) {
                log.error("生产线数据转换失败:line={}, error={}", 
                        event.getProductionLine(), transformResult.getErrorMessage());
                return;
            }
            
            // 第三步:数据质量评估
            double qualityScore = transformResult.getQualityScore();
            if (qualityScore < 0.8) {
                log.warn("生产线数据质量较低:line={}, score={}", 
                        event.getProductionLine(), qualityScore);
                
                // 低质量数据路由到数据清洗服务
                routeToDataCleaning(transformResult.getTransformedMessage(), event);
                return;
            }
            
            // 第四步:消息路由
            RoutingContext routingContext = RoutingContext.builder()
                    .attribute("productionLine", event.getProductionLine())
                    .attribute("shift", getCurrentShift())
                    .attribute("qualityScore", qualityScore)
                    .build();
                    
            RoutingResult routingResult = messageRouter.route(
                    transformResult.getTransformedMessage(), routingContext);
            
            if (routingResult.isSuccess()) {
                log.debug("生产线数据路由成功:line={}, target={}", 
                        event.getProductionLine(), routingResult.getSelectedTarget().getTargetName());
            } else {
                log.error("生产线数据路由失败:line={}, error={}", 
                        event.getProductionLine(), routingResult.getErrorDetails());
            }
            
        } catch (Exception e) {
            log.error("处理生产线数据时发生异常:line={}", event.getProductionLine(), e);
        }
    }
    
    /**
     * 构建转换上下文
     */
    private TransformContext buildTransformContext(ProductionLineDataEvent event) {
        return TransformContext.builder()
                .sourceSystem("PRODUCTION_LINE_" + event.getProductionLine())
                .targetSystem("MES_SYSTEM")
                .transformationTime(LocalDateTime.now())
                .attribute("productionLine", event.getProductionLine())
                .attribute("equipmentId", event.getEquipmentId())
                .attribute("shift", getCurrentShift())
                .build();
    }
    
    /**
     * 路由到数据清洗服务
     */
    private void routeToDataCleaning(StandardMessage message, ProductionLineDataEvent event) {
        // 创建数据清洗任务
        DataCleaningTask task = DataCleaningTask.builder()
                .messageId(message.getMessageId())
                .productionLine(event.getProductionLine())
                .rawData(event.getRawData())
                .parsedMessage(message)
                .priority(TaskPriority.HIGH)
                .build();
                
        // 发送到数据清洗队列
        // dataCleaningService.submitTask(task);
        
        log.info("已提交数据清洗任务:line={}, messageId={}", 
                event.getProductionLine(), message.getMessageId());
    }
    
    /**
     * 获取当前班次
     */
    private String getCurrentShift() {
        int hour = LocalDateTime.now().getHour();
        if (hour >= 8 && hour < 16) {
            return "DAY_SHIFT";
        } else if (hour >= 16 && hour < 24) {
            return "EVENING_SHIFT";
        } else {
            return "NIGHT_SHIFT";
        }
    }
}

# 6. 配置管理

# 6.1 动态配置

协议适配层支持动态配置,就像一个可以随时调整的"智能开关":

/**
 * 协议适配器配置管理器
 */
@Component
@Slf4j
public class ProtocolAdapterConfigManager {
    
    @Autowired
    private ConfigurationRepository configRepository;
    
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    
    // 配置缓存
    private final Map<String, AdapterConfig> configCache = new ConcurrentHashMap<>();
    
    // 配置监听器
    private final List<ConfigChangeListener> listeners = new CopyOnWriteArrayList<>();
    
    /**
     * 获取适配器配置
     */
    public AdapterConfig getConfig(String adapterId) {
        return configCache.computeIfAbsent(adapterId, this::loadConfigFromRepository);
    }
    
    /**
     * 更新适配器配置
     */
    public void updateConfig(String adapterId, AdapterConfig newConfig) {
        AdapterConfig oldConfig = configCache.get(adapterId);
        
        // 验证配置
        if (!validateConfig(newConfig)) {
            throw new IllegalArgumentException("无效的适配器配置");
        }
        
        // 保存到数据库
        configRepository.save(adapterId, newConfig);
        
        // 更新缓存
        configCache.put(adapterId, newConfig);
        
        // 通知监听器
        notifyConfigChange(adapterId, oldConfig, newConfig);
        
        log.info("适配器配置已更新:adapterId={}", adapterId);
    }
    
    /**
     * 注册配置变更监听器
     */
    public void addConfigChangeListener(ConfigChangeListener listener) {
        listeners.add(listener);
    }
    
    /**
     * 从数据库加载配置
     */
    private AdapterConfig loadConfigFromRepository(String adapterId) {
        return configRepository.findById(adapterId)
                .orElse(createDefaultConfig(adapterId));
    }
    
    /**
     * 创建默认配置
     */
    private AdapterConfig createDefaultConfig(String adapterId) {
        return AdapterConfig.builder()
                .adapterId(adapterId)
                .enabled(true)
                .maxConcurrency(100)
                .timeoutMs(5000)
                .retryCount(3)
                .batchSize(50)
                .build();
    }
    
    /**
     * 验证配置
     */
    private boolean validateConfig(AdapterConfig config) {
        if (config == null || config.getAdapterId() == null) {
            return false;
        }
        
        if (config.getMaxConcurrency() <= 0 || config.getMaxConcurrency() > 1000) {
            return false;
        }
        
        if (config.getTimeoutMs() <= 0 || config.getTimeoutMs() > 60000) {
            return false;
        }
        
        return true;
    }
    
    /**
     * 通知配置变更
     */
    private void notifyConfigChange(String adapterId, AdapterConfig oldConfig, AdapterConfig newConfig) {
        ConfigChangeEvent event = ConfigChangeEvent.builder()
                .adapterId(adapterId)
                .oldConfig(oldConfig)
                .newConfig(newConfig)
                .changeTime(LocalDateTime.now())
                .build();
                
        // 同步通知监听器
        listeners.forEach(listener -> {
            try {
                listener.onConfigChange(event);
            } catch (Exception e) {
                log.error("配置变更监听器执行失败:adapterId={}", adapterId, e);
            }
        });
        
        // 异步发布事件
        eventPublisher.publishEvent(event);
    }
    
    /**
     * 定期刷新配置
     */
    @Scheduled(fixedRate = 60000) // 每分钟检查一次
    public void refreshConfigs() {
        configCache.keySet().forEach(adapterId -> {
            try {
                AdapterConfig latestConfig = configRepository.findById(adapterId).orElse(null);
                if (latestConfig != null && !latestConfig.equals(configCache.get(adapterId))) {
                    updateConfig(adapterId, latestConfig);
                }
            } catch (Exception e) {
                log.error("刷新配置失败:adapterId={}", adapterId, e);
            }
        });
    }
}

/**
 * 适配器配置
 */
@Data
@Builder
public class AdapterConfig {
    private String adapterId;           // 适配器ID
    private boolean enabled;            // 是否启用
    private int maxConcurrency;         // 最大并发数
    private long timeoutMs;             // 超时时间(毫秒)
    private int retryCount;             // 重试次数
    private int batchSize;              // 批处理大小
    private Map<String, Object> properties; // 扩展属性
    private LocalDateTime updateTime;   // 更新时间
}

/**
 * 配置变更监听器
 */
public interface ConfigChangeListener {
    void onConfigChange(ConfigChangeEvent event);
}

/**
 * 配置变更事件
 */
@Data
@Builder
public class ConfigChangeEvent {
    private String adapterId;
    private AdapterConfig oldConfig;
    private AdapterConfig newConfig;
    private LocalDateTime changeTime;
}

# 7. 监控和指标

# 7.1 性能监控

协议适配层需要全面的监控,就像医生需要监控病人的各项生命体征:

/**
 * 协议适配器监控指标收集器
 */
@Component
@Slf4j
public class ProtocolAdapterMetricsCollector {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    // 计数器
    private final Counter parseSuccessCounter;
    private final Counter parseFailureCounter;
    private final Counter transformSuccessCounter;
    private final Counter transformFailureCounter;
    private final Counter routingSuccessCounter;
    private final Counter routingFailureCounter;
    
    // 计时器
    private final Timer parseTimer;
    private final Timer transformTimer;
    private final Timer routingTimer;
    
    // 仪表盘
    private final Gauge activeConnectionsGauge;
    private final Gauge queueSizeGauge;
    
    public ProtocolAdapterMetricsCollector(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        
        // 初始化计数器
        this.parseSuccessCounter = Counter.builder("protocol.parse.success")
                .description("协议解析成功次数")
                .register(meterRegistry);
                
        this.parseFailureCounter = Counter.builder("protocol.parse.failure")
                .description("协议解析失败次数")
                .register(meterRegistry);
                
        this.transformSuccessCounter = Counter.builder("data.transform.success")
                .description("数据转换成功次数")
                .register(meterRegistry);
                
        this.transformFailureCounter = Counter.builder("data.transform.failure")
                .description("数据转换失败次数")
                .register(meterRegistry);
                
        this.routingSuccessCounter = Counter.builder("message.routing.success")
                .description("消息路由成功次数")
                .register(meterRegistry);
                
        this.routingFailureCounter = Counter.builder("message.routing.failure")
                .description("消息路由失败次数")
                .register(meterRegistry);
        
        // 初始化计时器
        this.parseTimer = Timer.builder("protocol.parse.duration")
                .description("协议解析耗时")
                .register(meterRegistry);
                
        this.transformTimer = Timer.builder("data.transform.duration")
                .description("数据转换耗时")
                .register(meterRegistry);
                
        this.routingTimer = Timer.builder("message.routing.duration")
                .description("消息路由耗时")
                .register(meterRegistry);
        
        // 初始化仪表盘
        this.activeConnectionsGauge = Gauge.builder("protocol.connections.active")
                .description("活跃连接数")
                .register(meterRegistry, this, ProtocolAdapterMetricsCollector::getActiveConnections);
                
        this.queueSizeGauge = Gauge.builder("protocol.queue.size")
                .description("队列大小")
                .register(meterRegistry, this, ProtocolAdapterMetricsCollector::getQueueSize);
    }
    
    /**
     * 记录解析指标
     */
    public void recordParseMetrics(ParseResult result, long duration, String protocolType) {
        Timer.Sample sample = Timer.start(meterRegistry);
        sample.stop(parseTimer.tag("protocol", protocolType));
        
        if (result.isSuccess()) {
            parseSuccessCounter.increment(Tags.of("protocol", protocolType));
        } else {
            parseFailureCounter.increment(Tags.of(
                    "protocol", protocolType,
                    "error", result.getErrorCode()
            ));
        }
    }
    
    /**
     * 记录转换指标
     */
    public void recordTransformMetrics(TransformResult result, long duration, String transformType) {
        Timer.Sample sample = Timer.start(meterRegistry);
        sample.stop(transformTimer.tag("type", transformType));
        
        if (result.isSuccess()) {
            transformSuccessCounter.increment(Tags.of("type", transformType));
            
            // 记录数据质量分数
            Gauge.builder("data.quality.score")
                    .description("数据质量分数")
                    .tag("type", transformType)
                    .register(meterRegistry, () -> result.getQualityScore());
        } else {
            transformFailureCounter.increment(Tags.of(
                    "type", transformType,
                    "error", result.getErrorCode()
            ));
        }
    }
    
    /**
     * 记录路由指标
     */
    public void recordRoutingMetrics(RoutingResult result, long duration) {
        Timer.Sample sample = Timer.start(meterRegistry);
        sample.stop(routingTimer);
        
        if (result.isSuccess()) {
            routingSuccessCounter.increment(Tags.of(
                    "target", result.getSelectedTarget().getTargetName(),
                    "strategy", result.getAppliedRule().getStrategy().name()
            ));
        } else {
            routingFailureCounter.increment(Tags.of(
                    "error", result.getErrorCode()
            ));
        }
    }
    
    /**
     * 获取活跃连接数
     */
    private double getActiveConnections() {
        // 这里应该返回实际的活跃连接数
        // 例如:return connectionManager.getActiveConnectionCount();
        return 0;
    }
    
    /**
     * 获取队列大小
     */
    private double getQueueSize() {
        // 这里应该返回实际的队列大小
        // 例如:return messageQueue.size();
        return 0;
    }
    
    /**
     * 创建自定义指标
     */
    public void createCustomMetric(String name, String description, double value, Tags tags) {
        Gauge.builder(name)
                .description(description)
                .tags(tags)
                .register(meterRegistry, () -> value);
    }
}

# 7.2 健康检查

/**
 * 协议适配器健康检查
 */
@Component
public class ProtocolAdapterHealthIndicator implements HealthIndicator {
    
    @Autowired
    private ProtocolParserFactory parserFactory;
    
    @Autowired
    private DataTransformer dataTransformer;
    
    @Autowired
    private MessageRouter messageRouter;
    
    @Autowired
    private ProtocolAdapterConfigManager configManager;
    
    @Override
    public Health health() {
        Health.Builder builder = new Health.Builder();
        
        try {
            // 检查协议解析器
            checkProtocolParsers(builder);
            
            // 检查数据转换器
            checkDataTransformer(builder);
            
            // 检查消息路由器
            checkMessageRouter(builder);
            
            // 检查配置管理器
            checkConfigManager(builder);
            
            // 如果所有检查都通过,标记为健康
            builder.up();
            
        } catch (Exception e) {
            builder.down(e);
        }
        
        return builder.build();
    }
    
    private void checkProtocolParsers(Health.Builder builder) {
        List<String> availableParsers = parserFactory.getAvailableParserTypes();
        builder.withDetail("availableParsers", availableParsers);
        
        if (availableParsers.isEmpty()) {
            throw new RuntimeException("没有可用的协议解析器");
        }
        
        // 检查每个解析器的健康状态
        for (String parserType : availableParsers) {
            ProtocolParser parser = parserFactory.getParser(parserType);
            HealthStatus status = parser.checkHealth();
            builder.withDetail("parser." + parserType, status.isHealthy() ? "UP" : "DOWN");
            
            if (!status.isHealthy()) {
                throw new RuntimeException("协议解析器不健康:" + parserType);
            }
        }
    }
    
    private void checkDataTransformer(Health.Builder builder) {
        HealthStatus status = dataTransformer.checkHealth();
        builder.withDetail("dataTransformer", status.isHealthy() ? "UP" : "DOWN");
        
        if (!status.isHealthy()) {
            throw new RuntimeException("数据转换器不健康:" + status.getDetails());
        }
    }
    
    private void checkMessageRouter(Health.Builder builder) {
        HealthStatus status = messageRouter.checkHealth();
        builder.withDetail("messageRouter", status.isHealthy() ? "UP" : "DOWN");
        
        if (!status.isHealthy()) {
            throw new RuntimeException("消息路由器不健康:" + status.getDetails());
        }
    }
    
    private void checkConfigManager(Health.Builder builder) {
        try {
            // 尝试获取一个配置来验证配置管理器是否正常
            AdapterConfig testConfig = configManager.getConfig("test");
            builder.withDetail("configManager", "UP");
        } catch (Exception e) {
            builder.withDetail("configManager", "DOWN");
            throw new RuntimeException("配置管理器不健康:" + e.getMessage());
        }
    }
}

# 8. 最佳实践

# 8.1 性能优化建议

  1. 批处理优化:就像快递员一次送多个包裹比一次送一个包裹效率更高
/**
 * 批处理优化示例
 */
@Component
public class BatchProcessingOptimizer {
    
    @Autowired
    private DataTransformer dataTransformer;
    
    @Autowired
    private MessageRouter messageRouter;
    
    /**
     * 批量处理消息
     */
    public void processBatch(List<StandardMessage> messages) {
        // 按设备类型分组
        Map<String, List<StandardMessage>> groupedMessages = messages.stream()
                .collect(Collectors.groupingBy(msg -> 
                        msg.getHeader().getDeviceType()));
        
        // 并行处理每个分组
        groupedMessages.entrySet().parallelStream().forEach(entry -> {
            String deviceType = entry.getKey();
            List<StandardMessage> deviceMessages = entry.getValue();
            
            // 批量转换
            List<TransformResult> transformResults = dataTransformer
                    .batchTransform(deviceMessages, buildTransformContext(deviceType));
            
            // 批量路由
            List<StandardMessage> transformedMessages = transformResults.stream()
                    .filter(TransformResult::isSuccess)
                    .map(TransformResult::getTransformedMessage)
                    .collect(Collectors.toList());
                    
            messageRouter.batchRoute(transformedMessages, buildRoutingContext(deviceType));
        });
    }
    
    private TransformContext buildTransformContext(String deviceType) {
        return TransformContext.builder()
                .sourceSystem(deviceType)
                .targetSystem("UNIFIED_SYSTEM")
                .transformationTime(LocalDateTime.now())
                .build();
    }
    
    private RoutingContext buildRoutingContext(String deviceType) {
        return RoutingContext.builder()
                .attribute("deviceType", deviceType)
                .attribute("batchProcessing", true)
                .build();
    }
}
  1. 缓存策略:就像图书馆把常用的书放在显眼的地方
/**
 * 缓存优化示例
 */
@Component
public class CacheOptimizer {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 解析结果缓存
    private final Cache<String, ParseResult> parseResultCache = Caffeine.newBuilder()
            .maximumSize(10000)
            .expireAfterWrite(Duration.ofMinutes(30))
            .build();
    
    // 转换规则缓存
    private final Cache<String, List<TransformRule>> transformRuleCache = Caffeine.newBuilder()
            .maximumSize(1000)
            .expireAfterWrite(Duration.ofHours(1))
            .build();
    
    /**
     * 缓存解析结果
     */
    public void cacheParseResult(String key, ParseResult result) {
        if (result.isSuccess()) {
            parseResultCache.put(key, result);
        }
    }
    
    /**
     * 获取缓存的解析结果
     */
    public ParseResult getCachedParseResult(String key) {
        return parseResultCache.getIfPresent(key);
    }
    
    /**
     * 缓存转换规则
     */
    public void cacheTransformRules(String deviceType, List<TransformRule> rules) {
        transformRuleCache.put(deviceType, rules);
        
        // 同时缓存到Redis(分布式缓存)
        redisTemplate.opsForValue().set(
                "transform_rules:" + deviceType, 
                rules, 
                Duration.ofHours(2)
        );
    }
    
    /**
     * 获取缓存的转换规则
     */
    @SuppressWarnings("unchecked")
    public List<TransformRule> getCachedTransformRules(String deviceType) {
        // 先从本地缓存获取
        List<TransformRule> rules = transformRuleCache.getIfPresent(deviceType);
        if (rules != null) {
            return rules;
        }
        
        // 再从Redis获取
        rules = (List<TransformRule>) redisTemplate.opsForValue()
                .get("transform_rules:" + deviceType);
        if (rules != null) {
            transformRuleCache.put(deviceType, rules);
        }
        
        return rules;
    }
}

# 8.2 错误处理和恢复

/**
 * 错误处理和恢复机制
 */
@Component
@Slf4j
public class ErrorHandlingAndRecovery {
    
    @Autowired
    private DeadLetterQueue deadLetterQueue;
    
    @Autowired
    private AlertService alertService;
    
    /**
     * 处理解析错误
     */
    @Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000))
    public ParseResult handleParseError(byte[] rawData, String protocolType, Exception error) {
        log.error("协议解析失败,尝试恢复:protocol={}", protocolType, error);
        
        try {
            // 尝试使用备用解析器
            ProtocolParser backupParser = getBackupParser(protocolType);
            if (backupParser != null) {
                return backupParser.parse(rawData, createFallbackContext());
            }
            
            // 尝试通用解析器
            ProtocolParser genericParser = getGenericParser();
            return genericParser.parse(rawData, createGenericContext());
            
        } catch (Exception e) {
            log.error("所有解析尝试都失败", e);
            
            // 发送到死信队列
            deadLetterQueue.send(DeadLetterMessage.builder()
                    .originalData(rawData)
                    .protocolType(protocolType)
                    .errorMessage(e.getMessage())
                    .timestamp(LocalDateTime.now())
                    .build());
            
            // 发送告警
            alertService.sendAlert(Alert.builder()
                    .level(AlertLevel.ERROR)
                    .title("协议解析失败")
                    .message("协议类型:" + protocolType + ",错误:" + e.getMessage())
                    .build());
            
            return ParseResult.failure("PARSE_FAILED", e.getMessage());
        }
    }
    
    /**
     * 处理转换错误
     */
    @Recover
    public TransformResult handleTransformError(Exception error, StandardMessage message) {
        log.error("数据转换失败,执行恢复策略:messageId={}", message.getMessageId(), error);
        
        // 创建降级的转换结果
        StandardMessage degradedMessage = createDegradedMessage(message);
        
        return TransformResult.builder()
                .success(true)
                .transformedMessage(degradedMessage)
                .qualityScore(0.5) // 降级质量分数
                .errorMessage("使用降级策略")
                .build();
    }
    
    /**
     * 创建降级消息
     */
    private StandardMessage createDegradedMessage(StandardMessage original) {
        return StandardMessage.builder()
                .messageId(original.getMessageId())
                .deviceId(original.getDeviceId())
                .messageType(MessageType.DATA)
                .priority(MessagePriority.LOW)
                .sourceProtocol(original.getSourceProtocol())
                .timestamp(original.getTimestamp())
                .header(original.getHeader())
                .body(MessageBody.builder()
                        .dataPoints(Map.of("status", "degraded"))
                        .build())
                .build();
    }
    
    private ProtocolParser getBackupParser(String protocolType) {
        // 返回备用解析器
        return null;
    }
    
    private ProtocolParser getGenericParser() {
        // 返回通用解析器
        return null;
    }
    
    private ParseContext createFallbackContext() {
        return ParseContext.builder()
                .fallbackMode(true)
                .build();
    }
    
    private ParseContext createGenericContext() {
        return ParseContext.builder()
                .genericMode(true)
                .build();
    }
}

# 9. 总结

协议适配层就像一个智能的"翻译中心",它:

  1. 协议解析器:像专业翻译官,能够理解各种"方言"(协议)
  2. 数据转换器:像智能编辑,能够将内容转换成统一的"标准格式"
  3. 消息路由器:像高效的邮局调度员,能够将消息准确送达目的地

通过这三个核心组件的协同工作,协议适配层实现了:

  • 统一接入:支持多种协议的设备接入
  • 数据标准化:将异构数据转换为统一格式
  • 智能路由:根据规则将消息路由到合适的目标
  • 高可用性:具备故障转移和恢复能力
  • 高性能:支持批处理和缓存优化
  • 可监控:提供全面的监控和健康检查

这样的设计让物联网系统能够轻松应对各种复杂的设备接入场景,就像一个经验丰富的"万能翻译",无论遇到什么样的"语言",都能准确理解并妥善处理。

# 2.3 解析结果封装

/**
 * 协议解析结果 - 翻译的成果
 * 
 * 就像翻译官不仅要翻译内容,还要告诉你翻译的质量、
 * 是否有疑问等信息一样
 */
@Data
@Builder
public class ParseResult {
    
    /**
     * 解析是否成功 - "翻译成功了吗?"
     */
    private boolean success;
    
    /**
     * 解析后的标准消息 - "翻译的内容"
     */
    private StandardMessage message;
    
    /**
     * 错误信息 - "翻译遇到的问题"
     */
    private String errorMessage;
    
    /**
     * 错误代码 - "问题的类型编号"
     */
    private String errorCode;
    
    /**
     * 解析耗时(毫秒)- "翻译用了多长时间"
     */
    private long parseTime;
    
    /**
     * 数据质量评分 - "翻译质量如何(0-100)"
     */
    private int qualityScore;
    
    /**
     * 额外属性 - "其他补充信息"
     */
    private Map<String, Object> attributes;
    
    /**
     * 创建成功结果
     */
    public static ParseResult success(StandardMessage message) {
        return ParseResult.builder()
                .success(true)
                .message(message)
                .qualityScore(100)
                .parseTime(System.currentTimeMillis())
                .build();
    }
    
    /**
     * 创建失败结果
     */
    public static ParseResult failure(String errorCode, String errorMessage) {
        return ParseResult.builder()
                .success(false)
                .errorCode(errorCode)
                .errorMessage(errorMessage)
                .qualityScore(0)
                .parseTime(System.currentTimeMillis())
                .build();
    }
}

# 2.4 标准消息格式

/**
 * 标准消息格式 - 统一的"普通话"
 * 
 * 就像联合国会议中,各国代表最终都要用统一的语言
 * (如英语)来交流一样,所有设备消息最终都要转换为
 * 这种标准格式
 */
@Data
@Builder
public class StandardMessage {
    
    /**
     * 消息ID - "消息的身份证"
     */
    private String messageId;
    
    /**
     * 设备ID - "说话者的身份"
     */
    private String deviceId;
    
    /**
     * 消息类型 - "谈话的主题类型"
     */
    private MessageType messageType;
    
    /**
     * 时间戳 - "说话的时间"
     */
    private LocalDateTime timestamp;
    
    /**
     * 消息头 - "信封上的信息"
     */
    private MessageHeader header;
    
    /**
     * 消息体 - "信件的内容"
     */
    private MessageBody body;
    
    /**
     * 消息优先级 - "消息的重要程度"
     */
    private Priority priority;
    
    /**
     * 来源协议 - "原始语言类型"
     */
    private ProtocolType sourceProtocol;
    
    /**
     * 质量指标 - "消息的可信度"
     */
    private QualityMetrics quality;
}

/**
 * 消息类型枚举 - 谈话主题的分类
 */
public enum MessageType {
    DATA("数据消息", "设备上报的传感器数据"),
    COMMAND("命令消息", "向设备发送的控制指令"),
    EVENT("事件消息", "设备状态变化通知"),
    HEARTBEAT("心跳消息", "设备存活状态确认"),
    ALARM("告警消息", "设备异常情况报告"),
    CONFIG("配置消息", "设备配置参数"),
    FIRMWARE("固件消息", "设备固件更新相关"),
    LOG("日志消息", "设备运行日志");
    
    private final String name;
    private final String description;
    
    MessageType(String name, String description) {
        this.name = name;
        this.description = description;
    }
}

# 2.5 MQTT协议解析器实现

/**
 * MQTT协议解析器 - 专门的"MQTT语言"翻译官
 * 
 * MQTT就像是物联网世界的"普通话",轻量级、易学习,
 * 大部分IoT设备都"会说"这种语言
 */
@Component
@Slf4j
public class MqttProtocolParser implements ProtocolParser {
    
    @Autowired
    private ObjectMapper objectMapper;
    
    @Autowired
    private MqttTopicResolver topicResolver;
    
    /**
     * 解析MQTT消息 - 理解MQTT设备的"话"
     */
    @Override
    public ParseResult parse(byte[] rawData, ParseContext context) {
        long startTime = System.currentTimeMillis();
        
        try {
            // 第一步:提取MQTT消息信息(就像先看信封上的地址)
            MqttMessage mqttMessage = extractMqttMessage(rawData, context);
            
            // 第二步:解析Topic信息(理解"收件人地址")
            TopicInfo topicInfo = topicResolver.resolve(mqttMessage.getTopic());
            
            // 第三步:解析消息体(读懂"信件内容")
            MessageBody messageBody = parseMessageBody(mqttMessage.getPayload(), topicInfo);
            
            // 第四步:构建标准消息(翻译成"普通话")
            StandardMessage standardMessage = StandardMessage.builder()
                    .messageId(generateMessageId())
                    .deviceId(topicInfo.getDeviceId())
                    .messageType(determineMessageType(topicInfo))
                    .timestamp(LocalDateTime.now())
                    .header(buildMessageHeader(mqttMessage, topicInfo))
                    .body(messageBody)
                    .priority(determinePriority(topicInfo))
                    .sourceProtocol(ProtocolType.MQTT)
                    .quality(calculateQuality(mqttMessage))
                    .build();
            
            long parseTime = System.currentTimeMillis() - startTime;
            
            log.debug("MQTT消息解析成功:deviceId={}, messageType={}, parseTime={}ms", 
                    topicInfo.getDeviceId(), standardMessage.getMessageType(), parseTime);
            
            return ParseResult.builder()
                    .success(true)
                    .message(standardMessage)
                    .parseTime(parseTime)
                    .qualityScore(calculateQualityScore(standardMessage))
                    .build();
            
        } catch (Exception e) {
            log.error("MQTT消息解析失败", e);
            return ParseResult.failure("MQTT_PARSE_ERROR", e.getMessage());
        }
    }
    
    /**
     * 构建MQTT消息 - 把"普通话"翻译成MQTT设备能懂的话
     */
    @Override
    public byte[] build(StandardMessage message, BuildContext context) {
        try {
            // 第一步:构建MQTT Topic(确定"收件人地址")
            String topic = buildMqttTopic(message, context);
            
            // 第二步:构建消息体(写"信件内容")
            byte[] payload = buildMqttPayload(message);
            
            // 第三步:设置MQTT属性(贴"邮票",设置"邮寄方式")
            MqttProperties properties = buildMqttProperties(message);
            
            // 第四步:封装成MQTT消息包
            MqttMessagePacket packet = MqttMessagePacket.builder()
                    .topic(topic)
                    .payload(payload)
                    .qos(determineQos(message))
                    .retained(shouldRetain(message))
                    .properties(properties)
                    .build();
            
            return packet.toByteArray();
            
        } catch (Exception e) {
            log.error("MQTT消息构建失败:messageId={}", message.getMessageId(), e);
            throw new ProtocolBuildException("MQTT消息构建失败", e);
        }
    }
    
    /**
     * 提取MQTT消息信息
     */
    private MqttMessage extractMqttMessage(byte[] rawData, ParseContext context) {
        // 从上下文中获取MQTT相关信息
        String topic = context.getAttribute("mqtt.topic", String.class);
        Integer qos = context.getAttribute("mqtt.qos", Integer.class);
        Boolean retained = context.getAttribute("mqtt.retained", Boolean.class);
        
        return MqttMessage.builder()
                .topic(topic)
                .payload(rawData)
                .qos(qos != null ? qos : 0)
                .retained(retained != null ? retained : false)
                .timestamp(LocalDateTime.now())
                .build();
    }
    
    /**
     * 解析消息体 - 理解"信件内容"
     */
    private MessageBody parseMessageBody(byte[] payload, TopicInfo topicInfo) {
        try {
            String payloadStr = new String(payload, StandardCharsets.UTF_8);
            
            // 根据Topic类型选择不同的解析策略
            switch (topicInfo.getMessageType()) {
                case DATA:
                    return parseDataMessage(payloadStr, topicInfo);
                case COMMAND:
                    return parseCommandMessage(payloadStr, topicInfo);
                case EVENT:
                    return parseEventMessage(payloadStr, topicInfo);
                case HEARTBEAT:
                    return parseHeartbeatMessage(payloadStr, topicInfo);
                default:
                    return parseGenericMessage(payloadStr, topicInfo);
            }
        } catch (Exception e) {
            log.warn("消息体解析失败,使用原始数据:{}", e.getMessage());
            return MessageBody.builder()
                    .contentType("application/octet-stream")
                    .rawData(payload)
                    .build();
        }
    }
    
    /**
     * 解析数据消息 - 处理传感器数据
     */
    private MessageBody parseDataMessage(String payload, TopicInfo topicInfo) {
        try {
            // 尝试解析为JSON格式
            JsonNode jsonNode = objectMapper.readTree(payload);
            
            Map<String, Object> dataPoints = new HashMap<>();
            
            // 遍历JSON字段,提取数据点
            jsonNode.fields().forEachRemaining(entry -> {
                String key = entry.getKey();
                JsonNode value = entry.getValue();
                
                if (value.isNumber()) {
                    dataPoints.put(key, value.asDouble());
                } else if (value.isBoolean()) {
                    dataPoints.put(key, value.asBoolean());
                } else {
                    dataPoints.put(key, value.asText());
                }
            });
            
            return MessageBody.builder()
                    .contentType("application/json")
                    .dataPoints(dataPoints)
                    .rawContent(payload)
                    .build();
            
        } catch (Exception e) {
            // 如果不是JSON格式,尝试其他格式
            return parseKeyValueMessage(payload);
        }
    }
    
    /**
     * 解析键值对格式消息
     */
    private MessageBody parseKeyValueMessage(String payload) {
        Map<String, Object> dataPoints = new HashMap<>();
        
        // 支持多种分隔符:temp=25.5;humidity=60.0 或 temp:25.5,humidity:60.0
        String[] pairs = payload.split("[;,]\\s*");
        
        for (String pair : pairs) {
            String[] keyValue = pair.split("[=:]\\s*", 2);
            if (keyValue.length == 2) {
                String key = keyValue[0].trim();
                String value = keyValue[1].trim();
                
                // 尝试转换为数字
                try {
                    if (value.contains(".")) {
                        dataPoints.put(key, Double.parseDouble(value));
                    } else {
                        dataPoints.put(key, Long.parseLong(value));
                    }
                } catch (NumberFormatException e) {
                    // 保持为字符串
                    dataPoints.put(key, value);
                }
            }
        }
        
        return MessageBody.builder()
                .contentType("text/plain")
                .dataPoints(dataPoints)
                .rawContent(payload)
                .build();
    }
    
    /**
     * 计算消息质量评分
     */
    private int calculateQualityScore(StandardMessage message) {
        int score = 100;
        
        // 检查必要字段
        if (message.getDeviceId() == null || message.getDeviceId().isEmpty()) {
            score -= 20;
        }
        
        if (message.getMessageType() == null) {
            score -= 15;
        }
        
        if (message.getBody() == null || message.getBody().getDataPoints().isEmpty()) {
            score -= 25;
        }
        
        // 检查时间戳合理性
        if (message.getTimestamp() != null) {
            LocalDateTime now = LocalDateTime.now();
            long timeDiff = Math.abs(ChronoUnit.SECONDS.between(message.getTimestamp(), now));
            if (timeDiff > 300) { // 超过5分钟
                score -= 10;
            }
        }
        
        return Math.max(0, score);
    }
    
    @Override
    public ProtocolType getSupportedProtocol() {
        return ProtocolType.MQTT;
    }
    
    @Override
    public boolean validate(byte[] data) {
        try {
            // 基本的数据有效性检查
            if (data == null || data.length == 0) {
                return false;
            }
            
            // 检查是否为有效的UTF-8字符串
            String content = new String(data, StandardCharsets.UTF_8);
            
            // 检查是否为有效的JSON或键值对格式
            return isValidJson(content) || isValidKeyValue(content);
            
        } catch (Exception e) {
            return false;
        }
    }
    
    private boolean isValidJson(String content) {
        try {
            objectMapper.readTree(content);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
    
    private boolean isValidKeyValue(String content) {
        // 简单检查是否包含键值对模式
        return content.matches(".+[=:].+");
    }
}

# 2.6 Topic解析器

/**
 * MQTT Topic解析器 - "地址解析专家"
 * 
 * 就像邮递员需要理解地址格式一样,Topic解析器需要
 * 理解MQTT Topic的结构,从中提取有用信息
 */
@Component
@Slf4j
public class MqttTopicResolver {
    
    // Topic模式配置(就像"地址格式规范")
    private static final Map<String, TopicPattern> TOPIC_PATTERNS = new HashMap<>();
    
    static {
        // 数据上报:device/{deviceId}/data/{dataType}
        TOPIC_PATTERNS.put("DATA", TopicPattern.builder()
                .pattern("device/([^/]+)/data/([^/]+)")
                .messageType(MessageType.DATA)
                .deviceIdGroup(1)
                .dataTypeGroup(2)
                .build());
        
        // 命令下发:device/{deviceId}/command/{commandType}
        TOPIC_PATTERNS.put("COMMAND", TopicPattern.builder()
                .pattern("device/([^/]+)/command/([^/]+)")
                .messageType(MessageType.COMMAND)
                .deviceIdGroup(1)
                .commandTypeGroup(2)
                .build());
        
        // 事件通知:device/{deviceId}/event/{eventType}
        TOPIC_PATTERNS.put("EVENT", TopicPattern.builder()
                .pattern("device/([^/]+)/event/([^/]+)")
                .messageType(MessageType.EVENT)
                .deviceIdGroup(1)
                .eventTypeGroup(2)
                .build());
        
        // 心跳消息:device/{deviceId}/heartbeat
        TOPIC_PATTERNS.put("HEARTBEAT", TopicPattern.builder()
                .pattern("device/([^/]+)/heartbeat")
                .messageType(MessageType.HEARTBEAT)
                .deviceIdGroup(1)
                .build());
    }
    
    /**
     * 解析Topic信息
     */
    public TopicInfo resolve(String topic) {
        if (topic == null || topic.isEmpty()) {
            throw new TopicResolveException("Topic不能为空");
        }
        
        // 遍历所有模式,找到匹配的
        for (Map.Entry<String, TopicPattern> entry : TOPIC_PATTERNS.entrySet()) {
            TopicPattern pattern = entry.getValue();
            Pattern regex = Pattern.compile(pattern.getPattern());
            Matcher matcher = regex.matcher(topic);
            
            if (matcher.matches()) {
                return buildTopicInfo(topic, pattern, matcher);
            }
        }
        
        // 如果没有匹配的模式,返回默认解析结果
        log.warn("未知的Topic格式:{}", topic);
        return TopicInfo.builder()
                .originalTopic(topic)
                .messageType(MessageType.DATA)
                .deviceId(extractDeviceIdFromUnknownTopic(topic))
                .build();
    }
    
    /**
     * 构建Topic信息
     */
    private TopicInfo buildTopicInfo(String topic, TopicPattern pattern, Matcher matcher) {
        TopicInfo.TopicInfoBuilder builder = TopicInfo.builder()
                .originalTopic(topic)
                .messageType(pattern.getMessageType());
        
        // 提取设备ID
        if (pattern.getDeviceIdGroup() > 0 && matcher.groupCount() >= pattern.getDeviceIdGroup()) {
            builder.deviceId(matcher.group(pattern.getDeviceIdGroup()));
        }
        
        // 提取数据类型
        if (pattern.getDataTypeGroup() > 0 && matcher.groupCount() >= pattern.getDataTypeGroup()) {
            builder.dataType(matcher.group(pattern.getDataTypeGroup()));
        }
        
        // 提取命令类型
        if (pattern.getCommandTypeGroup() > 0 && matcher.groupCount() >= pattern.getCommandTypeGroup()) {
            builder.commandType(matcher.group(pattern.getCommandTypeGroup()));
        }
        
        // 提取事件类型
        if (pattern.getEventTypeGroup() > 0 && matcher.groupCount() >= pattern.getEventTypeGroup()) {
            builder.eventType(matcher.group(pattern.getEventTypeGroup()));
        }
        
        return builder.build();
    }
    
    /**
     * 从未知格式的Topic中尝试提取设备ID
     */
    private String extractDeviceIdFromUnknownTopic(String topic) {
        // 尝试一些常见的模式
        String[] parts = topic.split("/");
        
        // 查找可能是设备ID的部分(通常是数字、字母、下划线、连字符的组合)
        for (String part : parts) {
            if (part.matches("[a-zA-Z0-9_-]+") && part.length() > 3) {
                return part;
            }
        }
        
        // 如果找不到,返回第一个非空部分
        for (String part : parts) {
            if (!part.isEmpty()) {
                return part;
            }
        }
        
        return "unknown";
    }
}

/**
 * Topic信息封装
 */
@Data
@Builder
public class TopicInfo {
    private String originalTopic;    // 原始Topic
    private MessageType messageType; // 消息类型
    private String deviceId;         // 设备ID
    private String dataType;         // 数据类型
    private String commandType;      // 命令类型
    private String eventType;        // 事件类型
    private Map<String, String> parameters; // 额外参数
}

/**
 * Topic模式定义
 */
@Data
@Builder
public class TopicPattern {
    private String pattern;          // 正则表达式模式
    private MessageType messageType; // 对应的消息类型
    private int deviceIdGroup;       // 设备ID在正则分组中的位置
    private int dataTypeGroup;       // 数据类型在正则分组中的位置
    private int commandTypeGroup;    // 命令类型在正则分组中的位置
    private int eventTypeGroup;      // 事件类型在正则分组中的位置
}