MQTT设备接入层详细设计与实现

# MQTT设备接入层详细设计与实现

# 故事背景

想象一下,你是一家智能家居公司的技术负责人。公司有成千上万的智能设备:温度传感器、智能灯泡、空调控制器等。这些设备就像是分布在全国各地的"小助手",它们需要时刻与总部的"大脑"(服务器)保持联系,汇报自己的状态,接收控制指令。

MQTT就像是这些设备与服务器之间的"邮政系统",它轻量、可靠,特别适合物联网设备使用。让我们一起来构建这个"邮政系统"吧!

# MQTT协议原理

# 什么是MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,专为低带宽、高延迟或不可靠的网络环境设计。

# 核心概念

  1. Broker(代理):消息中转站,就像邮局
  2. Publisher(发布者):发送消息的设备,就像寄信人
  3. Subscriber(订阅者):接收消息的设备,就像收信人
  4. Topic(主题):消息的分类标签,就像邮件地址

# MQTT工作流程

设备A (温度传感器) ----发布温度数据----> MQTT Broker ----推送----> 服务器
                                        |
                                        ----推送----> 手机APP

# 系统架构设计

# 整体架构

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   设备层        │    │   接入层        │    │   业务层        │
│                 │    │                 │    │                 │
│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
│ │ IoT设备     │ │    │ │ MQTT Broker │ │    │ │ 设备管理    │ │
│ │ - 传感器    │ │◄──►│ │ - 连接管理  │ │◄──►│ │ - 数据处理  │ │
│ │ - 执行器    │ │    │ │ - 消息路由  │ │    │ │ - 业务逻辑  │ │
│ │ - 网关      │ │    │ │ - 认证授权  │ │    │ │ - 数据存储  │ │
│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
└─────────────────┘    └─────────────────┘    └─────────────────┘

# 核心实体设计

# 1. MQTT连接实体

/**
 * MQTT连接信息实体
 * 就像是设备的"身份证",记录设备的连接信息
 */
@Entity
@Table(name = "mqtt_connection")
public class MqttConnection {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    /**
     * 客户端ID - 设备的唯一标识
     * 就像每个人的身份证号码一样唯一
     */
    @Column(name = "client_id", unique = true, nullable = false)
    private String clientId;
    
    /**
     * 设备ID - 关联的设备
     */
    @Column(name = "device_id", nullable = false)
    private String deviceId;
    
    /**
     * 连接状态:CONNECTED, DISCONNECTED, CONNECTING
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "status")
    private ConnectionStatus status;
    
    /**
     * 最后心跳时间 - 设备最后一次"报平安"的时间
     */
    @Column(name = "last_heartbeat")
    private LocalDateTime lastHeartbeat;
    
    /**
     * 连接时间
     */
    @Column(name = "connect_time")
    private LocalDateTime connectTime;
    
    /**
     * 客户端IP地址
     */
    @Column(name = "client_ip")
    private String clientIp;
    
    /**
     * 保活时间(秒)
     */
    @Column(name = "keep_alive")
    private Integer keepAlive;
    
    // 构造函数、getter、setter省略...
}

/**
 * 连接状态枚举
 */
public enum ConnectionStatus {
    CONNECTED("已连接"),
    DISCONNECTED("已断开"),
    CONNECTING("连接中");
    
    private final String description;
    
    ConnectionStatus(String description) {
        this.description = description;
    }
    
    public String getDescription() {
        return description;
    }
}

# 2. MQTT消息实体

/**
 * MQTT消息实体
 * 记录设备发送和接收的每一条消息,就像聊天记录
 */
@Entity
@Table(name = "mqtt_message")
public class MqttMessage {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    /**
     * 消息ID - 每条消息的唯一标识
     */
    @Column(name = "message_id", unique = true)
    private String messageId;
    
    /**
     * 主题 - 消息的"收件地址"
     */
    @Column(name = "topic", nullable = false)
    private String topic;
    
    /**
     * 消息内容 - 实际要传输的数据
     */
    @Lob
    @Column(name = "payload")
    private String payload;
    
    /**
     * 服务质量等级:0-最多一次,1-至少一次,2-恰好一次
     */
    @Column(name = "qos")
    private Integer qos;
    
