物联网协议适配器

# 物联网协议适配器

# 概述

协议适配器是物联网平台的核心组件,负责处理不同协议的设备接入、数据解析、消息路由等功能。本文档详细介绍MQTT、HTTP、CoAP、TCP/UDP等协议的适配实现。

# 架构设计

协议适配层
├── 协议接入层
│   ├── MQTT适配器
│   ├── HTTP适配器
│   ├── CoAP适配器
│   └── TCP/UDP适配器
├── 消息处理层
│   ├── 协议解析
│   ├── 数据转换
│   ├── 消息验证
│   └── 消息路由
├── 连接管理层
│   ├── 连接池管理
│   ├── 会话管理
│   ├── 心跳检测
│   └── 断线重连
└── 安全控制层
    ├── 协议认证
    ├── 数据加密
    ├── 访问控制
    └── 流量控制

# 协议适配器接口

# 通用协议适配器接口

/**
 * 协议适配器接口
 */
public interface ProtocolAdapter {
    
    /**
     * 获取协议类型
     */
    ProtocolType getProtocolType();
    
    /**
     * 启动适配器
     */
    void start() throws Exception;
    
    /**
     * 停止适配器
     */
    void stop() throws Exception;
    
    /**
     * 处理设备连接
     */
    void handleDeviceConnect(DeviceConnectEvent event);
    
    /**
     * 处理设备断开
     */
    void handleDeviceDisconnect(DeviceDisconnectEvent event);
    
    /**
     * 处理设备消息
     */
    void handleDeviceMessage(DeviceMessageEvent event);
    
    /**
     * 发送消息到设备
     */
    CompletableFuture<Boolean> sendMessageToDevice(String deviceId, Object message);
    
    /**
     * 获取连接状态
     */
    boolean isDeviceConnected(String deviceId);
    
    /**
     * 获取连接数量
     */
    int getConnectionCount();
    
    /**
     * 获取适配器状态
     */
    AdapterStatus getStatus();
}

/**
 * 协议类型枚举
 */
public enum ProtocolType {
    MQTT("MQTT", 1883),
    HTTP("HTTP", 80),
    HTTPS("HTTPS", 443),
    COAP("CoAP", 5683),
    TCP("TCP", 8080),
    UDP("UDP", 8081);
    
    private final String name;
    private final int defaultPort;
    
    ProtocolType(String name, int defaultPort) {
        this.name = name;
        this.defaultPort = defaultPort;
    }
    
    public String getName() {
        return name;
    }
    
    public int getDefaultPort() {
        return defaultPort;
    }
}

/**
 * 适配器状态枚举
 */
public enum AdapterStatus {
    STOPPED("已停止"),
    STARTING("启动中"),
    RUNNING("运行中"),
    STOPPING("停止中"),
    ERROR("错误");
    
    private final String description;
    
    AdapterStatus(String description) {
        this.description = description;
    }
    
    public String getDescription() {
        return description;
    }
}

# MQTT协议适配器

# MQTT适配器实现

@Component
@Slf4j
public class MqttProtocolAdapter implements ProtocolAdapter {
    
    @Autowired
    private DeviceAuthService deviceAuthService;
    
    @Autowired
    private DeviceManagementService deviceManagementService;
    
    @Autowired
    private MessageProcessingService messageProcessingService;
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    @Value("${iot.mqtt.broker.host:0.0.0.0}")
    private String brokerHost;
    
    @Value("${iot.mqtt.broker.port:1883}")
    private int brokerPort;
    
    @Value("${iot.mqtt.broker.max-connections:10000}")
    private int maxConnections;
    
    private MqttBroker mqttBroker;
    private final Map<String, MqttConnection> deviceConnections = new ConcurrentHashMap<>();
    private volatile AdapterStatus status = AdapterStatus.STOPPED;
    
    private final Counter connectionCounter;
    private final Counter messageCounter;
    private final Gauge activeConnectionsGauge;
    
    public MqttProtocolAdapter(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.connectionCounter = Counter.builder("iot.mqtt.connections.total")
            .description("MQTT连接总数")
            .register(meterRegistry);
        this.messageCounter = Counter.builder("iot.mqtt.messages.total")
            .description("MQTT消息总数")
            .register(meterRegistry);
        this.activeConnectionsGauge = Gauge.builder("iot.mqtt.connections.active")
            .description("活跃MQTT连接数")
            .register(meterRegistry, this, adapter -> adapter.getConnectionCount());
    }
    
    @Override
    public ProtocolType getProtocolType() {
        return ProtocolType.MQTT;
    }
    
    @Override
    public void start() throws Exception {
        if (status == AdapterStatus.RUNNING) {
            log.warn("MQTT适配器已经在运行中");
            return;
        }
        
        try {
            status = AdapterStatus.STARTING;
            
            // 创建MQTT Broker配置
            MqttBrokerConfig config = MqttBrokerConfig.builder()
                .host(brokerHost)
                .port(brokerPort)
                .maxConnections(maxConnections)
                .authenticator(this::authenticateDevice)
                .connectionHandler(this::handleConnection)
                .messageHandler(this::handleMessage)
                .disconnectionHandler(this::handleDisconnection)
                .build();
            
            // 启动MQTT Broker
            mqttBroker = new MqttBroker(config);
            mqttBroker.start();
            
            status = AdapterStatus.RUNNING;
            log.info("MQTT协议适配器启动成功, host: {}, port: {}", brokerHost, brokerPort);
            
        } catch (Exception e) {
            status = AdapterStatus.ERROR;
            log.error("MQTT协议适配器启动失败", e);
            throw e;
        }
    }
    
