物联网设备控制服务

# 物联网设备控制服务

# 概述

设备控制服务是物联网平台的核心功能模块,负责向设备发送控制指令、管理设备状态、处理指令响应。本文档详细介绍设备控制服务的架构设计、实现方案和优化策略。

# 服务架构

设备控制服务
├── 指令管理层
│   ├── 指令生成
│   ├── 指令验证
│   ├── 指令队列
│   └── 指令调度
├── 协议适配层
│   ├── MQTT控制
│   ├── HTTP控制
│   ├── CoAP控制
│   └── TCP/UDP控制
├── 状态管理层
│   ├── 设备状态跟踪
│   ├── 指令状态管理
│   ├── 响应处理
│   └── 超时处理
└── 安全控制层
    ├── 权限验证
    ├── 指令加密
    ├── 防重放攻击
    └── 审计日志

# 核心实体设计

# 设备控制指令实体

@Entity
@Table(name = "iot_device_command")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceCommand {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "command_id", nullable = false, unique = true, length = 100)
    private String commandId;
    
    @Column(name = "device_id", nullable = false, length = 100)
    private String deviceId;
    
    @Column(name = "product_id", nullable = false, length = 100)
    private String productId;
    
    @Column(name = "command_type", nullable = false, length = 50)
    @Enumerated(EnumType.STRING)
    private CommandType commandType;
    
    @Column(name = "command_name", nullable = false, length = 100)
    private String commandName;
    
    @Column(name = "command_params", columnDefinition = "JSON")
    private String commandParams;
    
    @Column(name = "command_content", columnDefinition = "TEXT")
    private String commandContent;
    
    @Column(name = "priority", nullable = false)
    private Integer priority = 0; // 优先级:0-低,1-中,2-高,3-紧急
    
    @Column(name = "timeout_seconds", nullable = false)
    private Integer timeoutSeconds = 30;
    
    @Column(name = "retry_count", nullable = false)
    private Integer retryCount = 0;
    
    @Column(name = "max_retry_count", nullable = false)
    private Integer maxRetryCount = 3;
    
    @Column(name = "status", nullable = false, length = 20)
    @Enumerated(EnumType.STRING)
    private CommandStatus status = CommandStatus.PENDING;
    
    @Column(name = "response_content", columnDefinition = "TEXT")
    private String responseContent;
    
    @Column(name = "error_message", length = 500)
    private String errorMessage;
    
    @Column(name = "sender_id", length = 100)
    private String senderId; // 发送者ID(用户或系统)
    
    @Column(name = "sender_type", length = 20)
    @Enumerated(EnumType.STRING)
    private SenderType senderType;
    
    @Column(name = "protocol", length = 20)
    private String protocol;
    
    @Column(name = "create_time", nullable = false)
    private LocalDateTime createTime;
    
    @Column(name = "send_time")
    private LocalDateTime sendTime;
    
    @Column(name = "response_time")
    private LocalDateTime responseTime;
    
    @Column(name = "complete_time")
    private LocalDateTime completeTime;
    
    @PrePersist
    protected void onCreate() {
        createTime = LocalDateTime.now();
        if (commandId == null) {
            commandId = generateCommandId();
        }
    }
    
    private String generateCommandId() {
        return "CMD_" + System.currentTimeMillis() + "_" + 
               ThreadLocalRandom.current().nextInt(1000, 9999);
    }
}

/**
 * 指令类型枚举
 */
public enum CommandType {
    CONTROL("控制指令"),
    CONFIG("配置指令"),
    QUERY("查询指令"),
    UPGRADE("升级指令"),
    REBOOT("重启指令"),
    RESET("重置指令");
    
    private final String description;
    
    CommandType(String description) {
        this.description = description;
    }
}

/**
 * 指令状态枚举
 */
public enum CommandStatus {
    PENDING("待发送"),
    SENDING("发送中"),
    SENT("已发送"),
    RECEIVED("设备已接收"),
    EXECUTING("执行中"),
    SUCCESS("执行成功"),
    FAILED("执行失败"),
    TIMEOUT("执行超时"),
    CANCELLED("已取消");
    
    private final String description;
    
    CommandStatus(String description) {
        this.description = description;
    }
}

/**
 * 发送者类型枚举
 */
public enum SenderType {
    USER("用户"),
    SYSTEM("系统"),
    RULE_ENGINE("规则引擎"),
    SCHEDULER("定时任务");
    
    private final String description;
    
    SenderType(String description) {
        this.description = description;
    }
}

# 设备控制模板实体

