物联网设备接入系统设计与实现

# 物联网设备接入系统设计与实现

# 系统概述

物联网设备接入系统是物联网平台的核心组件,负责管理海量设备的接入、认证、数据传输和远程控制。本文档详细介绍设备接入系统的架构设计、核心功能模块以及技术实现方案。

# 系统架构设计

# 整体架构

物联网设备接入系统
├── 设备接入层
│   ├── MQTT接入
│   ├── CoAP接入
│   ├── HTTP接入
│   └── TCP/UDP接入
├── 协议适配层
│   ├── 协议解析
│   ├── 数据转换
│   └── 消息路由
├── 设备管理层
│   ├── 设备注册
│   ├── 设备认证
│   ├── 设备状态管理
│   └── 设备分组管理
├── 数据处理层
│   ├── 数据采集
│   ├── 数据清洗
│   ├── 数据存储
│   └── 数据分发
└── 应用服务层
    ├── 设备控制
    ├── 规则引擎
    ├── 告警服务
    └── 数据分析

# 核心实体设计

# 设备信息实体

@Entity
@Table(name = "iot_device")
public class IoTDevice {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "device_id", unique = true)
    private String deviceId;
    
    @Column(name = "device_name")
    private String deviceName;
    
    @Column(name = "device_type")
    private String deviceType;
    
    @Column(name = "product_key")
    private String productKey;
    
    @Column(name = "device_secret")
    private String deviceSecret;
    
    @Enumerated(EnumType.STRING)
    @Column(name = "status")
    private DeviceStatus status;
    
    @Column(name = "protocol_type")
    private String protocolType;
    
    @Column(name = "ip_address")
    private String ipAddress;
    
    @Column(name = "last_online_time")
    private LocalDateTime lastOnlineTime;
    
    @Column(name = "last_offline_time")
    private LocalDateTime lastOfflineTime;
    
    @Column(name = "firmware_version")
    private String firmwareVersion;
    
    @Column(name = "location")
    private String location;
    
    @Column(name = "create_time")
    private LocalDateTime createTime;
    
    @Column(name = "update_time")
    private LocalDateTime updateTime;
    
    // getters and setters
}

// 设备状态枚举
public enum DeviceStatus {
    INACTIVE("未激活"),
    ONLINE("在线"),
    OFFLINE("离线"),
    DISABLED("已禁用"),
    FAULT("故障");
    
    private final String description;
    
    DeviceStatus(String description) {
        this.description = description;
    }
}

# 产品模型实体

@Entity
@Table(name = "iot_product")
public class IoTProduct {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "product_key", unique = true)
    private String productKey;
    
    @Column(name = "product_name")
    private String productName;
    
    @Column(name = "product_desc")
    private String productDesc;
    
    @Enumerated(EnumType.STRING)
    @Column(name = "device_type")
    private DeviceType deviceType;
    
    @Enumerated(EnumType.STRING)
    @Column(name = "data_format")
    private DataFormat dataFormat;
    
    @Column(name = "protocol_type")
    private String protocolType;
    
    @Column(name = "thing_model", columnDefinition = "TEXT")
    private String thingModel;
    
    @Enumerated(EnumType.STRING)
    @Column(name = "status")
    private ProductStatus status;
    
    @Column(name = "create_time")
    private LocalDateTime createTime;
    
    // getters and setters
}

// 设备类型枚举
public enum DeviceType {
    SENSOR("传感器"),
    ACTUATOR("执行器"),
    GATEWAY("网关"),
    CAMERA("摄像头"),
    CONTROLLER("控制器");
    
    private final String description;
    
    DeviceType(String description) {
        this.description = description;
    }
}

// 数据格式枚举
public enum DataFormat {
    JSON("JSON格式"),
    BINARY("二进制格式"),
    CUSTOM("自定义格式");
    
    private final String description;
    
    DataFormat(String description) {
        this.description = description;
    }
}

# 设备数据实体

@Entity
@Table(name = "iot_device_data")
public class IoTDeviceData {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "device_id")
    private String deviceId;
    
    @Column(name = "property_name")
    private String propertyName;
    
    @Column(name = "property_value")
    private String propertyValue;
    
    @Column(name = "data_type")
    private String dataType;
    
    @Column(name = "unit")
    private String unit;
    
    @Column(name = "quality")
    private Integer quality;
    
    @Column(name = "timestamp")
    private LocalDateTime timestamp;
    
    @Column(name = "create_time")
    private LocalDateTime createTime;
    
    // getters and setters
}

