MQTT设备接入层详细设计与实现
# MQTT设备接入层详细设计与实现
# 故事背景
想象一下,你是一家智能家居公司的技术负责人。公司有成千上万的智能设备:温度传感器、智能灯泡、空调控制器等。这些设备就像是分布在全国各地的"小助手",它们需要时刻与总部的"大脑"(服务器)保持联系,汇报自己的状态,接收控制指令。
MQTT就像是这些设备与服务器之间的"邮政系统",它轻量、可靠,特别适合物联网设备使用。让我们一起来构建这个"邮政系统"吧!
# MQTT协议原理
# 什么是MQTT?
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议,专为低带宽、高延迟或不可靠的网络环境设计。
# 核心概念
- Broker(代理):消息中转站,就像邮局
- Publisher(发布者):发送消息的设备,就像寄信人
- Subscriber(订阅者):接收消息的设备,就像收信人
- Topic(主题):消息的分类标签,就像邮件地址
# MQTT工作流程
设备A (温度传感器) ----发布温度数据----> MQTT Broker ----推送----> 服务器
|
----推送----> 手机APP
# 系统架构设计
# 整体架构
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 设备层 │ │ 接入层 │ │ 业务层 │
│ │ │ │ │ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ IoT设备 │ │ │ │ MQTT Broker │ │ │ │ 设备管理 │ │
│ │ - 传感器 │ │◄──►│ │ - 连接管理 │ │◄──►│ │ - 数据处理 │ │
│ │ - 执行器 │ │ │ │ - 消息路由 │ │ │ │ - 业务逻辑 │ │
│ │ - 网关 │ │ │ │ - 认证授权 │ │ │ │ - 数据存储 │ │
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
# 核心实体设计
# 1. MQTT连接实体
/**
* MQTT连接信息实体
* 就像是设备的"身份证",记录设备的连接信息
*/
@Entity
@Table(name = "mqtt_connection")
public class MqttConnection {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 客户端ID - 设备的唯一标识
* 就像每个人的身份证号码一样唯一
*/
@Column(name = "client_id", unique = true, nullable = false)
private String clientId;
/**
* 设备ID - 关联的设备
*/
@Column(name = "device_id", nullable = false)
private String deviceId;
/**
* 连接状态:CONNECTED, DISCONNECTED, CONNECTING
*/
@Enumerated(EnumType.STRING)
@Column(name = "status")
private ConnectionStatus status;
/**
* 最后心跳时间 - 设备最后一次"报平安"的时间
*/
@Column(name = "last_heartbeat")
private LocalDateTime lastHeartbeat;
/**
* 连接时间
*/
@Column(name = "connect_time")
private LocalDateTime connectTime;
/**
* 客户端IP地址
*/
@Column(name = "client_ip")
private String clientIp;
/**
* 保活时间(秒)
*/
@Column(name = "keep_alive")
private Integer keepAlive;
// 构造函数、getter、setter省略...
}
/**
* 连接状态枚举
*/
public enum ConnectionStatus {
CONNECTED("已连接"),
DISCONNECTED("已断开"),
CONNECTING("连接中");
private final String description;
ConnectionStatus(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
# 2. MQTT消息实体
/**
* MQTT消息实体
* 记录设备发送和接收的每一条消息,就像聊天记录
*/
@Entity
@Table(name = "mqtt_message")
public class MqttMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 消息ID - 每条消息的唯一标识
*/
@Column(name = "message_id", unique = true)
private String messageId;
/**
* 主题 - 消息的"收件地址"
*/
@Column(name = "topic", nullable = false)
private String topic;
/**
* 消息内容 - 实际要传输的数据
*/
@Lob
@Column(name = "payload")
private String payload;
/**
* 服务质量等级:0-最多一次,1-至少一次,2-恰好一次
*/
@Column(name = "qos")
private Integer qos;
/**
* 是否保留消息
*/
@Column(name = "retained")
private Boolean retained;
/**
* 发送者客户端ID
*/
@Column(name = "sender_client_id")
private String senderClientId;
/**
* 消息方向:INBOUND(设备到服务器),OUTBOUND(服务器到设备)
*/
@Enumerated(EnumType.STRING)
@Column(name = "direction")
private MessageDirection direction;
/**
* 创建时间
*/
@Column(name = "create_time")
private LocalDateTime createTime;
// 构造函数、getter、setter省略...
}
public enum MessageDirection {
INBOUND("入站"), // 设备发送给服务器
OUTBOUND("出站"); // 服务器发送给设备
private final String description;
MessageDirection(String description) {
this.description = description;
}
}
# 核心服务实现
# 1. MQTT连接管理服务
/**
* MQTT连接管理服务
* 就像是一个"接待员",负责管理所有设备的连接
*/
@Service
@Slf4j
public class MqttConnectionService {
@Autowired
private MqttConnectionRepository connectionRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 设备连接时调用
* 就像客人到酒店办理入住手续
*/
public void onDeviceConnect(String clientId, String deviceId, String clientIp) {
log.info("设备连接:clientId={}, deviceId={}, ip={}", clientId, deviceId, clientIp);
try {
// 1. 查找现有连接
MqttConnection existingConnection = connectionRepository.findByClientId(clientId);
if (existingConnection != null) {
// 更新现有连接
existingConnection.setStatus(ConnectionStatus.CONNECTED);
existingConnection.setConnectTime(LocalDateTime.now());
existingConnection.setLastHeartbeat(LocalDateTime.now());
existingConnection.setClientIp(clientIp);
} else {
// 创建新连接
existingConnection = new MqttConnection();
existingConnection.setClientId(clientId);
existingConnection.setDeviceId(deviceId);
existingConnection.setStatus(ConnectionStatus.CONNECTED);
existingConnection.setConnectTime(LocalDateTime.now());
existingConnection.setLastHeartbeat(LocalDateTime.now());
existingConnection.setClientIp(clientIp);
existingConnection.setKeepAlive(60); // 默认60秒心跳
}
// 2. 保存到数据库
connectionRepository.save(existingConnection);
// 3. 缓存到Redis,方便快速查询
String cacheKey = "mqtt:connection:" + clientId;
redisTemplate.opsForValue().set(cacheKey, existingConnection, Duration.ofHours(1));
// 4. 更新设备在线状态
updateDeviceOnlineStatus(deviceId, true);
log.info("设备连接成功:{}", clientId);
} catch (Exception e) {
log.error("处理设备连接失败:clientId={}", clientId, e);
throw new RuntimeException("设备连接处理失败", e);
}
}
/**
* 设备断开连接时调用
* 就像客人退房离开酒店
*/
public void onDeviceDisconnect(String clientId) {
log.info("设备断开连接:{}", clientId);
try {
// 1. 更新数据库中的连接状态
MqttConnection connection = connectionRepository.findByClientId(clientId);
if (connection != null) {
connection.setStatus(ConnectionStatus.DISCONNECTED);
connectionRepository.save(connection);
// 2. 更新设备离线状态
updateDeviceOnlineStatus(connection.getDeviceId(), false);
}
// 3. 清除Redis缓存
String cacheKey = "mqtt:connection:" + clientId;
redisTemplate.delete(cacheKey);
log.info("设备断开连接处理完成:{}", clientId);
} catch (Exception e) {
log.error("处理设备断开连接失败:clientId={}", clientId, e);
}
}
/**
* 更新心跳时间
* 就像设备定期"报平安"
*/
public void updateHeartbeat(String clientId) {
try {
// 1. 先从Redis缓存获取
String cacheKey = "mqtt:connection:" + clientId;
MqttConnection connection = (MqttConnection) redisTemplate.opsForValue().get(cacheKey);
if (connection == null) {
// 缓存中没有,从数据库获取
connection = connectionRepository.findByClientId(clientId);
}
if (connection != null) {
connection.setLastHeartbeat(LocalDateTime.now());
// 2. 更新数据库(异步)
CompletableFuture.runAsync(() -> {
connectionRepository.save(connection);
});
// 3. 更新缓存
redisTemplate.opsForValue().set(cacheKey, connection, Duration.ofHours(1));
}
} catch (Exception e) {
log.error("更新心跳失败:clientId={}", clientId, e);
}
}
/**
* 获取在线设备列表
*/
public List<MqttConnection> getOnlineDevices() {
return connectionRepository.findByStatus(ConnectionStatus.CONNECTED);
}
/**
* 检查设备是否在线
*/
public boolean isDeviceOnline(String clientId) {
// 先从Redis查询
String cacheKey = "mqtt:connection:" + clientId;
MqttConnection connection = (MqttConnection) redisTemplate.opsForValue().get(cacheKey);
if (connection != null) {
return connection.getStatus() == ConnectionStatus.CONNECTED;
}
// Redis中没有,查询数据库
connection = connectionRepository.findByClientId(clientId);
return connection != null && connection.getStatus() == ConnectionStatus.CONNECTED;
}
/**
* 更新设备在线状态
*/
private void updateDeviceOnlineStatus(String deviceId, boolean online) {
// 这里可以发送事件或调用设备管理服务
log.info("设备状态更新:deviceId={}, online={}", deviceId, online);
}
}
# 2. MQTT消息处理服务
/**
* MQTT消息处理服务
* 就像是邮局的分拣员,负责处理所有的消息
*/
@Service
@Slf4j
public class MqttMessageService {
@Autowired
private MqttMessageRepository messageRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 处理设备上报的消息
* 就像邮局收到一封信,需要分拣和投递
*/
public void handleInboundMessage(String clientId, String topic, String payload, int qos) {
log.info("收到设备消息:clientId={}, topic={}, qos={}", clientId, topic, qos);
try {
// 1. 创建消息记录
MqttMessage message = new MqttMessage();
message.setMessageId(generateMessageId());
message.setTopic(topic);
message.setPayload(payload);
message.setQos(qos);
message.setSenderClientId(clientId);
message.setDirection(MessageDirection.INBOUND);
message.setCreateTime(LocalDateTime.now());
// 2. 保存消息到数据库(异步)
CompletableFuture.runAsync(() -> {
messageRepository.save(message);
});
// 3. 根据主题类型进行不同处理
if (topic.startsWith("device/data/")) {
// 设备数据上报
handleDeviceDataMessage(clientId, topic, payload);
} else if (topic.startsWith("device/status/")) {
// 设备状态上报
handleDeviceStatusMessage(clientId, topic, payload);
} else if (topic.startsWith("device/alarm/")) {
// 设备告警上报
handleDeviceAlarmMessage(clientId, topic, payload);
} else {
// 其他消息
handleOtherMessage(clientId, topic, payload);
}
log.info("消息处理完成:messageId={}", message.getMessageId());
} catch (Exception e) {
log.error("处理入站消息失败:clientId={}, topic={}", clientId, topic, e);
}
}
/**
* 发送消息给设备
* 就像邮局发送信件给收件人
*/
public void sendMessageToDevice(String clientId, String topic, String payload, int qos) {
log.info("发送消息给设备:clientId={}, topic={}, qos={}", clientId, topic, qos);
try {
// 1. 检查设备是否在线
if (!isDeviceOnline(clientId)) {
log.warn("设备不在线,消息发送失败:clientId={}", clientId);
return;
}
// 2. 创建消息记录
MqttMessage message = new MqttMessage();
message.setMessageId(generateMessageId());
message.setTopic(topic);
message.setPayload(payload);
message.setQos(qos);
message.setSenderClientId("server");
message.setDirection(MessageDirection.OUTBOUND);
message.setCreateTime(LocalDateTime.now());
// 3. 通过MQTT客户端发送消息
publishMessage(clientId, topic, payload, qos);
// 4. 保存消息记录
messageRepository.save(message);
log.info("消息发送成功:messageId={}", message.getMessageId());
} catch (Exception e) {
log.error("发送消息失败:clientId={}, topic={}", clientId, topic, e);
throw new RuntimeException("消息发送失败", e);
}
}
/**
* 处理设备数据消息
*/
private void handleDeviceDataMessage(String clientId, String topic, String payload) {
try {
// 解析设备数据
DeviceData deviceData = parseDeviceData(payload);
// 数据验证
if (!validateDeviceData(deviceData)) {
log.warn("设备数据验证失败:clientId={}, data={}", clientId, payload);
return;
}
// 发送到Kafka进行后续处理
kafkaTemplate.send("device-data-topic", clientId, payload);
// 缓存最新数据到Redis
String cacheKey = "device:latest:" + clientId;
redisTemplate.opsForValue().set(cacheKey, deviceData, Duration.ofHours(24));
log.info("设备数据处理完成:clientId={}", clientId);
} catch (Exception e) {
log.error("处理设备数据失败:clientId={}", clientId, e);
}
}
/**
* 处理设备状态消息
*/
private void handleDeviceStatusMessage(String clientId, String topic, String payload) {
try {
// 解析设备状态
DeviceStatus status = parseDeviceStatus(payload);
// 更新设备状态缓存
String cacheKey = "device:status:" + clientId;
redisTemplate.opsForValue().set(cacheKey, status, Duration.ofHours(1));
// 如果是异常状态,发送告警
if (status.isAbnormal()) {
sendDeviceAlarm(clientId, "设备状态异常:" + status.getDescription());
}
log.info("设备状态更新:clientId={}, status={}", clientId, status);
} catch (Exception e) {
log.error("处理设备状态失败:clientId={}", clientId, e);
}
}
/**
* 处理设备告警消息
*/
private void handleDeviceAlarmMessage(String clientId, String topic, String payload) {
try {
// 解析告警信息
DeviceAlarm alarm = parseDeviceAlarm(payload);
// 发送到告警处理系统
kafkaTemplate.send("device-alarm-topic", clientId, payload);
// 记录告警到Redis(用于快速查询最近告警)
String cacheKey = "device:alarms:" + clientId;
redisTemplate.opsForList().leftPush(cacheKey, alarm);
redisTemplate.opsForList().trim(cacheKey, 0, 99); // 只保留最近100条
redisTemplate.expire(cacheKey, Duration.ofDays(7));
log.warn("收到设备告警:clientId={}, alarm={}", clientId, alarm);
} catch (Exception e) {
log.error("处理设备告警失败:clientId={}", clientId, e);
}
}
// 辅助方法
private String generateMessageId() {
return UUID.randomUUID().toString().replace("-", "");
}
private boolean isDeviceOnline(String clientId) {
// 调用连接管理服务检查设备状态
return true; // 简化实现
}
private void publishMessage(String clientId, String topic, String payload, int qos) {
// 实际的MQTT消息发布逻辑
log.info("发布MQTT消息:clientId={}, topic={}", clientId, topic);
}
// 数据解析方法(简化实现)
private DeviceData parseDeviceData(String payload) {
// JSON解析逻辑
return new DeviceData();
}
private DeviceStatus parseDeviceStatus(String payload) {
return new DeviceStatus();
}
private DeviceAlarm parseDeviceAlarm(String payload) {
return new DeviceAlarm();
}
private boolean validateDeviceData(DeviceData data) {
return data != null;
}
private void sendDeviceAlarm(String clientId, String message) {
log.warn("设备告警:clientId={}, message={}", clientId, message);
}
}
# MQTT Broker集成
# 1. Spring Boot集成配置
/**
* MQTT配置类
* 配置MQTT客户端和消息处理器
*/
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
@Slf4j
public class MqttConfig {
@Autowired
private MqttProperties mqttProperties;
@Autowired
private MqttConnectionService connectionService;
@Autowired
private MqttMessageService messageService;
/**
* MQTT连接工厂
*/
@Bean
public MqttConnectionFactory mqttConnectionFactory() {
MqttConnectionFactory factory = new MqttConnectionFactory();
factory.setServerURIs(mqttProperties.getBrokerUrl());
factory.setUserName(mqttProperties.getUsername());
factory.setPassword(mqttProperties.getPassword());
factory.setClientId(mqttProperties.getClientId());
factory.setCleanSession(true);
factory.setKeepAliveInterval(60);
factory.setConnectionTimeout(30);
return factory;
}
/**
* MQTT入站通道适配器
* 用于接收设备发送的消息
*/
@Bean
public MessageProducer inboundAdapter() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
mqttProperties.getClientId() + "_inbound",
mqttConnectionFactory(),
"device/+/+", "system/+/+" // 订阅主题模式
);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* MQTT出站通道适配器
* 用于向设备发送消息
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler outboundAdapter() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(
mqttProperties.getClientId() + "_outbound",
mqttConnectionFactory()
);
messageHandler.setAsync(true);
messageHandler.setDefaultQos(1);
messageHandler.setDefaultRetained(false);
return messageHandler;
}
/**
* MQTT输入通道
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* MQTT输出通道
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
# 2. MQTT消息处理器
/**
* MQTT消息处理器
* 处理从设备接收到的所有消息
*/
@Component
@Slf4j
public class MqttMessageHandler {
@Autowired
private MqttConnectionService connectionService;
@Autowired
private MqttMessageService messageService;
/**
* 处理MQTT入站消息
* 这个方法会在收到设备消息时自动调用
*/
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMessage(Message<String> message) {
try {
// 获取消息头信息
MessageHeaders headers = message.getHeaders();
String topic = (String) headers.get("mqtt_receivedTopic");
Integer qos = (Integer) headers.get("mqtt_receivedQos");
String payload = message.getPayload();
log.info("收到MQTT消息:topic={}, qos={}, payload={}", topic, qos, payload);
// 从主题中提取客户端ID
String clientId = extractClientIdFromTopic(topic);
if (clientId == null) {
log.warn("无法从主题中提取客户端ID:topic={}", topic);
return;
}
// 更新设备心跳
connectionService.updateHeartbeat(clientId);
// 处理消息内容
messageService.handleInboundMessage(clientId, topic, payload, qos != null ? qos : 0);
} catch (Exception e) {
log.error("处理MQTT消息失败", e);
}
}
/**
* 从主题中提取客户端ID
* 主题格式:device/{clientId}/{dataType}
*/
private String extractClientIdFromTopic(String topic) {
if (topic == null || !topic.startsWith("device/")) {
return null;
}
String[] parts = topic.split("/");
if (parts.length >= 2) {
return parts[1];
}
return null;
}
}
# 数据库设计
# 1. MQTT连接表
-- MQTT连接信息表
CREATE TABLE mqtt_connection (
id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
client_id VARCHAR(100) NOT NULL UNIQUE COMMENT '客户端ID',
device_id VARCHAR(100) NOT NULL COMMENT '设备ID',
status VARCHAR(20) NOT NULL COMMENT '连接状态',
last_heartbeat DATETIME COMMENT '最后心跳时间',
connect_time DATETIME COMMENT '连接时间',
client_ip VARCHAR(50) COMMENT '客户端IP',
keep_alive INT COMMENT '保活时间(秒)',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_client_id (client_id),
INDEX idx_device_id (device_id),
INDEX idx_status (status),
INDEX idx_last_heartbeat (last_heartbeat)
) COMMENT='MQTT连接信息表';
# 2. MQTT消息表
-- MQTT消息记录表
CREATE TABLE mqtt_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
message_id VARCHAR(50) NOT NULL UNIQUE COMMENT '消息ID',
topic VARCHAR(200) NOT NULL COMMENT '主题',
payload TEXT COMMENT '消息内容',
qos TINYINT DEFAULT 0 COMMENT '服务质量等级',
retained BOOLEAN DEFAULT FALSE COMMENT '是否保留消息',
sender_client_id VARCHAR(100) COMMENT '发送者客户端ID',
direction VARCHAR(10) NOT NULL COMMENT '消息方向',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
INDEX idx_message_id (message_id),
INDEX idx_topic (topic),
INDEX idx_sender_client_id (sender_client_id),
INDEX idx_direction (direction),
INDEX idx_create_time (create_time)
) COMMENT='MQTT消息记录表';
-- 按月分表(可选)
-- CREATE TABLE mqtt_message_202401 LIKE mqtt_message;
# 性能优化策略
# 1. 连接池配置
/**
* MQTT连接池配置
* 管理多个MQTT连接,提高并发处理能力
*/
@Configuration
public class MqttConnectionPoolConfig {
/**
* MQTT连接池
*/
@Bean
public MqttConnectionPool mqttConnectionPool() {
MqttConnectionPoolConfig config = new MqttConnectionPoolConfig();
config.setMaxTotal(100); // 最大连接数
config.setMaxIdle(20); // 最大空闲连接数
config.setMinIdle(5); // 最小空闲连接数
config.setMaxWaitMillis(5000); // 最大等待时间
config.setTestOnBorrow(true); // 借用时测试连接
config.setTestOnReturn(true); // 归还时测试连接
return new MqttConnectionPool(config);
}
}
# 2. 消息批量处理
/**
* 批量消息处理器
* 将多个消息打包处理,提高吞吐量
*/
@Component
@Slf4j
public class BatchMessageProcessor {
private final List<MqttMessage> messageBuffer = new ArrayList<>();
private final Object lock = new Object();
@Value("${mqtt.batch.size:100}")
private int batchSize;
@Value("${mqtt.batch.timeout:5000}")
private long batchTimeout;
@Autowired
private MqttMessageRepository messageRepository;
/**
* 添加消息到批处理队列
*/
public void addMessage(MqttMessage message) {
synchronized (lock) {
messageBuffer.add(message);
// 达到批量大小,立即处理
if (messageBuffer.size() >= batchSize) {
processBatch();
}
}
}
/**
* 定时处理批量消息
*/
@Scheduled(fixedDelay = 5000) // 每5秒执行一次
public void scheduledBatchProcess() {
synchronized (lock) {
if (!messageBuffer.isEmpty()) {
processBatch();
}
}
}
/**
* 处理批量消息
*/
private void processBatch() {
if (messageBuffer.isEmpty()) {
return;
}
List<MqttMessage> batch = new ArrayList<>(messageBuffer);
messageBuffer.clear();
try {
// 批量保存到数据库
messageRepository.saveAll(batch);
log.info("批量处理消息完成:count={}", batch.size());
} catch (Exception e) {
log.error("批量处理消息失败:count={}", batch.size(), e);
// 失败时重新加入队列
synchronized (lock) {
messageBuffer.addAll(0, batch);
}
}
}
}
# 监控和告警
# 1. 连接监控
/**
* MQTT连接监控服务
* 监控设备连接状态,及时发现异常
*/
@Service
@Slf4j
public class MqttConnectionMonitor {
@Autowired
private MqttConnectionService connectionService;
@Autowired
private AlertService alertService;
/**
* 定时检查设备连接状态
*/
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkConnectionStatus() {
try {
// 获取所有在线设备
List<MqttConnection> onlineDevices = connectionService.getOnlineDevices();
LocalDateTime now = LocalDateTime.now();
int offlineCount = 0;
for (MqttConnection connection : onlineDevices) {
// 检查心跳超时
if (connection.getLastHeartbeat() != null) {
long minutes = ChronoUnit.MINUTES.between(connection.getLastHeartbeat(), now);
// 超过3分钟没有心跳,认为设备离线
if (minutes > 3) {
log.warn("设备心跳超时:clientId={}, lastHeartbeat={}",
connection.getClientId(), connection.getLastHeartbeat());
// 标记设备为离线
connectionService.onDeviceDisconnect(connection.getClientId());
offlineCount++;
}
}
}
// 记录监控指标
recordConnectionMetrics(onlineDevices.size(), offlineCount);
} catch (Exception e) {
log.error("连接状态检查失败", e);
}
}
/**
* 记录连接指标
*/
private void recordConnectionMetrics(int onlineCount, int offlineCount) {
// 这里可以集成Prometheus、InfluxDB等监控系统
log.info("连接状态统计:在线={}, 离线={}", onlineCount, offlineCount);
// 如果离线设备过多,发送告警
if (offlineCount > 10) {
alertService.sendAlert("MQTT设备大量离线",
String.format("检测到%d个设备离线,请及时处理", offlineCount));
}
}
}
# 2. 性能指标监控
/**
* MQTT性能指标收集器
*/
@Component
@Slf4j
public class MqttMetricsCollector {
private final AtomicLong messageCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
private final AtomicLong connectionCount = new AtomicLong(0);
/**
* 记录消息处理指标
*/
public void recordMessage() {
messageCount.incrementAndGet();
}
/**
* 记录错误指标
*/
public void recordError() {
errorCount.incrementAndGet();
}
/**
* 记录连接指标
*/
public void recordConnection() {
connectionCount.incrementAndGet();
}
/**
* 定时输出指标
*/
@Scheduled(fixedRate = 30000) // 每30秒输出一次
public void reportMetrics() {
log.info("MQTT性能指标 - 消息数:{}, 错误数:{}, 连接数:{}",
messageCount.get(), errorCount.get(), connectionCount.get());
}
}
# 配置文件
# application.yml
# MQTT配置
mqtt:
broker-url: tcp://localhost:1883
username: admin
password: admin123
client-id: iot-server
# 批处理配置
batch:
size: 100
timeout: 5000
# 连接池配置
pool:
max-total: 100
max-idle: 20
min-idle: 5
max-wait: 5000
# 数据库配置
spring:
datasource:
url: jdbc:mysql://localhost:3306/iot_platform?useUnicode=true&characterEncoding=utf8&useSSL=false
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
# Redis配置
redis:
host: localhost
port: 6379
password:
database: 0
timeout: 3000
lettuce:
pool:
max-active: 100
max-idle: 20
min-idle: 5
max-wait: 5000
# 日志配置
logging:
level:
com.iot.mqtt: DEBUG
org.springframework.integration.mqtt: DEBUG
# 总结
通过这个MQTT设备接入层的实现,我们构建了一个完整的物联网设备通信系统:
- 连接管理:自动处理设备的连接、断开和心跳
- 消息处理:支持双向消息传输和不同类型消息的分类处理
- 性能优化:通过连接池、批处理、缓存等技术提高系统性能
- 监控告警:实时监控设备状态和系统性能
- 可扩展性:支持水平扩展和高并发处理
这就像是为物联网设备建立了一个高效、可靠的"邮政系统",确保设备与服务器之间的通信畅通无阻!