@Entity
@Table(name = "iot_device_command_template")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceCommandTemplate {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "template_id", nullable = false, unique = true, length = 100)
    private String templateId;
    
    @Column(name = "template_name", nullable = false, length = 100)
    private String templateName;
    
    @Column(name = "product_id", nullable = false, length = 100)
    private String productId;
    
    @Column(name = "command_type", nullable = false, length = 50)
    @Enumerated(EnumType.STRING)
    private CommandType commandType;
    
    @Column(name = "command_template", columnDefinition = "TEXT")
    private String commandTemplate;
    
    @Column(name = "param_schema", columnDefinition = "JSON")
    private String paramSchema; // 参数JSON Schema
    
    @Column(name = "default_timeout", nullable = false)
    private Integer defaultTimeout = 30;
    
    @Column(name = "default_priority", nullable = false)
    private Integer defaultPriority = 0;
    
    @Column(name = "enabled", nullable = false)
    private Boolean enabled = true;
    
    @Column(name = "description", length = 500)
    private String description;
    
    @Column(name = "create_time", nullable = false)
    private LocalDateTime createTime;
    
    @Column(name = "update_time", nullable = false)
    private LocalDateTime updateTime;
    
    @PrePersist
    protected void onCreate() {
        createTime = LocalDateTime.now();
        updateTime = LocalDateTime.now();
    }
    
    @PreUpdate
    protected void onUpdate() {
        updateTime = LocalDateTime.now();
    }
}

# 设备控制服务实现

# 核心控制服务

@Service
@Slf4j
public class DeviceControlService {
    
    @Autowired
    private DeviceCommandRepository commandRepository;
    
    @Autowired
    private DeviceCommandTemplateRepository templateRepository;
    
    @Autowired
    private DeviceManagementService deviceManagementService;
    
    @Autowired
    private DeviceAuthService deviceAuthService;
    
    @Autowired
    private CommandDispatchService commandDispatchService;
    
    @Autowired
    private CommandValidationService commandValidationService;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    private final Counter commandSentCounter;
    private final Counter commandSuccessCounter;
    private final Counter commandFailedCounter;
    private final Timer commandExecutionTimer;
    
    public DeviceControlService(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.commandSentCounter = Counter.builder("iot.command.sent.total")
            .description("发送的指令总数")
            .register(meterRegistry);
        this.commandSuccessCounter = Counter.builder("iot.command.success.total")
            .description("执行成功的指令总数")
            .register(meterRegistry);
        this.commandFailedCounter = Counter.builder("iot.command.failed.total")
            .description("执行失败的指令总数")
            .register(meterRegistry);
        this.commandExecutionTimer = Timer.builder("iot.command.execution.duration")
            .description("指令执行耗时")
            .register(meterRegistry);
    }
    
    /**
     * 发送控制指令
     */
    public CommandResult sendCommand(SendCommandRequest request) {
        Timer.Sample sample = Timer.start(meterRegistry);
        
        try {
            // 1. 验证请求参数
            validateSendCommandRequest(request);
            
            // 2. 验证设备状态
            IoTDevice device = validateDeviceForCommand(request.getDeviceId());
            
            // 3. 验证权限
            validateCommandPermission(request.getSenderId(), request.getDeviceId(), request.getCommandType());
            
            // 4. 构建指令对象
            DeviceCommand command = buildDeviceCommand(request, device.getProductId());
            
            // 5. 验证指令内容
            ValidationResult validationResult = commandValidationService.validate(command);
            if (!validationResult.isValid()) {
                return CommandResult.failure("指令验证失败: " + String.join(",", validationResult.getErrors()));
            }
            
            // 6. 保存指令
            command = commandRepository.save(command);
            
            // 7. 异步发送指令
            commandDispatchService.dispatchCommandAsync(command);
            
            // 8. 更新指标
            commandSentCounter.increment(Tags.of(
                "device_id", request.getDeviceId(),
                "command_type", request.getCommandType().name()
            ));
            
            return CommandResult.success(command.getCommandId(), "指令发送成功");
            
        } catch (Exception e) {
            log.error("发送控制指令失败, deviceId: {}", request.getDeviceId(), e);
            return CommandResult.failure("发送指令失败: " + e.getMessage());
        } finally {
            sample.stop(commandExecutionTimer);
        }
    }
    
    /**
     * 批量发送控制指令
     */
    public BatchCommandResult batchSendCommand(BatchSendCommandRequest request) {
        List<String> successCommandIds = new ArrayList<>();
        List<String> failedDeviceIds = new ArrayList<>();
        List<String> errors = new ArrayList<>();
        
        for (String deviceId : request.getDeviceIds()) {
            try {
                SendCommandRequest singleRequest = SendCommandRequest.builder()
                    .deviceId(deviceId)
                    .commandType(request.getCommandType())
                    .commandName(request.getCommandName())
                    .commandParams(request.getCommandParams())
                    .senderId(request.getSenderId())
                    .senderType(request.getSenderType())
                    .priority(request.getPriority())
                    .timeoutSeconds(request.getTimeoutSeconds())
                    .build();
                
                CommandResult result = sendCommand(singleRequest);
                if (result.isSuccess()) {
                    successCommandIds.add(result.getCommandId());
                } else {
                    failedDeviceIds.add(deviceId);
                    errors.add("设备 " + deviceId + ": " + result.getMessage());
                }
                
            } catch (Exception e) {
                failedDeviceIds.add(deviceId);
                errors.add("设备 " + deviceId + ": " + e.getMessage());
            }
        }
        
        return BatchCommandResult.builder()
            .totalCount(request.getDeviceIds().size())
            .successCount(successCommandIds.size())
            .failureCount(failedDeviceIds.size())
            .successCommandIds(successCommandIds)
            .failedDeviceIds(failedDeviceIds)
            .errors(errors)
            .build();
    }
    