    /**
     * 是否保留消息
     */
    @Column(name = "retained")
    private Boolean retained;
    
    /**
     * 发送者客户端ID
     */
    @Column(name = "sender_client_id")
    private String senderClientId;
    
    /**
     * 消息方向:INBOUND(设备到服务器),OUTBOUND(服务器到设备)
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "direction")
    private MessageDirection direction;
    
    /**
     * 创建时间
     */
    @Column(name = "create_time")
    private LocalDateTime createTime;
    
    // 构造函数、getter、setter省略...
}

public enum MessageDirection {
    INBOUND("入站"),   // 设备发送给服务器
    OUTBOUND("出站");  // 服务器发送给设备
    
    private final String description;
    
    MessageDirection(String description) {
        this.description = description;
    }
}

# 核心服务实现

# 1. MQTT连接管理服务

/**
 * MQTT连接管理服务
 * 就像是一个"接待员",负责管理所有设备的连接
 */
@Service
@Slf4j
public class MqttConnectionService {
    
    @Autowired
    private MqttConnectionRepository connectionRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 设备连接时调用
     * 就像客人到酒店办理入住手续
     */
    public void onDeviceConnect(String clientId, String deviceId, String clientIp) {
        log.info("设备连接:clientId={}, deviceId={}, ip={}", clientId, deviceId, clientIp);
        
        try {
            // 1. 查找现有连接
            MqttConnection existingConnection = connectionRepository.findByClientId(clientId);
            
            if (existingConnection != null) {
                // 更新现有连接
                existingConnection.setStatus(ConnectionStatus.CONNECTED);
                existingConnection.setConnectTime(LocalDateTime.now());
                existingConnection.setLastHeartbeat(LocalDateTime.now());
                existingConnection.setClientIp(clientIp);
            } else {
                // 创建新连接
                existingConnection = new MqttConnection();
                existingConnection.setClientId(clientId);
                existingConnection.setDeviceId(deviceId);
                existingConnection.setStatus(ConnectionStatus.CONNECTED);
                existingConnection.setConnectTime(LocalDateTime.now());
                existingConnection.setLastHeartbeat(LocalDateTime.now());
                existingConnection.setClientIp(clientIp);
                existingConnection.setKeepAlive(60); // 默认60秒心跳
            }
            
            // 2. 保存到数据库
            connectionRepository.save(existingConnection);
            
            // 3. 缓存到Redis,方便快速查询
            String cacheKey = "mqtt:connection:" + clientId;
            redisTemplate.opsForValue().set(cacheKey, existingConnection, Duration.ofHours(1));
            
            // 4. 更新设备在线状态
            updateDeviceOnlineStatus(deviceId, true);
            
            log.info("设备连接成功:{}", clientId);
            
        } catch (Exception e) {
            log.error("处理设备连接失败:clientId={}", clientId, e);
            throw new RuntimeException("设备连接处理失败", e);
        }
    }
    
    /**
     * 设备断开连接时调用
     * 就像客人退房离开酒店
     */
    public void onDeviceDisconnect(String clientId) {
        log.info("设备断开连接:{}", clientId);
        
        try {
            // 1. 更新数据库中的连接状态
            MqttConnection connection = connectionRepository.findByClientId(clientId);
            if (connection != null) {
                connection.setStatus(ConnectionStatus.DISCONNECTED);
                connectionRepository.save(connection);
                
                // 2. 更新设备离线状态
                updateDeviceOnlineStatus(connection.getDeviceId(), false);
            }
            
            // 3. 清除Redis缓存
            String cacheKey = "mqtt:connection:" + clientId;
            redisTemplate.delete(cacheKey);
            
            log.info("设备断开连接处理完成:{}", clientId);
            
        } catch (Exception e) {
            log.error("处理设备断开连接失败:clientId={}", clientId, e);
        }
    }
    
