物联网设备管理服务

# 物联网设备管理服务

# 概述

设备管理服务是物联网平台的核心组件,负责设备的全生命周期管理,包括设备注册、激活、状态监控、分组管理等功能。本文档详细介绍设备管理服务的设计与实现。

# 功能架构

设备管理服务
├── 设备注册管理
│   ├── 设备信息录入
│   ├── 设备密钥生成
│   └── 设备激活流程
├── 设备状态管理
│   ├── 在线状态监控
│   ├── 设备健康检查
│   └── 故障状态处理
├── 设备分组管理
│   ├── 分组创建与删除
│   ├── 设备分组分配
│   └── 批量操作管理
└── 设备生命周期
    ├── 设备启用/禁用
    ├── 设备注销
    └── 设备迁移

# 核心实体设计

# 设备实体

@Entity
@Table(name = "iot_device")
public class IoTDevice {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "device_id", unique = true, nullable = false)
    private String deviceId;
    
    @Column(name = "device_name", nullable = false)
    private String deviceName;
    
    @Column(name = "device_type")
    private String deviceType;
    
    @Column(name = "product_key", nullable = false)
    private String productKey;
    
    @Column(name = "device_secret", nullable = false)
    private String deviceSecret;
    
    @Enumerated(EnumType.STRING)
    @Column(name = "status")
    private DeviceStatus status;
    
    @Column(name = "protocol_type")
    private String protocolType;
    
    @Column(name = "ip_address")
    private String ipAddress;
    
    @Column(name = "mac_address")
    private String macAddress;
    
    @Column(name = "firmware_version")
    private String firmwareVersion;
    
    @Column(name = "hardware_version")
    private String hardwareVersion;
    
    @Column(name = "location")
    private String location;
    
    @Column(name = "description")
    private String description;
    
    @Column(name = "tags")
    private String tags;
    
    @Column(name = "group_id")
    private Long groupId;
    
    @Column(name = "last_online_time")
    private LocalDateTime lastOnlineTime;
    
    @Column(name = "last_offline_time")
    private LocalDateTime lastOfflineTime;
    
    @Column(name = "create_time")
    private LocalDateTime createTime;
    
    @Column(name = "update_time")
    private LocalDateTime updateTime;
    
    // getters and setters
}

# 设备分组实体

@Entity
@Table(name = "iot_device_group")
public class IoTDeviceGroup {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "group_name", nullable = false)
    private String groupName;
    
    @Column(name = "group_desc")
    private String groupDesc;
    
    @Column(name = "parent_id")
    private Long parentId;
    
    @Column(name = "group_path")
    private String groupPath;
    
    @Column(name = "device_count")
    private Integer deviceCount;
    
    @Column(name = "create_time")
    private LocalDateTime createTime;
    
    @Column(name = "update_time")
    private LocalDateTime updateTime;
    
    // getters and setters
}

# 核心服务实现

# 设备注册服务

@Service
@Transactional
public class DeviceRegistrationService {
    
    @Autowired
    private IoTDeviceRepository deviceRepository;
    
    @Autowired
    private IoTProductRepository productRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 批量设备注册
     */
    public BatchRegistrationResult batchRegisterDevices(BatchRegistrationRequest request) {
        List<DeviceRegistrationInfo> devices = request.getDevices();
        List<DeviceRegistrationResult> results = new ArrayList<>();
        List<String> failedDevices = new ArrayList<>();
        
        for (DeviceRegistrationInfo deviceInfo : devices) {
            try {
                DeviceRegistrationRequest singleRequest = new DeviceRegistrationRequest();
                BeanUtils.copyProperties(deviceInfo, singleRequest);
                
                DeviceRegistrationResult result = registerDevice(singleRequest);
                results.add(result);
                
            } catch (Exception e) {
                failedDevices.add(deviceInfo.getDeviceId() + ": " + e.getMessage());
                log.error("批量注册设备失败, deviceId: {}", deviceInfo.getDeviceId(), e);
            }
        }
        
        return new BatchRegistrationResult(
            results.size(),
            failedDevices.size(),
            results,
            failedDevices
        );
    }
    