    /**
     * 使用模板发送指令
     */
    public CommandResult sendCommandByTemplate(SendCommandByTemplateRequest request) {
        try {
            // 1. 获取指令模板
            DeviceCommandTemplate template = templateRepository.findByTemplateId(request.getTemplateId())
                .orElseThrow(() -> new IllegalArgumentException("指令模板不存在: " + request.getTemplateId()));
            
            if (!template.getEnabled()) {
                throw new IllegalArgumentException("指令模板已禁用: " + request.getTemplateId());
            }
            
            // 2. 验证参数
            validateTemplateParams(template, request.getParams());
            
            // 3. 渲染指令内容
            String commandContent = renderCommandTemplate(template.getCommandTemplate(), request.getParams());
            
            // 4. 构建发送请求
            SendCommandRequest sendRequest = SendCommandRequest.builder()
                .deviceId(request.getDeviceId())
                .commandType(template.getCommandType())
                .commandName(template.getTemplateName())
                .commandParams(request.getParams())
                .commandContent(commandContent)
                .senderId(request.getSenderId())
                .senderType(request.getSenderType())
                .priority(template.getDefaultPriority())
                .timeoutSeconds(template.getDefaultTimeout())
                .build();
            
            return sendCommand(sendRequest);
            
        } catch (Exception e) {
            log.error("使用模板发送指令失败, templateId: {}, deviceId: {}", 
                request.getTemplateId(), request.getDeviceId(), e);
            return CommandResult.failure("模板指令发送失败: " + e.getMessage());
        }
    }
    
    /**
     * 查询指令状态
     */
    public CommandStatusResult getCommandStatus(String commandId) {
        try {
            DeviceCommand command = commandRepository.findByCommandId(commandId)
                .orElseThrow(() -> new IllegalArgumentException("指令不存在: " + commandId));
            
            return CommandStatusResult.builder()
                .commandId(commandId)
                .deviceId(command.getDeviceId())
                .status(command.getStatus())
                .responseContent(command.getResponseContent())
                .errorMessage(command.getErrorMessage())
                .createTime(command.getCreateTime())
                .sendTime(command.getSendTime())
                .responseTime(command.getResponseTime())
                .completeTime(command.getCompleteTime())
                .build();
            
        } catch (Exception e) {
            log.error("查询指令状态失败, commandId: {}", commandId, e);
            throw new RuntimeException("查询指令状态失败", e);
        }
    }
    
    /**
     * 取消指令
     */
    public CommandResult cancelCommand(String commandId, String operatorId) {
        try {
            DeviceCommand command = commandRepository.findByCommandId(commandId)
                .orElseThrow(() -> new IllegalArgumentException("指令不存在: " + commandId));
            
            // 只有待发送和发送中的指令可以取消
            if (command.getStatus() != CommandStatus.PENDING && command.getStatus() != CommandStatus.SENDING) {
                return CommandResult.failure("指令状态不允许取消: " + command.getStatus());
            }
            
            // 更新指令状态
            command.setStatus(CommandStatus.CANCELLED);
            command.setCompleteTime(LocalDateTime.now());
            command.setErrorMessage("指令被用户取消, 操作者: " + operatorId);
            commandRepository.save(command);
            
            // 从调度队列中移除
            commandDispatchService.removeFromQueue(commandId);
            
            return CommandResult.success(commandId, "指令取消成功");
            
        } catch (Exception e) {
            log.error("取消指令失败, commandId: {}", commandId, e);
            return CommandResult.failure("取消指令失败: " + e.getMessage());
        }
    }
    
    /**
     * 处理指令响应
     */
    public void handleCommandResponse(CommandResponseEvent event) {
        try {
            DeviceCommand command = commandRepository.findByCommandId(event.getCommandId())
                .orElse(null);
            
            if (command == null) {
                log.warn("收到未知指令的响应, commandId: {}", event.getCommandId());
                return;
            }
            
            // 更新指令状态
            command.setResponseTime(LocalDateTime.now());
            command.setResponseContent(event.getResponseContent());
            
            if (event.isSuccess()) {
                command.setStatus(CommandStatus.SUCCESS);
                command.setCompleteTime(LocalDateTime.now());
                
                commandSuccessCounter.increment(Tags.of(
                    "device_id", command.getDeviceId(),
                    "command_type", command.getCommandType().name()
                ));
            } else {
                command.setStatus(CommandStatus.FAILED);
                command.setCompleteTime(LocalDateTime.now());
                command.setErrorMessage(event.getErrorMessage());
                
                commandFailedCounter.increment(Tags.of(
                    "device_id", command.getDeviceId(),
                    "command_type", command.getCommandType().name(),
                    "failure_reason", event.getErrorMessage()
                ));
            }
            
            commandRepository.save(command);
            
            // 发送指令完成事件
            publishCommandCompletedEvent(command);
            
        } catch (Exception e) {
            log.error("处理指令响应失败, commandId: {}", event.getCommandId(), e);
        }
    }
    
