物联网协议适配器
# 物联网协议适配器
# 概述
协议适配器是物联网平台的核心组件,负责处理不同协议的设备接入、数据解析、消息路由等功能。本文档详细介绍MQTT、HTTP、CoAP、TCP/UDP等协议的适配实现。
# 架构设计
协议适配层
├── 协议接入层
│ ├── MQTT适配器
│ ├── HTTP适配器
│ ├── CoAP适配器
│ └── TCP/UDP适配器
├── 消息处理层
│ ├── 协议解析
│ ├── 数据转换
│ ├── 消息验证
│ └── 消息路由
├── 连接管理层
│ ├── 连接池管理
│ ├── 会话管理
│ ├── 心跳检测
│ └── 断线重连
└── 安全控制层
├── 协议认证
├── 数据加密
├── 访问控制
└── 流量控制
# 协议适配器接口
# 通用协议适配器接口
/**
* 协议适配器接口
*/
public interface ProtocolAdapter {
/**
* 获取协议类型
*/
ProtocolType getProtocolType();
/**
* 启动适配器
*/
void start() throws Exception;
/**
* 停止适配器
*/
void stop() throws Exception;
/**
* 处理设备连接
*/
void handleDeviceConnect(DeviceConnectEvent event);
/**
* 处理设备断开
*/
void handleDeviceDisconnect(DeviceDisconnectEvent event);
/**
* 处理设备消息
*/
void handleDeviceMessage(DeviceMessageEvent event);
/**
* 发送消息到设备
*/
CompletableFuture<Boolean> sendMessageToDevice(String deviceId, Object message);
/**
* 获取连接状态
*/
boolean isDeviceConnected(String deviceId);
/**
* 获取连接数量
*/
int getConnectionCount();
/**
* 获取适配器状态
*/
AdapterStatus getStatus();
}
/**
* 协议类型枚举
*/
public enum ProtocolType {
MQTT("MQTT", 1883),
HTTP("HTTP", 80),
HTTPS("HTTPS", 443),
COAP("CoAP", 5683),
TCP("TCP", 8080),
UDP("UDP", 8081);
private final String name;
private final int defaultPort;
ProtocolType(String name, int defaultPort) {
this.name = name;
this.defaultPort = defaultPort;
}
public String getName() {
return name;
}
public int getDefaultPort() {
return defaultPort;
}
}
/**
* 适配器状态枚举
*/
public enum AdapterStatus {
STOPPED("已停止"),
STARTING("启动中"),
RUNNING("运行中"),
STOPPING("停止中"),
ERROR("错误");
private final String description;
AdapterStatus(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
# MQTT协议适配器
# MQTT适配器实现
@Component
@Slf4j
public class MqttProtocolAdapter implements ProtocolAdapter {
@Autowired
private DeviceAuthService deviceAuthService;
@Autowired
private DeviceManagementService deviceManagementService;
@Autowired
private MessageProcessingService messageProcessingService;
@Autowired
private MeterRegistry meterRegistry;
@Value("${iot.mqtt.broker.host:0.0.0.0}")
private String brokerHost;
@Value("${iot.mqtt.broker.port:1883}")
private int brokerPort;
@Value("${iot.mqtt.broker.max-connections:10000}")
private int maxConnections;
private MqttBroker mqttBroker;
private final Map<String, MqttConnection> deviceConnections = new ConcurrentHashMap<>();
private volatile AdapterStatus status = AdapterStatus.STOPPED;
private final Counter connectionCounter;
private final Counter messageCounter;
private final Gauge activeConnectionsGauge;
public MqttProtocolAdapter(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.connectionCounter = Counter.builder("iot.mqtt.connections.total")
.description("MQTT连接总数")
.register(meterRegistry);
this.messageCounter = Counter.builder("iot.mqtt.messages.total")
.description("MQTT消息总数")
.register(meterRegistry);
this.activeConnectionsGauge = Gauge.builder("iot.mqtt.connections.active")
.description("活跃MQTT连接数")
.register(meterRegistry, this, adapter -> adapter.getConnectionCount());
}
@Override
public ProtocolType getProtocolType() {
return ProtocolType.MQTT;
}
@Override
public void start() throws Exception {
if (status == AdapterStatus.RUNNING) {
log.warn("MQTT适配器已经在运行中");
return;
}
try {
status = AdapterStatus.STARTING;
// 创建MQTT Broker配置
MqttBrokerConfig config = MqttBrokerConfig.builder()
.host(brokerHost)
.port(brokerPort)
.maxConnections(maxConnections)
.authenticator(this::authenticateDevice)
.connectionHandler(this::handleConnection)
.messageHandler(this::handleMessage)
.disconnectionHandler(this::handleDisconnection)
.build();
// 启动MQTT Broker
mqttBroker = new MqttBroker(config);
mqttBroker.start();
status = AdapterStatus.RUNNING;
log.info("MQTT协议适配器启动成功, host: {}, port: {}", brokerHost, brokerPort);
} catch (Exception e) {
status = AdapterStatus.ERROR;
log.error("MQTT协议适配器启动失败", e);
throw e;
}
}
@Override
public void stop() throws Exception {
if (status == AdapterStatus.STOPPED) {
log.warn("MQTT适配器已经停止");
return;
}
try {
status = AdapterStatus.STOPPING;
// 关闭所有连接
deviceConnections.values().forEach(connection -> {
try {
connection.close();
} catch (Exception e) {
log.warn("关闭MQTT连接失败, deviceId: {}", connection.getDeviceId(), e);
}
});
deviceConnections.clear();
// 停止MQTT Broker
if (mqttBroker != null) {
mqttBroker.stop();
}
status = AdapterStatus.STOPPED;
log.info("MQTT协议适配器停止成功");
} catch (Exception e) {
status = AdapterStatus.ERROR;
log.error("MQTT协议适配器停止失败", e);
throw e;
}
}
@Override
public void handleDeviceConnect(DeviceConnectEvent event) {
String deviceId = event.getDeviceId();
try {
// 验证设备
IoTDevice device = deviceManagementService.getDeviceById(deviceId);
if (device == null) {
log.warn("设备不存在, deviceId: {}", deviceId);
return;
}
// 创建连接对象
MqttConnection connection = new MqttConnection(
deviceId, event.getClientId(), event.getRemoteAddress(), event.getConnectTime()
);
deviceConnections.put(deviceId, connection);
// 更新设备状态
deviceManagementService.updateDeviceStatus(deviceId, DeviceStatus.ONLINE);
// 订阅设备主题
subscribeDeviceTopics(deviceId);
connectionCounter.increment(Tags.of("protocol", "mqtt", "event", "connect"));
log.info("MQTT设备连接成功, deviceId: {}, clientId: {}",
deviceId, event.getClientId());
} catch (Exception e) {
log.error("处理MQTT设备连接失败, deviceId: {}", deviceId, e);
}
}
@Override
public void handleDeviceDisconnect(DeviceDisconnectEvent event) {
String deviceId = event.getDeviceId();
try {
// 移除连接
MqttConnection connection = deviceConnections.remove(deviceId);
if (connection != null) {
connection.close();
}
// 更新设备状态
deviceManagementService.updateDeviceStatus(deviceId, DeviceStatus.OFFLINE);
connectionCounter.increment(Tags.of("protocol", "mqtt", "event", "disconnect"));
log.info("MQTT设备断开连接, deviceId: {}, reason: {}",
deviceId, event.getReason());
} catch (Exception e) {
log.error("处理MQTT设备断开失败, deviceId: {}", deviceId, e);
}
}
@Override
public void handleDeviceMessage(DeviceMessageEvent event) {
String deviceId = event.getDeviceId();
String topic = event.getTopic();
String payload = event.getPayload();
try {
// 验证消息
if (!validateMessage(deviceId, topic, payload)) {
log.warn("MQTT消息验证失败, deviceId: {}, topic: {}", deviceId, topic);
return;
}
// 解析消息类型
MessageType messageType = parseMessageType(topic);
// 处理不同类型的消息
switch (messageType) {
case DATA_REPORT:
handleDataReport(deviceId, topic, payload);
break;
case EVENT_REPORT:
handleEventReport(deviceId, topic, payload);
break;
case COMMAND_RESPONSE:
handleCommandResponse(deviceId, topic, payload);
break;
case HEARTBEAT:
handleHeartbeat(deviceId, topic, payload);
break;
default:
log.warn("未知的MQTT消息类型, deviceId: {}, topic: {}", deviceId, topic);
}
messageCounter.increment(Tags.of(
"protocol", "mqtt",
"message_type", messageType.name(),
"device_id", deviceId
));
} catch (Exception e) {
log.error("处理MQTT设备消息失败, deviceId: {}, topic: {}", deviceId, topic, e);
}
}
@Override
public CompletableFuture<Boolean> sendMessageToDevice(String deviceId, Object message) {
return CompletableFuture.supplyAsync(() -> {
try {
MqttConnection connection = deviceConnections.get(deviceId);
if (connection == null) {
log.warn("设备未连接, deviceId: {}", deviceId);
return false;
}
// 构建MQTT消息
MqttMessage mqttMessage = buildMqttMessage(message);
// 确定发送主题
String topic = buildDownlinkTopic(deviceId, message);
// 发送消息
mqttBroker.publish(topic, mqttMessage);
log.debug("MQTT消息发送成功, deviceId: {}, topic: {}", deviceId, topic);
return true;
} catch (Exception e) {
log.error("MQTT消息发送失败, deviceId: {}", deviceId, e);
return false;
}
});
}
@Override
public boolean isDeviceConnected(String deviceId) {
return deviceConnections.containsKey(deviceId);
}
@Override
public int getConnectionCount() {
return deviceConnections.size();
}
@Override
public AdapterStatus getStatus() {
return status;
}
/**
* 设备认证
*/
private boolean authenticateDevice(String clientId, String username, String password) {
try {
// 从clientId或username中提取deviceId
String deviceId = extractDeviceId(clientId, username);
if (StringUtils.isBlank(deviceId)) {
log.warn("无法提取设备ID, clientId: {}, username: {}", clientId, username);
return false;
}
// 验证设备认证信息
AuthenticationResult result = deviceAuthService.authenticate(
deviceId, username, password, ProtocolType.MQTT
);
if (result.isSuccess()) {
log.debug("MQTT设备认证成功, deviceId: {}", deviceId);
return true;
} else {
log.warn("MQTT设备认证失败, deviceId: {}, reason: {}",
deviceId, result.getFailureReason());
return false;
}
} catch (Exception e) {
log.error("MQTT设备认证异常, clientId: {}", clientId, e);
return false;
}
}
/**
* 处理连接
*/
private void handleConnection(MqttConnectionEvent event) {
String deviceId = extractDeviceId(event.getClientId(), event.getUsername());
DeviceConnectEvent connectEvent = DeviceConnectEvent.builder()
.deviceId(deviceId)
.clientId(event.getClientId())
.protocol(ProtocolType.MQTT)
.remoteAddress(event.getRemoteAddress())
.connectTime(LocalDateTime.now())
.build();
handleDeviceConnect(connectEvent);
}
/**
* 处理消息
*/
private void handleMessage(MqttMessageEvent event) {
String deviceId = extractDeviceId(event.getClientId(), null);
DeviceMessageEvent messageEvent = DeviceMessageEvent.builder()
.deviceId(deviceId)
.protocol(ProtocolType.MQTT)
.topic(event.getTopic())
.payload(new String(event.getPayload(), StandardCharsets.UTF_8))
.qos(event.getQos())
.timestamp(LocalDateTime.now())
.build();
handleDeviceMessage(messageEvent);
}
/**
* 处理断开连接
*/
private void handleDisconnection(MqttDisconnectionEvent event) {
String deviceId = extractDeviceId(event.getClientId(), null);
DeviceDisconnectEvent disconnectEvent = DeviceDisconnectEvent.builder()
.deviceId(deviceId)
.clientId(event.getClientId())
.protocol(ProtocolType.MQTT)
.reason(event.getReason())
.disconnectTime(LocalDateTime.now())
.build();
handleDeviceDisconnect(disconnectEvent);
}
/**
* 订阅设备主题
*/
private void subscribeDeviceTopics(String deviceId) {
try {
// 数据上报主题
String dataReportTopic = String.format("/device/%s/data/+", deviceId);
mqttBroker.subscribe(dataReportTopic);
// 事件上报主题
String eventReportTopic = String.format("/device/%s/event/+", deviceId);
mqttBroker.subscribe(eventReportTopic);
// 指令响应主题
String commandResponseTopic = String.format("/device/%s/command/response/+", deviceId);
mqttBroker.subscribe(commandResponseTopic);
// 心跳主题
String heartbeatTopic = String.format("/device/%s/heartbeat", deviceId);
mqttBroker.subscribe(heartbeatTopic);
log.debug("设备主题订阅成功, deviceId: {}", deviceId);
} catch (Exception e) {
log.error("订阅设备主题失败, deviceId: {}", deviceId, e);
}
}
/**
* 验证消息
*/
private boolean validateMessage(String deviceId, String topic, String payload) {
// 检查设备ID
if (StringUtils.isBlank(deviceId)) {
return false;
}
// 检查主题格式
if (!isValidTopic(topic)) {
return false;
}
// 检查负载大小
if (payload != null && payload.length() > 1024 * 1024) { // 1MB
log.warn("MQTT消息负载过大, deviceId: {}, size: {}", deviceId, payload.length());
return false;
}
return true;
}
/**
* 解析消息类型
*/
private MessageType parseMessageType(String topic) {
if (topic.contains("/data/")) {
return MessageType.DATA_REPORT;
} else if (topic.contains("/event/")) {
return MessageType.EVENT_REPORT;
} else if (topic.contains("/command/response/")) {
return MessageType.COMMAND_RESPONSE;
} else if (topic.contains("/heartbeat")) {
return MessageType.HEARTBEAT;
} else {
return MessageType.UNKNOWN;
}
}
/**
* 处理数据上报
*/
private void handleDataReport(String deviceId, String topic, String payload) {
try {
// 解析数据类型
String dataType = extractDataType(topic);
// 构建设备数据对象
DeviceDataMessage dataMessage = DeviceDataMessage.builder()
.deviceId(deviceId)
.dataType(dataType)
.payload(payload)
.protocol(ProtocolType.MQTT)
.topic(topic)
.timestamp(LocalDateTime.now())
.build();
// 异步处理数据
messageProcessingService.processDeviceData(dataMessage);
} catch (Exception e) {
log.error("处理MQTT数据上报失败, deviceId: {}, topic: {}", deviceId, topic, e);
}
}
/**
* 处理事件上报
*/
private void handleEventReport(String deviceId, String topic, String payload) {
try {
// 解析事件类型
String eventType = extractEventType(topic);
// 构建设备事件对象
DeviceEventMessage eventMessage = DeviceEventMessage.builder()
.deviceId(deviceId)
.eventType(eventType)
.payload(payload)
.protocol(ProtocolType.MQTT)
.topic(topic)
.timestamp(LocalDateTime.now())
.build();
// 异步处理事件
messageProcessingService.processDeviceEvent(eventMessage);
} catch (Exception e) {
log.error("处理MQTT事件上报失败, deviceId: {}, topic: {}", deviceId, topic, e);
}
}
/**
* 处理指令响应
*/
private void handleCommandResponse(String deviceId, String topic, String payload) {
try {
// 解析指令ID
String commandId = extractCommandId(topic);
// 构建指令响应对象
CommandResponseMessage responseMessage = CommandResponseMessage.builder()
.deviceId(deviceId)
.commandId(commandId)
.payload(payload)
.protocol(ProtocolType.MQTT)
.topic(topic)
.timestamp(LocalDateTime.now())
.build();
// 异步处理指令响应
messageProcessingService.processCommandResponse(responseMessage);
} catch (Exception e) {
log.error("处理MQTT指令响应失败, deviceId: {}, topic: {}", deviceId, topic, e);
}
}
/**
* 处理心跳
*/
private void handleHeartbeat(String deviceId, String topic, String payload) {
try {
// 更新设备最后活跃时间
deviceManagementService.updateDeviceLastActiveTime(deviceId, LocalDateTime.now());
log.debug("收到设备心跳, deviceId: {}", deviceId);
} catch (Exception e) {
log.error("处理MQTT心跳失败, deviceId: {}", deviceId, e);
}
}
/**
* 构建MQTT消息
*/
private MqttMessage buildMqttMessage(Object message) {
try {
String payload;
if (message instanceof String) {
payload = (String) message;
} else {
ObjectMapper mapper = new ObjectMapper();
payload = mapper.writeValueAsString(message);
}
return MqttMessage.builder()
.payload(payload.getBytes(StandardCharsets.UTF_8))
.qos(1) // 至少一次
.retained(false)
.build();
} catch (Exception e) {
throw new RuntimeException("构建MQTT消息失败", e);
}
}
/**
* 构建下行主题
*/
private String buildDownlinkTopic(String deviceId, Object message) {
if (message instanceof DeviceCommand) {
DeviceCommand command = (DeviceCommand) message;
return String.format("/device/%s/command/%s", deviceId, command.getCommandType().name().toLowerCase());
} else {
return String.format("/device/%s/message", deviceId);
}
}
/**
* 提取设备ID
*/
private String extractDeviceId(String clientId, String username) {
// 优先从username中提取
if (StringUtils.isNotBlank(username)) {
return username;
}
// 从clientId中提取
if (StringUtils.isNotBlank(clientId)) {
// 假设clientId格式为: device_{deviceId}
if (clientId.startsWith("device_")) {
return clientId.substring(7);
}
return clientId;
}
return null;
}
/**
* 检查主题是否有效
*/
private boolean isValidTopic(String topic) {
if (StringUtils.isBlank(topic)) {
return false;
}
// 检查主题格式: /device/{deviceId}/{messageType}/...
String[] parts = topic.split("/");
return parts.length >= 4 && "device".equals(parts[1]);
}
/**
* 提取数据类型
*/
private String extractDataType(String topic) {
// 主题格式: /device/{deviceId}/data/{dataType}
String[] parts = topic.split("/");
return parts.length >= 5 ? parts[4] : "unknown";
}
/**
* 提取事件类型
*/
private String extractEventType(String topic) {
// 主题格式: /device/{deviceId}/event/{eventType}
String[] parts = topic.split("/");
return parts.length >= 5 ? parts[4] : "unknown";
}
/**
* 提取指令ID
*/
private String extractCommandId(String topic) {
// 主题格式: /device/{deviceId}/command/response/{commandId}
String[] parts = topic.split("/");
return parts.length >= 6 ? parts[5] : "unknown";
}
}
/**
* MQTT连接对象
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttConnection {
private String deviceId;
private String clientId;
private String remoteAddress;
private LocalDateTime connectTime;
private LocalDateTime lastActiveTime;
private volatile boolean active = true;
public void close() {
active = false;
}
public void updateLastActiveTime() {
lastActiveTime = LocalDateTime.now();
}
}
# HTTP协议适配器
# HTTP适配器实现
@RestController
@RequestMapping("/api/device")
@Slf4j
public class HttpProtocolAdapter implements ProtocolAdapter {
@Autowired
private DeviceAuthService deviceAuthService;
@Autowired
private DeviceManagementService deviceManagementService;
@Autowired
private MessageProcessingService messageProcessingService;
@Autowired
private MeterRegistry meterRegistry;
private volatile AdapterStatus status = AdapterStatus.STOPPED;
private final Map<String, HttpSession> deviceSessions = new ConcurrentHashMap<>();;
private final Counter requestCounter;
private final Timer requestTimer;
public HttpProtocolAdapter(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.requestCounter = Counter.builder("iot.http.requests.total")
.description("HTTP请求总数")
.register(meterRegistry);
this.requestTimer = Timer.builder("iot.http.request.duration")
.description("HTTP请求耗时")
.register(meterRegistry);
}
@Override
public ProtocolType getProtocolType() {
return ProtocolType.HTTP;
}
@Override
public void start() throws Exception {
status = AdapterStatus.RUNNING;
log.info("HTTP协议适配器启动成功");
}
@Override
public void stop() throws Exception {
status = AdapterStatus.STOPPED;
deviceSessions.clear();
log.info("HTTP协议适配器停止成功");
}
/**
* 设备数据上报
*/
@PostMapping("/{deviceId}/data")
public ResponseEntity<ApiResponse> reportData(
@PathVariable String deviceId,
@RequestParam(required = false) String dataType,
@RequestBody String payload,
HttpServletRequest request) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
// 认证设备
if (!authenticateDevice(deviceId, request)) {
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "data", "result", "auth_failed"));
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.failure("设备认证失败"));
}
// 更新会话
updateSession(deviceId, request);
// 构建数据消息
DeviceDataMessage dataMessage = DeviceDataMessage.builder()
.deviceId(deviceId)
.dataType(dataType != null ? dataType : "default")
.payload(payload)
.protocol(ProtocolType.HTTP)
.remoteAddress(getRemoteAddress(request))
.timestamp(LocalDateTime.now())
.build();
// 异步处理数据
messageProcessingService.processDeviceData(dataMessage);
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "data", "result", "success"));
return ResponseEntity.ok(ApiResponse.success("数据上报成功"));
} catch (Exception e) {
log.error("HTTP数据上报失败, deviceId: {}", deviceId, e);
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "data", "result", "error"));
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.failure("数据上报失败: " + e.getMessage()));
} finally {
sample.stop(requestTimer.tag("endpoint", "data"));
}
}
/**
* 设备事件上报
*/
@PostMapping("/{deviceId}/event")
public ResponseEntity<ApiResponse> reportEvent(
@PathVariable String deviceId,
@RequestParam String eventType,
@RequestBody String payload,
HttpServletRequest request) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
// 认证设备
if (!authenticateDevice(deviceId, request)) {
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "event", "result", "auth_failed"));
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.failure("设备认证失败"));
}
// 更新会话
updateSession(deviceId, request);
// 构建事件消息
DeviceEventMessage eventMessage = DeviceEventMessage.builder()
.deviceId(deviceId)
.eventType(eventType)
.payload(payload)
.protocol(ProtocolType.HTTP)
.remoteAddress(getRemoteAddress(request))
.timestamp(LocalDateTime.now())
.build();
// 异步处理事件
messageProcessingService.processDeviceEvent(eventMessage);
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "event", "result", "success"));
return ResponseEntity.ok(ApiResponse.success("事件上报成功"));
} catch (Exception e) {
log.error("HTTP事件上报失败, deviceId: {}", deviceId, e);
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "event", "result", "error"));
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.failure("事件上报失败: " + e.getMessage()));
} finally {
sample.stop(requestTimer.tag("endpoint", "event"));
}
}
/**
* 指令响应
*/
@PostMapping("/{deviceId}/command/{commandId}/response")
public ResponseEntity<ApiResponse> commandResponse(
@PathVariable String deviceId,
@PathVariable String commandId,
@RequestBody String payload,
HttpServletRequest request) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
// 认证设备
if (!authenticateDevice(deviceId, request)) {
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "command_response", "result", "auth_failed"));
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.failure("设备认证失败"));
}
// 更新会话
updateSession(deviceId, request);
// 构建指令响应消息
CommandResponseMessage responseMessage = CommandResponseMessage.builder()
.deviceId(deviceId)
.commandId(commandId)
.payload(payload)
.protocol(ProtocolType.HTTP)
.remoteAddress(getRemoteAddress(request))
.timestamp(LocalDateTime.now())
.build();
// 异步处理指令响应
messageProcessingService.processCommandResponse(responseMessage);
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "command_response", "result", "success"));
return ResponseEntity.ok(ApiResponse.success("指令响应成功"));
} catch (Exception e) {
log.error("HTTP指令响应失败, deviceId: {}, commandId: {}", deviceId, commandId, e);
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "command_response", "result", "error"));
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.failure("指令响应失败: " + e.getMessage()));
} finally {
sample.stop(requestTimer.tag("endpoint", "command_response"));
}
}
/**
* 设备心跳
*/
@PostMapping("/{deviceId}/heartbeat")
public ResponseEntity<ApiResponse> heartbeat(
@PathVariable String deviceId,
HttpServletRequest request) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
// 认证设备
if (!authenticateDevice(deviceId, request)) {
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "heartbeat", "result", "auth_failed"));
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.failure("设备认证失败"));
}
// 更新会话
updateSession(deviceId, request);
// 更新设备最后活跃时间
deviceManagementService.updateDeviceLastActiveTime(deviceId, LocalDateTime.now());
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "heartbeat", "result", "success"));
return ResponseEntity.ok(ApiResponse.success("心跳成功"));
} catch (Exception e) {
log.error("HTTP心跳失败, deviceId: {}", deviceId, e);
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "heartbeat", "result", "error"));
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.failure("心跳失败: " + e.getMessage()));
} finally {
sample.stop(requestTimer.tag("endpoint", "heartbeat"));
}
}
/**
* 获取待执行指令
*/
@GetMapping("/{deviceId}/commands")
public ResponseEntity<ApiResponse<List<DeviceCommand>>> getPendingCommands(
@PathVariable String deviceId,
HttpServletRequest request) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
// 认证设备
if (!authenticateDevice(deviceId, request)) {
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "get_commands", "result", "auth_failed"));
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.failure("设备认证失败"));
}
// 更新会话
updateSession(deviceId, request);
// 获取待执行指令
List<DeviceCommand> pendingCommands = messageProcessingService.getPendingCommands(deviceId);
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "get_commands", "result", "success"));
return ResponseEntity.ok(ApiResponse.success(pendingCommands));
} catch (Exception e) {
log.error("获取待执行指令失败, deviceId: {}", deviceId, e);
requestCounter.increment(Tags.of("protocol", "http", "endpoint", "get_commands", "result", "error"));
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.failure("获取指令失败: " + e.getMessage()));
} finally {
sample.stop(requestTimer.tag("endpoint", "get_commands"));
}
}
@Override
public void handleDeviceConnect(DeviceConnectEvent event) {
// HTTP协议无需处理连接事件
}
@Override
public void handleDeviceDisconnect(DeviceDisconnectEvent event) {
// HTTP协议无需处理断开事件
}
@Override
public void handleDeviceMessage(DeviceMessageEvent event) {
// HTTP协议通过REST接口处理消息
}
@Override
public CompletableFuture<Boolean> sendMessageToDevice(String deviceId, Object message) {
// HTTP协议采用拉取模式,设备主动获取指令
return CompletableFuture.completedFuture(true);
}
@Override
public boolean isDeviceConnected(String deviceId) {
HttpSession session = deviceSessions.get(deviceId);
if (session == null) {
return false;
}
// 检查会话是否过期(5分钟无活动)
return session.getLastActiveTime().isAfter(LocalDateTime.now().minusMinutes(5));
}
@Override
public int getConnectionCount() {
// 清理过期会话
LocalDateTime expireTime = LocalDateTime.now().minusMinutes(5);
deviceSessions.entrySet().removeIf(entry ->
entry.getValue().getLastActiveTime().isBefore(expireTime));
return deviceSessions.size();
}
@Override
public AdapterStatus getStatus() {
return status;
}
/**
* 认证设备
*/
private boolean authenticateDevice(String deviceId, HttpServletRequest request) {
try {
// 从请求头中获取认证信息
String authHeader = request.getHeader("Authorization");
if (StringUtils.isBlank(authHeader)) {
return false;
}
// 解析认证信息
String[] authParts = parseAuthHeader(authHeader);
if (authParts == null || authParts.length != 2) {
return false;
}
String username = authParts[0];
String password = authParts[1];
// 验证设备认证信息
AuthenticationResult result = deviceAuthService.authenticate(
deviceId, username, password, ProtocolType.HTTP
);
return result.isSuccess();
} catch (Exception e) {
log.error("HTTP设备认证异常, deviceId: {}", deviceId, e);
return false;
}
}
/**
* 解析认证头
*/
private String[] parseAuthHeader(String authHeader) {
if (authHeader.startsWith("Basic ")) {
// Basic认证
String encoded = authHeader.substring(6);
String decoded = new String(Base64.getDecoder().decode(encoded), StandardCharsets.UTF_8);
return decoded.split(":", 2);
} else if (authHeader.startsWith("Bearer ")) {
// Token认证
String token = authHeader.substring(7);
// TODO: 解析Token获取用户名和密码
return new String[]{"token", token};
}
return null;
}
/**
* 更新会话
*/
private void updateSession(String deviceId, HttpServletRequest request) {
HttpSession session = deviceSessions.computeIfAbsent(deviceId, k ->
HttpSession.builder()
.deviceId(deviceId)
.remoteAddress(getRemoteAddress(request))
.createTime(LocalDateTime.now())
.build()
);
session.updateLastActiveTime();
}
/**
* 获取远程地址
*/
private String getRemoteAddress(HttpServletRequest request) {
String xForwardedFor = request.getHeader("X-Forwarded-For");
if (StringUtils.isNotBlank(xForwardedFor)) {
return xForwardedFor.split(",")[0].trim();
}
String xRealIp = request.getHeader("X-Real-IP");
if (StringUtils.isNotBlank(xRealIp)) {
return xRealIp;
}
return request.getRemoteAddr();
}
}
/**
* HTTP会话对象
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class HttpSession {
private String deviceId;
private String remoteAddress;
private LocalDateTime createTime;
private LocalDateTime lastActiveTime;
public void updateLastActiveTime() {
lastActiveTime = LocalDateTime.now();
}
}
# 协议适配器管理器
# 适配器管理器实现
@Service
@Slf4j
public class ProtocolAdapterManager {
private final Map<ProtocolType, ProtocolAdapter> adapters = new ConcurrentHashMap<>();
@Autowired
private List<ProtocolAdapter> adapterList;
@Autowired
private MeterRegistry meterRegistry;
private final Gauge adapterStatusGauge;
public ProtocolAdapterManager(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.adapterStatusGauge = Gauge.builder("iot.adapter.status")
.description("协议适配器状态")
.register(meterRegistry, this, manager -> {
return manager.getRunningAdapterCount();
});
}
@PostConstruct
public void initialize() {
// 注册所有适配器
for (ProtocolAdapter adapter : adapterList) {
adapters.put(adapter.getProtocolType(), adapter);
log.info("注册协议适配器: {}", adapter.getProtocolType());
}
// 启动所有适配器
startAllAdapters();
}
@PreDestroy
public void destroy() {
// 停止所有适配器
stopAllAdapters();
}
/**
* 启动所有适配器
*/
public void startAllAdapters() {
for (ProtocolAdapter adapter : adapters.values()) {
try {
adapter.start();
log.info("协议适配器启动成功: {}", adapter.getProtocolType());
} catch (Exception e) {
log.error("协议适配器启动失败: {}", adapter.getProtocolType(), e);
}
}
}
/**
* 停止所有适配器
*/
public void stopAllAdapters() {
for (ProtocolAdapter adapter : adapters.values()) {
try {
adapter.stop();
log.info("协议适配器停止成功: {}", adapter.getProtocolType());
} catch (Exception e) {
log.error("协议适配器停止失败: {}", adapter.getProtocolType(), e);
}
}
}
/**
* 启动指定适配器
*/
public void startAdapter(ProtocolType protocolType) throws Exception {
ProtocolAdapter adapter = adapters.get(protocolType);
if (adapter == null) {
throw new IllegalArgumentException("不支持的协议类型: " + protocolType);
}
adapter.start();
log.info("协议适配器启动成功: {}", protocolType);
}
/**
* 停止指定适配器
*/
public void stopAdapter(ProtocolType protocolType) throws Exception {
ProtocolAdapter adapter = adapters.get(protocolType);
if (adapter == null) {
throw new IllegalArgumentException("不支持的协议类型: " + protocolType);
}
adapter.stop();
log.info("协议适配器停止成功: {}", protocolType);
}
/**
* 获取适配器
*/
public ProtocolAdapter getAdapter(ProtocolType protocolType) {
return adapters.get(protocolType);
}
/**
* 发送消息到设备
*/
public CompletableFuture<Boolean> sendMessageToDevice(String deviceId, Object message, ProtocolType protocolType) {
ProtocolAdapter adapter = adapters.get(protocolType);
if (adapter == null) {
return CompletableFuture.completedFuture(false);
}
return adapter.sendMessageToDevice(deviceId, message);
}
/**
* 检查设备是否连接
*/
public boolean isDeviceConnected(String deviceId, ProtocolType protocolType) {
ProtocolAdapter adapter = adapters.get(protocolType);
if (adapter == null) {
return false;
}
return adapter.isDeviceConnected(deviceId);
}
/**
* 获取适配器状态
*/
public Map<ProtocolType, AdapterStatus> getAdapterStatuses() {
Map<ProtocolType, AdapterStatus> statuses = new HashMap<>();
for (Map.Entry<ProtocolType, ProtocolAdapter> entry : adapters.entrySet()) {
statuses.put(entry.getKey(), entry.getValue().getStatus());
}
return statuses;
}
/**
* 获取连接统计
*/
public Map<ProtocolType, Integer> getConnectionCounts() {
Map<ProtocolType, Integer> counts = new HashMap<>();
for (Map.Entry<ProtocolType, ProtocolAdapter> entry : adapters.entrySet()) {
counts.put(entry.getKey(), entry.getValue().getConnectionCount());
}
return counts;
}
/**
* 获取运行中的适配器数量
*/
public int getRunningAdapterCount() {
return (int) adapters.values().stream()
.filter(adapter -> adapter.getStatus() == AdapterStatus.RUNNING)
.count();
}
/**
* 获取总连接数
*/
public int getTotalConnectionCount() {
return adapters.values().stream()
.mapToInt(ProtocolAdapter::getConnectionCount)
.sum();
}
}
# 消息类型定义
# 消息类型枚举
/**
* 消息类型枚举
*/
public enum MessageType {
DATA_REPORT("数据上报"),
EVENT_REPORT("事件上报"),
COMMAND_RESPONSE("指令响应"),
HEARTBEAT("心跳"),
UNKNOWN("未知");
private final String description;
MessageType(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
# 设备消息事件
/**
* 设备连接事件
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceConnectEvent {
private String deviceId;
private String clientId;
private ProtocolType protocol;
private String remoteAddress;
private LocalDateTime connectTime;
private Map<String, Object> attributes;
}
/**
* 设备断开事件
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceDisconnectEvent {
private String deviceId;
private String clientId;
private ProtocolType protocol;
private String reason;
private LocalDateTime disconnectTime;
private Map<String, Object> attributes;
}
/**
* 设备消息事件
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceMessageEvent {
private String deviceId;
private ProtocolType protocol;
private String topic;
private String payload;
private Integer qos;
private String remoteAddress;
private LocalDateTime timestamp;
private Map<String, Object> attributes;
}
/**
* 设备数据消息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceDataMessage {
private String deviceId;
private String dataType;
private String payload;
private ProtocolType protocol;
private String topic;
private String remoteAddress;
private LocalDateTime timestamp;
private Map<String, Object> attributes;
}
/**
* 设备事件消息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceEventMessage {
private String deviceId;
private String eventType;
private String payload;
private ProtocolType protocol;
private String topic;
private String remoteAddress;
private LocalDateTime timestamp;
private Map<String, Object> attributes;
}
/**
* 指令响应消息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CommandResponseMessage {
private String deviceId;
private String commandId;
private String payload;
private ProtocolType protocol;
private String topic;
private String remoteAddress;
private LocalDateTime timestamp;
private Map<String, Object> attributes;
}
# 配置文件
# 协议适配器配置
# application.yml
iot:
# MQTT配置
mqtt:
broker:
host: 0.0.0.0
port: 1883
max-connections: 10000
keep-alive: 60
clean-session: true
topics:
data-report: "/device/{deviceId}/data/{dataType}"
event-report: "/device/{deviceId}/event/{eventType}"
command-response: "/device/{deviceId}/command/response/{commandId}"
heartbeat: "/device/{deviceId}/heartbeat"
command-downlink: "/device/{deviceId}/command/{commandType}"
# HTTP配置
http:
server:
port: 8080
max-connections: 5000
connection-timeout: 30000
read-timeout: 30000
endpoints:
data-report: "/api/device/{deviceId}/data"
event-report: "/api/device/{deviceId}/event"
command-response: "/api/device/{deviceId}/command/{commandId}/response"
heartbeat: "/api/device/{deviceId}/heartbeat"
get-commands: "/api/device/{deviceId}/commands"
# CoAP配置
coap:
server:
host: 0.0.0.0
port: 5683
max-connections: 5000
resources:
data-report: "/device/{deviceId}/data"
event-report: "/device/{deviceId}/event"
command-response: "/device/{deviceId}/command/{commandId}/response"
heartbeat: "/device/{deviceId}/heartbeat"
# TCP配置
tcp:
server:
host: 0.0.0.0
port: 8080
max-connections: 5000
so-timeout: 30000
keep-alive: true
# UDP配置
udp:
server:
host: 0.0.0.0
port: 8081
max-connections: 5000
buffer-size: 1024
# 通用配置
common:
message:
max-size: 1048576 # 1MB
encoding: UTF-8
security:
auth-required: true
token-expire: 3600 # 1小时
performance:
thread-pool-size: 50
queue-capacity: 2000
# 总结
协议适配器是物联网平台的重要组件,通过统一的接口设计和模块化的实现,支持多种协议的设备接入。每种协议适配器都针对其特点进行了优化,如MQTT的长连接管理、HTTP的无状态特性、CoAP的轻量级实现等。通过适配器管理器统一管理所有协议适配器,提供了灵活的协议扩展能力和高效的消息处理性能。