    /**
     * 更新心跳时间
     * 就像设备定期"报平安"
     */
    public void updateHeartbeat(String clientId) {
        try {
            // 1. 先从Redis缓存获取
            String cacheKey = "mqtt:connection:" + clientId;
            MqttConnection connection = (MqttConnection) redisTemplate.opsForValue().get(cacheKey);
            
            if (connection == null) {
                // 缓存中没有,从数据库获取
                connection = connectionRepository.findByClientId(clientId);
            }
            
            if (connection != null) {
                connection.setLastHeartbeat(LocalDateTime.now());
                
                // 2. 更新数据库(异步)
                CompletableFuture.runAsync(() -> {
                    connectionRepository.save(connection);
                });
                
                // 3. 更新缓存
                redisTemplate.opsForValue().set(cacheKey, connection, Duration.ofHours(1));
            }
            
        } catch (Exception e) {
            log.error("更新心跳失败:clientId={}", clientId, e);
        }
    }
    
    /**
     * 获取在线设备列表
     */
    public List<MqttConnection> getOnlineDevices() {
        return connectionRepository.findByStatus(ConnectionStatus.CONNECTED);
    }
    
    /**
     * 检查设备是否在线
     */
    public boolean isDeviceOnline(String clientId) {
        // 先从Redis查询
        String cacheKey = "mqtt:connection:" + clientId;
        MqttConnection connection = (MqttConnection) redisTemplate.opsForValue().get(cacheKey);
        
        if (connection != null) {
            return connection.getStatus() == ConnectionStatus.CONNECTED;
        }
        
        // Redis中没有,查询数据库
        connection = connectionRepository.findByClientId(clientId);
        return connection != null && connection.getStatus() == ConnectionStatus.CONNECTED;
    }
    
    /**
     * 更新设备在线状态
     */
    private void updateDeviceOnlineStatus(String deviceId, boolean online) {
        // 这里可以发送事件或调用设备管理服务
        log.info("设备状态更新:deviceId={}, online={}", deviceId, online);
    }
}

# 2. MQTT消息处理服务

/**
 * MQTT消息处理服务
 * 就像是邮局的分拣员,负责处理所有的消息
 */
@Service
@Slf4j
public class MqttMessageService {
    
    @Autowired
    private MqttMessageRepository messageRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    /**
     * 处理设备上报的消息
     * 就像邮局收到一封信,需要分拣和投递
     */
    public void handleInboundMessage(String clientId, String topic, String payload, int qos) {
        log.info("收到设备消息:clientId={}, topic={}, qos={}", clientId, topic, qos);
        
        try {
            // 1. 创建消息记录
            MqttMessage message = new MqttMessage();
            message.setMessageId(generateMessageId());
            message.setTopic(topic);
            message.setPayload(payload);
            message.setQos(qos);
            message.setSenderClientId(clientId);
            message.setDirection(MessageDirection.INBOUND);
            message.setCreateTime(LocalDateTime.now());
            
            // 2. 保存消息到数据库(异步)
            CompletableFuture.runAsync(() -> {
                messageRepository.save(message);
            });
            
            // 3. 根据主题类型进行不同处理
            if (topic.startsWith("device/data/")) {
                // 设备数据上报
                handleDeviceDataMessage(clientId, topic, payload);
            } else if (topic.startsWith("device/status/")) {
                // 设备状态上报
                handleDeviceStatusMessage(clientId, topic, payload);
            } else if (topic.startsWith("device/alarm/")) {
                // 设备告警上报
                handleDeviceAlarmMessage(clientId, topic, payload);
            } else {
                // 其他消息
                handleOtherMessage(clientId, topic, payload);
            }
            
            log.info("消息处理完成:messageId={}", message.getMessageId());
            
        } catch (Exception e) {
            log.error("处理入站消息失败:clientId={}, topic={}", clientId, topic, e);
        }
    }
    
    /**
     * 发送消息给设备
     * 就像邮局发送信件给收件人
     */
    public void sendMessageToDevice(String clientId, String topic, String payload, int qos) {
        log.info("发送消息给设备:clientId={}, topic={}, qos={}", clientId, topic, qos);
        
        try {
            // 1. 检查设备是否在线
            if (!isDeviceOnline(clientId)) {
                log.warn("设备不在线,消息发送失败:clientId={}", clientId);
                return;
            }
            
            // 2. 创建消息记录
            MqttMessage message = new MqttMessage();
            message.setMessageId(generateMessageId());
            message.setTopic(topic);
            message.setPayload(payload);
            message.setQos(qos);
            message.setSenderClientId("server");
            message.setDirection(MessageDirection.OUTBOUND);
            message.setCreateTime(LocalDateTime.now());
            
            // 3. 通过MQTT客户端发送消息
            publishMessage(clientId, topic, payload, qos);
            
            // 4. 保存消息记录
            messageRepository.save(message);
            
            log.info("消息发送成功:messageId={}", message.getMessageId());
            
        } catch (Exception e) {
            log.error("发送消息失败:clientId={}, topic={}", clientId, topic, e);
            throw new RuntimeException("消息发送失败", e);
        }
    }
    