    @Override
    public void stop() throws Exception {
        if (status == AdapterStatus.STOPPED) {
            log.warn("MQTT适配器已经停止");
            return;
        }
        
        try {
            status = AdapterStatus.STOPPING;
            
            // 关闭所有连接
            deviceConnections.values().forEach(connection -> {
                try {
                    connection.close();
                } catch (Exception e) {
                    log.warn("关闭MQTT连接失败, deviceId: {}", connection.getDeviceId(), e);
                }
            });
            deviceConnections.clear();
            
            // 停止MQTT Broker
            if (mqttBroker != null) {
                mqttBroker.stop();
            }
            
            status = AdapterStatus.STOPPED;
            log.info("MQTT协议适配器停止成功");
            
        } catch (Exception e) {
            status = AdapterStatus.ERROR;
            log.error("MQTT协议适配器停止失败", e);
            throw e;
        }
    }
    
    @Override
    public void handleDeviceConnect(DeviceConnectEvent event) {
        String deviceId = event.getDeviceId();
        
        try {
            // 验证设备
            IoTDevice device = deviceManagementService.getDeviceById(deviceId);
            if (device == null) {
                log.warn("设备不存在, deviceId: {}", deviceId);
                return;
            }
            
            // 创建连接对象
            MqttConnection connection = new MqttConnection(
                deviceId, event.getClientId(), event.getRemoteAddress(), event.getConnectTime()
            );
            
            deviceConnections.put(deviceId, connection);
            
            // 更新设备状态
            deviceManagementService.updateDeviceStatus(deviceId, DeviceStatus.ONLINE);
            
            // 订阅设备主题
            subscribeDeviceTopics(deviceId);
            
            connectionCounter.increment(Tags.of("protocol", "mqtt", "event", "connect"));
            
            log.info("MQTT设备连接成功, deviceId: {}, clientId: {}", 
                deviceId, event.getClientId());
            
        } catch (Exception e) {
            log.error("处理MQTT设备连接失败, deviceId: {}", deviceId, e);
        }
    }
    
    @Override
    public void handleDeviceDisconnect(DeviceDisconnectEvent event) {
        String deviceId = event.getDeviceId();
        
        try {
            // 移除连接
            MqttConnection connection = deviceConnections.remove(deviceId);
            if (connection != null) {
                connection.close();
            }
            
            // 更新设备状态
            deviceManagementService.updateDeviceStatus(deviceId, DeviceStatus.OFFLINE);
            
            connectionCounter.increment(Tags.of("protocol", "mqtt", "event", "disconnect"));
            
            log.info("MQTT设备断开连接, deviceId: {}, reason: {}", 
                deviceId, event.getReason());
            
        } catch (Exception e) {
            log.error("处理MQTT设备断开失败, deviceId: {}", deviceId, e);
        }
    }
    
    @Override
    public void handleDeviceMessage(DeviceMessageEvent event) {
        String deviceId = event.getDeviceId();
        String topic = event.getTopic();
        String payload = event.getPayload();
        
        try {
            // 验证消息
            if (!validateMessage(deviceId, topic, payload)) {
                log.warn("MQTT消息验证失败, deviceId: {}, topic: {}", deviceId, topic);
                return;
            }
            
            // 解析消息类型
            MessageType messageType = parseMessageType(topic);
            
            // 处理不同类型的消息
            switch (messageType) {
                case DATA_REPORT:
                    handleDataReport(deviceId, topic, payload);
                    break;
                case EVENT_REPORT:
                    handleEventReport(deviceId, topic, payload);
                    break;
                case COMMAND_RESPONSE:
                    handleCommandResponse(deviceId, topic, payload);
                    break;
                case HEARTBEAT:
                    handleHeartbeat(deviceId, topic, payload);
                    break;
                default:
                    log.warn("未知的MQTT消息类型, deviceId: {}, topic: {}", deviceId, topic);
            }
            
            messageCounter.increment(Tags.of(
                "protocol", "mqtt",
                "message_type", messageType.name(),
                "device_id", deviceId
            ));
            
        } catch (Exception e) {
            log.error("处理MQTT设备消息失败, deviceId: {}, topic: {}", deviceId, topic, e);
        }
    }
    
    @Override
    public CompletableFuture<Boolean> sendMessageToDevice(String deviceId, Object message) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                MqttConnection connection = deviceConnections.get(deviceId);
                if (connection == null) {
                    log.warn("设备未连接, deviceId: {}", deviceId);
                    return false;
                }
                
                // 构建MQTT消息
                MqttMessage mqttMessage = buildMqttMessage(message);
                
                // 确定发送主题
                String topic = buildDownlinkTopic(deviceId, message);
                
                // 发送消息
                mqttBroker.publish(topic, mqttMessage);
                