    /**
     * 设备注册
     */
    public DeviceRegistrationResult registerDevice(DeviceRegistrationRequest request) {
        // 1. 验证产品信息
        IoTProduct product = validateProduct(request.getProductKey());
        
        // 2. 验证设备ID唯一性
        validateDeviceIdUniqueness(request.getDeviceId());
        
        // 3. 创建设备记录
        IoTDevice device = createDevice(request, product);
        
        // 4. 保存设备信息
        IoTDevice savedDevice = deviceRepository.save(device);
        
        // 5. 缓存设备信息
        cacheDeviceInfo(savedDevice);
        
        // 6. 发送设备注册事件
        publishDeviceRegisteredEvent(savedDevice);
        
        return new DeviceRegistrationResult(
            savedDevice.getDeviceId(),
            savedDevice.getDeviceSecret(),
            "设备注册成功"
        );
    }
    
    /**
     * 验证产品信息
     */
    private IoTProduct validateProduct(String productKey) {
        IoTProduct product = productRepository.findByProductKey(productKey)
            .orElseThrow(() -> new BusinessException("产品不存在"));
        
        if (product.getStatus() != ProductStatus.PUBLISHED) {
            throw new BusinessException("产品未发布,无法注册设备");
        }
        
        return product;
    }
    
    /**
     * 验证设备ID唯一性
     */
    private void validateDeviceIdUniqueness(String deviceId) {
        if (deviceRepository.existsByDeviceId(deviceId)) {
            throw new BusinessException("设备ID已存在: " + deviceId);
        }
    }
    
    /**
     * 创建设备对象
     */
    private IoTDevice createDevice(DeviceRegistrationRequest request, IoTProduct product) {
        IoTDevice device = new IoTDevice();
        device.setDeviceId(request.getDeviceId());
        device.setDeviceName(request.getDeviceName());
        device.setDeviceType(request.getDeviceType());
        device.setProductKey(request.getProductKey());
        device.setDeviceSecret(generateDeviceSecret());
        device.setStatus(DeviceStatus.INACTIVE);
        device.setProtocolType(request.getProtocolType());
        device.setLocation(request.getLocation());
        device.setDescription(request.getDescription());
        device.setTags(request.getTags());
        device.setCreateTime(LocalDateTime.now());
        device.setUpdateTime(LocalDateTime.now());
        
        return device;
    }
    
    /**
     * 生成设备密钥
     */
    private String generateDeviceSecret() {
        String randomStr = UUID.randomUUID().toString().replace("-", "");
        String timestamp = String.valueOf(System.currentTimeMillis());
        return DigestUtils.sha256Hex(randomStr + timestamp);
    }
}

# 设备状态管理服务

@Service
public class DeviceStatusService {
    
    @Autowired
    private IoTDeviceRepository deviceRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    /**
     * 更新设备状态
     */
    public void updateDeviceStatus(String deviceId, DeviceStatus status) {
        IoTDevice device = deviceRepository.findByDeviceId(deviceId)
            .orElseThrow(() -> new BusinessException("设备不存在"));
        
        DeviceStatus oldStatus = device.getStatus();
        device.setStatus(status);
        device.setUpdateTime(LocalDateTime.now());
        
        // 更新特定状态的时间戳
        switch (status) {
            case ONLINE:
                device.setLastOnlineTime(LocalDateTime.now());
                break;
            case OFFLINE:
                device.setLastOfflineTime(LocalDateTime.now());
                break;
        }
        
        deviceRepository.save(device);
        
        // 更新缓存
        updateDeviceStatusCache(deviceId, status);
        
        // 发送状态变更事件
        publishDeviceStatusChangedEvent(deviceId, oldStatus, status);
        
        log.info("设备状态更新, deviceId: {}, oldStatus: {}, newStatus: {}", 
                deviceId, oldStatus, status);
    }
    
    /**
     * 批量更新设备状态
     */
    @Async
    public void batchUpdateDeviceStatus(List<String> deviceIds, DeviceStatus status) {
        for (String deviceId : deviceIds) {
            try {
                updateDeviceStatus(deviceId, status);
                Thread.sleep(50); // 避免过于频繁的数据库操作
            } catch (Exception e) {
                log.error("批量更新设备状态失败, deviceId: {}, status: {}", deviceId, status, e);
            }
        }
    }
    