# 核心服务实现

# 设备管理服务

@Service
@Transactional
public class DeviceManagementService {
    
    @Autowired
    private IoTDeviceRepository deviceRepository;
    
    @Autowired
    private IoTProductRepository productRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private DeviceAuthService deviceAuthService;
    
    /**
     * 设备注册
     */
    public DeviceRegistrationResult registerDevice(DeviceRegistrationRequest request) {
        // 验证产品密钥
        IoTProduct product = productRepository.findByProductKey(request.getProductKey())
            .orElseThrow(() -> new BusinessException("产品不存在"));
        
        if (product.getStatus() != ProductStatus.PUBLISHED) {
            throw new BusinessException("产品未发布,无法注册设备");
        }
        
        // 检查设备是否已存在
        if (deviceRepository.existsByDeviceId(request.getDeviceId())) {
            throw new BusinessException("设备ID已存在");
        }
        
        // 创建设备
        IoTDevice device = new IoTDevice();
        device.setDeviceId(request.getDeviceId());
        device.setDeviceName(request.getDeviceName());
        device.setDeviceType(request.getDeviceType());
        device.setProductKey(request.getProductKey());
        device.setDeviceSecret(generateDeviceSecret());
        device.setStatus(DeviceStatus.INACTIVE);
        device.setProtocolType(request.getProtocolType());
        device.setLocation(request.getLocation());
        device.setCreateTime(LocalDateTime.now());
        
        IoTDevice savedDevice = deviceRepository.save(device);
        
        // 缓存设备信息
        cacheDeviceInfo(savedDevice);
        
        return new DeviceRegistrationResult(
            savedDevice.getDeviceId(),
            savedDevice.getDeviceSecret(),
            "设备注册成功"
        );
    }
    
    /**
     * 设备激活
     */
    public void activateDevice(String deviceId, String deviceSecret) {
        IoTDevice device = getDeviceById(deviceId);
        
        // 验证设备密钥
        if (!device.getDeviceSecret().equals(deviceSecret)) {
            throw new BusinessException("设备密钥错误");
        }
        
        if (device.getStatus() != DeviceStatus.INACTIVE) {
            throw new BusinessException("设备状态异常,无法激活");
        }
        
        // 激活设备
        device.setStatus(DeviceStatus.OFFLINE);
        device.setUpdateTime(LocalDateTime.now());
        deviceRepository.save(device);
        
        // 更新缓存
        cacheDeviceInfo(device);
        
        log.info("设备激活成功, deviceId: {}", deviceId);
    }
    
    /**
     * 设备上线
     */
    public void deviceOnline(String deviceId, String ipAddress) {
        IoTDevice device = getDeviceById(deviceId);
        
        device.setStatus(DeviceStatus.ONLINE);
        device.setIpAddress(ipAddress);
        device.setLastOnlineTime(LocalDateTime.now());
        device.setUpdateTime(LocalDateTime.now());
        deviceRepository.save(device);
        
        // 更新缓存
        cacheDeviceInfo(device);
        
        // 记录设备在线状态
        recordDeviceOnlineStatus(deviceId, true);
        
        log.info("设备上线, deviceId: {}, ip: {}", deviceId, ipAddress);
    }
    
    /**
     * 设备下线
     */
    public void deviceOffline(String deviceId) {
        IoTDevice device = getDeviceById(deviceId);
        
        device.setStatus(DeviceStatus.OFFLINE);
        device.setLastOfflineTime(LocalDateTime.now());
        device.setUpdateTime(LocalDateTime.now());
        deviceRepository.save(device);
        
        // 更新缓存
        cacheDeviceInfo(device);
        
        // 记录设备离线状态
        recordDeviceOnlineStatus(deviceId, false);
        
        log.info("设备下线, deviceId: {}", deviceId);
    }
    
    /**
     * 生成设备密钥
     */
    private String generateDeviceSecret() {
        return DigestUtils.md5Hex(UUID.randomUUID().toString() + System.currentTimeMillis());
    }
    