                log.debug("MQTT消息发送成功, deviceId: {}, topic: {}", deviceId, topic);
                return true;
                
            } catch (Exception e) {
                log.error("MQTT消息发送失败, deviceId: {}", deviceId, e);
                return false;
            }
        });
    }
    
    @Override
    public boolean isDeviceConnected(String deviceId) {
        return deviceConnections.containsKey(deviceId);
    }
    
    @Override
    public int getConnectionCount() {
        return deviceConnections.size();
    }
    
    @Override
    public AdapterStatus getStatus() {
        return status;
    }
    
    /**
     * 设备认证
     */
    private boolean authenticateDevice(String clientId, String username, String password) {
        try {
            // 从clientId或username中提取deviceId
            String deviceId = extractDeviceId(clientId, username);
            
            if (StringUtils.isBlank(deviceId)) {
                log.warn("无法提取设备ID, clientId: {}, username: {}", clientId, username);
                return false;
            }
            
            // 验证设备认证信息
            AuthenticationResult result = deviceAuthService.authenticate(
                deviceId, username, password, ProtocolType.MQTT
            );
            
            if (result.isSuccess()) {
                log.debug("MQTT设备认证成功, deviceId: {}", deviceId);
                return true;
            } else {
                log.warn("MQTT设备认证失败, deviceId: {}, reason: {}", 
                    deviceId, result.getFailureReason());
                return false;
            }
            
        } catch (Exception e) {
            log.error("MQTT设备认证异常, clientId: {}", clientId, e);
            return false;
        }
    }
    
    /**
     * 处理连接
     */
    private void handleConnection(MqttConnectionEvent event) {
        String deviceId = extractDeviceId(event.getClientId(), event.getUsername());
        
        DeviceConnectEvent connectEvent = DeviceConnectEvent.builder()
            .deviceId(deviceId)
            .clientId(event.getClientId())
            .protocol(ProtocolType.MQTT)
            .remoteAddress(event.getRemoteAddress())
            .connectTime(LocalDateTime.now())
            .build();
        
        handleDeviceConnect(connectEvent);
    }
    
    /**
     * 处理消息
     */
    private void handleMessage(MqttMessageEvent event) {
        String deviceId = extractDeviceId(event.getClientId(), null);
        
        DeviceMessageEvent messageEvent = DeviceMessageEvent.builder()
            .deviceId(deviceId)
            .protocol(ProtocolType.MQTT)
            .topic(event.getTopic())
            .payload(new String(event.getPayload(), StandardCharsets.UTF_8))
            .qos(event.getQos())
            .timestamp(LocalDateTime.now())
            .build();
        
        handleDeviceMessage(messageEvent);
    }
    
    /**
     * 处理断开连接
     */
    private void handleDisconnection(MqttDisconnectionEvent event) {
        String deviceId = extractDeviceId(event.getClientId(), null);
        
        DeviceDisconnectEvent disconnectEvent = DeviceDisconnectEvent.builder()
            .deviceId(deviceId)
            .clientId(event.getClientId())
            .protocol(ProtocolType.MQTT)
            .reason(event.getReason())
            .disconnectTime(LocalDateTime.now())
            .build();
        
        handleDeviceDisconnect(disconnectEvent);
    }
    
    /**
     * 订阅设备主题
     */
    private void subscribeDeviceTopics(String deviceId) {
        try {
            // 数据上报主题
            String dataReportTopic = String.format("/device/%s/data/+", deviceId);
            mqttBroker.subscribe(dataReportTopic);
            
            // 事件上报主题
            String eventReportTopic = String.format("/device/%s/event/+", deviceId);
            mqttBroker.subscribe(eventReportTopic);
            
            // 指令响应主题
            String commandResponseTopic = String.format("/device/%s/command/response/+", deviceId);
            mqttBroker.subscribe(commandResponseTopic);
            
            // 心跳主题
            String heartbeatTopic = String.format("/device/%s/heartbeat", deviceId);
            mqttBroker.subscribe(heartbeatTopic);
            
            log.debug("设备主题订阅成功, deviceId: {}", deviceId);
            
        } catch (Exception e) {
            log.error("订阅设备主题失败, deviceId: {}", deviceId, e);
        }
    }
    
    /**
     * 验证消息
     */
    private boolean validateMessage(String deviceId, String topic, String payload) {
        // 检查设备ID
        if (StringUtils.isBlank(deviceId)) {
            return false;
        }
        
        // 检查主题格式
        if (!isValidTopic(topic)) {
            return false;
        }
        
        // 检查负载大小
        if (payload != null && payload.length() > 1024 * 1024) { // 1MB
            log.warn("MQTT消息负载过大, deviceId: {}, size: {}", deviceId, payload.length());
            return false;
        }
        
        return true;
    }
    
    /**
     * 解析消息类型
     */
    private MessageType parseMessageType(String topic) {
        if (topic.contains("/data/")) {
            return MessageType.DATA_REPORT;
        } else if (topic.contains("/event/")) {
            return MessageType.EVENT_REPORT;
        } else if (topic.contains("/command/response/")) {
            return MessageType.COMMAND_RESPONSE;
        } else if (topic.contains("/heartbeat")) {
            return MessageType.HEARTBEAT;
        } else {
            return MessageType.UNKNOWN;
        }
    }
    
    /**
     * 处理数据上报
     */
    private void handleDataReport(String deviceId, String topic, String payload) {
        try {
            // 解析数据类型
            String dataType = extractDataType(topic);
            
            // 构建设备数据对象
            DeviceDataMessage dataMessage = DeviceDataMessage.builder()
                .deviceId(deviceId)
                .dataType(dataType)
                .payload(payload)
                .protocol(ProtocolType.MQTT)
                .topic(topic)
                .timestamp(LocalDateTime.now())
                .build();
            
            // 异步处理数据
            messageProcessingService.processDeviceData(dataMessage);
            
        } catch (Exception e) {
            log.error("处理MQTT数据上报失败, deviceId: {}, topic: {}", deviceId, topic, e);
        }
    }
    
    /**
     * 处理事件上报
     */
    private void handleEventReport(String deviceId, String topic, String payload) {
        try {
            // 解析事件类型
            String eventType = extractEventType(topic);
            
            // 构建设备事件对象
            DeviceEventMessage eventMessage = DeviceEventMessage.builder()
                .deviceId(deviceId)
                .eventType(eventType)
                .payload(payload)
                .protocol(ProtocolType.MQTT)
                .topic(topic)
                .timestamp(LocalDateTime.now())
                .build();
            
            // 异步处理事件
            messageProcessingService.processDeviceEvent(eventMessage);
            
        } catch (Exception e) {
            log.error("处理MQTT事件上报失败, deviceId: {}, topic: {}", deviceId, topic, e);
        }
    }
    
    /**
     * 处理指令响应
     */
    private void handleCommandResponse(String deviceId, String topic, String payload) {
        try {
            // 解析指令ID
            String commandId = extractCommandId(topic);
            
            // 构建指令响应对象
            CommandResponseMessage responseMessage = CommandResponseMessage.builder()
                .deviceId(deviceId)
                .commandId(commandId)
                .payload(payload)
                .protocol(ProtocolType.MQTT)
                .topic(topic)
                .timestamp(LocalDateTime.now())
                .build();
            
            // 异步处理指令响应
            messageProcessingService.processCommandResponse(responseMessage);
            
        } catch (Exception e) {
            log.error("处理MQTT指令响应失败, deviceId: {}, topic: {}", deviceId, topic, e);
        }
    }
    
    /**
     * 处理心跳
     */
    private void handleHeartbeat(String deviceId, String topic, String payload) {
        try {
            // 更新设备最后活跃时间
            deviceManagementService.updateDeviceLastActiveTime(deviceId, LocalDateTime.now());
            
            log.debug("收到设备心跳, deviceId: {}", deviceId);
            
        } catch (Exception e) {
            log.error("处理MQTT心跳失败, deviceId: {}", deviceId, e);
        }
    }
    
    /**
     * 构建MQTT消息
     */
    private MqttMessage buildMqttMessage(Object message) {
        try {
            String payload;
            
            if (message instanceof String) {
                payload = (String) message;
            } else {
                ObjectMapper mapper = new ObjectMapper();
                payload = mapper.writeValueAsString(message);
            }
            
            return MqttMessage.builder()
                .payload(payload.getBytes(StandardCharsets.UTF_8))
                .qos(1) // 至少一次
                .retained(false)
                .build();
            
        } catch (Exception e) {
            throw new RuntimeException("构建MQTT消息失败", e);
        }
    }
    
    /**
     * 构建下行主题
     */
    private String buildDownlinkTopic(String deviceId, Object message) {
        if (message instanceof DeviceCommand) {
            DeviceCommand command = (DeviceCommand) message;
            return String.format("/device/%s/command/%s", deviceId, command.getCommandType().name().toLowerCase());
        } else {
            return String.format("/device/%s/message", deviceId);
        }
    }
    
    /**
     * 提取设备ID
     */
    private String extractDeviceId(String clientId, String username) {
        // 优先从username中提取
        if (StringUtils.isNotBlank(username)) {
            return username;
        }
        
        // 从clientId中提取
        if (StringUtils.isNotBlank(clientId)) {
            // 假设clientId格式为: device_{deviceId}
            if (clientId.startsWith("device_")) {
                return clientId.substring(7);
            }
            return clientId;
        }
        
        return null;
    }
    
    /**
     * 检查主题是否有效
     */
    private boolean isValidTopic(String topic) {
        if (StringUtils.isBlank(topic)) {
            return false;
        }
        
        // 检查主题格式: /device/{deviceId}/{messageType}/...
        String[] parts = topic.split("/");
        return parts.length >= 4 && "device".equals(parts[1]);
    }
    
    /**
     * 提取数据类型
     */
    private String extractDataType(String topic) {
        // 主题格式: /device/{deviceId}/data/{dataType}
        String[] parts = topic.split("/");
        return parts.length >= 5 ? parts[4] : "unknown";
    }
    
    /**
     * 提取事件类型
     */
    private String extractEventType(String topic) {
        // 主题格式: /device/{deviceId}/event/{eventType}
        String[] parts = topic.split("/");
        return parts.length >= 5 ? parts[4] : "unknown";
    }
    
    /**
     * 提取指令ID
     */
    private String extractCommandId(String topic) {
        // 主题格式: /device/{deviceId}/command/response/{commandId}
        String[] parts = topic.split("/");
        return parts.length >= 6 ? parts[5] : "unknown";
    }
}

