物联网设备接入系统设计与实现
# 物联网设备接入系统设计与实现
# 系统概述
物联网设备接入系统是物联网平台的核心组件,负责管理海量设备的接入、认证、数据传输和远程控制。本文档详细介绍设备接入系统的架构设计、核心功能模块以及技术实现方案。
# 系统架构设计
# 整体架构
物联网设备接入系统
├── 设备接入层
│ ├── MQTT接入
│ ├── CoAP接入
│ ├── HTTP接入
│ └── TCP/UDP接入
├── 协议适配层
│ ├── 协议解析
│ ├── 数据转换
│ └── 消息路由
├── 设备管理层
│ ├── 设备注册
│ ├── 设备认证
│ ├── 设备状态管理
│ └── 设备分组管理
├── 数据处理层
│ ├── 数据采集
│ ├── 数据清洗
│ ├── 数据存储
│ └── 数据分发
└── 应用服务层
├── 设备控制
├── 规则引擎
├── 告警服务
└── 数据分析
# 核心实体设计
# 设备信息实体
@Entity
@Table(name = "iot_device")
public class IoTDevice {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "device_id", unique = true)
private String deviceId;
@Column(name = "device_name")
private String deviceName;
@Column(name = "device_type")
private String deviceType;
@Column(name = "product_key")
private String productKey;
@Column(name = "device_secret")
private String deviceSecret;
@Enumerated(EnumType.STRING)
@Column(name = "status")
private DeviceStatus status;
@Column(name = "protocol_type")
private String protocolType;
@Column(name = "ip_address")
private String ipAddress;
@Column(name = "last_online_time")
private LocalDateTime lastOnlineTime;
@Column(name = "last_offline_time")
private LocalDateTime lastOfflineTime;
@Column(name = "firmware_version")
private String firmwareVersion;
@Column(name = "location")
private String location;
@Column(name = "create_time")
private LocalDateTime createTime;
@Column(name = "update_time")
private LocalDateTime updateTime;
// getters and setters
}
// 设备状态枚举
public enum DeviceStatus {
INACTIVE("未激活"),
ONLINE("在线"),
OFFLINE("离线"),
DISABLED("已禁用"),
FAULT("故障");
private final String description;
DeviceStatus(String description) {
this.description = description;
}
}
# 产品模型实体
@Entity
@Table(name = "iot_product")
public class IoTProduct {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "product_key", unique = true)
private String productKey;
@Column(name = "product_name")
private String productName;
@Column(name = "product_desc")
private String productDesc;
@Enumerated(EnumType.STRING)
@Column(name = "device_type")
private DeviceType deviceType;
@Enumerated(EnumType.STRING)
@Column(name = "data_format")
private DataFormat dataFormat;
@Column(name = "protocol_type")
private String protocolType;
@Column(name = "thing_model", columnDefinition = "TEXT")
private String thingModel;
@Enumerated(EnumType.STRING)
@Column(name = "status")
private ProductStatus status;
@Column(name = "create_time")
private LocalDateTime createTime;
// getters and setters
}
// 设备类型枚举
public enum DeviceType {
SENSOR("传感器"),
ACTUATOR("执行器"),
GATEWAY("网关"),
CAMERA("摄像头"),
CONTROLLER("控制器");
private final String description;
DeviceType(String description) {
this.description = description;
}
}
// 数据格式枚举
public enum DataFormat {
JSON("JSON格式"),
BINARY("二进制格式"),
CUSTOM("自定义格式");
private final String description;
DataFormat(String description) {
this.description = description;
}
}
# 设备数据实体
@Entity
@Table(name = "iot_device_data")
public class IoTDeviceData {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "device_id")
private String deviceId;
@Column(name = "property_name")
private String propertyName;
@Column(name = "property_value")
private String propertyValue;
@Column(name = "data_type")
private String dataType;
@Column(name = "unit")
private String unit;
@Column(name = "quality")
private Integer quality;
@Column(name = "timestamp")
private LocalDateTime timestamp;
@Column(name = "create_time")
private LocalDateTime createTime;
// getters and setters
}
# 核心服务实现
# 设备管理服务
@Service
@Transactional
public class DeviceManagementService {
@Autowired
private IoTDeviceRepository deviceRepository;
@Autowired
private IoTProductRepository productRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private DeviceAuthService deviceAuthService;
/**
* 设备注册
*/
public DeviceRegistrationResult registerDevice(DeviceRegistrationRequest request) {
// 验证产品密钥
IoTProduct product = productRepository.findByProductKey(request.getProductKey())
.orElseThrow(() -> new BusinessException("产品不存在"));
if (product.getStatus() != ProductStatus.PUBLISHED) {
throw new BusinessException("产品未发布,无法注册设备");
}
// 检查设备是否已存在
if (deviceRepository.existsByDeviceId(request.getDeviceId())) {
throw new BusinessException("设备ID已存在");
}
// 创建设备
IoTDevice device = new IoTDevice();
device.setDeviceId(request.getDeviceId());
device.setDeviceName(request.getDeviceName());
device.setDeviceType(request.getDeviceType());
device.setProductKey(request.getProductKey());
device.setDeviceSecret(generateDeviceSecret());
device.setStatus(DeviceStatus.INACTIVE);
device.setProtocolType(request.getProtocolType());
device.setLocation(request.getLocation());
device.setCreateTime(LocalDateTime.now());
IoTDevice savedDevice = deviceRepository.save(device);
// 缓存设备信息
cacheDeviceInfo(savedDevice);
return new DeviceRegistrationResult(
savedDevice.getDeviceId(),
savedDevice.getDeviceSecret(),
"设备注册成功"
);
}
/**
* 设备激活
*/
public void activateDevice(String deviceId, String deviceSecret) {
IoTDevice device = getDeviceById(deviceId);
// 验证设备密钥
if (!device.getDeviceSecret().equals(deviceSecret)) {
throw new BusinessException("设备密钥错误");
}
if (device.getStatus() != DeviceStatus.INACTIVE) {
throw new BusinessException("设备状态异常,无法激活");
}
// 激活设备
device.setStatus(DeviceStatus.OFFLINE);
device.setUpdateTime(LocalDateTime.now());
deviceRepository.save(device);
// 更新缓存
cacheDeviceInfo(device);
log.info("设备激活成功, deviceId: {}", deviceId);
}
/**
* 设备上线
*/
public void deviceOnline(String deviceId, String ipAddress) {
IoTDevice device = getDeviceById(deviceId);
device.setStatus(DeviceStatus.ONLINE);
device.setIpAddress(ipAddress);
device.setLastOnlineTime(LocalDateTime.now());
device.setUpdateTime(LocalDateTime.now());
deviceRepository.save(device);
// 更新缓存
cacheDeviceInfo(device);
// 记录设备在线状态
recordDeviceOnlineStatus(deviceId, true);
log.info("设备上线, deviceId: {}, ip: {}", deviceId, ipAddress);
}
/**
* 设备下线
*/
public void deviceOffline(String deviceId) {
IoTDevice device = getDeviceById(deviceId);
device.setStatus(DeviceStatus.OFFLINE);
device.setLastOfflineTime(LocalDateTime.now());
device.setUpdateTime(LocalDateTime.now());
deviceRepository.save(device);
// 更新缓存
cacheDeviceInfo(device);
// 记录设备离线状态
recordDeviceOnlineStatus(deviceId, false);
log.info("设备下线, deviceId: {}", deviceId);
}
/**
* 生成设备密钥
*/
private String generateDeviceSecret() {
return DigestUtils.md5Hex(UUID.randomUUID().toString() + System.currentTimeMillis());
}
/**
* 缓存设备信息
*/
private void cacheDeviceInfo(IoTDevice device) {
String cacheKey = "iot:device:" + device.getDeviceId();
redisTemplate.opsForValue().set(cacheKey, device, Duration.ofHours(24));
}
/**
* 记录设备在线状态
*/
private void recordDeviceOnlineStatus(String deviceId, boolean online) {
String statusKey = "iot:device:status:" + deviceId;
redisTemplate.opsForValue().set(statusKey, online ? "online" : "offline", Duration.ofDays(7));
}
}
# 设备认证服务
@Service
public class DeviceAuthService {
@Autowired
private DeviceManagementService deviceManagementService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 设备认证
*/
public AuthenticationResult authenticate(DeviceAuthRequest request) {
String deviceId = request.getDeviceId();
String signature = request.getSignature();
String timestamp = request.getTimestamp();
// 1. 基础参数校验
if (StringUtils.isAnyBlank(deviceId, signature, timestamp)) {
return AuthenticationResult.failure("认证参数不完整");
}
// 2. 时间戳校验(防重放攻击)
if (!validateTimestamp(timestamp)) {
return AuthenticationResult.failure("时间戳无效");
}
// 3. 获取设备信息
IoTDevice device = deviceManagementService.getDeviceById(deviceId);
if (device == null) {
return AuthenticationResult.failure("设备不存在");
}
if (device.getStatus() == DeviceStatus.DISABLED) {
return AuthenticationResult.failure("设备已被禁用");
}
// 4. 签名验证
String expectedSignature = calculateSignature(deviceId, device.getDeviceSecret(), timestamp);
if (!signature.equals(expectedSignature)) {
return AuthenticationResult.failure("签名验证失败");
}
// 5. 生成访问令牌
String accessToken = generateAccessToken(deviceId);
// 6. 缓存令牌
cacheAccessToken(deviceId, accessToken);
return AuthenticationResult.success(accessToken, "认证成功");
}
/**
* 验证访问令牌
*/
public boolean validateAccessToken(String deviceId, String accessToken) {
String cacheKey = "iot:token:" + deviceId;
String cachedToken = (String) redisTemplate.opsForValue().get(cacheKey);
return accessToken.equals(cachedToken);
}
/**
* 计算签名
*/
private String calculateSignature(String deviceId, String deviceSecret, String timestamp) {
String data = deviceId + deviceSecret + timestamp;
return DigestUtils.sha256Hex(data);
}
/**
* 验证时间戳
*/
private boolean validateTimestamp(String timestamp) {
try {
long ts = Long.parseLong(timestamp);
long currentTime = System.currentTimeMillis();
long diff = Math.abs(currentTime - ts);
// 允许5分钟的时间偏差
return diff <= 5 * 60 * 1000;
} catch (NumberFormatException e) {
return false;
}
}
/**
* 生成访问令牌
*/
private String generateAccessToken(String deviceId) {
String data = deviceId + System.currentTimeMillis() + UUID.randomUUID().toString();
return DigestUtils.sha256Hex(data);
}
/**
* 缓存访问令牌
*/
private void cacheAccessToken(String deviceId, String accessToken) {
String cacheKey = "iot:token:" + deviceId;
redisTemplate.opsForValue().set(cacheKey, accessToken, Duration.ofHours(24));
}
}
# 数据接收服务
@Service
public class DeviceDataService {
@Autowired
private IoTDeviceDataRepository deviceDataRepository;
@Autowired
private DeviceAuthService deviceAuthService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 接收设备数据
*/
public DataReceiveResult receiveDeviceData(DeviceDataRequest request) {
String deviceId = request.getDeviceId();
String accessToken = request.getAccessToken();
// 1. 验证访问令牌
if (!deviceAuthService.validateAccessToken(deviceId, accessToken)) {
return DataReceiveResult.failure("访问令牌无效");
}
// 2. 数据格式验证
if (!validateDataFormat(request.getData())) {
return DataReceiveResult.failure("数据格式错误");
}
// 3. 解析设备数据
List<IoTDeviceData> dataList = parseDeviceData(deviceId, request.getData());
// 4. 批量保存数据
deviceDataRepository.saveAll(dataList);
// 5. 缓存最新数据
cacheLatestData(deviceId, dataList);
// 6. 发送数据处理消息
sendDataProcessMessage(deviceId, dataList);
// 7. 更新设备最后通信时间
updateDeviceLastCommunicationTime(deviceId);
return DataReceiveResult.success("数据接收成功");
}
/**
* 解析设备数据
*/
private List<IoTDeviceData> parseDeviceData(String deviceId, String jsonData) {
List<IoTDeviceData> dataList = new ArrayList<>();
try {
JsonNode rootNode = objectMapper.readTree(jsonData);
if (rootNode.isObject()) {
rootNode.fields().forEachRemaining(entry -> {
String propertyName = entry.getKey();
JsonNode valueNode = entry.getValue();
IoTDeviceData data = new IoTDeviceData();
data.setDeviceId(deviceId);
data.setPropertyName(propertyName);
data.setPropertyValue(valueNode.asText());
data.setDataType(determineDataType(valueNode));
data.setTimestamp(LocalDateTime.now());
data.setCreateTime(LocalDateTime.now());
dataList.add(data);
});
}
} catch (Exception e) {
log.error("解析设备数据失败, deviceId: {}, data: {}", deviceId, jsonData, e);
throw new BusinessException("数据解析失败");
}
return dataList;
}
/**
* 缓存最新数据
*/
private void cacheLatestData(String deviceId, List<IoTDeviceData> dataList) {
for (IoTDeviceData data : dataList) {
String cacheKey = "iot:latest:" + deviceId + ":" + data.getPropertyName();
redisTemplate.opsForValue().set(cacheKey, data.getPropertyValue(), Duration.ofDays(1));
}
}
/**
* 发送数据处理消息
*/
private void sendDataProcessMessage(String deviceId, List<IoTDeviceData> dataList) {
DeviceDataMessage message = new DeviceDataMessage();
message.setDeviceId(deviceId);
message.setDataList(dataList);
message.setTimestamp(LocalDateTime.now());
rabbitTemplate.convertAndSend("iot.data.exchange", "data.received", message);
}
/**
* 获取设备最新数据
*/
public Map<String, String> getLatestDeviceData(String deviceId) {
Map<String, String> latestData = new HashMap<>();
String pattern = "iot:latest:" + deviceId + ":*";
Set<String> keys = redisTemplate.keys(pattern);
if (keys != null && !keys.isEmpty()) {
List<String> values = redisTemplate.opsForValue().multiGet(keys);
int index = 0;
for (String key : keys) {
String propertyName = key.substring(key.lastIndexOf(":") + 1);
latestData.put(propertyName, values.get(index++));
}
}
return latestData;
}
}
# 设备控制服务
@Service
public class DeviceControlService {
@Autowired
private DeviceManagementService deviceManagementService;
@Autowired
private MqttTemplate mqttTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送设备控制指令
*/
public ControlResult sendControlCommand(DeviceControlRequest request) {
String deviceId = request.getDeviceId();
String command = request.getCommand();
Map<String, Object> params = request.getParams();
// 1. 验证设备状态
IoTDevice device = deviceManagementService.getDeviceById(deviceId);
if (device.getStatus() != DeviceStatus.ONLINE) {
return ControlResult.failure("设备不在线");
}
// 2. 构建控制消息
DeviceControlMessage controlMessage = new DeviceControlMessage();
controlMessage.setDeviceId(deviceId);
controlMessage.setCommand(command);
controlMessage.setParams(params);
controlMessage.setMessageId(UUID.randomUUID().toString());
controlMessage.setTimestamp(System.currentTimeMillis());
try {
// 3. 根据协议类型发送指令
switch (device.getProtocolType().toUpperCase()) {
case "MQTT":
sendMqttCommand(deviceId, controlMessage);
break;
case "HTTP":
sendHttpCommand(device.getIpAddress(), controlMessage);
break;
case "COAP":
sendCoapCommand(device.getIpAddress(), controlMessage);
break;
default:
return ControlResult.failure("不支持的协议类型");
}
// 4. 记录控制日志
recordControlLog(controlMessage);
return ControlResult.success(controlMessage.getMessageId(), "指令发送成功");
} catch (Exception e) {
log.error("发送设备控制指令失败, deviceId: {}, command: {}", deviceId, command, e);
return ControlResult.failure("指令发送失败: " + e.getMessage());
}
}
/**
* 发送MQTT控制指令
*/
private void sendMqttCommand(String deviceId, DeviceControlMessage message) {
String topic = "/device/" + deviceId + "/control";
String payload = JsonUtils.toJson(message);
mqttTemplate.convertAndSend(topic, payload);
log.info("MQTT控制指令已发送, deviceId: {}, topic: {}", deviceId, topic);
}
/**
* 发送HTTP控制指令
*/
private void sendHttpCommand(String deviceIp, DeviceControlMessage message) {
String url = "http://" + deviceIp + ":8080/control";
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<DeviceControlMessage> entity = new HttpEntity<>(message, headers);
ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
if (response.getStatusCode() != HttpStatus.OK) {
throw new RuntimeException("HTTP控制指令发送失败");
}
log.info("HTTP控制指令已发送, deviceIp: {}, url: {}", deviceIp, url);
}
/**
* 批量设备控制
*/
@Async
public void batchDeviceControl(List<String> deviceIds, String command, Map<String, Object> params) {
for (String deviceId : deviceIds) {
try {
DeviceControlRequest request = new DeviceControlRequest();
request.setDeviceId(deviceId);
request.setCommand(command);
request.setParams(params);
sendControlCommand(request);
// 避免过于频繁的请求
Thread.sleep(100);
} catch (Exception e) {
log.error("批量控制设备失败, deviceId: {}, command: {}", deviceId, command, e);
}
}
}
/**
* 记录控制日志
*/
private void recordControlLog(DeviceControlMessage message) {
DeviceControlLog log = new DeviceControlLog();
log.setDeviceId(message.getDeviceId());
log.setCommand(message.getCommand());
log.setParams(JsonUtils.toJson(message.getParams()));
log.setMessageId(message.getMessageId());
log.setCreateTime(LocalDateTime.now());
// 异步保存日志
rabbitTemplate.convertAndSend("iot.log.exchange", "control.log", log);
}
}
# 协议适配实现
# MQTT协议适配器
@Component
public class MqttProtocolAdapter {
@Autowired
private DeviceDataService deviceDataService;
@Autowired
private DeviceManagementService deviceManagementService;
/**
* 处理MQTT连接事件
*/
@EventListener
public void handleMqttConnect(MqttConnectEvent event) {
String clientId = event.getClientId();
String deviceId = extractDeviceId(clientId);
if (deviceId != null) {
deviceManagementService.deviceOnline(deviceId, event.getRemoteAddress());
}
}
/**
* 处理MQTT断开事件
*/
@EventListener
public void handleMqttDisconnect(MqttDisconnectEvent event) {
String clientId = event.getClientId();
String deviceId = extractDeviceId(clientId);
if (deviceId != null) {
deviceManagementService.deviceOffline(deviceId);
}
}
/**
* 处理设备数据上报
*/
@MqttMessageListener(topics = "/device/+/data")
public void handleDeviceData(String topic, String payload, MqttMessage message) {
String deviceId = extractDeviceIdFromTopic(topic);
if (deviceId != null) {
DeviceDataRequest request = new DeviceDataRequest();
request.setDeviceId(deviceId);
request.setData(payload);
request.setAccessToken(extractAccessToken(message));
deviceDataService.receiveDeviceData(request);
}
}
/**
* 处理设备状态上报
*/
@MqttMessageListener(topics = "/device/+/status")
public void handleDeviceStatus(String topic, String payload) {
String deviceId = extractDeviceIdFromTopic(topic);
if (deviceId != null) {
try {
JsonNode statusNode = objectMapper.readTree(payload);
String status = statusNode.get("status").asText();
if ("fault".equals(status)) {
deviceManagementService.updateDeviceStatus(deviceId, DeviceStatus.FAULT);
}
} catch (Exception e) {
log.error("处理设备状态失败, deviceId: {}, payload: {}", deviceId, payload, e);
}
}
}
/**
* 从Topic中提取设备ID
*/
private String extractDeviceIdFromTopic(String topic) {
String[] parts = topic.split("/");
return parts.length >= 3 ? parts[2] : null;
}
/**
* 从ClientId中提取设备ID
*/
private String extractDeviceId(String clientId) {
// 假设ClientId格式为: device_{deviceId}
return clientId.startsWith("device_") ? clientId.substring(7) : null;
}
}
# 数据库设计
# 设备信息表
CREATE TABLE `iot_device` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`device_id` varchar(100) NOT NULL COMMENT '设备ID',
`device_name` varchar(200) NOT NULL COMMENT '设备名称',
`device_type` varchar(50) NOT NULL COMMENT '设备类型',
`product_key` varchar(100) NOT NULL COMMENT '产品密钥',
`device_secret` varchar(100) NOT NULL COMMENT '设备密钥',
`status` varchar(20) NOT NULL DEFAULT 'INACTIVE' COMMENT '设备状态',
`protocol_type` varchar(20) NOT NULL COMMENT '协议类型',
`ip_address` varchar(50) DEFAULT NULL COMMENT 'IP地址',
`last_online_time` datetime DEFAULT NULL COMMENT '最后上线时间',
`last_offline_time` datetime DEFAULT NULL COMMENT '最后下线时间',
`firmware_version` varchar(50) DEFAULT NULL COMMENT '固件版本',
`location` varchar(500) DEFAULT NULL COMMENT '设备位置',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_device_id` (`device_id`),
KEY `idx_product_key` (`product_key`),
KEY `idx_status` (`status`),
KEY `idx_device_type` (`device_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='物联网设备表';
# 产品模型表
CREATE TABLE `iot_product` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`product_key` varchar(100) NOT NULL COMMENT '产品密钥',
`product_name` varchar(200) NOT NULL COMMENT '产品名称',
`product_desc` varchar(500) DEFAULT NULL COMMENT '产品描述',
`device_type` varchar(50) NOT NULL COMMENT '设备类型',
`data_format` varchar(20) NOT NULL COMMENT '数据格式',
`protocol_type` varchar(20) NOT NULL COMMENT '协议类型',
`thing_model` text COMMENT '物模型定义',
`status` varchar(20) NOT NULL DEFAULT 'DRAFT' COMMENT '产品状态',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_product_key` (`product_key`),
KEY `idx_device_type` (`device_type`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='物联网产品表';
# 设备数据表
CREATE TABLE `iot_device_data` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`device_id` varchar(100) NOT NULL COMMENT '设备ID',
`property_name` varchar(100) NOT NULL COMMENT '属性名称',
`property_value` varchar(1000) NOT NULL COMMENT '属性值',
`data_type` varchar(20) NOT NULL COMMENT '数据类型',
`unit` varchar(20) DEFAULT NULL COMMENT '单位',
`quality` int DEFAULT '100' COMMENT '数据质量',
`timestamp` datetime NOT NULL COMMENT '数据时间戳',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_device_id_timestamp` (`device_id`, `timestamp`),
KEY `idx_property_name` (`property_name`),
KEY `idx_timestamp` (`timestamp`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备数据表'
PARTITION BY RANGE (TO_DAYS(create_time)) (
PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
PARTITION p202403 VALUES LESS THAN (TO_DAYS('2024-04-01')),
PARTITION p_future VALUES LESS THAN MAXVALUE
);
# 性能优化策略
# 连接池优化
@Configuration
public class IoTConnectionConfig {
@Bean
public MqttConnectionFactory mqttConnectionFactory() {
MqttConnectionFactory factory = new MqttConnectionFactory();
factory.setServerURIs("tcp://localhost:1883");
factory.setMaxInFlight(1000);
factory.setConnectionTimeout(30);
factory.setKeepAliveInterval(60);
factory.setCleanSession(false);
return factory;
}
@Bean
public ThreadPoolTaskExecutor iotTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(50);
executor.setMaxPoolSize(200);
executor.setQueueCapacity(2000);
executor.setThreadNamePrefix("IoT-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
# 数据存储优化
@Component
public class DeviceDataBatchProcessor {
@Autowired
private IoTDeviceDataRepository deviceDataRepository;
private final List<IoTDeviceData> batchData = new ArrayList<>();
private final int BATCH_SIZE = 1000;
/**
* 批量处理设备数据
*/
@Async
public synchronized void addData(IoTDeviceData data) {
batchData.add(data);
if (batchData.size() >= BATCH_SIZE) {
flushBatch();
}
}
/**
* 定时刷新批次
*/
@Scheduled(fixedRate = 5000) // 每5秒执行一次
public synchronized void flushBatch() {
if (!batchData.isEmpty()) {
deviceDataRepository.saveAll(new ArrayList<>(batchData));
batchData.clear();
log.info("批量保存设备数据完成, size: {}", batchData.size());
}
}
}
# 监控和告警
# 设备监控服务
@Component
public class DeviceMonitorService {
@Autowired
private DeviceManagementService deviceManagementService;
@Autowired
private AlertService alertService;
/**
* 检查设备心跳
*/
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkDeviceHeartbeat() {
List<IoTDevice> onlineDevices = deviceManagementService.getOnlineDevices();
for (IoTDevice device : onlineDevices) {
LocalDateTime lastOnlineTime = device.getLastOnlineTime();
if (lastOnlineTime != null &&
lastOnlineTime.isBefore(LocalDateTime.now().minusMinutes(5))) {
// 设备可能离线
deviceManagementService.deviceOffline(device.getDeviceId());
// 发送告警
alertService.sendDeviceOfflineAlert(device);
}
}
}
/**
* 监控设备数据异常
*/
@EventListener
public void handleDeviceDataAnomaly(DeviceDataAnomalyEvent event) {
String deviceId = event.getDeviceId();
String propertyName = event.getPropertyName();
String anomalyType = event.getAnomalyType();
// 发送数据异常告警
alertService.sendDataAnomalyAlert(deviceId, propertyName, anomalyType);
log.warn("设备数据异常, deviceId: {}, property: {}, type: {}",
deviceId, propertyName, anomalyType);
}
}
# 总结
物联网设备接入系统的核心设计要点:
- 多协议支持:MQTT、HTTP、CoAP等主流物联网协议
- 设备管理:注册、认证、状态管理、分组管理
- 数据处理:实时数据接收、存储、缓存、分发
- 安全机制:设备认证、访问控制、数据加密
- 性能优化:连接池、批量处理、分区存储
- 监控告警:设备状态监控、数据异常检测
- 扩展性:支持海量设备接入和高并发数据处理
通过以上设计,可以构建一个高性能、高可用的物联网设备接入系统,满足工业物联网和消费物联网的各种应用场景。