    /**
     * 缓存设备信息
     */
    private void cacheDeviceInfo(IoTDevice device) {
        String cacheKey = "iot:device:" + device.getDeviceId();
        redisTemplate.opsForValue().set(cacheKey, device, Duration.ofHours(24));
    }
    
    /**
     * 记录设备在线状态
     */
    private void recordDeviceOnlineStatus(String deviceId, boolean online) {
        String statusKey = "iot:device:status:" + deviceId;
        redisTemplate.opsForValue().set(statusKey, online ? "online" : "offline", Duration.ofDays(7));
    }
}

# 设备认证服务

@Service
public class DeviceAuthService {
    
    @Autowired
    private DeviceManagementService deviceManagementService;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 设备认证
     */
    public AuthenticationResult authenticate(DeviceAuthRequest request) {
        String deviceId = request.getDeviceId();
        String signature = request.getSignature();
        String timestamp = request.getTimestamp();
        
        // 1. 基础参数校验
        if (StringUtils.isAnyBlank(deviceId, signature, timestamp)) {
            return AuthenticationResult.failure("认证参数不完整");
        }
        
        // 2. 时间戳校验(防重放攻击)
        if (!validateTimestamp(timestamp)) {
            return AuthenticationResult.failure("时间戳无效");
        }
        
        // 3. 获取设备信息
        IoTDevice device = deviceManagementService.getDeviceById(deviceId);
        if (device == null) {
            return AuthenticationResult.failure("设备不存在");
        }
        
        if (device.getStatus() == DeviceStatus.DISABLED) {
            return AuthenticationResult.failure("设备已被禁用");
        }
        
        // 4. 签名验证
        String expectedSignature = calculateSignature(deviceId, device.getDeviceSecret(), timestamp);
        if (!signature.equals(expectedSignature)) {
            return AuthenticationResult.failure("签名验证失败");
        }
        
        // 5. 生成访问令牌
        String accessToken = generateAccessToken(deviceId);
        
        // 6. 缓存令牌
        cacheAccessToken(deviceId, accessToken);
        
        return AuthenticationResult.success(accessToken, "认证成功");
    }
    
    /**
     * 验证访问令牌
     */
    public boolean validateAccessToken(String deviceId, String accessToken) {
        String cacheKey = "iot:token:" + deviceId;
        String cachedToken = (String) redisTemplate.opsForValue().get(cacheKey);
        
        return accessToken.equals(cachedToken);
    }
    
    /**
     * 计算签名
     */
    private String calculateSignature(String deviceId, String deviceSecret, String timestamp) {
        String data = deviceId + deviceSecret + timestamp;
        return DigestUtils.sha256Hex(data);
    }
    
    /**
     * 验证时间戳
     */
    private boolean validateTimestamp(String timestamp) {
        try {
            long ts = Long.parseLong(timestamp);
            long currentTime = System.currentTimeMillis();
            long diff = Math.abs(currentTime - ts);
            
            // 允许5分钟的时间偏差
            return diff <= 5 * 60 * 1000;
        } catch (NumberFormatException e) {
            return false;
        }
    }
    
    /**
     * 生成访问令牌
     */
    private String generateAccessToken(String deviceId) {
        String data = deviceId + System.currentTimeMillis() + UUID.randomUUID().toString();
        return DigestUtils.sha256Hex(data);
    }
    
    /**
     * 缓存访问令牌
     */
    private void cacheAccessToken(String deviceId, String accessToken) {
        String cacheKey = "iot:token:" + deviceId;
        redisTemplate.opsForValue().set(cacheKey, accessToken, Duration.ofHours(24));
    }
}

# 数据接收服务

@Service
public class DeviceDataService {
    
    @Autowired
    private IoTDeviceDataRepository deviceDataRepository;
    