/**
 * MQTT连接对象
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqttConnection {
    private String deviceId;
    private String clientId;
    private String remoteAddress;
    private LocalDateTime connectTime;
    private LocalDateTime lastActiveTime;
    private volatile boolean active = true;
    
    public void close() {
        active = false;
    }
    
    public void updateLastActiveTime() {
        lastActiveTime = LocalDateTime.now();
    }
}

# HTTP协议适配器

# HTTP适配器实现

@RestController
@RequestMapping("/api/device")
@Slf4j
public class HttpProtocolAdapter implements ProtocolAdapter {
    
    @Autowired
    private DeviceAuthService deviceAuthService;
    
    @Autowired
    private DeviceManagementService deviceManagementService;
    
    @Autowired
    private MessageProcessingService messageProcessingService;
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    private volatile AdapterStatus status = AdapterStatus.STOPPED;
    private final Map<String, HttpSession> deviceSessions = new ConcurrentHashMap<>();;
    
    private final Counter requestCounter;
    private final Timer requestTimer;
    
    public HttpProtocolAdapter(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.requestCounter = Counter.builder("iot.http.requests.total")
            .description("HTTP请求总数")
            .register(meterRegistry);
        this.requestTimer = Timer.builder("iot.http.request.duration")
            .description("HTTP请求耗时")
            .register(meterRegistry);
    }
    
    @Override
    public ProtocolType getProtocolType() {
        return ProtocolType.HTTP;
    }
    
    @Override
    public void start() throws Exception {
        status = AdapterStatus.RUNNING;
        log.info("HTTP协议适配器启动成功");
    }
    
    @Override
    public void stop() throws Exception {
        status = AdapterStatus.STOPPED;
        deviceSessions.clear();
        log.info("HTTP协议适配器停止成功");
    }
    
    /**
     * 设备数据上报
     */
    @PostMapping("/{deviceId}/data")
    public ResponseEntity<ApiResponse> reportData(
            @PathVariable String deviceId,
            @RequestParam(required = false) String dataType,
            @RequestBody String payload,
            HttpServletRequest request) {
        
        Timer.Sample sample = Timer.start(meterRegistry);
        
        try {
            // 认证设备
            if (!authenticateDevice(deviceId, request)) {
                requestCounter.increment(Tags.of("protocol", "http", "endpoint", "data", "result", "auth_failed"));
                return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
                    .body(ApiResponse.failure("设备认证失败"));
            }
            
            // 更新会话
            updateSession(deviceId, request);
            
            // 构建数据消息
            DeviceDataMessage dataMessage = DeviceDataMessage.builder()
                .deviceId(deviceId)
                .dataType(dataType != null ? dataType : "default")
                .payload(payload)
                .protocol(ProtocolType.HTTP)
                .remoteAddress(getRemoteAddress(request))
                .timestamp(LocalDateTime.now())
                .build();
            
            // 异步处理数据
            messageProcessingService.processDeviceData(dataMessage);
            
            requestCounter.increment(Tags.of("protocol", "http", "endpoint", "data", "result", "success"));
            
            return ResponseEntity.ok(ApiResponse.success("数据上报成功"));
            
        } catch (Exception e) {
            log.error("HTTP数据上报失败, deviceId: {}", deviceId, e);
            requestCounter.increment(Tags.of("protocol", "http", "endpoint", "data", "result", "error"));
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body(ApiResponse.failure("数据上报失败: " + e.getMessage()));
        } finally {
            sample.stop(requestTimer.tag("endpoint", "data"));
        }
    }
    
    /**
     * 设备事件上报
     */
    @PostMapping("/{deviceId}/event")
    public ResponseEntity<ApiResponse> reportEvent(
            @PathVariable String deviceId,
            @RequestParam String eventType,
            @RequestBody String payload,
            HttpServletRequest request) {
        
        Timer.Sample sample = Timer.start(meterRegistry);
        
        try {
            // 认证设备
            if (!authenticateDevice(deviceId, request)) {
                requestCounter.increment(Tags.of("protocol", "http", "endpoint", "event", "result", "auth_failed"));
                return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
                    .body(ApiResponse.failure("设备认证失败"));
            }
            
            // 更新会话
            updateSession(deviceId, request);
            
            // 构建事件消息
            DeviceEventMessage eventMessage = DeviceEventMessage.builder()
                .deviceId(deviceId)
                .eventType(eventType)
                .payload(payload)
                .protocol(ProtocolType.HTTP)
                .remoteAddress(getRemoteAddress(request))
                .timestamp(LocalDateTime.now())
                .build();
            
            // 异步处理事件
            messageProcessingService.processDeviceEvent(eventMessage);
            
            requestCounter.increment(Tags.of("protocol", "http", "endpoint", "event", "result", "success"));
            
            return ResponseEntity.ok(ApiResponse.success("事件上报成功"));
            
        } catch (Exception e) {
            log.error("HTTP事件上报失败, deviceId: {}", deviceId, e);
            requestCounter.increment(Tags.of("protocol", "http", "endpoint", "event", "result", "error"));
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body(ApiResponse.failure("事件上报失败: " + e.getMessage()));
        } finally {
            sample.stop(requestTimer.tag("endpoint", "event"));
        }
    }
    
    /**
     * 指令响应
     */
    @PostMapping("/{deviceId}/command/{commandId}/response")
    public ResponseEntity<ApiResponse> commandResponse(
            @PathVariable String deviceId,
            @PathVariable String commandId,
            @RequestBody String payload,
            HttpServletRequest request) {
        
        Timer.Sample sample = Timer.start(meterRegistry);
        
        try {
            // 认证设备
            if (!authenticateDevice(deviceId, request)) {
                requestCounter.increment(Tags.of("protocol", "http", "endpoint", "command_response", "result", "auth_failed"));
                return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
                    .body(ApiResponse.failure("设备认证失败"));
            }
            
            // 更新会话
            updateSession(deviceId, request);
            
            // 构建指令响应消息
            CommandResponseMessage responseMessage = CommandResponseMessage.builder()
                .deviceId(deviceId)
                .commandId(commandId)
                .payload(payload)
                .protocol(ProtocolType.HTTP)
                .remoteAddress(getRemoteAddress(request))
                .timestamp(LocalDateTime.now())
                .build();
            
            // 异步处理指令响应
            messageProcessingService.processCommandResponse(responseMessage);
            
            requestCounter.increment(Tags.of("protocol", "http", "endpoint", "command_response", "result", "success"));
            
            return ResponseEntity.ok(ApiResponse.success("指令响应成功"));
            
        } catch (Exception e) {
            log.error("HTTP指令响应失败, deviceId: {}, commandId: {}", deviceId, commandId, e);
            requestCounter.increment(Tags.of("protocol", "http", "endpoint", "command_response", "result", "error"));
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body(ApiResponse.failure("指令响应失败: " + e.getMessage()));
        } finally {
            sample.stop(requestTimer.tag("endpoint", "command_response"));
        }
    }
    
    /**
     * 设备心跳
     */
    @PostMapping("/{deviceId}/heartbeat")
    public ResponseEntity<ApiResponse> heartbeat(
            @PathVariable String deviceId,
            HttpServletRequest request) {
        
        Timer.Sample sample = Timer.start(meterRegistry);
        
        try {
            // 认证设备
            if (!authenticateDevice(deviceId, request)) {
                requestCounter.increment(Tags.of("protocol", "http", "endpoint", "heartbeat", "result", "auth_failed"));
                return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
                    .body(ApiResponse.failure("设备认证失败"));
            }
            
            // 更新会话
            updateSession(deviceId, request);
            
            // 更新设备最后活跃时间
            deviceManagementService.updateDeviceLastActiveTime(deviceId, LocalDateTime.now());
            
            requestCounter.increment(Tags.of("protocol", "http", "endpoint", "heartbeat", "result", "success"));
            
            return ResponseEntity.ok(ApiResponse.success("心跳成功"));
            
        } catch (Exception e) {
            log.error("HTTP心跳失败, deviceId: {}", deviceId, e);
            requestCounter.increment(Tags.of("protocol", "http", "endpoint", "heartbeat", "result", "error"));
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body(ApiResponse.failure("心跳失败: " + e.getMessage()));
        } finally {
            sample.stop(requestTimer.tag("endpoint", "heartbeat"));
        }
    }
    
    /**
     * 获取待执行指令
     */
    @GetMapping("/{deviceId}/commands")
    public ResponseEntity<ApiResponse<List<DeviceCommand>>> getPendingCommands(
            @PathVariable String deviceId,
            HttpServletRequest request) {
        
        Timer.Sample sample = Timer.start(meterRegistry);
        
        try {
            // 认证设备
            if (!authenticateDevice(deviceId, request)) {
                requestCounter.increment(Tags.of("protocol", "http", "endpoint", "get_commands", "result", "auth_failed"));
                return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
                    .body(ApiResponse.failure("设备认证失败"));
            }
            
            // 更新会话
            updateSession(deviceId, request);
            
            // 获取待执行指令
            List<DeviceCommand> pendingCommands = messageProcessingService.getPendingCommands(deviceId);
            
            requestCounter.increment(Tags.of("protocol", "http", "endpoint", "get_commands", "result", "success"));
            
            return ResponseEntity.ok(ApiResponse.success(pendingCommands));
            
        } catch (Exception e) {
            log.error("获取待执行指令失败, deviceId: {}", deviceId, e);
            requestCounter.increment(Tags.of("protocol", "http", "endpoint", "get_commands", "result", "error"));
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body(ApiResponse.failure("获取指令失败: " + e.getMessage()));
        } finally {
            sample.stop(requestTimer.tag("endpoint", "get_commands"));
        }
    }
    
    @Override
    public void handleDeviceConnect(DeviceConnectEvent event) {
        // HTTP协议无需处理连接事件
    }
    
    @Override
    public void handleDeviceDisconnect(DeviceDisconnectEvent event) {
        // HTTP协议无需处理断开事件
    }
    
    @Override
    public void handleDeviceMessage(DeviceMessageEvent event) {
        // HTTP协议通过REST接口处理消息
    }
    
    @Override
    public CompletableFuture<Boolean> sendMessageToDevice(String deviceId, Object message) {
        // HTTP协议采用拉取模式,设备主动获取指令
        return CompletableFuture.completedFuture(true);
    }
    
    @Override
    public boolean isDeviceConnected(String deviceId) {
        HttpSession session = deviceSessions.get(deviceId);
        if (session == null) {
            return false;
        }
        
        // 检查会话是否过期(5分钟无活动)
        return session.getLastActiveTime().isAfter(LocalDateTime.now().minusMinutes(5));
    }
    
    @Override
    public int getConnectionCount() {
        // 清理过期会话
        LocalDateTime expireTime = LocalDateTime.now().minusMinutes(5);
        deviceSessions.entrySet().removeIf(entry -> 
            entry.getValue().getLastActiveTime().isBefore(expireTime));
        
        return deviceSessions.size();
    }
    
    @Override
    public AdapterStatus getStatus() {
        return status;
    }
    
    /**
     * 认证设备
     */
    private boolean authenticateDevice(String deviceId, HttpServletRequest request) {
        try {
            // 从请求头中获取认证信息
            String authHeader = request.getHeader("Authorization");
            if (StringUtils.isBlank(authHeader)) {
                return false;
            }
            
            // 解析认证信息
            String[] authParts = parseAuthHeader(authHeader);
            if (authParts == null || authParts.length != 2) {
                return false;
            }
            
            String username = authParts[0];
            String password = authParts[1];
            
            // 验证设备认证信息
            AuthenticationResult result = deviceAuthService.authenticate(
                deviceId, username, password, ProtocolType.HTTP
            );
            
            return result.isSuccess();
            
        } catch (Exception e) {
            log.error("HTTP设备认证异常, deviceId: {}", deviceId, e);
            return false;
        }
    }
    
    /**
     * 解析认证头
     */
    private String[] parseAuthHeader(String authHeader) {
        if (authHeader.startsWith("Basic ")) {
            // Basic认证
            String encoded = authHeader.substring(6);
            String decoded = new String(Base64.getDecoder().decode(encoded), StandardCharsets.UTF_8);
            return decoded.split(":", 2);
        } else if (authHeader.startsWith("Bearer ")) {
            // Token认证
            String token = authHeader.substring(7);
            // TODO: 解析Token获取用户名和密码
            return new String[]{"token", token};
        }
        
        return null;
    }
    
    /**
     * 更新会话
     */
    private void updateSession(String deviceId, HttpServletRequest request) {
        HttpSession session = deviceSessions.computeIfAbsent(deviceId, k -> 
            HttpSession.builder()
                .deviceId(deviceId)
                .remoteAddress(getRemoteAddress(request))
                .createTime(LocalDateTime.now())
                .build()
        );
        
        session.updateLastActiveTime();
    }
    
    /**
     * 获取远程地址
     */
    private String getRemoteAddress(HttpServletRequest request) {
        String xForwardedFor = request.getHeader("X-Forwarded-For");
        if (StringUtils.isNotBlank(xForwardedFor)) {
            return xForwardedFor.split(",")[0].trim();
        }
        
        String xRealIp = request.getHeader("X-Real-IP");
        if (StringUtils.isNotBlank(xRealIp)) {
            return xRealIp;
        }
        
        return request.getRemoteAddr();
    }
}