    /**
     * 验证发送指令请求
     */
    private void validateSendCommandRequest(SendCommandRequest request) {
        if (StringUtils.isBlank(request.getDeviceId())) {
            throw new IllegalArgumentException("设备ID不能为空");
        }
        
        if (request.getCommandType() == null) {
            throw new IllegalArgumentException("指令类型不能为空");
        }
        
        if (StringUtils.isBlank(request.getCommandName())) {
            throw new IllegalArgumentException("指令名称不能为空");
        }
        
        if (StringUtils.isBlank(request.getSenderId())) {
            throw new IllegalArgumentException("发送者ID不能为空");
        }
        
        if (request.getSenderType() == null) {
            throw new IllegalArgumentException("发送者类型不能为空");
        }
    }
    
    /**
     * 验证设备状态
     */
    private IoTDevice validateDeviceForCommand(String deviceId) {
        IoTDevice device = deviceManagementService.getDeviceById(deviceId);
        if (device == null) {
            throw new IllegalArgumentException("设备不存在: " + deviceId);
        }
        
        if (device.getStatus() == DeviceStatus.DISABLED) {
            throw new IllegalArgumentException("设备已被禁用: " + deviceId);
        }
        
        if (device.getStatus() == DeviceStatus.OFFLINE) {
            throw new IllegalArgumentException("设备离线,无法发送指令: " + deviceId);
        }
        
        return device;
    }
    
    /**
     * 验证指令权限
     */
    private void validateCommandPermission(String senderId, String deviceId, CommandType commandType) {
        String permission = "device:command:" + commandType.name().toLowerCase();
        
        if (!deviceAuthService.hasPermission(senderId, deviceId, permission)) {
            throw new SecurityException("无权限发送此类型指令: " + commandType);
        }
    }
    
    /**
     * 构建设备指令对象
     */
    private DeviceCommand buildDeviceCommand(SendCommandRequest request, String productId) {
        return DeviceCommand.builder()
            .deviceId(request.getDeviceId())
            .productId(productId)
            .commandType(request.getCommandType())
            .commandName(request.getCommandName())
            .commandParams(request.getCommandParams())
            .commandContent(request.getCommandContent())
            .priority(request.getPriority() != null ? request.getPriority() : 0)
            .timeoutSeconds(request.getTimeoutSeconds() != null ? request.getTimeoutSeconds() : 30)
            .senderId(request.getSenderId())
            .senderType(request.getSenderType())
            .status(CommandStatus.PENDING)
            .build();
    }
}

# 指令调度服务

@Service
@Slf4j
public class CommandDispatchService {
    
    @Autowired
    private MqttCommandSender mqttCommandSender;
    
    @Autowired
    private HttpCommandSender httpCommandSender;
    
    @Autowired
    private DeviceManagementService deviceManagementService;
    
    @Autowired
    private DeviceCommandRepository commandRepository;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private AsyncTaskExecutor commandExecutor;
    
    private final PriorityBlockingQueue<DeviceCommand> commandQueue;
    private final ScheduledExecutorService scheduledExecutor;
    
    public CommandDispatchService() {
        // 使用优先级队列,优先级高的指令先执行
        this.commandQueue = new PriorityBlockingQueue<>(1000, 
            Comparator.comparing(DeviceCommand::getPriority).reversed()
                .thenComparing(DeviceCommand::getCreateTime));
        
        this.scheduledExecutor = Executors.newScheduledThreadPool(5);
        
        // 启动指令调度线程
        startCommandDispatchThread();
        
        // 启动超时检查线程
        startTimeoutCheckThread();
    }
    
    /**
     * 异步调度指令
     */
    @Async("commandExecutor")
    public CompletableFuture<Void> dispatchCommandAsync(DeviceCommand command) {
        return CompletableFuture.runAsync(() -> {
            try {
                // 添加到调度队列
                commandQueue.offer(command);
                
                log.debug("指令已添加到调度队列, commandId: {}, priority: {}", 
                    command.getCommandId(), command.getPriority());
                
            } catch (Exception e) {
                log.error("指令调度失败, commandId: {}", command.getCommandId(), e);
                
                // 更新指令状态为失败
                command.setStatus(CommandStatus.FAILED);
                command.setErrorMessage("调度失败: " + e.getMessage());
                command.setCompleteTime(LocalDateTime.now());
                commandRepository.save(command);
            }
        }, commandExecutor);
    }
    