    @Autowired
    private DeviceAuthService deviceAuthService;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 接收设备数据
     */
    public DataReceiveResult receiveDeviceData(DeviceDataRequest request) {
        String deviceId = request.getDeviceId();
        String accessToken = request.getAccessToken();
        
        // 1. 验证访问令牌
        if (!deviceAuthService.validateAccessToken(deviceId, accessToken)) {
            return DataReceiveResult.failure("访问令牌无效");
        }
        
        // 2. 数据格式验证
        if (!validateDataFormat(request.getData())) {
            return DataReceiveResult.failure("数据格式错误");
        }
        
        // 3. 解析设备数据
        List<IoTDeviceData> dataList = parseDeviceData(deviceId, request.getData());
        
        // 4. 批量保存数据
        deviceDataRepository.saveAll(dataList);
        
        // 5. 缓存最新数据
        cacheLatestData(deviceId, dataList);
        
        // 6. 发送数据处理消息
        sendDataProcessMessage(deviceId, dataList);
        
        // 7. 更新设备最后通信时间
        updateDeviceLastCommunicationTime(deviceId);
        
        return DataReceiveResult.success("数据接收成功");
    }
    
    /**
     * 解析设备数据
     */
    private List<IoTDeviceData> parseDeviceData(String deviceId, String jsonData) {
        List<IoTDeviceData> dataList = new ArrayList<>();
        
        try {
            JsonNode rootNode = objectMapper.readTree(jsonData);
            
            if (rootNode.isObject()) {
                rootNode.fields().forEachRemaining(entry -> {
                    String propertyName = entry.getKey();
                    JsonNode valueNode = entry.getValue();
                    
                    IoTDeviceData data = new IoTDeviceData();
                    data.setDeviceId(deviceId);
                    data.setPropertyName(propertyName);
                    data.setPropertyValue(valueNode.asText());
                    data.setDataType(determineDataType(valueNode));
                    data.setTimestamp(LocalDateTime.now());
                    data.setCreateTime(LocalDateTime.now());
                    
                    dataList.add(data);
                });
            }
        } catch (Exception e) {
            log.error("解析设备数据失败, deviceId: {}, data: {}", deviceId, jsonData, e);
            throw new BusinessException("数据解析失败");
        }
        
        return dataList;
    }
    
    /**
     * 缓存最新数据
     */
    private void cacheLatestData(String deviceId, List<IoTDeviceData> dataList) {
        for (IoTDeviceData data : dataList) {
            String cacheKey = "iot:latest:" + deviceId + ":" + data.getPropertyName();
            redisTemplate.opsForValue().set(cacheKey, data.getPropertyValue(), Duration.ofDays(1));
        }
    }
    
    /**
     * 发送数据处理消息
     */
    private void sendDataProcessMessage(String deviceId, List<IoTDeviceData> dataList) {
        DeviceDataMessage message = new DeviceDataMessage();
        message.setDeviceId(deviceId);
        message.setDataList(dataList);
        message.setTimestamp(LocalDateTime.now());
        
        rabbitTemplate.convertAndSend("iot.data.exchange", "data.received", message);
    }
    
    /**
     * 获取设备最新数据
     */
    public Map<String, String> getLatestDeviceData(String deviceId) {
        Map<String, String> latestData = new HashMap<>();
        
        String pattern = "iot:latest:" + deviceId + ":*";
        Set<String> keys = redisTemplate.keys(pattern);
        
        if (keys != null && !keys.isEmpty()) {
            List<String> values = redisTemplate.opsForValue().multiGet(keys);
            
            int index = 0;
            for (String key : keys) {
                String propertyName = key.substring(key.lastIndexOf(":") + 1);
                latestData.put(propertyName, values.get(index++));
            }
        }
        
        return latestData;
    }
}

# 设备控制服务

@Service
public class DeviceControlService {
    
    @Autowired
    private DeviceManagementService deviceManagementService;
    