/**
 * HTTP会话对象
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class HttpSession {
    private String deviceId;
    private String remoteAddress;
    private LocalDateTime createTime;
    private LocalDateTime lastActiveTime;
    
    public void updateLastActiveTime() {
        lastActiveTime = LocalDateTime.now();
    }
}

# 协议适配器管理器

# 适配器管理器实现

@Service
@Slf4j
public class ProtocolAdapterManager {
    
    private final Map<ProtocolType, ProtocolAdapter> adapters = new ConcurrentHashMap<>();
    
    @Autowired
    private List<ProtocolAdapter> adapterList;
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    private final Gauge adapterStatusGauge;
    
    public ProtocolAdapterManager(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.adapterStatusGauge = Gauge.builder("iot.adapter.status")
            .description("协议适配器状态")
            .register(meterRegistry, this, manager -> {
                return manager.getRunningAdapterCount();
            });
    }
    
    @PostConstruct
    public void initialize() {
        // 注册所有适配器
        for (ProtocolAdapter adapter : adapterList) {
            adapters.put(adapter.getProtocolType(), adapter);
            log.info("注册协议适配器: {}", adapter.getProtocolType());
        }
        
        // 启动所有适配器
        startAllAdapters();
    }
    
    @PreDestroy
    public void destroy() {
        // 停止所有适配器
        stopAllAdapters();
    }
    
    /**
     * 启动所有适配器
     */
    public void startAllAdapters() {
        for (ProtocolAdapter adapter : adapters.values()) {
            try {
                adapter.start();
                log.info("协议适配器启动成功: {}", adapter.getProtocolType());
            } catch (Exception e) {
                log.error("协议适配器启动失败: {}", adapter.getProtocolType(), e);
            }
        }
    }
    
    /**
     * 停止所有适配器
     */
    public void stopAllAdapters() {
        for (ProtocolAdapter adapter : adapters.values()) {
            try {
                adapter.stop();
                log.info("协议适配器停止成功: {}", adapter.getProtocolType());
            } catch (Exception e) {
                log.error("协议适配器停止失败: {}", adapter.getProtocolType(), e);
            }
        }
    }
    
    /**
     * 启动指定适配器
     */
    public void startAdapter(ProtocolType protocolType) throws Exception {
        ProtocolAdapter adapter = adapters.get(protocolType);
        if (adapter == null) {
            throw new IllegalArgumentException("不支持的协议类型: " + protocolType);
        }
        
        adapter.start();
        log.info("协议适配器启动成功: {}", protocolType);
    }
    
    /**
     * 停止指定适配器
     */
    public void stopAdapter(ProtocolType protocolType) throws Exception {
        ProtocolAdapter adapter = adapters.get(protocolType);
        if (adapter == null) {
            throw new IllegalArgumentException("不支持的协议类型: " + protocolType);
        }
        
        adapter.stop();
        log.info("协议适配器停止成功: {}", protocolType);
    }
    
    /**
     * 获取适配器
     */
    public ProtocolAdapter getAdapter(ProtocolType protocolType) {
        return adapters.get(protocolType);
    }
    
    /**
     * 发送消息到设备
     */
    public CompletableFuture<Boolean> sendMessageToDevice(String deviceId, Object message, ProtocolType protocolType) {
        ProtocolAdapter adapter = adapters.get(protocolType);
        if (adapter == null) {
            return CompletableFuture.completedFuture(false);
        }
        
        return adapter.sendMessageToDevice(deviceId, message);
    }
    
    /**
     * 检查设备是否连接
     */
    public boolean isDeviceConnected(String deviceId, ProtocolType protocolType) {
        ProtocolAdapter adapter = adapters.get(protocolType);
        if (adapter == null) {
            return false;
        }
        
        return adapter.isDeviceConnected(deviceId);
    }
    
    /**
     * 获取适配器状态
     */
    public Map<ProtocolType, AdapterStatus> getAdapterStatuses() {
        Map<ProtocolType, AdapterStatus> statuses = new HashMap<>();
        for (Map.Entry<ProtocolType, ProtocolAdapter> entry : adapters.entrySet()) {
            statuses.put(entry.getKey(), entry.getValue().getStatus());
        }
        return statuses;
    }
    
    /**
     * 获取连接统计
     */
    public Map<ProtocolType, Integer> getConnectionCounts() {
        Map<ProtocolType, Integer> counts = new HashMap<>();
        for (Map.Entry<ProtocolType, ProtocolAdapter> entry : adapters.entrySet()) {
            counts.put(entry.getKey(), entry.getValue().getConnectionCount());
        }
        return counts;
    }
    
    /**
     * 获取运行中的适配器数量
     */
    public int getRunningAdapterCount() {
        return (int) adapters.values().stream()
            .filter(adapter -> adapter.getStatus() == AdapterStatus.RUNNING)
            .count();
    }
    
    /**
     * 获取总连接数
     */
    public int getTotalConnectionCount() {
        return adapters.values().stream()
            .mapToInt(ProtocolAdapter::getConnectionCount)
            .sum();
    }
}