    /**
     * 启动指令调度线程
     */
    private void startCommandDispatchThread() {
        Thread dispatchThread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    DeviceCommand command = commandQueue.take(); // 阻塞等待
                    processCommand(command);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    log.error("指令调度线程异常", e);
                }
            }
        });
        
        dispatchThread.setName("command-dispatch-thread");
        dispatchThread.setDaemon(true);
        dispatchThread.start();
    }
    
    /**
     * 处理单个指令
     */
    private void processCommand(DeviceCommand command) {
        try {
            // 1. 检查指令是否已被取消
            if (command.getStatus() == CommandStatus.CANCELLED) {
                log.debug("指令已被取消, commandId: {}", command.getCommandId());
                return;
            }
            
            // 2. 更新指令状态为发送中
            command.setStatus(CommandStatus.SENDING);
            command.setSendTime(LocalDateTime.now());
            commandRepository.save(command);
            
            // 3. 获取设备信息
            IoTDevice device = deviceManagementService.getDeviceById(command.getDeviceId());
            if (device == null) {
                throw new IllegalStateException("设备不存在: " + command.getDeviceId());
            }
            
            // 4. 根据设备协议发送指令
            boolean sent = sendCommandByProtocol(command, device);
            
            if (sent) {
                // 5. 更新指令状态为已发送
                command.setStatus(CommandStatus.SENT);
                commandRepository.save(command);
                
                // 6. 设置超时检查
                scheduleTimeoutCheck(command);
                
                log.debug("指令发送成功, commandId: {}, deviceId: {}", 
                    command.getCommandId(), command.getDeviceId());
            } else {
                throw new RuntimeException("指令发送失败");
            }
            
        } catch (Exception e) {
            log.error("处理指令失败, commandId: {}", command.getCommandId(), e);
            
            // 更新指令状态为失败
            command.setStatus(CommandStatus.FAILED);
            command.setErrorMessage(e.getMessage());
            command.setCompleteTime(LocalDateTime.now());
            commandRepository.save(command);
            
            // 检查是否需要重试
            checkAndRetryCommand(command);
        }
    }
    
    /**
     * 根据协议发送指令
     */
    private boolean sendCommandByProtocol(DeviceCommand command, IoTDevice device) {
        String protocol = device.getProtocol();
        
        switch (protocol.toUpperCase()) {
            case "MQTT":
                return mqttCommandSender.sendCommand(command, device);
            case "HTTP":
                return httpCommandSender.sendCommand(command, device);
            case "COAP":
                // TODO: 实现CoAP指令发送
                throw new UnsupportedOperationException("CoAP协议暂未支持");
            default:
                throw new IllegalArgumentException("不支持的协议: " + protocol);
        }
    }
    
    /**
     * 设置超时检查
     */
    private void scheduleTimeoutCheck(DeviceCommand command) {
        scheduledExecutor.schedule(() -> {
            try {
                // 重新查询指令状态
                DeviceCommand currentCommand = commandRepository.findByCommandId(command.getCommandId())
                    .orElse(null);
                
                if (currentCommand != null && 
                    currentCommand.getStatus() != CommandStatus.SUCCESS && 
                    currentCommand.getStatus() != CommandStatus.FAILED && 
                    currentCommand.getStatus() != CommandStatus.CANCELLED) {
                    
                    // 指令超时
                    currentCommand.setStatus(CommandStatus.TIMEOUT);
                    currentCommand.setErrorMessage("指令执行超时");
                    currentCommand.setCompleteTime(LocalDateTime.now());
                    commandRepository.save(currentCommand);
                    
                    log.warn("指令执行超时, commandId: {}, deviceId: {}", 
                        currentCommand.getCommandId(), currentCommand.getDeviceId());
                    
                    // 检查是否需要重试
                    checkAndRetryCommand(currentCommand);
                }
                
            } catch (Exception e) {
                log.error("超时检查异常, commandId: {}", command.getCommandId(), e);
            }
        }, command.getTimeoutSeconds(), TimeUnit.SECONDS);
    }
    
    /**
     * 检查并重试指令
     */
    private void checkAndRetryCommand(DeviceCommand command) {
        if (command.getRetryCount() < command.getMaxRetryCount()) {
            // 增加重试次数
            command.setRetryCount(command.getRetryCount() + 1);
            command.setStatus(CommandStatus.PENDING);
            command.setErrorMessage(null);
            commandRepository.save(command);
            
            // 延迟重试(指数退避)
            long delaySeconds = (long) Math.pow(2, command.getRetryCount()) * 5; // 5s, 10s, 20s
            
            scheduledExecutor.schedule(() -> {
                commandQueue.offer(command);
                log.info("指令重试, commandId: {}, retryCount: {}", 
                    command.getCommandId(), command.getRetryCount());
            }, delaySeconds, TimeUnit.SECONDS);
        }
    }
    
    /**
     * 从队列中移除指令
     */
    public void removeFromQueue(String commandId) {
        commandQueue.removeIf(cmd -> cmd.getCommandId().equals(commandId));
    }
    
    /**
     * 启动超时检查线程
     */
    private void startTimeoutCheckThread() {
        scheduledExecutor.scheduleWithFixedDelay(() -> {
            try {
                // 查询超时的指令
                LocalDateTime timeoutThreshold = LocalDateTime.now().minusMinutes(10);
                List<DeviceCommand> timeoutCommands = commandRepository.findTimeoutCommands(
                    Arrays.asList(CommandStatus.SENT, CommandStatus.RECEIVED, CommandStatus.EXECUTING),
                    timeoutThreshold
                );
                
                for (DeviceCommand command : timeoutCommands) {
                    command.setStatus(CommandStatus.TIMEOUT);
                    command.setErrorMessage("指令执行超时(系统检查)");
                    command.setCompleteTime(LocalDateTime.now());
                    commandRepository.save(command);
                    
                    log.warn("系统检查发现超时指令, commandId: {}, deviceId: {}", 
                        command.getCommandId(), command.getDeviceId());
                }
                
            } catch (Exception e) {
                log.error("超时检查线程异常", e);
            }
        }, 1, 1, TimeUnit.MINUTES);
    }
}

# MQTT指令发送器

@Component
@Slf4j
public class MqttCommandSender {
    