    /**
     * 获取设备在线状态统计
     */
    public DeviceStatusStatistics getDeviceStatusStatistics() {
        List<Object[]> results = deviceRepository.countDevicesByStatus();
        
        DeviceStatusStatistics statistics = new DeviceStatusStatistics();
        for (Object[] result : results) {
            DeviceStatus status = (DeviceStatus) result[0];
            Long count = (Long) result[1];
            
            switch (status) {
                case ONLINE:
                    statistics.setOnlineCount(count.intValue());
                    break;
                case OFFLINE:
                    statistics.setOfflineCount(count.intValue());
                    break;
                case INACTIVE:
                    statistics.setInactiveCount(count.intValue());
                    break;
                case DISABLED:
                    statistics.setDisabledCount(count.intValue());
                    break;
                case FAULT:
                    statistics.setFaultCount(count.intValue());
                    break;
            }
        }
        
        statistics.setTotalCount(
            statistics.getOnlineCount() + statistics.getOfflineCount() + 
            statistics.getInactiveCount() + statistics.getDisabledCount() + 
            statistics.getFaultCount()
        );
        
        return statistics;
    }
    
    /**
     * 设备健康检查
     */
    @Scheduled(fixedRate = 300000) // 每5分钟执行一次
    public void deviceHealthCheck() {
        List<IoTDevice> onlineDevices = deviceRepository.findByStatus(DeviceStatus.ONLINE);
        
        for (IoTDevice device : onlineDevices) {
            if (isDeviceTimeout(device)) {
                updateDeviceStatus(device.getDeviceId(), DeviceStatus.OFFLINE);
                
                // 发送设备离线告警
                sendDeviceOfflineAlert(device);
            }
        }
    }
    
    /**
     * 判断设备是否超时
     */
    private boolean isDeviceTimeout(IoTDevice device) {
        if (device.getLastOnlineTime() == null) {
            return true;
        }
        
        LocalDateTime timeoutThreshold = LocalDateTime.now().minusMinutes(10);
        return device.getLastOnlineTime().isBefore(timeoutThreshold);
    }
    
    /**
     * 发送设备离线告警
     */
    private void sendDeviceOfflineAlert(IoTDevice device) {
        DeviceOfflineAlert alert = new DeviceOfflineAlert();
        alert.setDeviceId(device.getDeviceId());
        alert.setDeviceName(device.getDeviceName());
        alert.setOfflineTime(LocalDateTime.now());
        alert.setAlertLevel(AlertLevel.WARNING);
        
        rabbitTemplate.convertAndSend("iot.alert.exchange", "device.offline", alert);
    }
}

# 设备分组管理服务

@Service
@Transactional
public class DeviceGroupService {
    
    @Autowired
    private IoTDeviceGroupRepository groupRepository;
    
    @Autowired
    private IoTDeviceRepository deviceRepository;
    
    /**
     * 创建设备分组
     */
    public IoTDeviceGroup createDeviceGroup(CreateGroupRequest request) {
        // 验证分组名称唯一性
        if (groupRepository.existsByGroupNameAndParentId(request.getGroupName(), request.getParentId())) {
            throw new BusinessException("分组名称已存在");
        }
        
        IoTDeviceGroup group = new IoTDeviceGroup();
        group.setGroupName(request.getGroupName());
        group.setGroupDesc(request.getGroupDesc());
        group.setParentId(request.getParentId());
        group.setDeviceCount(0);
        group.setCreateTime(LocalDateTime.now());
        group.setUpdateTime(LocalDateTime.now());
        
        // 构建分组路径
        group.setGroupPath(buildGroupPath(request.getParentId(), request.getGroupName()));
        
        return groupRepository.save(group);
    }
    
    /**
     * 将设备添加到分组
     */
    public void addDeviceToGroup(String deviceId, Long groupId) {
        IoTDevice device = deviceRepository.findByDeviceId(deviceId)
            .orElseThrow(() -> new BusinessException("设备不存在"));
        
        IoTDeviceGroup group = groupRepository.findById(groupId)
            .orElseThrow(() -> new BusinessException("分组不存在"));
        
        // 如果设备已在其他分组,先从原分组移除
        if (device.getGroupId() != null) {
            removeDeviceFromGroup(deviceId);
        }
        
        // 添加到新分组
        device.setGroupId(groupId);
        device.setUpdateTime(LocalDateTime.now());
        deviceRepository.save(device);
        
        // 更新分组设备数量
        updateGroupDeviceCount(groupId);
        
        log.info("设备添加到分组成功, deviceId: {}, groupId: {}", deviceId, groupId);
    }
    