    @Autowired
    private MqttTemplate mqttTemplate;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    /**
     * 发送设备控制指令
     */
    public ControlResult sendControlCommand(DeviceControlRequest request) {
        String deviceId = request.getDeviceId();
        String command = request.getCommand();
        Map<String, Object> params = request.getParams();
        
        // 1. 验证设备状态
        IoTDevice device = deviceManagementService.getDeviceById(deviceId);
        if (device.getStatus() != DeviceStatus.ONLINE) {
            return ControlResult.failure("设备不在线");
        }
        
        // 2. 构建控制消息
        DeviceControlMessage controlMessage = new DeviceControlMessage();
        controlMessage.setDeviceId(deviceId);
        controlMessage.setCommand(command);
        controlMessage.setParams(params);
        controlMessage.setMessageId(UUID.randomUUID().toString());
        controlMessage.setTimestamp(System.currentTimeMillis());
        
        try {
            // 3. 根据协议类型发送指令
            switch (device.getProtocolType().toUpperCase()) {
                case "MQTT":
                    sendMqttCommand(deviceId, controlMessage);
                    break;
                case "HTTP":
                    sendHttpCommand(device.getIpAddress(), controlMessage);
                    break;
                case "COAP":
                    sendCoapCommand(device.getIpAddress(), controlMessage);
                    break;
                default:
                    return ControlResult.failure("不支持的协议类型");
            }
            
            // 4. 记录控制日志
            recordControlLog(controlMessage);
            
            return ControlResult.success(controlMessage.getMessageId(), "指令发送成功");
            
        } catch (Exception e) {
            log.error("发送设备控制指令失败, deviceId: {}, command: {}", deviceId, command, e);
            return ControlResult.failure("指令发送失败: " + e.getMessage());
        }
    }
    
    /**
     * 发送MQTT控制指令
     */
    private void sendMqttCommand(String deviceId, DeviceControlMessage message) {
        String topic = "/device/" + deviceId + "/control";
        String payload = JsonUtils.toJson(message);
        
        mqttTemplate.convertAndSend(topic, payload);
        log.info("MQTT控制指令已发送, deviceId: {}, topic: {}", deviceId, topic);
    }
    
    /**
     * 发送HTTP控制指令
     */
    private void sendHttpCommand(String deviceIp, DeviceControlMessage message) {
        String url = "http://" + deviceIp + ":8080/control";
        
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        
        HttpEntity<DeviceControlMessage> entity = new HttpEntity<>(message, headers);
        ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
        
        if (response.getStatusCode() != HttpStatus.OK) {
            throw new RuntimeException("HTTP控制指令发送失败");
        }
        
        log.info("HTTP控制指令已发送, deviceIp: {}, url: {}", deviceIp, url);
    }
    
    /**
     * 批量设备控制
     */
    @Async
    public void batchDeviceControl(List<String> deviceIds, String command, Map<String, Object> params) {
        for (String deviceId : deviceIds) {
            try {
                DeviceControlRequest request = new DeviceControlRequest();
                request.setDeviceId(deviceId);
                request.setCommand(command);
                request.setParams(params);
                
                sendControlCommand(request);
                
                // 避免过于频繁的请求
                Thread.sleep(100);
                
            } catch (Exception e) {
                log.error("批量控制设备失败, deviceId: {}, command: {}", deviceId, command, e);
            }
        }
    }
    
    /**
     * 记录控制日志
     */
    private void recordControlLog(DeviceControlMessage message) {
        DeviceControlLog log = new DeviceControlLog();
        log.setDeviceId(message.getDeviceId());
        log.setCommand(message.getCommand());
        log.setParams(JsonUtils.toJson(message.getParams()));
        log.setMessageId(message.getMessageId());
        log.setCreateTime(LocalDateTime.now());
        
        // 异步保存日志
        rabbitTemplate.convertAndSend("iot.log.exchange", "control.log", log);
    }
}

# 协议适配实现

# MQTT协议适配器

@Component
public class MqttProtocolAdapter {
    
    @Autowired
    private DeviceDataService deviceDataService;
    
    @Autowired
    private DeviceManagementService deviceManagementService;
    
    /**
     * 处理MQTT连接事件
     */
    @EventListener
    public void handleMqttConnect(MqttConnectEvent event) {
        String clientId = event.getClientId();
        String deviceId = extractDeviceId(clientId);
        
        if (deviceId != null) {
            deviceManagementService.deviceOnline(deviceId, event.getRemoteAddress());
        }
    }
    
    /**
     * 处理MQTT断开事件
     */
    @EventListener
    public void handleMqttDisconnect(MqttDisconnectEvent event) {
        String clientId = event.getClientId();
        String deviceId = extractDeviceId(clientId);
        
        if (deviceId != null) {
            deviceManagementService.deviceOffline(deviceId);
        }
    }
    