    @Autowired
    private MqttTemplate mqttTemplate;
    
    @Autowired
    private DeviceAuthService deviceAuthService;
    
    /**
     * 发送MQTT指令
     */
    public boolean sendCommand(DeviceCommand command, IoTDevice device) {
        try {
            // 1. 构建MQTT主题
            String topic = buildCommandTopic(device.getDeviceId(), command.getCommandType());
            
            // 2. 构建指令消息
            MqttCommandMessage message = buildCommandMessage(command);
            
            // 3. 发送MQTT消息
            mqttTemplate.convertAndSend(topic, message);
            
            log.debug("MQTT指令发送成功, commandId: {}, topic: {}", 
                command.getCommandId(), topic);
            
            return true;
            
        } catch (Exception e) {
            log.error("MQTT指令发送失败, commandId: {}", command.getCommandId(), e);
            return false;
        }
    }
    
    /**
     * 构建指令主题
     */
    private String buildCommandTopic(String deviceId, CommandType commandType) {
        // 主题格式: /device/{deviceId}/command/{commandType}
        return String.format("/device/%s/command/%s", deviceId, commandType.name().toLowerCase());
    }
    
    /**
     * 构建指令消息
     */
    private MqttCommandMessage buildCommandMessage(DeviceCommand command) {
        return MqttCommandMessage.builder()
            .commandId(command.getCommandId())
            .commandName(command.getCommandName())
            .commandType(command.getCommandType())
            .commandParams(command.getCommandParams())
            .commandContent(command.getCommandContent())
            .timestamp(System.currentTimeMillis())
            .timeout(command.getTimeoutSeconds())
            .build();
    }
}

# HTTP指令发送器

@Component
@Slf4j
public class HttpCommandSender {
    
    @Autowired
    private RestTemplate restTemplate;
    
    @Autowired
    private DeviceAuthService deviceAuthService;
    
    /**
     * 发送HTTP指令
     */
    public boolean sendCommand(DeviceCommand command, IoTDevice device) {
        try {
            // 1. 构建请求URL
            String url = buildCommandUrl(device);
            
            // 2. 构建请求头
            HttpHeaders headers = buildRequestHeaders(device);
            
            // 3. 构建请求体
            HttpCommandRequest requestBody = buildCommandRequest(command);
            
            // 4. 发送HTTP请求
            HttpEntity<HttpCommandRequest> entity = new HttpEntity<>(requestBody, headers);
            ResponseEntity<HttpCommandResponse> response = restTemplate.postForEntity(url, entity, HttpCommandResponse.class);
            
            if (response.getStatusCode().is2xxSuccessful()) {
                log.debug("HTTP指令发送成功, commandId: {}, url: {}", 
                    command.getCommandId(), url);
                return true;
            } else {
                log.warn("HTTP指令发送失败, commandId: {}, status: {}", 
                    command.getCommandId(), response.getStatusCode());
                return false;
            }
            
        } catch (Exception e) {
            log.error("HTTP指令发送失败, commandId: {}", command.getCommandId(), e);
            return false;
        }
    }
    
    /**
     * 构建指令URL
     */
    private String buildCommandUrl(IoTDevice device) {
        // URL格式: http://{deviceIp}:{port}/api/command
        return String.format("http://%s:%d/api/command", 
            device.getIpAddress(), device.getPort());
    }
    
    /**
     * 构建请求头
     */
    private HttpHeaders buildRequestHeaders(IoTDevice device) {
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        headers.set("Device-Id", device.getDeviceId());
        
        // 添加认证头
        String authToken = deviceAuthService.generateDeviceToken(device.getDeviceId());
        headers.set("Authorization", "Bearer " + authToken);
        
        return headers;
    }
    
    /**
     * 构建指令请求
     */
    private HttpCommandRequest buildCommandRequest(DeviceCommand command) {
        return HttpCommandRequest.builder()
            .commandId(command.getCommandId())
            .commandName(command.getCommandName())
            .commandType(command.getCommandType())
            .commandParams(command.getCommandParams())
            .commandContent(command.getCommandContent())
            .timestamp(System.currentTimeMillis())
            .timeout(command.getTimeoutSeconds())
            .build();
    }
}

# 指令验证服务

@Service
@Slf4j
public class CommandValidationService {
    
    @Autowired
    private DeviceCommandTemplateRepository templateRepository;
    
    /**
     * 验证设备指令
     */
    public ValidationResult validate(DeviceCommand command) {
        List<String> errors = new ArrayList<>();
        
        try {
            // 1. 基础字段验证
            validateBasicFields(command, errors);
            
            // 2. 指令内容验证
            validateCommandContent(command, errors);
            
            // 3. 参数验证
            validateCommandParams(command, errors);
            
            // 4. 安全验证
            validateSecurity(command, errors);
            
        } catch (Exception e) {
            log.error("指令验证异常, commandId: {}", command.getCommandId(), e);
            errors.add("验证过程异常: " + e.getMessage());
        }
        
        return ValidationResult.builder()
            .valid(errors.isEmpty())
            .errors(errors)
            .build();
    }
    