# 消息类型定义

# 消息类型枚举

/**
 * 消息类型枚举
 */
public enum MessageType {
    DATA_REPORT("数据上报"),
    EVENT_REPORT("事件上报"),
    COMMAND_RESPONSE("指令响应"),
    HEARTBEAT("心跳"),
    UNKNOWN("未知");
    
    private final String description;
    
    MessageType(String description) {
        this.description = description;
    }
    
    public String getDescription() {
        return description;
    }
}

# 设备消息事件

/**
 * 设备连接事件
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceConnectEvent {
    private String deviceId;
    private String clientId;
    private ProtocolType protocol;
    private String remoteAddress;
    private LocalDateTime connectTime;
    private Map<String, Object> attributes;
}

/**
 * 设备断开事件
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceDisconnectEvent {
    private String deviceId;
    private String clientId;
    private ProtocolType protocol;
    private String reason;
    private LocalDateTime disconnectTime;
    private Map<String, Object> attributes;
}

/**
 * 设备消息事件
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceMessageEvent {
    private String deviceId;
    private ProtocolType protocol;
    private String topic;
    private String payload;
    private Integer qos;
    private String remoteAddress;
    private LocalDateTime timestamp;
    private Map<String, Object> attributes;
}

/**
 * 设备数据消息
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceDataMessage {
    private String deviceId;
    private String dataType;
    private String payload;
    private ProtocolType protocol;
    private String topic;
    private String remoteAddress;
    private LocalDateTime timestamp;
    private Map<String, Object> attributes;
}

/**
 * 设备事件消息
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceEventMessage {
    private String deviceId;
    private String eventType;
    private String payload;
    private ProtocolType protocol;
    private String topic;
    private String remoteAddress;
    private LocalDateTime timestamp;
    private Map<String, Object> attributes;
}

/**
 * 指令响应消息
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CommandResponseMessage {
    private String deviceId;
    private String commandId;
    private String payload;
    private ProtocolType protocol;
    private String topic;
    private String remoteAddress;
    private LocalDateTime timestamp;
    private Map<String, Object> attributes;
}

# 配置文件

# 协议适配器配置

# application.yml
iot:
  # MQTT配置
  mqtt:
    broker:
      host: 0.0.0.0
      port: 1883
      max-connections: 10000
      keep-alive: 60
      clean-session: true
    topics:
      data-report: "/device/{deviceId}/data/{dataType}"
      event-report: "/device/{deviceId}/event/{eventType}"
      command-response: "/device/{deviceId}/command/response/{commandId}"
      heartbeat: "/device/{deviceId}/heartbeat"
      command-downlink: "/device/{deviceId}/command/{commandType}"
  
  # HTTP配置
  http:
    server:
      port: 8080
      max-connections: 5000
      connection-timeout: 30000
      read-timeout: 30000
    endpoints:
      data-report: "/api/device/{deviceId}/data"
      event-report: "/api/device/{deviceId}/event"
      command-response: "/api/device/{deviceId}/command/{commandId}/response"
      heartbeat: "/api/device/{deviceId}/heartbeat"
      get-commands: "/api/device/{deviceId}/commands"
  
  # CoAP配置
  coap:
    server:
      host: 0.0.0.0
      port: 5683
      max-connections: 5000
    resources:
      data-report: "/device/{deviceId}/data"
      event-report: "/device/{deviceId}/event"
      command-response: "/device/{deviceId}/command/{commandId}/response"
      heartbeat: "/device/{deviceId}/heartbeat"
  
  # TCP配置
  tcp:
    server:
      host: 0.0.0.0
      port: 8080
      max-connections: 5000
      so-timeout: 30000
      keep-alive: true
  
  # UDP配置
  udp:
    server:
      host: 0.0.0.0
      port: 8081
      max-connections: 5000
      buffer-size: 1024
  
  # 通用配置
  common:
    message:
      max-size: 1048576 # 1MB
      encoding: UTF-8
    security:
      auth-required: true
      token-expire: 3600 # 1小时
    performance:
      thread-pool-size: 50
      queue-capacity: 2000

# 总结

协议适配器是物联网平台的重要组件,通过统一的接口设计和模块化的实现,支持多种协议的设备接入。每种协议适配器都针对其特点进行了优化,如MQTT的长连接管理、HTTP的无状态特性、CoAP的轻量级实现等。通过适配器管理器统一管理所有协议适配器,提供了灵活的协议扩展能力和高效的消息处理性能。