    /**
     * 从分组中移除设备
     */
    public void removeDeviceFromGroup(String deviceId) {
        IoTDevice device = deviceRepository.findByDeviceId(deviceId)
            .orElseThrow(() -> new BusinessException("设备不存在"));
        
        Long oldGroupId = device.getGroupId();
        if (oldGroupId != null) {
            device.setGroupId(null);
            device.setUpdateTime(LocalDateTime.now());
            deviceRepository.save(device);
            
            // 更新原分组设备数量
            updateGroupDeviceCount(oldGroupId);
            
            log.info("设备从分组移除成功, deviceId: {}, oldGroupId: {}", deviceId, oldGroupId);
        }
    }
    
    /**
     * 批量设备分组操作
     */
    @Async
    public void batchDeviceGroupOperation(List<String> deviceIds, Long groupId, GroupOperation operation) {
        for (String deviceId : deviceIds) {
            try {
                switch (operation) {
                    case ADD:
                        addDeviceToGroup(deviceId, groupId);
                        break;
                    case REMOVE:
                        removeDeviceFromGroup(deviceId);
                        break;
                }
                Thread.sleep(50);
            } catch (Exception e) {
                log.error("批量分组操作失败, deviceId: {}, operation: {}", deviceId, operation, e);
            }
        }
    }
    
    /**
     * 获取分组树结构
     */
    public List<DeviceGroupTreeNode> getGroupTree() {
        List<IoTDeviceGroup> allGroups = groupRepository.findAllByOrderByGroupPathAsc();
        return buildGroupTree(allGroups, null);
    }
    
    /**
     * 构建分组树
     */
    private List<DeviceGroupTreeNode> buildGroupTree(List<IoTDeviceGroup> allGroups, Long parentId) {
        return allGroups.stream()
            .filter(group -> Objects.equals(group.getParentId(), parentId))
            .map(group -> {
                DeviceGroupTreeNode node = new DeviceGroupTreeNode();
                node.setId(group.getId());
                node.setGroupName(group.getGroupName());
                node.setGroupDesc(group.getGroupDesc());
                node.setDeviceCount(group.getDeviceCount());
                node.setChildren(buildGroupTree(allGroups, group.getId()));
                return node;
            })
            .collect(Collectors.toList());
    }
    
    /**
     * 构建分组路径
     */
    private String buildGroupPath(Long parentId, String groupName) {
        if (parentId == null) {
            return "/" + groupName;
        }
        
        IoTDeviceGroup parentGroup = groupRepository.findById(parentId)
            .orElseThrow(() -> new BusinessException("父分组不存在"));
        
        return parentGroup.getGroupPath() + "/" + groupName;
    }
    
    /**
     * 更新分组设备数量
     */
    private void updateGroupDeviceCount(Long groupId) {
        int deviceCount = deviceRepository.countByGroupId(groupId);
        groupRepository.updateDeviceCount(groupId, deviceCount);
    }
}

# 数据库设计

# 设备表

CREATE TABLE `iot_device` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `device_id` varchar(100) NOT NULL COMMENT '设备ID',
  `device_name` varchar(200) NOT NULL COMMENT '设备名称',
  `device_type` varchar(50) NOT NULL COMMENT '设备类型',
  `product_key` varchar(100) NOT NULL COMMENT '产品密钥',
  `device_secret` varchar(100) NOT NULL COMMENT '设备密钥',
  `status` varchar(20) NOT NULL DEFAULT 'INACTIVE' COMMENT '设备状态',
  `protocol_type` varchar(20) NOT NULL COMMENT '协议类型',
  `ip_address` varchar(50) DEFAULT NULL COMMENT 'IP地址',
  `mac_address` varchar(50) DEFAULT NULL COMMENT 'MAC地址',
  `firmware_version` varchar(50) DEFAULT NULL COMMENT '固件版本',
  `hardware_version` varchar(50) DEFAULT NULL COMMENT '硬件版本',
  `location` varchar(500) DEFAULT NULL COMMENT '设备位置',
  `description` varchar(1000) DEFAULT NULL COMMENT '设备描述',
  `tags` varchar(500) DEFAULT NULL COMMENT '设备标签',
  `group_id` bigint DEFAULT NULL COMMENT '分组ID',
  `last_online_time` datetime DEFAULT NULL COMMENT '最后上线时间',
  `last_offline_time` datetime DEFAULT NULL COMMENT '最后下线时间',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_device_id` (`device_id`),
  KEY `idx_product_key` (`product_key`),
  KEY `idx_status` (`status`),
  KEY `idx_device_type` (`device_type`),
  KEY `idx_group_id` (`group_id`),
  KEY `idx_last_online_time` (`last_online_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='物联网设备表';