    /**
     * 验证基础字段
     */
    private void validateBasicFields(DeviceCommand command, List<String> errors) {
        if (StringUtils.isBlank(command.getDeviceId())) {
            errors.add("设备ID不能为空");
        }
        
        if (command.getCommandType() == null) {
            errors.add("指令类型不能为空");
        }
        
        if (StringUtils.isBlank(command.getCommandName())) {
            errors.add("指令名称不能为空");
        }
        
        if (command.getPriority() < 0 || command.getPriority() > 3) {
            errors.add("指令优先级必须在0-3之间");
        }
        
        if (command.getTimeoutSeconds() <= 0 || command.getTimeoutSeconds() > 3600) {
            errors.add("超时时间必须在1-3600秒之间");
        }
    }
    
    /**
     * 验证指令内容
     */
    private void validateCommandContent(DeviceCommand command, List<String> errors) {
        String content = command.getCommandContent();
        
        if (StringUtils.isBlank(content)) {
            // 如果没有指令内容,检查是否有参数
            if (StringUtils.isBlank(command.getCommandParams())) {
                errors.add("指令内容和参数不能同时为空");
            }
            return;
        }
        
        // 检查内容长度
        if (content.length() > 10240) { // 10KB
            errors.add("指令内容过长,不能超过10KB");
        }
        
        // 如果是JSON格式,验证JSON有效性
        if (content.trim().startsWith("{") || content.trim().startsWith("[")) {
            try {
                ObjectMapper mapper = new ObjectMapper();
                mapper.readTree(content);
            } catch (JsonProcessingException e) {
                errors.add("指令内容JSON格式无效: " + e.getMessage());
            }
        }
    }
    
    /**
     * 验证指令参数
     */
    private void validateCommandParams(DeviceCommand command, List<String> errors) {
        String params = command.getCommandParams();
        
        if (StringUtils.isBlank(params)) {
            return;
        }
        
        try {
            // 验证参数JSON格式
            ObjectMapper mapper = new ObjectMapper();
            JsonNode paramsNode = mapper.readTree(params);
            
            // 根据指令类型验证特定参数
            validateTypeSpecificParams(command.getCommandType(), paramsNode, errors);
            
        } catch (JsonProcessingException e) {
            errors.add("指令参数JSON格式无效: " + e.getMessage());
        }
    }
    
    /**
     * 验证特定类型的参数
     */
    private void validateTypeSpecificParams(CommandType commandType, JsonNode params, List<String> errors) {
        switch (commandType) {
            case CONTROL:
                validateControlParams(params, errors);
                break;
            case CONFIG:
                validateConfigParams(params, errors);
                break;
            case UPGRADE:
                validateUpgradeParams(params, errors);
                break;
            // 其他类型的验证...
        }
    }
    
    /**
     * 验证控制参数
     */
    private void validateControlParams(JsonNode params, List<String> errors) {
        // 控制指令必须包含action字段
        if (!params.has("action")) {
            errors.add("控制指令必须包含action字段");
        }
    }
    
    /**
     * 验证配置参数
     */
    private void validateConfigParams(JsonNode params, List<String> errors) {
        // 配置指令必须包含config字段
        if (!params.has("config")) {
            errors.add("配置指令必须包含config字段");
        }
    }
    
    /**
     * 验证升级参数
     */
    private void validateUpgradeParams(JsonNode params, List<String> errors) {
        // 升级指令必须包含version和downloadUrl字段
        if (!params.has("version")) {
            errors.add("升级指令必须包含version字段");
        }
        
        if (!params.has("downloadUrl")) {
            errors.add("升级指令必须包含downloadUrl字段");
        }
    }
    
    /**
     * 安全验证
     */
    private void validateSecurity(DeviceCommand command, List<String> errors) {
        // 检查是否包含危险指令
        String content = command.getCommandContent();
        String params = command.getCommandParams();
        
        List<String> dangerousKeywords = Arrays.asList(
            "rm -rf", "format", "delete", "drop", "truncate", "shutdown", "reboot"
        );
        
        for (String keyword : dangerousKeywords) {
            if ((content != null && content.toLowerCase().contains(keyword)) ||
                (params != null && params.toLowerCase().contains(keyword))) {
                errors.add("指令包含危险关键字: " + keyword);
            }
        }
        
        // 检查指令长度是否异常
        int totalLength = (content != null ? content.length() : 0) + 
                         (params != null ? params.length() : 0);
        if (totalLength > 50000) { // 50KB
            errors.add("指令总长度过长,可能存在安全风险");
        }
    }
}

# 数据库设计

# 设备指令表