    /**
     * 处理设备数据消息
     */
    private void handleDeviceDataMessage(String clientId, String topic, String payload) {
        try {
            // 解析设备数据
            DeviceData deviceData = parseDeviceData(payload);
            
            // 数据验证
            if (!validateDeviceData(deviceData)) {
                log.warn("设备数据验证失败:clientId={}, data={}", clientId, payload);
                return;
            }
            
            // 发送到Kafka进行后续处理
            kafkaTemplate.send("device-data-topic", clientId, payload);
            
            // 缓存最新数据到Redis
            String cacheKey = "device:latest:" + clientId;
            redisTemplate.opsForValue().set(cacheKey, deviceData, Duration.ofHours(24));
            
            log.info("设备数据处理完成:clientId={}", clientId);
            
        } catch (Exception e) {
            log.error("处理设备数据失败:clientId={}", clientId, e);
        }
    }
    
    /**
     * 处理设备状态消息
     */
    private void handleDeviceStatusMessage(String clientId, String topic, String payload) {
        try {
            // 解析设备状态
            DeviceStatus status = parseDeviceStatus(payload);
            
            // 更新设备状态缓存
            String cacheKey = "device:status:" + clientId;
            redisTemplate.opsForValue().set(cacheKey, status, Duration.ofHours(1));
            
            // 如果是异常状态,发送告警
            if (status.isAbnormal()) {
                sendDeviceAlarm(clientId, "设备状态异常:" + status.getDescription());
            }
            
            log.info("设备状态更新:clientId={}, status={}", clientId, status);
            
        } catch (Exception e) {
            log.error("处理设备状态失败:clientId={}", clientId, e);
        }
    }
    
    /**
     * 处理设备告警消息
     */
    private void handleDeviceAlarmMessage(String clientId, String topic, String payload) {
        try {
            // 解析告警信息
            DeviceAlarm alarm = parseDeviceAlarm(payload);
            
            // 发送到告警处理系统
            kafkaTemplate.send("device-alarm-topic", clientId, payload);
            
            // 记录告警到Redis(用于快速查询最近告警)
            String cacheKey = "device:alarms:" + clientId;
            redisTemplate.opsForList().leftPush(cacheKey, alarm);
            redisTemplate.opsForList().trim(cacheKey, 0, 99); // 只保留最近100条
            redisTemplate.expire(cacheKey, Duration.ofDays(7));
            
            log.warn("收到设备告警:clientId={}, alarm={}", clientId, alarm);
            
        } catch (Exception e) {
            log.error("处理设备告警失败:clientId={}", clientId, e);
        }
    }
    
    // 辅助方法
    private String generateMessageId() {
        return UUID.randomUUID().toString().replace("-", "");
    }
    
    private boolean isDeviceOnline(String clientId) {
        // 调用连接管理服务检查设备状态
        return true; // 简化实现
    }
    
    private void publishMessage(String clientId, String topic, String payload, int qos) {
        // 实际的MQTT消息发布逻辑
        log.info("发布MQTT消息:clientId={}, topic={}", clientId, topic);
    }
    
    // 数据解析方法(简化实现)
    private DeviceData parseDeviceData(String payload) {
        // JSON解析逻辑
        return new DeviceData();
    }
    
    private DeviceStatus parseDeviceStatus(String payload) {
        return new DeviceStatus();
    }
    
    private DeviceAlarm parseDeviceAlarm(String payload) {
        return new DeviceAlarm();
    }
    
    private boolean validateDeviceData(DeviceData data) {
        return data != null;
    }
    
    private void sendDeviceAlarm(String clientId, String message) {
        log.warn("设备告警:clientId={}, message={}", clientId, message);
    }
}

# MQTT Broker集成

# 1. Spring Boot集成配置

