物联网设备管理服务
# 物联网设备管理服务
# 概述
设备管理服务是物联网平台的核心组件,负责设备的全生命周期管理,包括设备注册、激活、状态监控、分组管理等功能。本文档详细介绍设备管理服务的设计与实现。
# 功能架构
设备管理服务
├── 设备注册管理
│ ├── 设备信息录入
│ ├── 设备密钥生成
│ └── 设备激活流程
├── 设备状态管理
│ ├── 在线状态监控
│ ├── 设备健康检查
│ └── 故障状态处理
├── 设备分组管理
│ ├── 分组创建与删除
│ ├── 设备分组分配
│ └── 批量操作管理
└── 设备生命周期
├── 设备启用/禁用
├── 设备注销
└── 设备迁移
# 核心实体设计
# 设备实体
@Entity
@Table(name = "iot_device")
public class IoTDevice {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "device_id", unique = true, nullable = false)
private String deviceId;
@Column(name = "device_name", nullable = false)
private String deviceName;
@Column(name = "device_type")
private String deviceType;
@Column(name = "product_key", nullable = false)
private String productKey;
@Column(name = "device_secret", nullable = false)
private String deviceSecret;
@Enumerated(EnumType.STRING)
@Column(name = "status")
private DeviceStatus status;
@Column(name = "protocol_type")
private String protocolType;
@Column(name = "ip_address")
private String ipAddress;
@Column(name = "mac_address")
private String macAddress;
@Column(name = "firmware_version")
private String firmwareVersion;
@Column(name = "hardware_version")
private String hardwareVersion;
@Column(name = "location")
private String location;
@Column(name = "description")
private String description;
@Column(name = "tags")
private String tags;
@Column(name = "group_id")
private Long groupId;
@Column(name = "last_online_time")
private LocalDateTime lastOnlineTime;
@Column(name = "last_offline_time")
private LocalDateTime lastOfflineTime;
@Column(name = "create_time")
private LocalDateTime createTime;
@Column(name = "update_time")
private LocalDateTime updateTime;
// getters and setters
}
# 设备分组实体
@Entity
@Table(name = "iot_device_group")
public class IoTDeviceGroup {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "group_name", nullable = false)
private String groupName;
@Column(name = "group_desc")
private String groupDesc;
@Column(name = "parent_id")
private Long parentId;
@Column(name = "group_path")
private String groupPath;
@Column(name = "device_count")
private Integer deviceCount;
@Column(name = "create_time")
private LocalDateTime createTime;
@Column(name = "update_time")
private LocalDateTime updateTime;
// getters and setters
}
# 核心服务实现
# 设备注册服务
@Service
@Transactional
public class DeviceRegistrationService {
@Autowired
private IoTDeviceRepository deviceRepository;
@Autowired
private IoTProductRepository productRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 批量设备注册
*/
public BatchRegistrationResult batchRegisterDevices(BatchRegistrationRequest request) {
List<DeviceRegistrationInfo> devices = request.getDevices();
List<DeviceRegistrationResult> results = new ArrayList<>();
List<String> failedDevices = new ArrayList<>();
for (DeviceRegistrationInfo deviceInfo : devices) {
try {
DeviceRegistrationRequest singleRequest = new DeviceRegistrationRequest();
BeanUtils.copyProperties(deviceInfo, singleRequest);
DeviceRegistrationResult result = registerDevice(singleRequest);
results.add(result);
} catch (Exception e) {
failedDevices.add(deviceInfo.getDeviceId() + ": " + e.getMessage());
log.error("批量注册设备失败, deviceId: {}", deviceInfo.getDeviceId(), e);
}
}
return new BatchRegistrationResult(
results.size(),
failedDevices.size(),
results,
failedDevices
);
}
/**
* 设备注册
*/
public DeviceRegistrationResult registerDevice(DeviceRegistrationRequest request) {
// 1. 验证产品信息
IoTProduct product = validateProduct(request.getProductKey());
// 2. 验证设备ID唯一性
validateDeviceIdUniqueness(request.getDeviceId());
// 3. 创建设备记录
IoTDevice device = createDevice(request, product);
// 4. 保存设备信息
IoTDevice savedDevice = deviceRepository.save(device);
// 5. 缓存设备信息
cacheDeviceInfo(savedDevice);
// 6. 发送设备注册事件
publishDeviceRegisteredEvent(savedDevice);
return new DeviceRegistrationResult(
savedDevice.getDeviceId(),
savedDevice.getDeviceSecret(),
"设备注册成功"
);
}
/**
* 验证产品信息
*/
private IoTProduct validateProduct(String productKey) {
IoTProduct product = productRepository.findByProductKey(productKey)
.orElseThrow(() -> new BusinessException("产品不存在"));
if (product.getStatus() != ProductStatus.PUBLISHED) {
throw new BusinessException("产品未发布,无法注册设备");
}
return product;
}
/**
* 验证设备ID唯一性
*/
private void validateDeviceIdUniqueness(String deviceId) {
if (deviceRepository.existsByDeviceId(deviceId)) {
throw new BusinessException("设备ID已存在: " + deviceId);
}
}
/**
* 创建设备对象
*/
private IoTDevice createDevice(DeviceRegistrationRequest request, IoTProduct product) {
IoTDevice device = new IoTDevice();
device.setDeviceId(request.getDeviceId());
device.setDeviceName(request.getDeviceName());
device.setDeviceType(request.getDeviceType());
device.setProductKey(request.getProductKey());
device.setDeviceSecret(generateDeviceSecret());
device.setStatus(DeviceStatus.INACTIVE);
device.setProtocolType(request.getProtocolType());
device.setLocation(request.getLocation());
device.setDescription(request.getDescription());
device.setTags(request.getTags());
device.setCreateTime(LocalDateTime.now());
device.setUpdateTime(LocalDateTime.now());
return device;
}
/**
* 生成设备密钥
*/
private String generateDeviceSecret() {
String randomStr = UUID.randomUUID().toString().replace("-", "");
String timestamp = String.valueOf(System.currentTimeMillis());
return DigestUtils.sha256Hex(randomStr + timestamp);
}
}
# 设备状态管理服务
@Service
public class DeviceStatusService {
@Autowired
private IoTDeviceRepository deviceRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 更新设备状态
*/
public void updateDeviceStatus(String deviceId, DeviceStatus status) {
IoTDevice device = deviceRepository.findByDeviceId(deviceId)
.orElseThrow(() -> new BusinessException("设备不存在"));
DeviceStatus oldStatus = device.getStatus();
device.setStatus(status);
device.setUpdateTime(LocalDateTime.now());
// 更新特定状态的时间戳
switch (status) {
case ONLINE:
device.setLastOnlineTime(LocalDateTime.now());
break;
case OFFLINE:
device.setLastOfflineTime(LocalDateTime.now());
break;
}
deviceRepository.save(device);
// 更新缓存
updateDeviceStatusCache(deviceId, status);
// 发送状态变更事件
publishDeviceStatusChangedEvent(deviceId, oldStatus, status);
log.info("设备状态更新, deviceId: {}, oldStatus: {}, newStatus: {}",
deviceId, oldStatus, status);
}
/**
* 批量更新设备状态
*/
@Async
public void batchUpdateDeviceStatus(List<String> deviceIds, DeviceStatus status) {
for (String deviceId : deviceIds) {
try {
updateDeviceStatus(deviceId, status);
Thread.sleep(50); // 避免过于频繁的数据库操作
} catch (Exception e) {
log.error("批量更新设备状态失败, deviceId: {}, status: {}", deviceId, status, e);
}
}
}
/**
* 获取设备在线状态统计
*/
public DeviceStatusStatistics getDeviceStatusStatistics() {
List<Object[]> results = deviceRepository.countDevicesByStatus();
DeviceStatusStatistics statistics = new DeviceStatusStatistics();
for (Object[] result : results) {
DeviceStatus status = (DeviceStatus) result[0];
Long count = (Long) result[1];
switch (status) {
case ONLINE:
statistics.setOnlineCount(count.intValue());
break;
case OFFLINE:
statistics.setOfflineCount(count.intValue());
break;
case INACTIVE:
statistics.setInactiveCount(count.intValue());
break;
case DISABLED:
statistics.setDisabledCount(count.intValue());
break;
case FAULT:
statistics.setFaultCount(count.intValue());
break;
}
}
statistics.setTotalCount(
statistics.getOnlineCount() + statistics.getOfflineCount() +
statistics.getInactiveCount() + statistics.getDisabledCount() +
statistics.getFaultCount()
);
return statistics;
}
/**
* 设备健康检查
*/
@Scheduled(fixedRate = 300000) // 每5分钟执行一次
public void deviceHealthCheck() {
List<IoTDevice> onlineDevices = deviceRepository.findByStatus(DeviceStatus.ONLINE);
for (IoTDevice device : onlineDevices) {
if (isDeviceTimeout(device)) {
updateDeviceStatus(device.getDeviceId(), DeviceStatus.OFFLINE);
// 发送设备离线告警
sendDeviceOfflineAlert(device);
}
}
}
/**
* 判断设备是否超时
*/
private boolean isDeviceTimeout(IoTDevice device) {
if (device.getLastOnlineTime() == null) {
return true;
}
LocalDateTime timeoutThreshold = LocalDateTime.now().minusMinutes(10);
return device.getLastOnlineTime().isBefore(timeoutThreshold);
}
/**
* 发送设备离线告警
*/
private void sendDeviceOfflineAlert(IoTDevice device) {
DeviceOfflineAlert alert = new DeviceOfflineAlert();
alert.setDeviceId(device.getDeviceId());
alert.setDeviceName(device.getDeviceName());
alert.setOfflineTime(LocalDateTime.now());
alert.setAlertLevel(AlertLevel.WARNING);
rabbitTemplate.convertAndSend("iot.alert.exchange", "device.offline", alert);
}
}
# 设备分组管理服务
@Service
@Transactional
public class DeviceGroupService {
@Autowired
private IoTDeviceGroupRepository groupRepository;
@Autowired
private IoTDeviceRepository deviceRepository;
/**
* 创建设备分组
*/
public IoTDeviceGroup createDeviceGroup(CreateGroupRequest request) {
// 验证分组名称唯一性
if (groupRepository.existsByGroupNameAndParentId(request.getGroupName(), request.getParentId())) {
throw new BusinessException("分组名称已存在");
}
IoTDeviceGroup group = new IoTDeviceGroup();
group.setGroupName(request.getGroupName());
group.setGroupDesc(request.getGroupDesc());
group.setParentId(request.getParentId());
group.setDeviceCount(0);
group.setCreateTime(LocalDateTime.now());
group.setUpdateTime(LocalDateTime.now());
// 构建分组路径
group.setGroupPath(buildGroupPath(request.getParentId(), request.getGroupName()));
return groupRepository.save(group);
}
/**
* 将设备添加到分组
*/
public void addDeviceToGroup(String deviceId, Long groupId) {
IoTDevice device = deviceRepository.findByDeviceId(deviceId)
.orElseThrow(() -> new BusinessException("设备不存在"));
IoTDeviceGroup group = groupRepository.findById(groupId)
.orElseThrow(() -> new BusinessException("分组不存在"));
// 如果设备已在其他分组,先从原分组移除
if (device.getGroupId() != null) {
removeDeviceFromGroup(deviceId);
}
// 添加到新分组
device.setGroupId(groupId);
device.setUpdateTime(LocalDateTime.now());
deviceRepository.save(device);
// 更新分组设备数量
updateGroupDeviceCount(groupId);
log.info("设备添加到分组成功, deviceId: {}, groupId: {}", deviceId, groupId);
}
/**
* 从分组中移除设备
*/
public void removeDeviceFromGroup(String deviceId) {
IoTDevice device = deviceRepository.findByDeviceId(deviceId)
.orElseThrow(() -> new BusinessException("设备不存在"));
Long oldGroupId = device.getGroupId();
if (oldGroupId != null) {
device.setGroupId(null);
device.setUpdateTime(LocalDateTime.now());
deviceRepository.save(device);
// 更新原分组设备数量
updateGroupDeviceCount(oldGroupId);
log.info("设备从分组移除成功, deviceId: {}, oldGroupId: {}", deviceId, oldGroupId);
}
}
/**
* 批量设备分组操作
*/
@Async
public void batchDeviceGroupOperation(List<String> deviceIds, Long groupId, GroupOperation operation) {
for (String deviceId : deviceIds) {
try {
switch (operation) {
case ADD:
addDeviceToGroup(deviceId, groupId);
break;
case REMOVE:
removeDeviceFromGroup(deviceId);
break;
}
Thread.sleep(50);
} catch (Exception e) {
log.error("批量分组操作失败, deviceId: {}, operation: {}", deviceId, operation, e);
}
}
}
/**
* 获取分组树结构
*/
public List<DeviceGroupTreeNode> getGroupTree() {
List<IoTDeviceGroup> allGroups = groupRepository.findAllByOrderByGroupPathAsc();
return buildGroupTree(allGroups, null);
}
/**
* 构建分组树
*/
private List<DeviceGroupTreeNode> buildGroupTree(List<IoTDeviceGroup> allGroups, Long parentId) {
return allGroups.stream()
.filter(group -> Objects.equals(group.getParentId(), parentId))
.map(group -> {
DeviceGroupTreeNode node = new DeviceGroupTreeNode();
node.setId(group.getId());
node.setGroupName(group.getGroupName());
node.setGroupDesc(group.getGroupDesc());
node.setDeviceCount(group.getDeviceCount());
node.setChildren(buildGroupTree(allGroups, group.getId()));
return node;
})
.collect(Collectors.toList());
}
/**
* 构建分组路径
*/
private String buildGroupPath(Long parentId, String groupName) {
if (parentId == null) {
return "/" + groupName;
}
IoTDeviceGroup parentGroup = groupRepository.findById(parentId)
.orElseThrow(() -> new BusinessException("父分组不存在"));
return parentGroup.getGroupPath() + "/" + groupName;
}
/**
* 更新分组设备数量
*/
private void updateGroupDeviceCount(Long groupId) {
int deviceCount = deviceRepository.countByGroupId(groupId);
groupRepository.updateDeviceCount(groupId, deviceCount);
}
}
# 数据库设计
# 设备表
CREATE TABLE `iot_device` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`device_id` varchar(100) NOT NULL COMMENT '设备ID',
`device_name` varchar(200) NOT NULL COMMENT '设备名称',
`device_type` varchar(50) NOT NULL COMMENT '设备类型',
`product_key` varchar(100) NOT NULL COMMENT '产品密钥',
`device_secret` varchar(100) NOT NULL COMMENT '设备密钥',
`status` varchar(20) NOT NULL DEFAULT 'INACTIVE' COMMENT '设备状态',
`protocol_type` varchar(20) NOT NULL COMMENT '协议类型',
`ip_address` varchar(50) DEFAULT NULL COMMENT 'IP地址',
`mac_address` varchar(50) DEFAULT NULL COMMENT 'MAC地址',
`firmware_version` varchar(50) DEFAULT NULL COMMENT '固件版本',
`hardware_version` varchar(50) DEFAULT NULL COMMENT '硬件版本',
`location` varchar(500) DEFAULT NULL COMMENT '设备位置',
`description` varchar(1000) DEFAULT NULL COMMENT '设备描述',
`tags` varchar(500) DEFAULT NULL COMMENT '设备标签',
`group_id` bigint DEFAULT NULL COMMENT '分组ID',
`last_online_time` datetime DEFAULT NULL COMMENT '最后上线时间',
`last_offline_time` datetime DEFAULT NULL COMMENT '最后下线时间',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_device_id` (`device_id`),
KEY `idx_product_key` (`product_key`),
KEY `idx_status` (`status`),
KEY `idx_device_type` (`device_type`),
KEY `idx_group_id` (`group_id`),
KEY `idx_last_online_time` (`last_online_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='物联网设备表';
# 设备分组表
CREATE TABLE `iot_device_group` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`group_name` varchar(100) NOT NULL COMMENT '分组名称',
`group_desc` varchar(500) DEFAULT NULL COMMENT '分组描述',
`parent_id` bigint DEFAULT NULL COMMENT '父分组ID',
`group_path` varchar(1000) NOT NULL COMMENT '分组路径',
`device_count` int NOT NULL DEFAULT '0' COMMENT '设备数量',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_group_name_parent` (`group_name`, `parent_id`),
KEY `idx_parent_id` (`parent_id`),
KEY `idx_group_path` (`group_path`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备分组表';
# 性能优化
# 缓存策略
@Component
public class DeviceCacheManager {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String DEVICE_CACHE_PREFIX = "iot:device:";
private static final String DEVICE_STATUS_PREFIX = "iot:device:status:";
private static final String GROUP_CACHE_PREFIX = "iot:group:";
/**
* 缓存设备信息
*/
public void cacheDeviceInfo(IoTDevice device) {
String cacheKey = DEVICE_CACHE_PREFIX + device.getDeviceId();
redisTemplate.opsForValue().set(cacheKey, device, Duration.ofHours(24));
}
/**
* 缓存设备状态
*/
public void cacheDeviceStatus(String deviceId, DeviceStatus status) {
String cacheKey = DEVICE_STATUS_PREFIX + deviceId;
redisTemplate.opsForValue().set(cacheKey, status.name(), Duration.ofDays(7));
}
/**
* 批量缓存设备状态
*/
public void batchCacheDeviceStatus(Map<String, DeviceStatus> deviceStatusMap) {
Map<String, String> cacheMap = deviceStatusMap.entrySet().stream()
.collect(Collectors.toMap(
entry -> DEVICE_STATUS_PREFIX + entry.getKey(),
entry -> entry.getValue().name()
));
redisTemplate.opsForValue().multiSet(cacheMap);
// 设置过期时间
cacheMap.keySet().forEach(key ->
redisTemplate.expire(key, Duration.ofDays(7))
);
}
}
# 监控指标
# 设备管理监控
@Component
public class DeviceManagementMetrics {
private final MeterRegistry meterRegistry;
private final Counter deviceRegistrationCounter;
private final Gauge onlineDeviceGauge;
private final Timer deviceOperationTimer;
public DeviceManagementMetrics(MeterRegistry meterRegistry, DeviceStatusService deviceStatusService) {
this.meterRegistry = meterRegistry;
this.deviceRegistrationCounter = Counter.builder("iot.device.registration")
.description("设备注册计数")
.register(meterRegistry);
this.onlineDeviceGauge = Gauge.builder("iot.device.online")
.description("在线设备数量")
.register(meterRegistry, this, metrics -> {
DeviceStatusStatistics stats = deviceStatusService.getDeviceStatusStatistics();
return stats.getOnlineCount();
});
this.deviceOperationTimer = Timer.builder("iot.device.operation")
.description("设备操作耗时")
.register(meterRegistry);
}
public void incrementDeviceRegistration() {
deviceRegistrationCounter.increment();
}
public Timer.Sample startDeviceOperation() {
return Timer.start(meterRegistry);
}
}
# 总结
设备管理服务是物联网平台的基础服务,提供了完整的设备生命周期管理功能。通过合理的架构设计、缓存策略和监控机制,能够支撑大规模设备的管理需求,确保系统的高性能和高可用性。