协议适配层详细设计与实现
# 协议适配层详细设计与实现
# 1. 概述
# 1.1 背景故事
想象一下,在一个繁忙的国际机场,来自世界各地的旅客说着不同的语言——有英语、中文、法语、西班牙语等等。如果没有翻译服务,这些旅客就无法与机场工作人员有效沟通,机场的运营也会陷入混乱。
在物联网世界中,协议适配层就像是这个机场的"万能翻译官"。各种设备就像是说着不同"语言"的旅客:
- 传感器设备说着"MQTT语言"
- 工业设备说着"Modbus语言"
- 智能家居设备说着"CoAP语言"
- 企业系统说着"HTTP语言"
协议适配层的使命就是让这些"说着不同语言"的设备能够无障碍地交流,确保整个物联网系统的和谐运行。
# 1.2 核心功能
协议适配层主要包含三个核心组件:
协议解析器(Protocol Parser) - "语言识别专家"
- 识别和解析各种设备协议
- 将原始数据转换为标准格式
数据转换器(Data Transformer) - "内容翻译官"
- 在不同数据格式间进行转换
- 统一数据结构和语义
消息路由器(Message Router) - "智能调度员"
- 根据规则将消息路由到正确的目标
- 实现负载均衡和故障转移
# 2. 协议解析器(Protocol Parser)
# 2.1 设计原理
协议解析器就像一位精通多国语言的翻译专家,能够理解各种设备"说话"的方式。每种协议都有自己的"语法规则",解析器需要掌握这些规则,才能正确理解设备发送的消息。
# 2.2 核心接口设计
/**
* 协议解析器接口 - 万能翻译官的基础能力
*
* 就像翻译官需要具备"听懂"和"表达"的能力一样,
* 协议解析器需要能够解析输入数据和构建输出数据
*/
public interface ProtocolParser {
/**
* 解析协议数据 - "听懂"设备说的话
*
* @param rawData 原始数据(设备的"原话")
* @param context 解析上下文("对话环境")
* @return 解析结果("理解的内容")
*/
ParseResult parse(byte[] rawData, ParseContext context);
/**
* 构建协议数据 - "说"设备能懂的话
*
* @param message 标准消息("要表达的意思")
* @param context 构建上下文("表达环境")
* @return 协议数据("设备能懂的话")
*/
byte[] build(StandardMessage message, BuildContext context);
/**
* 获取支持的协议类型 - "擅长的语言"
*/
ProtocolType getSupportedProtocol();
/**
* 验证数据格式 - "检查语法是否正确"
*/
boolean validate(byte[] data);
}
# 3. 数据转换器(Data Transformer)
# 3.1 设计原理
数据转换器就像一位精通各种"方言"的语言学家。虽然协议解析器已经把各种设备语言翻译成了"普通话",但不同地区的"普通话"还是有差异的——有的说"土豆",有的说"马铃薯",有的说"洋芋",说的是同一个东西,但表达方式不同。
数据转换器的任务就是统一这些"方言差异",确保所有数据都使用相同的"词汇"和"语法"。
# 3.2 核心接口设计
/**
* 数据转换器接口 - 统一"方言"的专家
*
* 就像联合国需要统一各国代表的表达方式一样,
* 数据转换器需要统一各种设备的数据格式
*/
public interface DataTransformer {
/**
* 转换数据格式 - 统一"说话方式"
*
* @param sourceData 源数据("方言版本")
* @param transformRule 转换规则("翻译字典")
* @return 转换结果("标准版本")
*/
TransformResult transform(Object sourceData, TransformRule transformRule);
/**
* 批量转换 - 批量处理"方言"
*/
List<TransformResult> batchTransform(List<Object> sourceDataList, TransformRule transformRule);
/**
* 反向转换 - 把"标准话"转回"方言"
*/
TransformResult reverseTransform(Object standardData, TransformRule transformRule);
/**
* 获取支持的转换类型
*/
Set<TransformType> getSupportedTypes();
/**
* 验证转换规则
*/
boolean validateRule(TransformRule rule);
}
# 3.3 转换规则定义
/**
* 数据转换规则 - "翻译字典"
*
* 就像字典告诉我们"土豆"="马铃薯"一样,
* 转换规则告诉系统如何统一不同的数据表达
*/
@Data
@Builder
public class TransformRule {
/**
* 规则ID - "字典编号"
*/
private String ruleId;
/**
* 规则名称 - "字典名称"
*/
private String ruleName;
/**
* 源数据类型 - "原始方言类型"
*/
private String sourceType;
/**
* 目标数据类型 - "标准语言类型"
*/
private String targetType;
/**
* 字段映射规则 - "词汇对照表"
*/
private List<FieldMapping> fieldMappings;
/**
* 数据验证规则 - "语法检查规则"
*/
private List<ValidationRule> validationRules;
/**
* 转换脚本 - "复杂翻译逻辑"
*/
private String transformScript;
/**
* 转换优先级 - "翻译优先级"
*/
private int priority;
/**
* 是否启用 - "字典是否可用"
*/
private boolean enabled;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}
/**
* 字段映射规则 - "词汇对照条目"
*/
@Data
@Builder
public class FieldMapping {
/**
* 源字段名 - "方言词汇"
*/
private String sourceField;
/**
* 目标字段名 - "标准词汇"
*/
private String targetField;
/**
* 数据类型转换 - "词性转换"
*/
private DataTypeMapping dataTypeMapping;
/**
* 默认值 - "找不到对应词汇时的默认翻译"
*/
private Object defaultValue;
/**
* 转换表达式 - "复杂翻译公式"
*/
private String expression;
/**
* 是否必需 - "是否为必须翻译的词汇"
*/
private boolean required;
}
/**
* 数据类型映射
*/
@Data
@Builder
public class DataTypeMapping {
private Class<?> sourceType; // 源数据类型
private Class<?> targetType; // 目标数据类型
private String converter; // 转换器名称
private Map<String, Object> converterParams; // 转换参数
}
# 3.4 通用数据转换器实现
/**
* 通用数据转换器 - 万能"翻译官"
*
* 就像一位经验丰富的翻译官,能够处理各种复杂的翻译任务
*/
@Component
@Slf4j
public class UniversalDataTransformer implements DataTransformer {
@Autowired
private TransformRuleRepository ruleRepository;
@Autowired
private ScriptEngine scriptEngine;
@Autowired
private TypeConverterRegistry converterRegistry;
@Autowired
private ValidationService validationService;
/**
* 转换数据 - 执行"翻译"工作
*/
@Override
public TransformResult transform(Object sourceData, TransformRule transformRule) {
long startTime = System.currentTimeMillis();
try {
// 第一步:验证输入数据(检查"原文"是否合法)
if (!validateSourceData(sourceData, transformRule)) {
return TransformResult.failure("SOURCE_DATA_INVALID", "源数据验证失败");
}
// 第二步:执行字段映射(按"字典"逐词翻译)
Map<String, Object> mappedData = executeFieldMapping(sourceData, transformRule);
// 第三步:执行脚本转换(处理复杂的"语法转换")
if (transformRule.getTransformScript() != null) {
mappedData = executeScriptTransform(mappedData, transformRule.getTransformScript());
}
// 第四步:数据类型转换(统一"词性")
Object transformedData = executeTypeConversion(mappedData, transformRule);
// 第五步:验证转换结果(检查"译文"质量)
ValidationResult validationResult = validationService.validate(transformedData, transformRule.getValidationRules());
long transformTime = System.currentTimeMillis() - startTime;
log.debug("数据转换完成:ruleId={}, transformTime={}ms, valid={}",
transformRule.getRuleId(), transformTime, validationResult.isValid());
return TransformResult.builder()
.success(true)
.transformedData(transformedData)
.sourceData(sourceData)
.transformRule(transformRule)
.transformTime(transformTime)
.validationResult(validationResult)
.qualityScore(calculateQualityScore(transformedData, validationResult))
.build();
} catch (Exception e) {
log.error("数据转换失败:ruleId={}", transformRule.getRuleId(), e);
return TransformResult.failure("TRANSFORM_ERROR", e.getMessage());
}
}
/**
* 执行字段映射 - 按"字典"翻译
*/
private Map<String, Object> executeFieldMapping(Object sourceData, TransformRule transformRule) {
Map<String, Object> result = new HashMap<>();
Map<String, Object> sourceMap = convertToMap(sourceData);
for (FieldMapping mapping : transformRule.getFieldMappings()) {
try {
Object sourceValue = extractSourceValue(sourceMap, mapping);
Object transformedValue = transformFieldValue(sourceValue, mapping);
if (transformedValue != null || mapping.isRequired()) {
result.put(mapping.getTargetField(), transformedValue);
}
} catch (Exception e) {
log.warn("字段映射失败:sourceField={}, targetField={}",
mapping.getSourceField(), mapping.getTargetField(), e);
if (mapping.isRequired()) {
throw new TransformException("必需字段映射失败:" + mapping.getTargetField(), e);
}
}
}
return result;
}
/**
* 提取源字段值 - 从"原文"中找到要翻译的"词汇"
*/
private Object extractSourceValue(Map<String, Object> sourceMap, FieldMapping mapping) {
String sourceField = mapping.getSourceField();
// 支持嵌套字段访问:user.profile.name
if (sourceField.contains(".")) {
return extractNestedValue(sourceMap, sourceField);
}
Object value = sourceMap.get(sourceField);
// 如果找不到值,使用默认值
if (value == null && mapping.getDefaultValue() != null) {
value = mapping.getDefaultValue();
}
return value;
}
/**
* 提取嵌套字段值
*/
private Object extractNestedValue(Map<String, Object> sourceMap, String fieldPath) {
String[] parts = fieldPath.split("\\.");
Object current = sourceMap;
for (String part : parts) {
if (current instanceof Map) {
current = ((Map<?, ?>) current).get(part);
} else {
return null;
}
}
return current;
}
/**
* 转换字段值 - 执行具体的"词汇翻译"
*/
private Object transformFieldValue(Object sourceValue, FieldMapping mapping) {
if (sourceValue == null) {
return mapping.getDefaultValue();
}
// 如果有转换表达式,使用表达式计算
if (mapping.getExpression() != null) {
return evaluateExpression(mapping.getExpression(), sourceValue);
}
// 如果有数据类型映射,执行类型转换
if (mapping.getDataTypeMapping() != null) {
return executeTypeMapping(sourceValue, mapping.getDataTypeMapping());
}
// 否则直接返回原值
return sourceValue;
}
/**
* 执行表达式计算 - 处理复杂的"翻译逻辑"
*/
private Object evaluateExpression(String expression, Object sourceValue) {
try {
// 创建脚本上下文
ScriptContext context = new SimpleScriptContext();
context.setAttribute("value", sourceValue, ScriptContext.ENGINE_SCOPE);
context.setAttribute("Math", Math.class, ScriptContext.ENGINE_SCOPE);
// 执行表达式
return scriptEngine.eval(expression, context);
} catch (Exception e) {
log.warn("表达式计算失败:expression={}, value={}", expression, sourceValue, e);
return sourceValue;
}
}
/**
* 执行类型映射
*/
private Object executeTypeMapping(Object sourceValue, DataTypeMapping typeMapping) {
TypeConverter converter = converterRegistry.getConverter(typeMapping.getConverter());
if (converter != null) {
return converter.convert(sourceValue, typeMapping.getTargetType(), typeMapping.getConverterParams());
}
// 如果没有专门的转换器,尝试默认转换
return convertType(sourceValue, typeMapping.getTargetType());
}
/**
* 默认类型转换
*/
private Object convertType(Object value, Class<?> targetType) {
if (value == null || targetType.isInstance(value)) {
return value;
}
try {
if (targetType == String.class) {
return value.toString();
} else if (targetType == Integer.class || targetType == int.class) {
return Integer.valueOf(value.toString());
} else if (targetType == Long.class || targetType == long.class) {
return Long.valueOf(value.toString());
} else if (targetType == Double.class || targetType == double.class) {
return Double.valueOf(value.toString());
} else if (targetType == Boolean.class || targetType == boolean.class) {
return Boolean.valueOf(value.toString());
} else if (targetType == LocalDateTime.class) {
return parseDateTime(value.toString());
}
} catch (Exception e) {
log.warn("类型转换失败:value={}, targetType={}", value, targetType.getSimpleName(), e);
}
return value;
}
/**
* 解析日期时间
*/
private LocalDateTime parseDateTime(String dateStr) {
// 支持多种日期格式
String[] patterns = {
"yyyy-MM-dd HH:mm:ss",
"yyyy-MM-dd'T'HH:mm:ss",
"yyyy-MM-dd'T'HH:mm:ss.SSS",
"yyyy-MM-dd'T'HH:mm:ss'Z'",
"yyyy-MM-dd",
"MM/dd/yyyy HH:mm:ss"
};
for (String pattern : patterns) {
try {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
return LocalDateTime.parse(dateStr, formatter);
} catch (Exception e) {
// 继续尝试下一个格式
}
}
// 如果都失败了,尝试解析时间戳
try {
long timestamp = Long.parseLong(dateStr);
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
} catch (Exception e) {
throw new TransformException("无法解析日期时间:" + dateStr);
}
}
/**
* 执行脚本转换 - 处理复杂的"语法转换"
*/
private Map<String, Object> executeScriptTransform(Map<String, Object> mappedData, String script) {
try {
ScriptContext context = new SimpleScriptContext();
context.setAttribute("data", mappedData, ScriptContext.ENGINE_SCOPE);
context.setAttribute("result", new HashMap<String, Object>(), ScriptContext.ENGINE_SCOPE);
scriptEngine.eval(script, context);
@SuppressWarnings("unchecked")
Map<String, Object> result = (Map<String, Object>) context.getAttribute("result");
return result != null ? result : mappedData;
} catch (Exception e) {
log.warn("脚本转换失败,使用映射结果", e);
return mappedData;
}
}
/**
* 将对象转换为Map
*/
private Map<String, Object> convertToMap(Object obj) {
if (obj instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) obj;
return map;
}
if (obj instanceof StandardMessage) {
return convertMessageToMap((StandardMessage) obj);
}
// 使用反射转换普通对象
return convertObjectToMap(obj);
}
/**
* 转换StandardMessage为Map
*/
private Map<String, Object> convertMessageToMap(StandardMessage message) {
Map<String, Object> map = new HashMap<>();
map.put("messageId", message.getMessageId());
map.put("deviceId", message.getDeviceId());
map.put("messageType", message.getMessageType());
map.put("timestamp", message.getTimestamp());
map.put("priority", message.getPriority());
map.put("sourceProtocol", message.getSourceProtocol());
if (message.getHeader() != null) {
map.put("header", convertObjectToMap(message.getHeader()));
}
if (message.getBody() != null) {
map.put("body", convertObjectToMap(message.getBody()));
if (message.getBody().getDataPoints() != null) {
map.putAll(message.getBody().getDataPoints());
}
}
return map;
}
/**
* 使用反射转换对象为Map
*/
private Map<String, Object> convertObjectToMap(Object obj) {
Map<String, Object> map = new HashMap<>();
try {
Field[] fields = obj.getClass().getDeclaredFields();
for (Field field : fields) {
field.setAccessible(true);
Object value = field.get(obj);
if (value != null) {
map.put(field.getName(), value);
}
}
} catch (Exception e) {
log.warn("对象转Map失败:{}", obj.getClass().getSimpleName(), e);
}
return map;
}
/**
* 计算转换质量评分
*/
private int calculateQualityScore(Object transformedData, ValidationResult validationResult) {
int score = 100;
// 基于验证结果调整分数
if (!validationResult.isValid()) {
score -= validationResult.getErrorCount() * 10;
}
// 基于数据完整性调整分数
if (transformedData instanceof Map) {
Map<?, ?> dataMap = (Map<?, ?>) transformedData;
if (dataMap.isEmpty()) {
score -= 50;
} else if (dataMap.size() < 3) {
score -= 20;
}
}
return Math.max(0, score);
}
@Override
public List<TransformResult> batchTransform(List<Object> sourceDataList, TransformRule transformRule) {
return sourceDataList.stream()
.map(data -> transform(data, transformRule))
.collect(Collectors.toList());
}
@Override
public TransformResult reverseTransform(Object standardData, TransformRule transformRule) {
// 创建反向转换规则
TransformRule reverseRule = createReverseRule(transformRule);
return transform(standardData, reverseRule);
}
/**
* 创建反向转换规则
*/
private TransformRule createReverseRule(TransformRule originalRule) {
List<FieldMapping> reverseMappings = originalRule.getFieldMappings().stream()
.map(mapping -> FieldMapping.builder()
.sourceField(mapping.getTargetField())
.targetField(mapping.getSourceField())
.dataTypeMapping(reverseDataTypeMapping(mapping.getDataTypeMapping()))
.defaultValue(mapping.getDefaultValue())
.required(mapping.isRequired())
.build())
.collect(Collectors.toList());
return TransformRule.builder()
.ruleId(originalRule.getRuleId() + "_reverse")
.ruleName(originalRule.getRuleName() + "_反向")
.sourceType(originalRule.getTargetType())
.targetType(originalRule.getSourceType())
.fieldMappings(reverseMappings)
.validationRules(originalRule.getValidationRules())
.priority(originalRule.getPriority())
.enabled(originalRule.isEnabled())
.build();
}
/**
* 反向数据类型映射
*/
private DataTypeMapping reverseDataTypeMapping(DataTypeMapping original) {
if (original == null) {
return null;
}
return DataTypeMapping.builder()
.sourceType(original.getTargetType())
.targetType(original.getSourceType())
.converter(original.getConverter())
.converterParams(original.getConverterParams())
.build();
}
@Override
public Set<TransformType> getSupportedTypes() {
return Set.of(
TransformType.FIELD_MAPPING,
TransformType.TYPE_CONVERSION,
TransformType.SCRIPT_TRANSFORM,
TransformType.VALIDATION
);
}
@Override
public boolean validateRule(TransformRule rule) {
if (rule == null) {
return false;
}
// 检查基本字段
if (rule.getRuleId() == null || rule.getRuleId().isEmpty()) {
return false;
}
if (rule.getFieldMappings() == null || rule.getFieldMappings().isEmpty()) {
return false;
}
// 检查字段映射的有效性
for (FieldMapping mapping : rule.getFieldMappings()) {
if (mapping.getSourceField() == null || mapping.getTargetField() == null) {
return false;
}
}
return true;
}
/**
* 验证源数据
*/
private boolean validateSourceData(Object sourceData, TransformRule transformRule) {
if (sourceData == null) {
return false;
}
// 可以添加更多的验证逻辑
return true;
}
}
# 3.5 转换结果封装
/**
* 数据转换结果 - "翻译成果报告"
*/
@Data
@Builder
public class TransformResult {
/**
* 转换是否成功
*/
private boolean success;
/**
* 转换后的数据
*/
private Object transformedData;
/**
* 原始数据
*/
private Object sourceData;
/**
* 使用的转换规则
*/
private TransformRule transformRule;
/**
* 转换耗时(毫秒)
*/
private long transformTime;
/**
* 验证结果
*/
private ValidationResult validationResult;
/**
* 质量评分(0-100)
*/
private int qualityScore;
/**
* 错误信息
*/
private String errorMessage;
/**
* 错误代码
*/
private String errorCode;
/**
* 转换统计信息
*/
private TransformStatistics statistics;
/**
* 创建成功结果
*/
public static TransformResult success(Object transformedData) {
return TransformResult.builder()
.success(true)
.transformedData(transformedData)
.qualityScore(100)
.build();
}
/**
* 创建失败结果
*/
public static TransformResult failure(String errorCode, String errorMessage) {
return TransformResult.builder()
.success(false)
.errorCode(errorCode)
.errorMessage(errorMessage)
.qualityScore(0)
.build();
}
}
/**
* 转换统计信息
*/
@Data
@Builder
public class TransformStatistics {
private int totalFields; // 总字段数
private int transformedFields; // 成功转换的字段数
private int skippedFields; // 跳过的字段数
private int errorFields; // 转换失败的字段数
private Map<String, Object> metrics; // 其他指标
}
# 4. 消息路由器(Message Router)
# 4.1 设计原理
消息路由器就像一位经验丰富的邮局调度员。在繁忙的邮局里,每天都有成千上万的邮件需要分拣和投递。调度员需要根据邮件的地址、优先级、重量等信息,决定这些邮件应该走哪条路线,用什么方式投递。
在物联网系统中,消息路由器面临着类似的挑战:
- 数以万计的设备消息需要处理
- 不同类型的消息需要不同的处理方式
- 系统负载需要均衡分配
- 故障时需要自动切换路由
# 4.2 核心接口设计
/**
* 消息路由器接口 - 智能"邮局调度员"
*
* 就像邮局调度员需要决定邮件的投递路线一样,
* 消息路由器需要决定消息的处理路径
*/
public interface MessageRouter {
/**
* 路由消息 - 决定消息的"投递路线"
*
* @param message 待路由的消息("邮件")
* @param routingContext 路由上下文("投递环境信息")
* @return 路由结果("投递方案")
*/
RoutingResult route(StandardMessage message, RoutingContext routingContext);
/**
* 批量路由 - 批量处理"邮件"
*/
List<RoutingResult> batchRoute(List<StandardMessage> messages, RoutingContext routingContext);
/**
* 注册路由规则 - 添加新的"投递规则"
*/
void registerRoutingRule(RoutingRule rule);
/**
* 移除路由规则 - 删除"投递规则"
*/
void removeRoutingRule(String ruleId);
/**
* 获取路由统计 - 查看"投递统计"
*/
RoutingStatistics getStatistics();
/**
* 健康检查 - 检查"投递系统"是否正常
*/
HealthStatus checkHealth();
}
# 4.3 路由规则定义
/**
* 路由规则 - "投递规则手册"
*
* 就像邮局有详细的投递规则一样,
* 路由规则定义了消息应该如何被处理和转发
*/
@Data
@Builder
public class RoutingRule {
/**
* 规则ID - "规则编号"
*/
private String ruleId;
/**
* 规则名称 - "规则名称"
*/
private String ruleName;
/**
* 匹配条件 - "适用条件"
*/
private List<MatchCondition> matchConditions;
/**
* 路由目标 - "投递目的地"
*/
private List<RouteTarget> routeTargets;
/**
* 路由策略 - "投递策略"
*/
private RoutingStrategy strategy;
/**
* 优先级 - "规则优先级"
*/
private int priority;
/**
* 是否启用 - "规则是否生效"
*/
private boolean enabled;
/**
* 故障转移配置 - "备用投递方案"
*/
private FailoverConfig failoverConfig;
/**
* 限流配置 - "流量控制"
*/
private RateLimitConfig rateLimitConfig;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}
/**
* 匹配条件 - "规则适用条件"
*/
@Data
@Builder
public class MatchCondition {
/**
* 条件类型 - "条件种类"
*/
private ConditionType type;
/**
* 字段名 - "检查的字段"
*/
private String fieldName;
/**
* 操作符 - "比较方式"
*/
private Operator operator;
/**
* 期望值 - "期望的值"
*/
private Object expectedValue;
/**
* 逻辑关系 - "与其他条件的关系"
*/
private LogicalOperator logicalOperator;
}
/**
* 路由目标 - "投递目的地"
*/
@Data
@Builder
public class RouteTarget {
/**
* 目标ID - "目的地编号"
*/
private String targetId;
/**
* 目标名称 - "目的地名称"
*/
private String targetName;
/**
* 目标类型 - "目的地类型"
*/
private TargetType targetType;
/**
* 连接配置 - "连接信息"
*/
private ConnectionConfig connectionConfig;
/**
* 权重 - "分配权重"
*/
private int weight;
/**
* 是否可用 - "目的地是否可用"
*/
private boolean available;
/**
* 健康状态 - "目的地健康状况"
*/
private HealthStatus healthStatus;
}
/**
* 路由策略枚举
*/
public enum RoutingStrategy {
ROUND_ROBIN("轮询", "依次分配给各个目标"),
WEIGHTED_ROUND_ROBIN("加权轮询", "根据权重分配"),
LEAST_CONNECTIONS("最少连接", "分配给连接数最少的目标"),
RANDOM("随机", "随机选择目标"),
HASH("哈希", "根据消息特征哈希选择"),
BROADCAST("广播", "发送给所有目标"),
FAILOVER("故障转移", "主备切换模式"),
PRIORITY("优先级", "按优先级选择目标");
private final String name;
private final String description;
RoutingStrategy(String name, String description) {
this.name = name;
this.description = description;
}
}
# 4.4 智能消息路由器实现
/**
* 智能消息路由器 - 高级"邮局调度系统"
*
* 就像现代化的邮局调度系统,能够智能地处理各种复杂的投递需求
*/
@Component
@Slf4j
public class IntelligentMessageRouter implements MessageRouter {
@Autowired
private RoutingRuleRepository ruleRepository;
@Autowired
private LoadBalancer loadBalancer;
@Autowired
private HealthChecker healthChecker;
@Autowired
private RateLimiter rateLimiter;
@Autowired
private MetricsCollector metricsCollector;
// 路由规则缓存("规则手册")
private final Map<String, RoutingRule> ruleCache = new ConcurrentHashMap<>();
// 路由统计("投递统计")
private final RoutingStatistics statistics = new RoutingStatistics();
/**
* 路由消息 - 执行"智能投递"
*/
@Override
public RoutingResult route(StandardMessage message, RoutingContext routingContext) {
long startTime = System.currentTimeMillis();
try {
// 第一步:查找匹配的路由规则("查找投递规则")
List<RoutingRule> matchedRules = findMatchingRules(message, routingContext);
if (matchedRules.isEmpty()) {
log.warn("未找到匹配的路由规则:messageId={}, deviceId={}",
message.getMessageId(), message.getDeviceId());
return RoutingResult.failure("NO_MATCHING_RULE", "未找到匹配的路由规则");
}
// 第二步:选择最佳规则("选择最优投递方案")
RoutingRule selectedRule = selectBestRule(matchedRules, message);
// 第三步:检查限流("检查投递能力")
if (!checkRateLimit(message, selectedRule)) {
statistics.incrementRateLimitedCount();
return RoutingResult.failure("RATE_LIMITED", "触发限流保护");
}
// 第四步:选择路由目标("选择具体投递地址")
List<RouteTarget> availableTargets = getAvailableTargets(selectedRule);
if (availableTargets.isEmpty()) {
return RoutingResult.failure("NO_AVAILABLE_TARGET", "没有可用的路由目标");
}
RouteTarget selectedTarget = selectTarget(availableTargets, selectedRule.getStrategy(), message);
// 第五步:执行路由("执行投递")
RoutingResult result = executeRouting(message, selectedTarget, selectedRule, routingContext);
// 第六步:更新统计("记录投递结果")
updateStatistics(result, System.currentTimeMillis() - startTime);
return result;
} catch (Exception e) {
log.error("消息路由失败:messageId={}", message.getMessageId(), e);
statistics.incrementErrorCount();
return RoutingResult.failure("ROUTING_ERROR", e.getMessage());
}
}
/**
* 查找匹配的路由规则 - "查找适用的投递规则"
*/
private List<RoutingRule> findMatchingRules(StandardMessage message, RoutingContext context) {
return ruleCache.values().stream()
.filter(rule -> rule.isEnabled())
.filter(rule -> matchesRule(message, rule, context))
.sorted(Comparator.comparingInt(RoutingRule::getPriority).reversed())
.collect(Collectors.toList());
}
/**
* 检查消息是否匹配规则
*/
private boolean matchesRule(StandardMessage message, RoutingRule rule, RoutingContext context) {
if (rule.getMatchConditions() == null || rule.getMatchConditions().isEmpty()) {
return true; // 没有条件表示匹配所有
}
// 构建消息属性映射
Map<String, Object> messageAttributes = buildMessageAttributes(message, context);
// 评估所有匹配条件
return evaluateConditions(rule.getMatchConditions(), messageAttributes);
}
/**
* 构建消息属性映射
*/
private Map<String, Object> buildMessageAttributes(StandardMessage message, RoutingContext context) {
Map<String, Object> attributes = new HashMap<>();
// 基本属性
attributes.put("messageId", message.getMessageId());
attributes.put("deviceId", message.getDeviceId());
attributes.put("messageType", message.getMessageType().name());
attributes.put("priority", message.getPriority().name());
attributes.put("sourceProtocol", message.getSourceProtocol().name());
// 时间属性
if (message.getTimestamp() != null) {
attributes.put("timestamp", message.getTimestamp());
attributes.put("hour", message.getTimestamp().getHour());
attributes.put("dayOfWeek", message.getTimestamp().getDayOfWeek().getValue());
}
// 消息体属性
if (message.getBody() != null && message.getBody().getDataPoints() != null) {
message.getBody().getDataPoints().forEach((key, value) -> {
attributes.put("data." + key, value);
});
}
// 上下文属性
if (context != null && context.getAttributes() != null) {
context.getAttributes().forEach((key, value) -> {
attributes.put("context." + key, value);
});
}
return attributes;
}
/**
* 评估匹配条件
*/
private boolean evaluateConditions(List<MatchCondition> conditions, Map<String, Object> attributes) {
if (conditions.isEmpty()) {
return true;
}
boolean result = evaluateCondition(conditions.get(0), attributes);
for (int i = 1; i < conditions.size(); i++) {
MatchCondition condition = conditions.get(i);
boolean conditionResult = evaluateCondition(condition, attributes);
// 根据逻辑操作符组合结果
if (condition.getLogicalOperator() == LogicalOperator.AND) {
result = result && conditionResult;
} else if (condition.getLogicalOperator() == LogicalOperator.OR) {
result = result || conditionResult;
}
}
return result;
}
/**
* 评估单个条件
*/
private boolean evaluateCondition(MatchCondition condition, Map<String, Object> attributes) {
Object actualValue = attributes.get(condition.getFieldName());
Object expectedValue = condition.getExpectedValue();
if (actualValue == null) {
return condition.getOperator() == Operator.IS_NULL;
}
switch (condition.getOperator()) {
case EQUALS:
return Objects.equals(actualValue, expectedValue);
case NOT_EQUALS:
return !Objects.equals(actualValue, expectedValue);
case GREATER_THAN:
return compareValues(actualValue, expectedValue) > 0;
case GREATER_THAN_OR_EQUAL:
return compareValues(actualValue, expectedValue) >= 0;
case LESS_THAN:
return compareValues(actualValue, expectedValue) < 0;
case LESS_THAN_OR_EQUAL:
return compareValues(actualValue, expectedValue) <= 0;
case CONTAINS:
return actualValue.toString().contains(expectedValue.toString());
case STARTS_WITH:
return actualValue.toString().startsWith(expectedValue.toString());
case ENDS_WITH:
return actualValue.toString().endsWith(expectedValue.toString());
case REGEX:
return actualValue.toString().matches(expectedValue.toString());
case IN:
return isValueInList(actualValue, expectedValue);
case NOT_IN:
return !isValueInList(actualValue, expectedValue);
case IS_NULL:
return false; // actualValue不为null
case IS_NOT_NULL:
return true; // actualValue不为null
default:
return false;
}
}
/**
* 比较值大小
*/
@SuppressWarnings("unchecked")
private int compareValues(Object actual, Object expected) {
if (actual instanceof Comparable && expected instanceof Comparable) {
try {
return ((Comparable<Object>) actual).compareTo(expected);
} catch (ClassCastException e) {
// 类型不匹配,转换为字符串比较
return actual.toString().compareTo(expected.toString());
}
}
return actual.toString().compareTo(expected.toString());
}
/**
* 检查值是否在列表中
*/
private boolean isValueInList(Object value, Object listValue) {
if (listValue instanceof List) {
return ((List<?>) listValue).contains(value);
} else if (listValue instanceof String) {
// 支持逗号分隔的字符串
String[] items = listValue.toString().split(",");
return Arrays.asList(items).contains(value.toString());
}
return false;
}
/**
* 选择最佳规则
*/
private RoutingRule selectBestRule(List<RoutingRule> matchedRules, StandardMessage message) {
// 已经按优先级排序,选择第一个
return matchedRules.get(0);
}
/**
* 检查限流
*/
private boolean checkRateLimit(StandardMessage message, RoutingRule rule) {
if (rule.getRateLimitConfig() == null) {
return true;
}
String limitKey = buildRateLimitKey(message, rule);
return rateLimiter.tryAcquire(limitKey, rule.getRateLimitConfig());
}
/**
* 构建限流键
*/
private String buildRateLimitKey(StandardMessage message, RoutingRule rule) {
return String.format("route:%s:device:%s", rule.getRuleId(), message.getDeviceId());
}
/**
* 获取可用的路由目标
*/
private List<RouteTarget> getAvailableTargets(RoutingRule rule) {
return rule.getRouteTargets().stream()
.filter(RouteTarget::isAvailable)
.filter(target -> healthChecker.isHealthy(target))
.collect(Collectors.toList());
}
/**
* 选择路由目标
*/
private RouteTarget selectTarget(List<RouteTarget> targets, RoutingStrategy strategy, StandardMessage message) {
switch (strategy) {
case ROUND_ROBIN:
return loadBalancer.roundRobin(targets);
case WEIGHTED_ROUND_ROBIN:
return loadBalancer.weightedRoundRobin(targets);
case LEAST_CONNECTIONS:
return loadBalancer.leastConnections(targets);
case RANDOM:
return loadBalancer.random(targets);
case HASH:
return loadBalancer.hash(targets, message.getDeviceId());
case PRIORITY:
return loadBalancer.priority(targets);
default:
return targets.get(0);
}
}
/**
* 执行路由
*/
private RoutingResult executeRouting(StandardMessage message, RouteTarget target,
RoutingRule rule, RoutingContext context) {
try {
// 根据目标类型执行不同的路由逻辑
switch (target.getTargetType()) {
case MESSAGE_QUEUE:
return routeToMessageQueue(message, target, context);
case HTTP_ENDPOINT:
return routeToHttpEndpoint(message, target, context);
case DATABASE:
return routeToDatabase(message, target, context);
case FILE_SYSTEM:
return routeToFileSystem(message, target, context);
case KAFKA_TOPIC:
return routeToKafkaTopic(message, target, context);
default:
return RoutingResult.failure("UNSUPPORTED_TARGET_TYPE",
"不支持的目标类型:" + target.getTargetType());
}
} catch (Exception e) {
log.error("路由执行失败:targetId={}, messageId={}",
target.getTargetId(), message.getMessageId(), e);
// 尝试故障转移
return handleFailover(message, rule, context, e);
}
}
/**
* 路由到消息队列
*/
private RoutingResult routeToMessageQueue(StandardMessage message, RouteTarget target, RoutingContext context) {
// 实现消息队列路由逻辑
log.debug("路由消息到队列:targetId={}, messageId={}", target.getTargetId(), message.getMessageId());
// 这里应该实现具体的消息队列发送逻辑
// 例如:rabbitTemplate.send(queueName, message);
return RoutingResult.success(target, "消息已发送到队列");
}
/**
* 路由到HTTP端点
*/
private RoutingResult routeToHttpEndpoint(StandardMessage message, RouteTarget target, RoutingContext context) {
// 实现HTTP端点路由逻辑
log.debug("路由消息到HTTP端点:targetId={}, messageId={}", target.getTargetId(), message.getMessageId());
// 这里应该实现具体的HTTP请求发送逻辑
// 例如:restTemplate.postForObject(url, message, String.class);
return RoutingResult.success(target, "消息已发送到HTTP端点");
}
/**
* 路由到数据库
*/
private RoutingResult routeToDatabase(StandardMessage message, RouteTarget target, RoutingContext context) {
// 实现数据库路由逻辑
log.debug("路由消息到数据库:targetId={}, messageId={}", target.getTargetId(), message.getMessageId());
// 这里应该实现具体的数据库存储逻辑
// 例如:messageRepository.save(message);
return RoutingResult.success(target, "消息已存储到数据库");
}
/**
* 路由到文件系统
*/
private RoutingResult routeToFileSystem(StandardMessage message, RouteTarget target, RoutingContext context) {
// 实现文件系统路由逻辑
log.debug("路由消息到文件系统:targetId={}, messageId={}", target.getTargetId(), message.getMessageId());
// 这里应该实现具体的文件写入逻辑
return RoutingResult.success(target, "消息已写入文件系统");
}
/**
* 路由到Kafka主题
*/
private RoutingResult routeToKafkaTopic(StandardMessage message, RouteTarget target, RoutingContext context) {
// 实现Kafka主题路由逻辑
log.debug("路由消息到Kafka主题:targetId={}, messageId={}", target.getTargetId(), message.getMessageId());
// 这里应该实现具体的Kafka发送逻辑
// 例如:kafkaTemplate.send(topicName, message);
return RoutingResult.success(target, "消息已发送到Kafka主题");
}
/**
* 处理故障转移
*/
private RoutingResult handleFailover(StandardMessage message, RoutingRule rule,
RoutingContext context, Exception originalError) {
if (rule.getFailoverConfig() == null || !rule.getFailoverConfig().isEnabled()) {
return RoutingResult.failure("ROUTING_FAILED", originalError.getMessage());
}
// 获取备用目标
List<RouteTarget> backupTargets = rule.getFailoverConfig().getBackupTargets();
if (backupTargets.isEmpty()) {
return RoutingResult.failure("NO_BACKUP_TARGET", "没有可用的备用目标");
}
// 尝试备用目标
for (RouteTarget backupTarget : backupTargets) {
if (backupTarget.isAvailable() && healthChecker.isHealthy(backupTarget)) {
try {
return executeRouting(message, backupTarget, rule, context);
} catch (Exception e) {
log.warn("备用目标也失败:targetId={}", backupTarget.getTargetId(), e);
}
}
}
return RoutingResult.failure("ALL_TARGETS_FAILED", "所有目标都不可用");
}
/**
* 更新统计信息
*/
private void updateStatistics(RoutingResult result, long routingTime) {
statistics.incrementTotalCount();
if (result.isSuccess()) {
statistics.incrementSuccessCount();
} else {
statistics.incrementFailureCount();
}
statistics.updateAverageRoutingTime(routingTime);
// 发送指标到监控系统
metricsCollector.recordRoutingMetrics(result, routingTime);
}
@Override
public List<RoutingResult> batchRoute(List<StandardMessage> messages, RoutingContext routingContext) {
return messages.parallelStream()
.map(message -> route(message, routingContext))
.collect(Collectors.toList());
}
@Override
public void registerRoutingRule(RoutingRule rule) {
if (validateRule(rule)) {
ruleCache.put(rule.getRuleId(), rule);
ruleRepository.save(rule);
log.info("路由规则已注册:ruleId={}, ruleName={}", rule.getRuleId(), rule.getRuleName());
} else {
throw new IllegalArgumentException("无效的路由规则");
}
}
@Override
public void removeRoutingRule(String ruleId) {
ruleCache.remove(ruleId);
ruleRepository.deleteById(ruleId);
log.info("路由规则已移除:ruleId={}", ruleId);
}
@Override
public RoutingStatistics getStatistics() {
return statistics.copy();
}
@Override
public HealthStatus checkHealth() {
// 检查各个组件的健康状态
boolean healthy = true;
StringBuilder details = new StringBuilder();
// 检查规则缓存
if (ruleCache.isEmpty()) {
healthy = false;
details.append("没有可用的路由规则; ");
}
// 检查负载均衡器
if (!loadBalancer.isHealthy()) {
healthy = false;
details.append("负载均衡器不健康; ");
}
// 检查限流器
if (!rateLimiter.isHealthy()) {
healthy = false;
details.append("限流器不健康; ");
}
return HealthStatus.builder()
.healthy(healthy)
.details(details.toString())
.checkTime(LocalDateTime.now())
.build();
}
/**
* 验证路由规则
*/
private boolean validateRule(RoutingRule rule) {
if (rule == null || rule.getRuleId() == null || rule.getRuleId().isEmpty()) {
return false;
}
if (rule.getRouteTargets() == null || rule.getRouteTargets().isEmpty()) {
return false;
}
// 验证匹配条件
if (rule.getMatchConditions() != null) {
for (MatchCondition condition : rule.getMatchConditions()) {
if (condition.getFieldName() == null || condition.getOperator() == null) {
return false;
}
}
}
return true;
}
/**
* 初始化方法 - 加载路由规则
*/
@PostConstruct
public void initialize() {
// 从数据库加载路由规则
List<RoutingRule> rules = ruleRepository.findAllEnabled();
for (RoutingRule rule : rules) {
ruleCache.put(rule.getRuleId(), rule);
}
log.info("消息路由器初始化完成,加载了{}条路由规则", rules.size());
}
}
# 4.5 路由结果封装
/**
* 路由结果 - "投递结果报告"
*/
@Data
@Builder
public class RoutingResult {
/**
* 路由是否成功
*/
private boolean success;
/**
* 选中的路由目标
*/
private RouteTarget selectedTarget;
/**
* 使用的路由规则
*/
private RoutingRule appliedRule;
/**
* 路由耗时(毫秒)
*/
private long routingTime;
/**
* 结果消息
*/
private String message;
/**
* 错误代码
*/
private String errorCode;
/**
* 错误详情
*/
private String errorDetails;
/**
* 路由跟踪信息
*/
private List<RouteTrace> traces;
/**
* 创建成功结果
*/
public static RoutingResult success(RouteTarget target, String message) {
return RoutingResult.builder()
.success(true)
.selectedTarget(target)
.message(message)
.traces(new ArrayList<>())
.build();
}
/**
* 创建失败结果
*/
public static RoutingResult failure(String errorCode, String errorDetails) {
return RoutingResult.builder()
.success(false)
.errorCode(errorCode)
.errorDetails(errorDetails)
.traces(new ArrayList<>())
.build();
}
/**
* 添加跟踪信息
*/
public void addTrace(String step, String details) {
if (traces == null) {
traces = new ArrayList<>();
}
traces.add(RouteTrace.builder()
.step(step)
.details(details)
.timestamp(LocalDateTime.now())
.build());
}
}
/**
* 路由跟踪信息
*/
@Data
@Builder
public class RouteTrace {
private String step; // 步骤名称
private String details; // 详细信息
private LocalDateTime timestamp; // 时间戳
}
# 5. 实际应用示例
# 5.1 智能家居场景
让我们通过一个智能家居的故事来看看协议适配层是如何工作的:
故事背景:小明家里有各种智能设备:温度传感器(MQTT协议)、智能插座(HTTP协议)、门锁(CoAP协议)。这些设备需要统一管理。
/**
* 智能家居协议适配器配置
*/
@Configuration
public class SmartHomeAdapterConfig {
/**
* 配置MQTT温度传感器解析器
*/
@Bean
public ProtocolParser temperatureSensorParser() {
return MqttProtocolParser.builder()
.deviceType("TEMPERATURE_SENSOR")
.topicPattern("home/+/temperature")
.messageFormat(MessageFormat.JSON)
.build();
}
/**
* 配置HTTP智能插座解析器
*/
@Bean
public ProtocolParser smartPlugParser() {
return HttpProtocolParser.builder()
.deviceType("SMART_PLUG")
.endpoint("/api/plug/status")
.method(HttpMethod.POST)
.contentType("application/json")
.build();
}
/**
* 配置CoAP门锁解析器
*/
@Bean
public ProtocolParser doorLockParser() {
return CoapProtocolParser.builder()
.deviceType("DOOR_LOCK")
.resourcePath("/lock/status")
.messageFormat(MessageFormat.CBOR)
.build();
}
/**
* 配置数据转换规则
*/
@Bean
public DataTransformer smartHomeTransformer() {
List<TransformRule> rules = Arrays.asList(
// 温度数据转换规则
TransformRule.builder()
.sourceField("temp")
.targetField("temperature")
.dataType(DataType.DOUBLE)
.unit("°C")
.validationRule("value >= -50 && value <= 100")
.build(),
// 插座状态转换规则
TransformRule.builder()
.sourceField("power_state")
.targetField("status")
.dataType(DataType.STRING)
.mappingScript("value == 1 ? 'ON' : 'OFF'")
.build(),
// 门锁状态转换规则
TransformRule.builder()
.sourceField("lock_status")
.targetField("locked")
.dataType(DataType.BOOLEAN)
.mappingScript("value == 'LOCKED'")
.build()
);
return new UniversalDataTransformer(rules);
}
/**
* 配置消息路由规则
*/
@Bean
public List<RoutingRule> smartHomeRoutingRules() {
return Arrays.asList(
// 温度数据路由到时序数据库
RoutingRule.builder()
.ruleId("temperature-to-tsdb")
.ruleName("温度数据路由")
.matchConditions(Arrays.asList(
MatchCondition.builder()
.fieldName("messageType")
.operator(Operator.EQUALS)
.expectedValue("DATA")
.logicalOperator(LogicalOperator.AND)
.build(),
MatchCondition.builder()
.fieldName("data.temperature")
.operator(Operator.IS_NOT_NULL)
.logicalOperator(LogicalOperator.AND)
.build()
))
.routeTargets(Arrays.asList(
RouteTarget.builder()
.targetId("influxdb-cluster")
.targetName("时序数据库集群")
.targetType(TargetType.DATABASE)
.weight(100)
.available(true)
.build()
))
.strategy(RoutingStrategy.ROUND_ROBIN)
.priority(100)
.enabled(true)
.build(),
// 安全事件路由到告警系统
RoutingRule.builder()
.ruleId("security-alert")
.ruleName("安全告警路由")
.matchConditions(Arrays.asList(
MatchCondition.builder()
.fieldName("messageType")
.operator(Operator.EQUALS)
.expectedValue("EVENT")
.logicalOperator(LogicalOperator.AND)
.build(),
MatchCondition.builder()
.fieldName("data.locked")
.operator(Operator.EQUALS)
.expectedValue(false)
.logicalOperator(LogicalOperator.AND)
.build(),
MatchCondition.builder()
.fieldName("hour")
.operator(Operator.GREATER_THAN_OR_EQUAL)
.expectedValue(22)
.logicalOperator(LogicalOperator.OR)
.build(),
MatchCondition.builder()
.fieldName("hour")
.operator(Operator.LESS_THAN_OR_EQUAL)
.expectedValue(6)
.build()
))
.routeTargets(Arrays.asList(
RouteTarget.builder()
.targetId("alert-system")
.targetName("告警系统")
.targetType(TargetType.HTTP_ENDPOINT)
.weight(100)
.available(true)
.build(),
RouteTarget.builder()
.targetId("sms-gateway")
.targetName("短信网关")
.targetType(TargetType.HTTP_ENDPOINT)
.weight(50)
.available(true)
.build()
))
.strategy(RoutingStrategy.BROADCAST)
.priority(200)
.enabled(true)
.rateLimitConfig(RateLimitConfig.builder()
.maxRequests(10)
.timeWindow(Duration.ofMinutes(1))
.build())
.build()
);
}
}
# 5.2 工业物联网场景
故事背景:某工厂有多条生产线,每条生产线上有温度、压力、振动等传感器,使用不同的工业协议(Modbus、OPC-UA、Profinet等)。
/**
* 工业物联网协议适配器
*/
@Component
@Slf4j
public class IndustrialIoTAdapter {
@Autowired
private ProtocolParserFactory parserFactory;
@Autowired
private DataTransformer dataTransformer;
@Autowired
private MessageRouter messageRouter;
/**
* 处理生产线数据
*/
@EventListener
public void handleProductionLineData(ProductionLineDataEvent event) {
try {
// 第一步:解析协议数据
ProtocolParser parser = parserFactory.getParser(event.getProtocolType());
ParseResult parseResult = parser.parse(event.getRawData(), event.getParseContext());
if (!parseResult.isSuccess()) {
log.error("生产线数据解析失败:line={}, error={}",
event.getProductionLine(), parseResult.getErrorMessage());
return;
}
StandardMessage message = parseResult.getMessage();
// 第二步:数据转换和质量检查
TransformResult transformResult = dataTransformer.transform(message, buildTransformContext(event));
if (!transformResult.isSuccess()) {
log.error("生产线数据转换失败:line={}, error={}",
event.getProductionLine(), transformResult.getErrorMessage());
return;
}
// 第三步:数据质量评估
double qualityScore = transformResult.getQualityScore();
if (qualityScore < 0.8) {
log.warn("生产线数据质量较低:line={}, score={}",
event.getProductionLine(), qualityScore);
// 低质量数据路由到数据清洗服务
routeToDataCleaning(transformResult.getTransformedMessage(), event);
return;
}
// 第四步:消息路由
RoutingContext routingContext = RoutingContext.builder()
.attribute("productionLine", event.getProductionLine())
.attribute("shift", getCurrentShift())
.attribute("qualityScore", qualityScore)
.build();
RoutingResult routingResult = messageRouter.route(
transformResult.getTransformedMessage(), routingContext);
if (routingResult.isSuccess()) {
log.debug("生产线数据路由成功:line={}, target={}",
event.getProductionLine(), routingResult.getSelectedTarget().getTargetName());
} else {
log.error("生产线数据路由失败:line={}, error={}",
event.getProductionLine(), routingResult.getErrorDetails());
}
} catch (Exception e) {
log.error("处理生产线数据时发生异常:line={}", event.getProductionLine(), e);
}
}
/**
* 构建转换上下文
*/
private TransformContext buildTransformContext(ProductionLineDataEvent event) {
return TransformContext.builder()
.sourceSystem("PRODUCTION_LINE_" + event.getProductionLine())
.targetSystem("MES_SYSTEM")
.transformationTime(LocalDateTime.now())
.attribute("productionLine", event.getProductionLine())
.attribute("equipmentId", event.getEquipmentId())
.attribute("shift", getCurrentShift())
.build();
}
/**
* 路由到数据清洗服务
*/
private void routeToDataCleaning(StandardMessage message, ProductionLineDataEvent event) {
// 创建数据清洗任务
DataCleaningTask task = DataCleaningTask.builder()
.messageId(message.getMessageId())
.productionLine(event.getProductionLine())
.rawData(event.getRawData())
.parsedMessage(message)
.priority(TaskPriority.HIGH)
.build();
// 发送到数据清洗队列
// dataCleaningService.submitTask(task);
log.info("已提交数据清洗任务:line={}, messageId={}",
event.getProductionLine(), message.getMessageId());
}
/**
* 获取当前班次
*/
private String getCurrentShift() {
int hour = LocalDateTime.now().getHour();
if (hour >= 8 && hour < 16) {
return "DAY_SHIFT";
} else if (hour >= 16 && hour < 24) {
return "EVENING_SHIFT";
} else {
return "NIGHT_SHIFT";
}
}
}
# 6. 配置管理
# 6.1 动态配置
协议适配层支持动态配置,就像一个可以随时调整的"智能开关":
/**
* 协议适配器配置管理器
*/
@Component
@Slf4j
public class ProtocolAdapterConfigManager {
@Autowired
private ConfigurationRepository configRepository;
@Autowired
private ApplicationEventPublisher eventPublisher;
// 配置缓存
private final Map<String, AdapterConfig> configCache = new ConcurrentHashMap<>();
// 配置监听器
private final List<ConfigChangeListener> listeners = new CopyOnWriteArrayList<>();
/**
* 获取适配器配置
*/
public AdapterConfig getConfig(String adapterId) {
return configCache.computeIfAbsent(adapterId, this::loadConfigFromRepository);
}
/**
* 更新适配器配置
*/
public void updateConfig(String adapterId, AdapterConfig newConfig) {
AdapterConfig oldConfig = configCache.get(adapterId);
// 验证配置
if (!validateConfig(newConfig)) {
throw new IllegalArgumentException("无效的适配器配置");
}
// 保存到数据库
configRepository.save(adapterId, newConfig);
// 更新缓存
configCache.put(adapterId, newConfig);
// 通知监听器
notifyConfigChange(adapterId, oldConfig, newConfig);
log.info("适配器配置已更新:adapterId={}", adapterId);
}
/**
* 注册配置变更监听器
*/
public void addConfigChangeListener(ConfigChangeListener listener) {
listeners.add(listener);
}
/**
* 从数据库加载配置
*/
private AdapterConfig loadConfigFromRepository(String adapterId) {
return configRepository.findById(adapterId)
.orElse(createDefaultConfig(adapterId));
}
/**
* 创建默认配置
*/
private AdapterConfig createDefaultConfig(String adapterId) {
return AdapterConfig.builder()
.adapterId(adapterId)
.enabled(true)
.maxConcurrency(100)
.timeoutMs(5000)
.retryCount(3)
.batchSize(50)
.build();
}
/**
* 验证配置
*/
private boolean validateConfig(AdapterConfig config) {
if (config == null || config.getAdapterId() == null) {
return false;
}
if (config.getMaxConcurrency() <= 0 || config.getMaxConcurrency() > 1000) {
return false;
}
if (config.getTimeoutMs() <= 0 || config.getTimeoutMs() > 60000) {
return false;
}
return true;
}
/**
* 通知配置变更
*/
private void notifyConfigChange(String adapterId, AdapterConfig oldConfig, AdapterConfig newConfig) {
ConfigChangeEvent event = ConfigChangeEvent.builder()
.adapterId(adapterId)
.oldConfig(oldConfig)
.newConfig(newConfig)
.changeTime(LocalDateTime.now())
.build();
// 同步通知监听器
listeners.forEach(listener -> {
try {
listener.onConfigChange(event);
} catch (Exception e) {
log.error("配置变更监听器执行失败:adapterId={}", adapterId, e);
}
});
// 异步发布事件
eventPublisher.publishEvent(event);
}
/**
* 定期刷新配置
*/
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void refreshConfigs() {
configCache.keySet().forEach(adapterId -> {
try {
AdapterConfig latestConfig = configRepository.findById(adapterId).orElse(null);
if (latestConfig != null && !latestConfig.equals(configCache.get(adapterId))) {
updateConfig(adapterId, latestConfig);
}
} catch (Exception e) {
log.error("刷新配置失败:adapterId={}", adapterId, e);
}
});
}
}
/**
* 适配器配置
*/
@Data
@Builder
public class AdapterConfig {
private String adapterId; // 适配器ID
private boolean enabled; // 是否启用
private int maxConcurrency; // 最大并发数
private long timeoutMs; // 超时时间(毫秒)
private int retryCount; // 重试次数
private int batchSize; // 批处理大小
private Map<String, Object> properties; // 扩展属性
private LocalDateTime updateTime; // 更新时间
}
/**
* 配置变更监听器
*/
public interface ConfigChangeListener {
void onConfigChange(ConfigChangeEvent event);
}
/**
* 配置变更事件
*/
@Data
@Builder
public class ConfigChangeEvent {
private String adapterId;
private AdapterConfig oldConfig;
private AdapterConfig newConfig;
private LocalDateTime changeTime;
}
# 7. 监控和指标
# 7.1 性能监控
协议适配层需要全面的监控,就像医生需要监控病人的各项生命体征:
/**
* 协议适配器监控指标收集器
*/
@Component
@Slf4j
public class ProtocolAdapterMetricsCollector {
@Autowired
private MeterRegistry meterRegistry;
// 计数器
private final Counter parseSuccessCounter;
private final Counter parseFailureCounter;
private final Counter transformSuccessCounter;
private final Counter transformFailureCounter;
private final Counter routingSuccessCounter;
private final Counter routingFailureCounter;
// 计时器
private final Timer parseTimer;
private final Timer transformTimer;
private final Timer routingTimer;
// 仪表盘
private final Gauge activeConnectionsGauge;
private final Gauge queueSizeGauge;
public ProtocolAdapterMetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// 初始化计数器
this.parseSuccessCounter = Counter.builder("protocol.parse.success")
.description("协议解析成功次数")
.register(meterRegistry);
this.parseFailureCounter = Counter.builder("protocol.parse.failure")
.description("协议解析失败次数")
.register(meterRegistry);
this.transformSuccessCounter = Counter.builder("data.transform.success")
.description("数据转换成功次数")
.register(meterRegistry);
this.transformFailureCounter = Counter.builder("data.transform.failure")
.description("数据转换失败次数")
.register(meterRegistry);
this.routingSuccessCounter = Counter.builder("message.routing.success")
.description("消息路由成功次数")
.register(meterRegistry);
this.routingFailureCounter = Counter.builder("message.routing.failure")
.description("消息路由失败次数")
.register(meterRegistry);
// 初始化计时器
this.parseTimer = Timer.builder("protocol.parse.duration")
.description("协议解析耗时")
.register(meterRegistry);
this.transformTimer = Timer.builder("data.transform.duration")
.description("数据转换耗时")
.register(meterRegistry);
this.routingTimer = Timer.builder("message.routing.duration")
.description("消息路由耗时")
.register(meterRegistry);
// 初始化仪表盘
this.activeConnectionsGauge = Gauge.builder("protocol.connections.active")
.description("活跃连接数")
.register(meterRegistry, this, ProtocolAdapterMetricsCollector::getActiveConnections);
this.queueSizeGauge = Gauge.builder("protocol.queue.size")
.description("队列大小")
.register(meterRegistry, this, ProtocolAdapterMetricsCollector::getQueueSize);
}
/**
* 记录解析指标
*/
public void recordParseMetrics(ParseResult result, long duration, String protocolType) {
Timer.Sample sample = Timer.start(meterRegistry);
sample.stop(parseTimer.tag("protocol", protocolType));
if (result.isSuccess()) {
parseSuccessCounter.increment(Tags.of("protocol", protocolType));
} else {
parseFailureCounter.increment(Tags.of(
"protocol", protocolType,
"error", result.getErrorCode()
));
}
}
/**
* 记录转换指标
*/
public void recordTransformMetrics(TransformResult result, long duration, String transformType) {
Timer.Sample sample = Timer.start(meterRegistry);
sample.stop(transformTimer.tag("type", transformType));
if (result.isSuccess()) {
transformSuccessCounter.increment(Tags.of("type", transformType));
// 记录数据质量分数
Gauge.builder("data.quality.score")
.description("数据质量分数")
.tag("type", transformType)
.register(meterRegistry, () -> result.getQualityScore());
} else {
transformFailureCounter.increment(Tags.of(
"type", transformType,
"error", result.getErrorCode()
));
}
}
/**
* 记录路由指标
*/
public void recordRoutingMetrics(RoutingResult result, long duration) {
Timer.Sample sample = Timer.start(meterRegistry);
sample.stop(routingTimer);
if (result.isSuccess()) {
routingSuccessCounter.increment(Tags.of(
"target", result.getSelectedTarget().getTargetName(),
"strategy", result.getAppliedRule().getStrategy().name()
));
} else {
routingFailureCounter.increment(Tags.of(
"error", result.getErrorCode()
));
}
}
/**
* 获取活跃连接数
*/
private double getActiveConnections() {
// 这里应该返回实际的活跃连接数
// 例如:return connectionManager.getActiveConnectionCount();
return 0;
}
/**
* 获取队列大小
*/
private double getQueueSize() {
// 这里应该返回实际的队列大小
// 例如:return messageQueue.size();
return 0;
}
/**
* 创建自定义指标
*/
public void createCustomMetric(String name, String description, double value, Tags tags) {
Gauge.builder(name)
.description(description)
.tags(tags)
.register(meterRegistry, () -> value);
}
}
# 7.2 健康检查
/**
* 协议适配器健康检查
*/
@Component
public class ProtocolAdapterHealthIndicator implements HealthIndicator {
@Autowired
private ProtocolParserFactory parserFactory;
@Autowired
private DataTransformer dataTransformer;
@Autowired
private MessageRouter messageRouter;
@Autowired
private ProtocolAdapterConfigManager configManager;
@Override
public Health health() {
Health.Builder builder = new Health.Builder();
try {
// 检查协议解析器
checkProtocolParsers(builder);
// 检查数据转换器
checkDataTransformer(builder);
// 检查消息路由器
checkMessageRouter(builder);
// 检查配置管理器
checkConfigManager(builder);
// 如果所有检查都通过,标记为健康
builder.up();
} catch (Exception e) {
builder.down(e);
}
return builder.build();
}
private void checkProtocolParsers(Health.Builder builder) {
List<String> availableParsers = parserFactory.getAvailableParserTypes();
builder.withDetail("availableParsers", availableParsers);
if (availableParsers.isEmpty()) {
throw new RuntimeException("没有可用的协议解析器");
}
// 检查每个解析器的健康状态
for (String parserType : availableParsers) {
ProtocolParser parser = parserFactory.getParser(parserType);
HealthStatus status = parser.checkHealth();
builder.withDetail("parser." + parserType, status.isHealthy() ? "UP" : "DOWN");
if (!status.isHealthy()) {
throw new RuntimeException("协议解析器不健康:" + parserType);
}
}
}
private void checkDataTransformer(Health.Builder builder) {
HealthStatus status = dataTransformer.checkHealth();
builder.withDetail("dataTransformer", status.isHealthy() ? "UP" : "DOWN");
if (!status.isHealthy()) {
throw new RuntimeException("数据转换器不健康:" + status.getDetails());
}
}
private void checkMessageRouter(Health.Builder builder) {
HealthStatus status = messageRouter.checkHealth();
builder.withDetail("messageRouter", status.isHealthy() ? "UP" : "DOWN");
if (!status.isHealthy()) {
throw new RuntimeException("消息路由器不健康:" + status.getDetails());
}
}
private void checkConfigManager(Health.Builder builder) {
try {
// 尝试获取一个配置来验证配置管理器是否正常
AdapterConfig testConfig = configManager.getConfig("test");
builder.withDetail("configManager", "UP");
} catch (Exception e) {
builder.withDetail("configManager", "DOWN");
throw new RuntimeException("配置管理器不健康:" + e.getMessage());
}
}
}
# 8. 最佳实践
# 8.1 性能优化建议
- 批处理优化:就像快递员一次送多个包裹比一次送一个包裹效率更高
/**
* 批处理优化示例
*/
@Component
public class BatchProcessingOptimizer {
@Autowired
private DataTransformer dataTransformer;
@Autowired
private MessageRouter messageRouter;
/**
* 批量处理消息
*/
public void processBatch(List<StandardMessage> messages) {
// 按设备类型分组
Map<String, List<StandardMessage>> groupedMessages = messages.stream()
.collect(Collectors.groupingBy(msg ->
msg.getHeader().getDeviceType()));
// 并行处理每个分组
groupedMessages.entrySet().parallelStream().forEach(entry -> {
String deviceType = entry.getKey();
List<StandardMessage> deviceMessages = entry.getValue();
// 批量转换
List<TransformResult> transformResults = dataTransformer
.batchTransform(deviceMessages, buildTransformContext(deviceType));
// 批量路由
List<StandardMessage> transformedMessages = transformResults.stream()
.filter(TransformResult::isSuccess)
.map(TransformResult::getTransformedMessage)
.collect(Collectors.toList());
messageRouter.batchRoute(transformedMessages, buildRoutingContext(deviceType));
});
}
private TransformContext buildTransformContext(String deviceType) {
return TransformContext.builder()
.sourceSystem(deviceType)
.targetSystem("UNIFIED_SYSTEM")
.transformationTime(LocalDateTime.now())
.build();
}
private RoutingContext buildRoutingContext(String deviceType) {
return RoutingContext.builder()
.attribute("deviceType", deviceType)
.attribute("batchProcessing", true)
.build();
}
}
- 缓存策略:就像图书馆把常用的书放在显眼的地方
/**
* 缓存优化示例
*/
@Component
public class CacheOptimizer {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 解析结果缓存
private final Cache<String, ParseResult> parseResultCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(Duration.ofMinutes(30))
.build();
// 转换规则缓存
private final Cache<String, List<TransformRule>> transformRuleCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(Duration.ofHours(1))
.build();
/**
* 缓存解析结果
*/
public void cacheParseResult(String key, ParseResult result) {
if (result.isSuccess()) {
parseResultCache.put(key, result);
}
}
/**
* 获取缓存的解析结果
*/
public ParseResult getCachedParseResult(String key) {
return parseResultCache.getIfPresent(key);
}
/**
* 缓存转换规则
*/
public void cacheTransformRules(String deviceType, List<TransformRule> rules) {
transformRuleCache.put(deviceType, rules);
// 同时缓存到Redis(分布式缓存)
redisTemplate.opsForValue().set(
"transform_rules:" + deviceType,
rules,
Duration.ofHours(2)
);
}
/**
* 获取缓存的转换规则
*/
@SuppressWarnings("unchecked")
public List<TransformRule> getCachedTransformRules(String deviceType) {
// 先从本地缓存获取
List<TransformRule> rules = transformRuleCache.getIfPresent(deviceType);
if (rules != null) {
return rules;
}
// 再从Redis获取
rules = (List<TransformRule>) redisTemplate.opsForValue()
.get("transform_rules:" + deviceType);
if (rules != null) {
transformRuleCache.put(deviceType, rules);
}
return rules;
}
}
# 8.2 错误处理和恢复
/**
* 错误处理和恢复机制
*/
@Component
@Slf4j
public class ErrorHandlingAndRecovery {
@Autowired
private DeadLetterQueue deadLetterQueue;
@Autowired
private AlertService alertService;
/**
* 处理解析错误
*/
@Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000))
public ParseResult handleParseError(byte[] rawData, String protocolType, Exception error) {
log.error("协议解析失败,尝试恢复:protocol={}", protocolType, error);
try {
// 尝试使用备用解析器
ProtocolParser backupParser = getBackupParser(protocolType);
if (backupParser != null) {
return backupParser.parse(rawData, createFallbackContext());
}
// 尝试通用解析器
ProtocolParser genericParser = getGenericParser();
return genericParser.parse(rawData, createGenericContext());
} catch (Exception e) {
log.error("所有解析尝试都失败", e);
// 发送到死信队列
deadLetterQueue.send(DeadLetterMessage.builder()
.originalData(rawData)
.protocolType(protocolType)
.errorMessage(e.getMessage())
.timestamp(LocalDateTime.now())
.build());
// 发送告警
alertService.sendAlert(Alert.builder()
.level(AlertLevel.ERROR)
.title("协议解析失败")
.message("协议类型:" + protocolType + ",错误:" + e.getMessage())
.build());
return ParseResult.failure("PARSE_FAILED", e.getMessage());
}
}
/**
* 处理转换错误
*/
@Recover
public TransformResult handleTransformError(Exception error, StandardMessage message) {
log.error("数据转换失败,执行恢复策略:messageId={}", message.getMessageId(), error);
// 创建降级的转换结果
StandardMessage degradedMessage = createDegradedMessage(message);
return TransformResult.builder()
.success(true)
.transformedMessage(degradedMessage)
.qualityScore(0.5) // 降级质量分数
.errorMessage("使用降级策略")
.build();
}
/**
* 创建降级消息
*/
private StandardMessage createDegradedMessage(StandardMessage original) {
return StandardMessage.builder()
.messageId(original.getMessageId())
.deviceId(original.getDeviceId())
.messageType(MessageType.DATA)
.priority(MessagePriority.LOW)
.sourceProtocol(original.getSourceProtocol())
.timestamp(original.getTimestamp())
.header(original.getHeader())
.body(MessageBody.builder()
.dataPoints(Map.of("status", "degraded"))
.build())
.build();
}
private ProtocolParser getBackupParser(String protocolType) {
// 返回备用解析器
return null;
}
private ProtocolParser getGenericParser() {
// 返回通用解析器
return null;
}
private ParseContext createFallbackContext() {
return ParseContext.builder()
.fallbackMode(true)
.build();
}
private ParseContext createGenericContext() {
return ParseContext.builder()
.genericMode(true)
.build();
}
}
# 9. 总结
协议适配层就像一个智能的"翻译中心",它:
- 协议解析器:像专业翻译官,能够理解各种"方言"(协议)
- 数据转换器:像智能编辑,能够将内容转换成统一的"标准格式"
- 消息路由器:像高效的邮局调度员,能够将消息准确送达目的地
通过这三个核心组件的协同工作,协议适配层实现了:
- 统一接入:支持多种协议的设备接入
- 数据标准化:将异构数据转换为统一格式
- 智能路由:根据规则将消息路由到合适的目标
- 高可用性:具备故障转移和恢复能力
- 高性能:支持批处理和缓存优化
- 可监控:提供全面的监控和健康检查
这样的设计让物联网系统能够轻松应对各种复杂的设备接入场景,就像一个经验丰富的"万能翻译",无论遇到什么样的"语言",都能准确理解并妥善处理。
# 2.3 解析结果封装
/**
* 协议解析结果 - 翻译的成果
*
* 就像翻译官不仅要翻译内容,还要告诉你翻译的质量、
* 是否有疑问等信息一样
*/
@Data
@Builder
public class ParseResult {
/**
* 解析是否成功 - "翻译成功了吗?"
*/
private boolean success;
/**
* 解析后的标准消息 - "翻译的内容"
*/
private StandardMessage message;
/**
* 错误信息 - "翻译遇到的问题"
*/
private String errorMessage;
/**
* 错误代码 - "问题的类型编号"
*/
private String errorCode;
/**
* 解析耗时(毫秒)- "翻译用了多长时间"
*/
private long parseTime;
/**
* 数据质量评分 - "翻译质量如何(0-100)"
*/
private int qualityScore;
/**
* 额外属性 - "其他补充信息"
*/
private Map<String, Object> attributes;
/**
* 创建成功结果
*/
public static ParseResult success(StandardMessage message) {
return ParseResult.builder()
.success(true)
.message(message)
.qualityScore(100)
.parseTime(System.currentTimeMillis())
.build();
}
/**
* 创建失败结果
*/
public static ParseResult failure(String errorCode, String errorMessage) {
return ParseResult.builder()
.success(false)
.errorCode(errorCode)
.errorMessage(errorMessage)
.qualityScore(0)
.parseTime(System.currentTimeMillis())
.build();
}
}
# 2.4 标准消息格式
/**
* 标准消息格式 - 统一的"普通话"
*
* 就像联合国会议中,各国代表最终都要用统一的语言
* (如英语)来交流一样,所有设备消息最终都要转换为
* 这种标准格式
*/
@Data
@Builder
public class StandardMessage {
/**
* 消息ID - "消息的身份证"
*/
private String messageId;
/**
* 设备ID - "说话者的身份"
*/
private String deviceId;
/**
* 消息类型 - "谈话的主题类型"
*/
private MessageType messageType;
/**
* 时间戳 - "说话的时间"
*/
private LocalDateTime timestamp;
/**
* 消息头 - "信封上的信息"
*/
private MessageHeader header;
/**
* 消息体 - "信件的内容"
*/
private MessageBody body;
/**
* 消息优先级 - "消息的重要程度"
*/
private Priority priority;
/**
* 来源协议 - "原始语言类型"
*/
private ProtocolType sourceProtocol;
/**
* 质量指标 - "消息的可信度"
*/
private QualityMetrics quality;
}
/**
* 消息类型枚举 - 谈话主题的分类
*/
public enum MessageType {
DATA("数据消息", "设备上报的传感器数据"),
COMMAND("命令消息", "向设备发送的控制指令"),
EVENT("事件消息", "设备状态变化通知"),
HEARTBEAT("心跳消息", "设备存活状态确认"),
ALARM("告警消息", "设备异常情况报告"),
CONFIG("配置消息", "设备配置参数"),
FIRMWARE("固件消息", "设备固件更新相关"),
LOG("日志消息", "设备运行日志");
private final String name;
private final String description;
MessageType(String name, String description) {
this.name = name;
this.description = description;
}
}
# 2.5 MQTT协议解析器实现
/**
* MQTT协议解析器 - 专门的"MQTT语言"翻译官
*
* MQTT就像是物联网世界的"普通话",轻量级、易学习,
* 大部分IoT设备都"会说"这种语言
*/
@Component
@Slf4j
public class MqttProtocolParser implements ProtocolParser {
@Autowired
private ObjectMapper objectMapper;
@Autowired
private MqttTopicResolver topicResolver;
/**
* 解析MQTT消息 - 理解MQTT设备的"话"
*/
@Override
public ParseResult parse(byte[] rawData, ParseContext context) {
long startTime = System.currentTimeMillis();
try {
// 第一步:提取MQTT消息信息(就像先看信封上的地址)
MqttMessage mqttMessage = extractMqttMessage(rawData, context);
// 第二步:解析Topic信息(理解"收件人地址")
TopicInfo topicInfo = topicResolver.resolve(mqttMessage.getTopic());
// 第三步:解析消息体(读懂"信件内容")
MessageBody messageBody = parseMessageBody(mqttMessage.getPayload(), topicInfo);
// 第四步:构建标准消息(翻译成"普通话")
StandardMessage standardMessage = StandardMessage.builder()
.messageId(generateMessageId())
.deviceId(topicInfo.getDeviceId())
.messageType(determineMessageType(topicInfo))
.timestamp(LocalDateTime.now())
.header(buildMessageHeader(mqttMessage, topicInfo))
.body(messageBody)
.priority(determinePriority(topicInfo))
.sourceProtocol(ProtocolType.MQTT)
.quality(calculateQuality(mqttMessage))
.build();
long parseTime = System.currentTimeMillis() - startTime;
log.debug("MQTT消息解析成功:deviceId={}, messageType={}, parseTime={}ms",
topicInfo.getDeviceId(), standardMessage.getMessageType(), parseTime);
return ParseResult.builder()
.success(true)
.message(standardMessage)
.parseTime(parseTime)
.qualityScore(calculateQualityScore(standardMessage))
.build();
} catch (Exception e) {
log.error("MQTT消息解析失败", e);
return ParseResult.failure("MQTT_PARSE_ERROR", e.getMessage());
}
}
/**
* 构建MQTT消息 - 把"普通话"翻译成MQTT设备能懂的话
*/
@Override
public byte[] build(StandardMessage message, BuildContext context) {
try {
// 第一步:构建MQTT Topic(确定"收件人地址")
String topic = buildMqttTopic(message, context);
// 第二步:构建消息体(写"信件内容")
byte[] payload = buildMqttPayload(message);
// 第三步:设置MQTT属性(贴"邮票",设置"邮寄方式")
MqttProperties properties = buildMqttProperties(message);
// 第四步:封装成MQTT消息包
MqttMessagePacket packet = MqttMessagePacket.builder()
.topic(topic)
.payload(payload)
.qos(determineQos(message))
.retained(shouldRetain(message))
.properties(properties)
.build();
return packet.toByteArray();
} catch (Exception e) {
log.error("MQTT消息构建失败:messageId={}", message.getMessageId(), e);
throw new ProtocolBuildException("MQTT消息构建失败", e);
}
}
/**
* 提取MQTT消息信息
*/
private MqttMessage extractMqttMessage(byte[] rawData, ParseContext context) {
// 从上下文中获取MQTT相关信息
String topic = context.getAttribute("mqtt.topic", String.class);
Integer qos = context.getAttribute("mqtt.qos", Integer.class);
Boolean retained = context.getAttribute("mqtt.retained", Boolean.class);
return MqttMessage.builder()
.topic(topic)
.payload(rawData)
.qos(qos != null ? qos : 0)
.retained(retained != null ? retained : false)
.timestamp(LocalDateTime.now())
.build();
}
/**
* 解析消息体 - 理解"信件内容"
*/
private MessageBody parseMessageBody(byte[] payload, TopicInfo topicInfo) {
try {
String payloadStr = new String(payload, StandardCharsets.UTF_8);
// 根据Topic类型选择不同的解析策略
switch (topicInfo.getMessageType()) {
case DATA:
return parseDataMessage(payloadStr, topicInfo);
case COMMAND:
return parseCommandMessage(payloadStr, topicInfo);
case EVENT:
return parseEventMessage(payloadStr, topicInfo);
case HEARTBEAT:
return parseHeartbeatMessage(payloadStr, topicInfo);
default:
return parseGenericMessage(payloadStr, topicInfo);
}
} catch (Exception e) {
log.warn("消息体解析失败,使用原始数据:{}", e.getMessage());
return MessageBody.builder()
.contentType("application/octet-stream")
.rawData(payload)
.build();
}
}
/**
* 解析数据消息 - 处理传感器数据
*/
private MessageBody parseDataMessage(String payload, TopicInfo topicInfo) {
try {
// 尝试解析为JSON格式
JsonNode jsonNode = objectMapper.readTree(payload);
Map<String, Object> dataPoints = new HashMap<>();
// 遍历JSON字段,提取数据点
jsonNode.fields().forEachRemaining(entry -> {
String key = entry.getKey();
JsonNode value = entry.getValue();
if (value.isNumber()) {
dataPoints.put(key, value.asDouble());
} else if (value.isBoolean()) {
dataPoints.put(key, value.asBoolean());
} else {
dataPoints.put(key, value.asText());
}
});
return MessageBody.builder()
.contentType("application/json")
.dataPoints(dataPoints)
.rawContent(payload)
.build();
} catch (Exception e) {
// 如果不是JSON格式,尝试其他格式
return parseKeyValueMessage(payload);
}
}
/**
* 解析键值对格式消息
*/
private MessageBody parseKeyValueMessage(String payload) {
Map<String, Object> dataPoints = new HashMap<>();
// 支持多种分隔符:temp=25.5;humidity=60.0 或 temp:25.5,humidity:60.0
String[] pairs = payload.split("[;,]\\s*");
for (String pair : pairs) {
String[] keyValue = pair.split("[=:]\\s*", 2);
if (keyValue.length == 2) {
String key = keyValue[0].trim();
String value = keyValue[1].trim();
// 尝试转换为数字
try {
if (value.contains(".")) {
dataPoints.put(key, Double.parseDouble(value));
} else {
dataPoints.put(key, Long.parseLong(value));
}
} catch (NumberFormatException e) {
// 保持为字符串
dataPoints.put(key, value);
}
}
}
return MessageBody.builder()
.contentType("text/plain")
.dataPoints(dataPoints)
.rawContent(payload)
.build();
}
/**
* 计算消息质量评分
*/
private int calculateQualityScore(StandardMessage message) {
int score = 100;
// 检查必要字段
if (message.getDeviceId() == null || message.getDeviceId().isEmpty()) {
score -= 20;
}
if (message.getMessageType() == null) {
score -= 15;
}
if (message.getBody() == null || message.getBody().getDataPoints().isEmpty()) {
score -= 25;
}
// 检查时间戳合理性
if (message.getTimestamp() != null) {
LocalDateTime now = LocalDateTime.now();
long timeDiff = Math.abs(ChronoUnit.SECONDS.between(message.getTimestamp(), now));
if (timeDiff > 300) { // 超过5分钟
score -= 10;
}
}
return Math.max(0, score);
}
@Override
public ProtocolType getSupportedProtocol() {
return ProtocolType.MQTT;
}
@Override
public boolean validate(byte[] data) {
try {
// 基本的数据有效性检查
if (data == null || data.length == 0) {
return false;
}
// 检查是否为有效的UTF-8字符串
String content = new String(data, StandardCharsets.UTF_8);
// 检查是否为有效的JSON或键值对格式
return isValidJson(content) || isValidKeyValue(content);
} catch (Exception e) {
return false;
}
}
private boolean isValidJson(String content) {
try {
objectMapper.readTree(content);
return true;
} catch (Exception e) {
return false;
}
}
private boolean isValidKeyValue(String content) {
// 简单检查是否包含键值对模式
return content.matches(".+[=:].+");
}
}
# 2.6 Topic解析器
/**
* MQTT Topic解析器 - "地址解析专家"
*
* 就像邮递员需要理解地址格式一样,Topic解析器需要
* 理解MQTT Topic的结构,从中提取有用信息
*/
@Component
@Slf4j
public class MqttTopicResolver {
// Topic模式配置(就像"地址格式规范")
private static final Map<String, TopicPattern> TOPIC_PATTERNS = new HashMap<>();
static {
// 数据上报:device/{deviceId}/data/{dataType}
TOPIC_PATTERNS.put("DATA", TopicPattern.builder()
.pattern("device/([^/]+)/data/([^/]+)")
.messageType(MessageType.DATA)
.deviceIdGroup(1)
.dataTypeGroup(2)
.build());
// 命令下发:device/{deviceId}/command/{commandType}
TOPIC_PATTERNS.put("COMMAND", TopicPattern.builder()
.pattern("device/([^/]+)/command/([^/]+)")
.messageType(MessageType.COMMAND)
.deviceIdGroup(1)
.commandTypeGroup(2)
.build());
// 事件通知:device/{deviceId}/event/{eventType}
TOPIC_PATTERNS.put("EVENT", TopicPattern.builder()
.pattern("device/([^/]+)/event/([^/]+)")
.messageType(MessageType.EVENT)
.deviceIdGroup(1)
.eventTypeGroup(2)
.build());
// 心跳消息:device/{deviceId}/heartbeat
TOPIC_PATTERNS.put("HEARTBEAT", TopicPattern.builder()
.pattern("device/([^/]+)/heartbeat")
.messageType(MessageType.HEARTBEAT)
.deviceIdGroup(1)
.build());
}
/**
* 解析Topic信息
*/
public TopicInfo resolve(String topic) {
if (topic == null || topic.isEmpty()) {
throw new TopicResolveException("Topic不能为空");
}
// 遍历所有模式,找到匹配的
for (Map.Entry<String, TopicPattern> entry : TOPIC_PATTERNS.entrySet()) {
TopicPattern pattern = entry.getValue();
Pattern regex = Pattern.compile(pattern.getPattern());
Matcher matcher = regex.matcher(topic);
if (matcher.matches()) {
return buildTopicInfo(topic, pattern, matcher);
}
}
// 如果没有匹配的模式,返回默认解析结果
log.warn("未知的Topic格式:{}", topic);
return TopicInfo.builder()
.originalTopic(topic)
.messageType(MessageType.DATA)
.deviceId(extractDeviceIdFromUnknownTopic(topic))
.build();
}
/**
* 构建Topic信息
*/
private TopicInfo buildTopicInfo(String topic, TopicPattern pattern, Matcher matcher) {
TopicInfo.TopicInfoBuilder builder = TopicInfo.builder()
.originalTopic(topic)
.messageType(pattern.getMessageType());
// 提取设备ID
if (pattern.getDeviceIdGroup() > 0 && matcher.groupCount() >= pattern.getDeviceIdGroup()) {
builder.deviceId(matcher.group(pattern.getDeviceIdGroup()));
}
// 提取数据类型
if (pattern.getDataTypeGroup() > 0 && matcher.groupCount() >= pattern.getDataTypeGroup()) {
builder.dataType(matcher.group(pattern.getDataTypeGroup()));
}
// 提取命令类型
if (pattern.getCommandTypeGroup() > 0 && matcher.groupCount() >= pattern.getCommandTypeGroup()) {
builder.commandType(matcher.group(pattern.getCommandTypeGroup()));
}
// 提取事件类型
if (pattern.getEventTypeGroup() > 0 && matcher.groupCount() >= pattern.getEventTypeGroup()) {
builder.eventType(matcher.group(pattern.getEventTypeGroup()));
}
return builder.build();
}
/**
* 从未知格式的Topic中尝试提取设备ID
*/
private String extractDeviceIdFromUnknownTopic(String topic) {
// 尝试一些常见的模式
String[] parts = topic.split("/");
// 查找可能是设备ID的部分(通常是数字、字母、下划线、连字符的组合)
for (String part : parts) {
if (part.matches("[a-zA-Z0-9_-]+") && part.length() > 3) {
return part;
}
}
// 如果找不到,返回第一个非空部分
for (String part : parts) {
if (!part.isEmpty()) {
return part;
}
}
return "unknown";
}
}
/**
* Topic信息封装
*/
@Data
@Builder
public class TopicInfo {
private String originalTopic; // 原始Topic
private MessageType messageType; // 消息类型
private String deviceId; // 设备ID
private String dataType; // 数据类型
private String commandType; // 命令类型
private String eventType; // 事件类型
private Map<String, String> parameters; // 额外参数
}
/**
* Topic模式定义
*/
@Data
@Builder
public class TopicPattern {
private String pattern; // 正则表达式模式
private MessageType messageType; // 对应的消息类型
private int deviceIdGroup; // 设备ID在正则分组中的位置
private int dataTypeGroup; // 数据类型在正则分组中的位置
private int commandTypeGroup; // 命令类型在正则分组中的位置
private int eventTypeGroup; // 事件类型在正则分组中的位置
}