/**
 * MQTT配置类
 * 配置MQTT客户端和消息处理器
 */
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
@Slf4j
public class MqttConfig {
    
    @Autowired
    private MqttProperties mqttProperties;
    
    @Autowired
    private MqttConnectionService connectionService;
    
    @Autowired
    private MqttMessageService messageService;
    
    /**
     * MQTT连接工厂
     */
    @Bean
    public MqttConnectionFactory mqttConnectionFactory() {
        MqttConnectionFactory factory = new MqttConnectionFactory();
        factory.setServerURIs(mqttProperties.getBrokerUrl());
        factory.setUserName(mqttProperties.getUsername());
        factory.setPassword(mqttProperties.getPassword());
        factory.setClientId(mqttProperties.getClientId());
        factory.setCleanSession(true);
        factory.setKeepAliveInterval(60);
        factory.setConnectionTimeout(30);
        
        return factory;
    }
    
    /**
     * MQTT入站通道适配器
     * 用于接收设备发送的消息
     */
    @Bean
    public MessageProducer inboundAdapter() {
        MqttPahoMessageDrivenChannelAdapter adapter = 
            new MqttPahoMessageDrivenChannelAdapter(
                mqttProperties.getClientId() + "_inbound",
                mqttConnectionFactory(),
                "device/+/+", "system/+/+" // 订阅主题模式
            );
        
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        
        return adapter;
    }
    
    /**
     * MQTT出站通道适配器
     * 用于向设备发送消息
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler outboundAdapter() {
        MqttPahoMessageHandler messageHandler = 
            new MqttPahoMessageHandler(
                mqttProperties.getClientId() + "_outbound",
                mqttConnectionFactory()
            );
        
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(1);
        messageHandler.setDefaultRetained(false);
        
        return messageHandler;
    }
    
    /**
     * MQTT输入通道
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    
    /**
     * MQTT输出通道
     */
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

# 2. MQTT消息处理器

/**
 * MQTT消息处理器
 * 处理从设备接收到的所有消息
 */
@Component
@Slf4j
public class MqttMessageHandler {
    
    @Autowired
    private MqttConnectionService connectionService;
    
    @Autowired
    private MqttMessageService messageService;
    
    /**
     * 处理MQTT入站消息
     * 这个方法会在收到设备消息时自动调用
     */
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public void handleMessage(Message<String> message) {
        try {
            // 获取消息头信息
            MessageHeaders headers = message.getHeaders();
            String topic = (String) headers.get("mqtt_receivedTopic");
            Integer qos = (Integer) headers.get("mqtt_receivedQos");
            String payload = message.getPayload();
            
            log.info("收到MQTT消息:topic={}, qos={}, payload={}", topic, qos, payload);
            
            // 从主题中提取客户端ID
            String clientId = extractClientIdFromTopic(topic);
            if (clientId == null) {
                log.warn("无法从主题中提取客户端ID:topic={}", topic);
                return;
            }
            
            // 更新设备心跳
            connectionService.updateHeartbeat(clientId);
            
            // 处理消息内容
            messageService.handleInboundMessage(clientId, topic, payload, qos != null ? qos : 0);
            
        } catch (Exception e) {
            log.error("处理MQTT消息失败", e);
        }
    }
    