    /**
     * 处理设备数据上报
     */
    @MqttMessageListener(topics = "/device/+/data")
    public void handleDeviceData(String topic, String payload, MqttMessage message) {
        String deviceId = extractDeviceIdFromTopic(topic);
        
        if (deviceId != null) {
            DeviceDataRequest request = new DeviceDataRequest();
            request.setDeviceId(deviceId);
            request.setData(payload);
            request.setAccessToken(extractAccessToken(message));
            
            deviceDataService.receiveDeviceData(request);
        }
    }
    
    /**
     * 处理设备状态上报
     */
    @MqttMessageListener(topics = "/device/+/status")
    public void handleDeviceStatus(String topic, String payload) {
        String deviceId = extractDeviceIdFromTopic(topic);
        
        if (deviceId != null) {
            try {
                JsonNode statusNode = objectMapper.readTree(payload);
                String status = statusNode.get("status").asText();
                
                if ("fault".equals(status)) {
                    deviceManagementService.updateDeviceStatus(deviceId, DeviceStatus.FAULT);
                }
            } catch (Exception e) {
                log.error("处理设备状态失败, deviceId: {}, payload: {}", deviceId, payload, e);
            }
        }
    }
    
    /**
     * 从Topic中提取设备ID
     */
    private String extractDeviceIdFromTopic(String topic) {
        String[] parts = topic.split("/");
        return parts.length >= 3 ? parts[2] : null;
    }
    
    /**
     * 从ClientId中提取设备ID
     */
    private String extractDeviceId(String clientId) {
        // 假设ClientId格式为: device_{deviceId}
        return clientId.startsWith("device_") ? clientId.substring(7) : null;
    }
}

# 数据库设计

# 设备信息表

CREATE TABLE `iot_device` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `device_id` varchar(100) NOT NULL COMMENT '设备ID',
  `device_name` varchar(200) NOT NULL COMMENT '设备名称',
  `device_type` varchar(50) NOT NULL COMMENT '设备类型',
  `product_key` varchar(100) NOT NULL COMMENT '产品密钥',
  `device_secret` varchar(100) NOT NULL COMMENT '设备密钥',
  `status` varchar(20) NOT NULL DEFAULT 'INACTIVE' COMMENT '设备状态',
  `protocol_type` varchar(20) NOT NULL COMMENT '协议类型',
  `ip_address` varchar(50) DEFAULT NULL COMMENT 'IP地址',
  `last_online_time` datetime DEFAULT NULL COMMENT '最后上线时间',
  `last_offline_time` datetime DEFAULT NULL COMMENT '最后下线时间',
  `firmware_version` varchar(50) DEFAULT NULL COMMENT '固件版本',
  `location` varchar(500) DEFAULT NULL COMMENT '设备位置',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_device_id` (`device_id`),
  KEY `idx_product_key` (`product_key`),
  KEY `idx_status` (`status`),
  KEY `idx_device_type` (`device_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='物联网设备表';

# 产品模型表