CREATE TABLE `iot_device_command` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `command_id` varchar(100) NOT NULL COMMENT '指令ID',
  `device_id` varchar(100) NOT NULL COMMENT '设备ID',
  `product_id` varchar(100) NOT NULL COMMENT '产品ID',
  `command_type` varchar(50) NOT NULL COMMENT '指令类型',
  `command_name` varchar(100) NOT NULL COMMENT '指令名称',
  `command_params` json DEFAULT NULL COMMENT '指令参数',
  `command_content` text COMMENT '指令内容',
  `priority` int NOT NULL DEFAULT '0' COMMENT '优先级',
  `timeout_seconds` int NOT NULL DEFAULT '30' COMMENT '超时时间(秒)',
  `retry_count` int NOT NULL DEFAULT '0' COMMENT '重试次数',
  `max_retry_count` int NOT NULL DEFAULT '3' COMMENT '最大重试次数',
  `status` varchar(20) NOT NULL DEFAULT 'PENDING' COMMENT '状态',
  `response_content` text COMMENT '响应内容',
  `error_message` varchar(500) DEFAULT NULL COMMENT '错误信息',
  `sender_id` varchar(100) DEFAULT NULL COMMENT '发送者ID',
  `sender_type` varchar(20) DEFAULT NULL COMMENT '发送者类型',
  `protocol` varchar(20) DEFAULT NULL COMMENT '协议',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `send_time` datetime DEFAULT NULL COMMENT '发送时间',
  `response_time` datetime DEFAULT NULL COMMENT '响应时间',
  `complete_time` datetime DEFAULT NULL COMMENT '完成时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_command_id` (`command_id`),
  KEY `idx_device_id_status` (`device_id`, `status`),
  KEY `idx_create_time` (`create_time`),
  KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备指令表'
PARTITION BY RANGE (TO_DAYS(create_time)) (
    PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
    PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
    PARTITION p202403 VALUES LESS THAN (TO_DAYS('2024-04-01')),
    PARTITION p_future VALUES LESS THAN MAXVALUE
);

# 指令模板表

CREATE TABLE `iot_device_command_template` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `template_id` varchar(100) NOT NULL COMMENT '模板ID',
  `template_name` varchar(100) NOT NULL COMMENT '模板名称',
  `product_id` varchar(100) NOT NULL COMMENT '产品ID',
  `command_type` varchar(50) NOT NULL COMMENT '指令类型',
  `command_template` text NOT NULL COMMENT '指令模板',
  `param_schema` json DEFAULT NULL COMMENT '参数Schema',
  `default_timeout` int NOT NULL DEFAULT '30' COMMENT '默认超时时间',
  `default_priority` int NOT NULL DEFAULT '0' COMMENT '默认优先级',
  `enabled` tinyint(1) NOT NULL DEFAULT '1' COMMENT '是否启用',
  `description` varchar(500) DEFAULT NULL COMMENT '描述',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_template_id` (`template_id`),
  KEY `idx_product_id` (`product_id`),
  KEY `idx_command_type` (`command_type`),
  KEY `idx_enabled` (`enabled`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备指令模板表';

# 性能优化

# 指令队列优化

@Configuration
public class CommandQueueConfig {
    
    /**
     * 指令执行线程池
     */
    @Bean("commandExecutor")
    public TaskExecutor commandExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(2000);
        executor.setThreadNamePrefix("command-exec-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
    
    /**
     * Redis配置
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
}

# 监控指标

# 指令执行监控

@Component
public class CommandMetrics {
    
    private final Counter commandSentCounter;
    private final Counter commandSuccessCounter;
    private final Counter commandFailedCounter;
    private final Timer commandExecutionTimer;
    private final Gauge pendingCommandsGauge;
    
    public CommandMetrics(MeterRegistry meterRegistry, DeviceCommandRepository commandRepository) {
        this.commandSentCounter = Counter.builder("iot.command.sent.total")
            .description("发送的指令总数")
            .register(meterRegistry);
        
        this.commandSuccessCounter = Counter.builder("iot.command.success.total")
            .description("执行成功的指令总数")
            .register(meterRegistry);
        
        this.commandFailedCounter = Counter.builder("iot.command.failed.total")
            .description("执行失败的指令总数")
            .register(meterRegistry);
        
        this.commandExecutionTimer = Timer.builder("iot.command.execution.duration")
            .description("指令执行耗时")
            .register(meterRegistry);
        
        this.pendingCommandsGauge = Gauge.builder("iot.command.pending.count")
            .description("待执行指令数量")
            .register(meterRegistry, this, metrics -> {
                return commandRepository.countByStatus(CommandStatus.PENDING);
            });
    }
    
    public void incrementCommandSent(String deviceId, String commandType) {
        commandSentCounter.increment(Tags.of(
            "device_id", deviceId,
            "command_type", commandType
        ));
    }
    
    public void incrementCommandSuccess(String deviceId, String commandType) {
        commandSuccessCounter.increment(Tags.of(
            "device_id", deviceId,
            "command_type", commandType
        ));
    }
    
    public void incrementCommandFailed(String deviceId, String commandType, String reason) {
        commandFailedCounter.increment(Tags.of(
            "device_id", deviceId,
            "command_type", commandType,
            "failure_reason", reason
        ));
    }
    
    public Timer.Sample startExecutionTimer() {
        return Timer.start();
    }
    
    public void stopExecutionTimer(Timer.Sample sample, String deviceId, String commandType) {
        sample.stop(Timer.builder("iot.command.execution.duration")
            .tag("device_id", deviceId)
            .tag("command_type", commandType)
            .register(meterRegistry));
    }
}

# 总结

设备控制服务是物联网平台的重要组成部分,通过指令管理、协议适配、状态跟踪、安全控制等机制,实现了对设备的可靠控制。合理的架构设计、优先级调度、超时处理和重试机制保证了指令执行的高效性和可靠性。