    /**
     * 从主题中提取客户端ID
     * 主题格式:device/{clientId}/{dataType}
     */
    private String extractClientIdFromTopic(String topic) {
        if (topic == null || !topic.startsWith("device/")) {
            return null;
        }
        
        String[] parts = topic.split("/");
        if (parts.length >= 2) {
            return parts[1];
        }
        
        return null;
    }
}

# 数据库设计

# 1. MQTT连接表

-- MQTT连接信息表
CREATE TABLE mqtt_connection (
    id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
    client_id VARCHAR(100) NOT NULL UNIQUE COMMENT '客户端ID',
    device_id VARCHAR(100) NOT NULL COMMENT '设备ID',
    status VARCHAR(20) NOT NULL COMMENT '连接状态',
    last_heartbeat DATETIME COMMENT '最后心跳时间',
    connect_time DATETIME COMMENT '连接时间',
    client_ip VARCHAR(50) COMMENT '客户端IP',
    keep_alive INT COMMENT '保活时间(秒)',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    
    INDEX idx_client_id (client_id),
    INDEX idx_device_id (device_id),
    INDEX idx_status (status),
    INDEX idx_last_heartbeat (last_heartbeat)
) COMMENT='MQTT连接信息表';

# 2. MQTT消息表

-- MQTT消息记录表
CREATE TABLE mqtt_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
    message_id VARCHAR(50) NOT NULL UNIQUE COMMENT '消息ID',
    topic VARCHAR(200) NOT NULL COMMENT '主题',
    payload TEXT COMMENT '消息内容',
    qos TINYINT DEFAULT 0 COMMENT '服务质量等级',
    retained BOOLEAN DEFAULT FALSE COMMENT '是否保留消息',
    sender_client_id VARCHAR(100) COMMENT '发送者客户端ID',
    direction VARCHAR(10) NOT NULL COMMENT '消息方向',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    
    INDEX idx_message_id (message_id),
    INDEX idx_topic (topic),
    INDEX idx_sender_client_id (sender_client_id),
    INDEX idx_direction (direction),
    INDEX idx_create_time (create_time)
) COMMENT='MQTT消息记录表';

-- 按月分表(可选)
-- CREATE TABLE mqtt_message_202401 LIKE mqtt_message;

# 性能优化策略

# 1. 连接池配置

/**
 * MQTT连接池配置
 * 管理多个MQTT连接,提高并发处理能力
 */
@Configuration
public class MqttConnectionPoolConfig {
    
    /**
     * MQTT连接池
     */
    @Bean
    public MqttConnectionPool mqttConnectionPool() {
        MqttConnectionPoolConfig config = new MqttConnectionPoolConfig();
        config.setMaxTotal(100);        // 最大连接数
        config.setMaxIdle(20);          // 最大空闲连接数
        config.setMinIdle(5);           // 最小空闲连接数
        config.setMaxWaitMillis(5000);  // 最大等待时间
        config.setTestOnBorrow(true);   // 借用时测试连接
        config.setTestOnReturn(true);   // 归还时测试连接
        
        return new MqttConnectionPool(config);
    }
}

# 2. 消息批量处理

/**
 * 批量消息处理器
 * 将多个消息打包处理,提高吞吐量
 */
@Component
@Slf4j
public class BatchMessageProcessor {
    
    private final List<MqttMessage> messageBuffer = new ArrayList<>();
    private final Object lock = new Object();
    
    @Value("${mqtt.batch.size:100}")
    private int batchSize;
    
    @Value("${mqtt.batch.timeout:5000}")
    private long batchTimeout;
    
    @Autowired
    private MqttMessageRepository messageRepository;
    
    /**
     * 添加消息到批处理队列
     */
    public void addMessage(MqttMessage message) {
        synchronized (lock) {
            messageBuffer.add(message);
            
            // 达到批量大小,立即处理
            if (messageBuffer.size() >= batchSize) {
                processBatch();
            }
        }
    }
    
    /**
     * 定时处理批量消息
     */
    @Scheduled(fixedDelay = 5000) // 每5秒执行一次
    public void scheduledBatchProcess() {
        synchronized (lock) {
            if (!messageBuffer.isEmpty()) {
                processBatch();
            }
        }
    }
    
    /**
     * 处理批量消息
     */
    private void processBatch() {
        if (messageBuffer.isEmpty()) {
            return;
        }
        
        List<MqttMessage> batch = new ArrayList<>(messageBuffer);
        messageBuffer.clear();
        
        try {
            // 批量保存到数据库
            messageRepository.saveAll(batch);
            log.info("批量处理消息完成:count={}", batch.size());
            
        } catch (Exception e) {
            log.error("批量处理消息失败:count={}", batch.size(), e);
            
            // 失败时重新加入队列
            synchronized (lock) {
                messageBuffer.addAll(0, batch);
            }
        }
    }
}

# 监控和告警

# 1. 连接监控

/**
 * MQTT连接监控服务
 * 监控设备连接状态,及时发现异常
 */
@Service
@Slf4j
public class MqttConnectionMonitor {
    
    @Autowired
    private MqttConnectionService connectionService;
    
    @Autowired
    private AlertService alertService;
    