CREATE TABLE `iot_product` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `product_key` varchar(100) NOT NULL COMMENT '产品密钥',
  `product_name` varchar(200) NOT NULL COMMENT '产品名称',
  `product_desc` varchar(500) DEFAULT NULL COMMENT '产品描述',
  `device_type` varchar(50) NOT NULL COMMENT '设备类型',
  `data_format` varchar(20) NOT NULL COMMENT '数据格式',
  `protocol_type` varchar(20) NOT NULL COMMENT '协议类型',
  `thing_model` text COMMENT '物模型定义',
  `status` varchar(20) NOT NULL DEFAULT 'DRAFT' COMMENT '产品状态',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_product_key` (`product_key`),
  KEY `idx_device_type` (`device_type`),
  KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='物联网产品表';

# 设备数据表

CREATE TABLE `iot_device_data` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `device_id` varchar(100) NOT NULL COMMENT '设备ID',
  `property_name` varchar(100) NOT NULL COMMENT '属性名称',
  `property_value` varchar(1000) NOT NULL COMMENT '属性值',
  `data_type` varchar(20) NOT NULL COMMENT '数据类型',
  `unit` varchar(20) DEFAULT NULL COMMENT '单位',
  `quality` int DEFAULT '100' COMMENT '数据质量',
  `timestamp` datetime NOT NULL COMMENT '数据时间戳',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`),
  KEY `idx_device_id_timestamp` (`device_id`, `timestamp`),
  KEY `idx_property_name` (`property_name`),
  KEY `idx_timestamp` (`timestamp`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备数据表'
PARTITION BY RANGE (TO_DAYS(create_time)) (
    PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
    PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
    PARTITION p202403 VALUES LESS THAN (TO_DAYS('2024-04-01')),
    PARTITION p_future VALUES LESS THAN MAXVALUE
);

# 性能优化策略

# 连接池优化

@Configuration
public class IoTConnectionConfig {
    
    @Bean
    public MqttConnectionFactory mqttConnectionFactory() {
        MqttConnectionFactory factory = new MqttConnectionFactory();
        factory.setServerURIs("tcp://localhost:1883");
        factory.setMaxInFlight(1000);
        factory.setConnectionTimeout(30);
        factory.setKeepAliveInterval(60);
        factory.setCleanSession(false);
        return factory;
    }
    
    @Bean
    public ThreadPoolTaskExecutor iotTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(50);
        executor.setMaxPoolSize(200);
        executor.setQueueCapacity(2000);
        executor.setThreadNamePrefix("IoT-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

# 数据存储优化

@Component
public class DeviceDataBatchProcessor {
    
    @Autowired
    private IoTDeviceDataRepository deviceDataRepository;
    
    private final List<IoTDeviceData> batchData = new ArrayList<>();
    private final int BATCH_SIZE = 1000;
    
    /**
     * 批量处理设备数据
     */
    @Async
    public synchronized void addData(IoTDeviceData data) {
        batchData.add(data);
        
        if (batchData.size() >= BATCH_SIZE) {
            flushBatch();
        }
    }
    
    /**
     * 定时刷新批次
     */
    @Scheduled(fixedRate = 5000) // 每5秒执行一次
    public synchronized void flushBatch() {
        if (!batchData.isEmpty()) {
            deviceDataRepository.saveAll(new ArrayList<>(batchData));
            batchData.clear();
            log.info("批量保存设备数据完成, size: {}", batchData.size());
        }
    }
}

# 监控和告警

# 设备监控服务

@Component
public class DeviceMonitorService {
    
    @Autowired
    private DeviceManagementService deviceManagementService;
    
    @Autowired
    private AlertService alertService;
    
    /**
     * 检查设备心跳
     */
    @Scheduled(fixedRate = 60000) // 每分钟检查一次
    public void checkDeviceHeartbeat() {
        List<IoTDevice> onlineDevices = deviceManagementService.getOnlineDevices();
        
        for (IoTDevice device : onlineDevices) {
            LocalDateTime lastOnlineTime = device.getLastOnlineTime();
            if (lastOnlineTime != null && 
                lastOnlineTime.isBefore(LocalDateTime.now().minusMinutes(5))) {
                
                // 设备可能离线
                deviceManagementService.deviceOffline(device.getDeviceId());
                
                // 发送告警
                alertService.sendDeviceOfflineAlert(device);
            }
        }
    }
    
    /**
     * 监控设备数据异常
     */
    @EventListener
    public void handleDeviceDataAnomaly(DeviceDataAnomalyEvent event) {
        String deviceId = event.getDeviceId();
        String propertyName = event.getPropertyName();
        String anomalyType = event.getAnomalyType();
        
        // 发送数据异常告警
        alertService.sendDataAnomalyAlert(deviceId, propertyName, anomalyType);
        
        log.warn("设备数据异常, deviceId: {}, property: {}, type: {}", 
                deviceId, propertyName, anomalyType);
    }
}

# 总结

物联网设备接入系统的核心设计要点:

  1. 多协议支持:MQTT、HTTP、CoAP等主流物联网协议
  2. 设备管理:注册、认证、状态管理、分组管理
  3. 数据处理:实时数据接收、存储、缓存、分发
  4. 安全机制:设备认证、访问控制、数据加密
  5. 性能优化:连接池、批量处理、分区存储
  6. 监控告警:设备状态监控、数据异常检测
  7. 扩展性:支持海量设备接入和高并发数据处理

通过以上设计,可以构建一个高性能、高可用的物联网设备接入系统,满足工业物联网和消费物联网的各种应用场景。