# 设备分组表

CREATE TABLE `iot_device_group` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `group_name` varchar(100) NOT NULL COMMENT '分组名称',
  `group_desc` varchar(500) DEFAULT NULL COMMENT '分组描述',
  `parent_id` bigint DEFAULT NULL COMMENT '父分组ID',
  `group_path` varchar(1000) NOT NULL COMMENT '分组路径',
  `device_count` int NOT NULL DEFAULT '0' COMMENT '设备数量',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_group_name_parent` (`group_name`, `parent_id`),
  KEY `idx_parent_id` (`parent_id`),
  KEY `idx_group_path` (`group_path`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备分组表';

# 性能优化

# 缓存策略

@Component
public class DeviceCacheManager {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String DEVICE_CACHE_PREFIX = "iot:device:";
    private static final String DEVICE_STATUS_PREFIX = "iot:device:status:";
    private static final String GROUP_CACHE_PREFIX = "iot:group:";
    
    /**
     * 缓存设备信息
     */
    public void cacheDeviceInfo(IoTDevice device) {
        String cacheKey = DEVICE_CACHE_PREFIX + device.getDeviceId();
        redisTemplate.opsForValue().set(cacheKey, device, Duration.ofHours(24));
    }
    
    /**
     * 缓存设备状态
     */
    public void cacheDeviceStatus(String deviceId, DeviceStatus status) {
        String cacheKey = DEVICE_STATUS_PREFIX + deviceId;
        redisTemplate.opsForValue().set(cacheKey, status.name(), Duration.ofDays(7));
    }
    
    /**
     * 批量缓存设备状态
     */
    public void batchCacheDeviceStatus(Map<String, DeviceStatus> deviceStatusMap) {
        Map<String, String> cacheMap = deviceStatusMap.entrySet().stream()
            .collect(Collectors.toMap(
                entry -> DEVICE_STATUS_PREFIX + entry.getKey(),
                entry -> entry.getValue().name()
            ));
        
        redisTemplate.opsForValue().multiSet(cacheMap);
        
        // 设置过期时间
        cacheMap.keySet().forEach(key -> 
            redisTemplate.expire(key, Duration.ofDays(7))
        );
    }
}

# 监控指标

# 设备管理监控

@Component
public class DeviceManagementMetrics {
    
    private final MeterRegistry meterRegistry;
    private final Counter deviceRegistrationCounter;
    private final Gauge onlineDeviceGauge;
    private final Timer deviceOperationTimer;
    
    public DeviceManagementMetrics(MeterRegistry meterRegistry, DeviceStatusService deviceStatusService) {
        this.meterRegistry = meterRegistry;
        this.deviceRegistrationCounter = Counter.builder("iot.device.registration")
            .description("设备注册计数")
            .register(meterRegistry);
        
        this.onlineDeviceGauge = Gauge.builder("iot.device.online")
            .description("在线设备数量")
            .register(meterRegistry, this, metrics -> {
                DeviceStatusStatistics stats = deviceStatusService.getDeviceStatusStatistics();
                return stats.getOnlineCount();
            });
        
        this.deviceOperationTimer = Timer.builder("iot.device.operation")
            .description("设备操作耗时")
            .register(meterRegistry);
    }
    
    public void incrementDeviceRegistration() {
        deviceRegistrationCounter.increment();
    }
    
    public Timer.Sample startDeviceOperation() {
        return Timer.start(meterRegistry);
    }
}

# 总结

设备管理服务是物联网平台的基础服务,提供了完整的设备生命周期管理功能。通过合理的架构设计、缓存策略和监控机制,能够支撑大规模设备的管理需求,确保系统的高性能和高可用性。