    /**
     * 定时检查设备连接状态
     */
    @Scheduled(fixedRate = 60000) // 每分钟检查一次
    public void checkConnectionStatus() {
        try {
            // 获取所有在线设备
            List<MqttConnection> onlineDevices = connectionService.getOnlineDevices();
            
            LocalDateTime now = LocalDateTime.now();
            int offlineCount = 0;
            
            for (MqttConnection connection : onlineDevices) {
                // 检查心跳超时
                if (connection.getLastHeartbeat() != null) {
                    long minutes = ChronoUnit.MINUTES.between(connection.getLastHeartbeat(), now);
                    
                    // 超过3分钟没有心跳,认为设备离线
                    if (minutes > 3) {
                        log.warn("设备心跳超时:clientId={}, lastHeartbeat={}", 
                                connection.getClientId(), connection.getLastHeartbeat());
                        
                        // 标记设备为离线
                        connectionService.onDeviceDisconnect(connection.getClientId());
                        offlineCount++;
                    }
                }
            }
            
            // 记录监控指标
            recordConnectionMetrics(onlineDevices.size(), offlineCount);
            
        } catch (Exception e) {
            log.error("连接状态检查失败", e);
        }
    }
    
    /**
     * 记录连接指标
     */
    private void recordConnectionMetrics(int onlineCount, int offlineCount) {
        // 这里可以集成Prometheus、InfluxDB等监控系统
        log.info("连接状态统计:在线={}, 离线={}", onlineCount, offlineCount);
        
        // 如果离线设备过多,发送告警
        if (offlineCount > 10) {
            alertService.sendAlert("MQTT设备大量离线", 
                    String.format("检测到%d个设备离线,请及时处理", offlineCount));
        }
    }
}

# 2. 性能指标监控

/**
 * MQTT性能指标收集器
 */
@Component
@Slf4j
public class MqttMetricsCollector {
    
    private final AtomicLong messageCount = new AtomicLong(0);
    private final AtomicLong errorCount = new AtomicLong(0);
    private final AtomicLong connectionCount = new AtomicLong(0);
    
    /**
     * 记录消息处理指标
     */
    public void recordMessage() {
        messageCount.incrementAndGet();
    }
    
    /**
     * 记录错误指标
     */
    public void recordError() {
        errorCount.incrementAndGet();
    }
    
    /**
     * 记录连接指标
     */
    public void recordConnection() {
        connectionCount.incrementAndGet();
    }
    
    /**
     * 定时输出指标
     */
    @Scheduled(fixedRate = 30000) // 每30秒输出一次
    public void reportMetrics() {
        log.info("MQTT性能指标 - 消息数:{}, 错误数:{}, 连接数:{}", 
                messageCount.get(), errorCount.get(), connectionCount.get());
    }
}

# 配置文件

# application.yml

# MQTT配置
mqtt:
  broker-url: tcp://localhost:1883
  username: admin
  password: admin123
  client-id: iot-server
  
  # 批处理配置
  batch:
    size: 100
    timeout: 5000
  
  # 连接池配置
  pool:
    max-total: 100
    max-idle: 20
    min-idle: 5
    max-wait: 5000

# 数据库配置
spring:
  datasource:
    url: jdbc:mysql://localhost:3306/iot_platform?useUnicode=true&characterEncoding=utf8&useSSL=false
    username: root
    password: 123456
    driver-class-name: com.mysql.cj.jdbc.Driver
  
  # Redis配置
  redis:
    host: localhost
    port: 6379
    password: 
    database: 0
    timeout: 3000
    lettuce:
      pool:
        max-active: 100
        max-idle: 20
        min-idle: 5
        max-wait: 5000

# 日志配置
logging:
  level:
    com.iot.mqtt: DEBUG
    org.springframework.integration.mqtt: DEBUG

# 总结

通过这个MQTT设备接入层的实现,我们构建了一个完整的物联网设备通信系统:

  1. 连接管理:自动处理设备的连接、断开和心跳
  2. 消息处理:支持双向消息传输和不同类型消息的分类处理
  3. 性能优化:通过连接池、批处理、缓存等技术提高系统性能
  4. 监控告警:实时监控设备状态和系统性能
  5. 可扩展性:支持水平扩展和高并发处理

这就像是为物联网设备建立了一个高效、可靠的"邮政系统",确保设备与服务器之间的通信畅通无阻!