物联网设备控制服务
# 物联网设备控制服务
# 概述
设备控制服务是物联网平台的核心功能模块,负责向设备发送控制指令、管理设备状态、处理指令响应。本文档详细介绍设备控制服务的架构设计、实现方案和优化策略。
# 服务架构
设备控制服务
├── 指令管理层
│ ├── 指令生成
│ ├── 指令验证
│ ├── 指令队列
│ └── 指令调度
├── 协议适配层
│ ├── MQTT控制
│ ├── HTTP控制
│ ├── CoAP控制
│ └── TCP/UDP控制
├── 状态管理层
│ ├── 设备状态跟踪
│ ├── 指令状态管理
│ ├── 响应处理
│ └── 超时处理
└── 安全控制层
├── 权限验证
├── 指令加密
├── 防重放攻击
└── 审计日志
# 核心实体设计
# 设备控制指令实体
@Entity
@Table(name = "iot_device_command")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceCommand {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "command_id", nullable = false, unique = true, length = 100)
private String commandId;
@Column(name = "device_id", nullable = false, length = 100)
private String deviceId;
@Column(name = "product_id", nullable = false, length = 100)
private String productId;
@Column(name = "command_type", nullable = false, length = 50)
@Enumerated(EnumType.STRING)
private CommandType commandType;
@Column(name = "command_name", nullable = false, length = 100)
private String commandName;
@Column(name = "command_params", columnDefinition = "JSON")
private String commandParams;
@Column(name = "command_content", columnDefinition = "TEXT")
private String commandContent;
@Column(name = "priority", nullable = false)
private Integer priority = 0; // 优先级:0-低,1-中,2-高,3-紧急
@Column(name = "timeout_seconds", nullable = false)
private Integer timeoutSeconds = 30;
@Column(name = "retry_count", nullable = false)
private Integer retryCount = 0;
@Column(name = "max_retry_count", nullable = false)
private Integer maxRetryCount = 3;
@Column(name = "status", nullable = false, length = 20)
@Enumerated(EnumType.STRING)
private CommandStatus status = CommandStatus.PENDING;
@Column(name = "response_content", columnDefinition = "TEXT")
private String responseContent;
@Column(name = "error_message", length = 500)
private String errorMessage;
@Column(name = "sender_id", length = 100)
private String senderId; // 发送者ID(用户或系统)
@Column(name = "sender_type", length = 20)
@Enumerated(EnumType.STRING)
private SenderType senderType;
@Column(name = "protocol", length = 20)
private String protocol;
@Column(name = "create_time", nullable = false)
private LocalDateTime createTime;
@Column(name = "send_time")
private LocalDateTime sendTime;
@Column(name = "response_time")
private LocalDateTime responseTime;
@Column(name = "complete_time")
private LocalDateTime completeTime;
@PrePersist
protected void onCreate() {
createTime = LocalDateTime.now();
if (commandId == null) {
commandId = generateCommandId();
}
}
private String generateCommandId() {
return "CMD_" + System.currentTimeMillis() + "_" +
ThreadLocalRandom.current().nextInt(1000, 9999);
}
}
/**
* 指令类型枚举
*/
public enum CommandType {
CONTROL("控制指令"),
CONFIG("配置指令"),
QUERY("查询指令"),
UPGRADE("升级指令"),
REBOOT("重启指令"),
RESET("重置指令");
private final String description;
CommandType(String description) {
this.description = description;
}
}
/**
* 指令状态枚举
*/
public enum CommandStatus {
PENDING("待发送"),
SENDING("发送中"),
SENT("已发送"),
RECEIVED("设备已接收"),
EXECUTING("执行中"),
SUCCESS("执行成功"),
FAILED("执行失败"),
TIMEOUT("执行超时"),
CANCELLED("已取消");
private final String description;
CommandStatus(String description) {
this.description = description;
}
}
/**
* 发送者类型枚举
*/
public enum SenderType {
USER("用户"),
SYSTEM("系统"),
RULE_ENGINE("规则引擎"),
SCHEDULER("定时任务");
private final String description;
SenderType(String description) {
this.description = description;
}
}
# 设备控制模板实体
@Entity
@Table(name = "iot_device_command_template")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceCommandTemplate {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "template_id", nullable = false, unique = true, length = 100)
private String templateId;
@Column(name = "template_name", nullable = false, length = 100)
private String templateName;
@Column(name = "product_id", nullable = false, length = 100)
private String productId;
@Column(name = "command_type", nullable = false, length = 50)
@Enumerated(EnumType.STRING)
private CommandType commandType;
@Column(name = "command_template", columnDefinition = "TEXT")
private String commandTemplate;
@Column(name = "param_schema", columnDefinition = "JSON")
private String paramSchema; // 参数JSON Schema
@Column(name = "default_timeout", nullable = false)
private Integer defaultTimeout = 30;
@Column(name = "default_priority", nullable = false)
private Integer defaultPriority = 0;
@Column(name = "enabled", nullable = false)
private Boolean enabled = true;
@Column(name = "description", length = 500)
private String description;
@Column(name = "create_time", nullable = false)
private LocalDateTime createTime;
@Column(name = "update_time", nullable = false)
private LocalDateTime updateTime;
@PrePersist
protected void onCreate() {
createTime = LocalDateTime.now();
updateTime = LocalDateTime.now();
}
@PreUpdate
protected void onUpdate() {
updateTime = LocalDateTime.now();
}
}
# 设备控制服务实现
# 核心控制服务
@Service
@Slf4j
public class DeviceControlService {
@Autowired
private DeviceCommandRepository commandRepository;
@Autowired
private DeviceCommandTemplateRepository templateRepository;
@Autowired
private DeviceManagementService deviceManagementService;
@Autowired
private DeviceAuthService deviceAuthService;
@Autowired
private CommandDispatchService commandDispatchService;
@Autowired
private CommandValidationService commandValidationService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private MeterRegistry meterRegistry;
private final Counter commandSentCounter;
private final Counter commandSuccessCounter;
private final Counter commandFailedCounter;
private final Timer commandExecutionTimer;
public DeviceControlService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.commandSentCounter = Counter.builder("iot.command.sent.total")
.description("发送的指令总数")
.register(meterRegistry);
this.commandSuccessCounter = Counter.builder("iot.command.success.total")
.description("执行成功的指令总数")
.register(meterRegistry);
this.commandFailedCounter = Counter.builder("iot.command.failed.total")
.description("执行失败的指令总数")
.register(meterRegistry);
this.commandExecutionTimer = Timer.builder("iot.command.execution.duration")
.description("指令执行耗时")
.register(meterRegistry);
}
/**
* 发送控制指令
*/
public CommandResult sendCommand(SendCommandRequest request) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
// 1. 验证请求参数
validateSendCommandRequest(request);
// 2. 验证设备状态
IoTDevice device = validateDeviceForCommand(request.getDeviceId());
// 3. 验证权限
validateCommandPermission(request.getSenderId(), request.getDeviceId(), request.getCommandType());
// 4. 构建指令对象
DeviceCommand command = buildDeviceCommand(request, device.getProductId());
// 5. 验证指令内容
ValidationResult validationResult = commandValidationService.validate(command);
if (!validationResult.isValid()) {
return CommandResult.failure("指令验证失败: " + String.join(",", validationResult.getErrors()));
}
// 6. 保存指令
command = commandRepository.save(command);
// 7. 异步发送指令
commandDispatchService.dispatchCommandAsync(command);
// 8. 更新指标
commandSentCounter.increment(Tags.of(
"device_id", request.getDeviceId(),
"command_type", request.getCommandType().name()
));
return CommandResult.success(command.getCommandId(), "指令发送成功");
} catch (Exception e) {
log.error("发送控制指令失败, deviceId: {}", request.getDeviceId(), e);
return CommandResult.failure("发送指令失败: " + e.getMessage());
} finally {
sample.stop(commandExecutionTimer);
}
}
/**
* 批量发送控制指令
*/
public BatchCommandResult batchSendCommand(BatchSendCommandRequest request) {
List<String> successCommandIds = new ArrayList<>();
List<String> failedDeviceIds = new ArrayList<>();
List<String> errors = new ArrayList<>();
for (String deviceId : request.getDeviceIds()) {
try {
SendCommandRequest singleRequest = SendCommandRequest.builder()
.deviceId(deviceId)
.commandType(request.getCommandType())
.commandName(request.getCommandName())
.commandParams(request.getCommandParams())
.senderId(request.getSenderId())
.senderType(request.getSenderType())
.priority(request.getPriority())
.timeoutSeconds(request.getTimeoutSeconds())
.build();
CommandResult result = sendCommand(singleRequest);
if (result.isSuccess()) {
successCommandIds.add(result.getCommandId());
} else {
failedDeviceIds.add(deviceId);
errors.add("设备 " + deviceId + ": " + result.getMessage());
}
} catch (Exception e) {
failedDeviceIds.add(deviceId);
errors.add("设备 " + deviceId + ": " + e.getMessage());
}
}
return BatchCommandResult.builder()
.totalCount(request.getDeviceIds().size())
.successCount(successCommandIds.size())
.failureCount(failedDeviceIds.size())
.successCommandIds(successCommandIds)
.failedDeviceIds(failedDeviceIds)
.errors(errors)
.build();
}
/**
* 使用模板发送指令
*/
public CommandResult sendCommandByTemplate(SendCommandByTemplateRequest request) {
try {
// 1. 获取指令模板
DeviceCommandTemplate template = templateRepository.findByTemplateId(request.getTemplateId())
.orElseThrow(() -> new IllegalArgumentException("指令模板不存在: " + request.getTemplateId()));
if (!template.getEnabled()) {
throw new IllegalArgumentException("指令模板已禁用: " + request.getTemplateId());
}
// 2. 验证参数
validateTemplateParams(template, request.getParams());
// 3. 渲染指令内容
String commandContent = renderCommandTemplate(template.getCommandTemplate(), request.getParams());
// 4. 构建发送请求
SendCommandRequest sendRequest = SendCommandRequest.builder()
.deviceId(request.getDeviceId())
.commandType(template.getCommandType())
.commandName(template.getTemplateName())
.commandParams(request.getParams())
.commandContent(commandContent)
.senderId(request.getSenderId())
.senderType(request.getSenderType())
.priority(template.getDefaultPriority())
.timeoutSeconds(template.getDefaultTimeout())
.build();
return sendCommand(sendRequest);
} catch (Exception e) {
log.error("使用模板发送指令失败, templateId: {}, deviceId: {}",
request.getTemplateId(), request.getDeviceId(), e);
return CommandResult.failure("模板指令发送失败: " + e.getMessage());
}
}
/**
* 查询指令状态
*/
public CommandStatusResult getCommandStatus(String commandId) {
try {
DeviceCommand command = commandRepository.findByCommandId(commandId)
.orElseThrow(() -> new IllegalArgumentException("指令不存在: " + commandId));
return CommandStatusResult.builder()
.commandId(commandId)
.deviceId(command.getDeviceId())
.status(command.getStatus())
.responseContent(command.getResponseContent())
.errorMessage(command.getErrorMessage())
.createTime(command.getCreateTime())
.sendTime(command.getSendTime())
.responseTime(command.getResponseTime())
.completeTime(command.getCompleteTime())
.build();
} catch (Exception e) {
log.error("查询指令状态失败, commandId: {}", commandId, e);
throw new RuntimeException("查询指令状态失败", e);
}
}
/**
* 取消指令
*/
public CommandResult cancelCommand(String commandId, String operatorId) {
try {
DeviceCommand command = commandRepository.findByCommandId(commandId)
.orElseThrow(() -> new IllegalArgumentException("指令不存在: " + commandId));
// 只有待发送和发送中的指令可以取消
if (command.getStatus() != CommandStatus.PENDING && command.getStatus() != CommandStatus.SENDING) {
return CommandResult.failure("指令状态不允许取消: " + command.getStatus());
}
// 更新指令状态
command.setStatus(CommandStatus.CANCELLED);
command.setCompleteTime(LocalDateTime.now());
command.setErrorMessage("指令被用户取消, 操作者: " + operatorId);
commandRepository.save(command);
// 从调度队列中移除
commandDispatchService.removeFromQueue(commandId);
return CommandResult.success(commandId, "指令取消成功");
} catch (Exception e) {
log.error("取消指令失败, commandId: {}", commandId, e);
return CommandResult.failure("取消指令失败: " + e.getMessage());
}
}
/**
* 处理指令响应
*/
public void handleCommandResponse(CommandResponseEvent event) {
try {
DeviceCommand command = commandRepository.findByCommandId(event.getCommandId())
.orElse(null);
if (command == null) {
log.warn("收到未知指令的响应, commandId: {}", event.getCommandId());
return;
}
// 更新指令状态
command.setResponseTime(LocalDateTime.now());
command.setResponseContent(event.getResponseContent());
if (event.isSuccess()) {
command.setStatus(CommandStatus.SUCCESS);
command.setCompleteTime(LocalDateTime.now());
commandSuccessCounter.increment(Tags.of(
"device_id", command.getDeviceId(),
"command_type", command.getCommandType().name()
));
} else {
command.setStatus(CommandStatus.FAILED);
command.setCompleteTime(LocalDateTime.now());
command.setErrorMessage(event.getErrorMessage());
commandFailedCounter.increment(Tags.of(
"device_id", command.getDeviceId(),
"command_type", command.getCommandType().name(),
"failure_reason", event.getErrorMessage()
));
}
commandRepository.save(command);
// 发送指令完成事件
publishCommandCompletedEvent(command);
} catch (Exception e) {
log.error("处理指令响应失败, commandId: {}", event.getCommandId(), e);
}
}
/**
* 验证发送指令请求
*/
private void validateSendCommandRequest(SendCommandRequest request) {
if (StringUtils.isBlank(request.getDeviceId())) {
throw new IllegalArgumentException("设备ID不能为空");
}
if (request.getCommandType() == null) {
throw new IllegalArgumentException("指令类型不能为空");
}
if (StringUtils.isBlank(request.getCommandName())) {
throw new IllegalArgumentException("指令名称不能为空");
}
if (StringUtils.isBlank(request.getSenderId())) {
throw new IllegalArgumentException("发送者ID不能为空");
}
if (request.getSenderType() == null) {
throw new IllegalArgumentException("发送者类型不能为空");
}
}
/**
* 验证设备状态
*/
private IoTDevice validateDeviceForCommand(String deviceId) {
IoTDevice device = deviceManagementService.getDeviceById(deviceId);
if (device == null) {
throw new IllegalArgumentException("设备不存在: " + deviceId);
}
if (device.getStatus() == DeviceStatus.DISABLED) {
throw new IllegalArgumentException("设备已被禁用: " + deviceId);
}
if (device.getStatus() == DeviceStatus.OFFLINE) {
throw new IllegalArgumentException("设备离线,无法发送指令: " + deviceId);
}
return device;
}
/**
* 验证指令权限
*/
private void validateCommandPermission(String senderId, String deviceId, CommandType commandType) {
String permission = "device:command:" + commandType.name().toLowerCase();
if (!deviceAuthService.hasPermission(senderId, deviceId, permission)) {
throw new SecurityException("无权限发送此类型指令: " + commandType);
}
}
/**
* 构建设备指令对象
*/
private DeviceCommand buildDeviceCommand(SendCommandRequest request, String productId) {
return DeviceCommand.builder()
.deviceId(request.getDeviceId())
.productId(productId)
.commandType(request.getCommandType())
.commandName(request.getCommandName())
.commandParams(request.getCommandParams())
.commandContent(request.getCommandContent())
.priority(request.getPriority() != null ? request.getPriority() : 0)
.timeoutSeconds(request.getTimeoutSeconds() != null ? request.getTimeoutSeconds() : 30)
.senderId(request.getSenderId())
.senderType(request.getSenderType())
.status(CommandStatus.PENDING)
.build();
}
}
# 指令调度服务
@Service
@Slf4j
public class CommandDispatchService {
@Autowired
private MqttCommandSender mqttCommandSender;
@Autowired
private HttpCommandSender httpCommandSender;
@Autowired
private DeviceManagementService deviceManagementService;
@Autowired
private DeviceCommandRepository commandRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private AsyncTaskExecutor commandExecutor;
private final PriorityBlockingQueue<DeviceCommand> commandQueue;
private final ScheduledExecutorService scheduledExecutor;
public CommandDispatchService() {
// 使用优先级队列,优先级高的指令先执行
this.commandQueue = new PriorityBlockingQueue<>(1000,
Comparator.comparing(DeviceCommand::getPriority).reversed()
.thenComparing(DeviceCommand::getCreateTime));
this.scheduledExecutor = Executors.newScheduledThreadPool(5);
// 启动指令调度线程
startCommandDispatchThread();
// 启动超时检查线程
startTimeoutCheckThread();
}
/**
* 异步调度指令
*/
@Async("commandExecutor")
public CompletableFuture<Void> dispatchCommandAsync(DeviceCommand command) {
return CompletableFuture.runAsync(() -> {
try {
// 添加到调度队列
commandQueue.offer(command);
log.debug("指令已添加到调度队列, commandId: {}, priority: {}",
command.getCommandId(), command.getPriority());
} catch (Exception e) {
log.error("指令调度失败, commandId: {}", command.getCommandId(), e);
// 更新指令状态为失败
command.setStatus(CommandStatus.FAILED);
command.setErrorMessage("调度失败: " + e.getMessage());
command.setCompleteTime(LocalDateTime.now());
commandRepository.save(command);
}
}, commandExecutor);
}
/**
* 启动指令调度线程
*/
private void startCommandDispatchThread() {
Thread dispatchThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
DeviceCommand command = commandQueue.take(); // 阻塞等待
processCommand(command);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("指令调度线程异常", e);
}
}
});
dispatchThread.setName("command-dispatch-thread");
dispatchThread.setDaemon(true);
dispatchThread.start();
}
/**
* 处理单个指令
*/
private void processCommand(DeviceCommand command) {
try {
// 1. 检查指令是否已被取消
if (command.getStatus() == CommandStatus.CANCELLED) {
log.debug("指令已被取消, commandId: {}", command.getCommandId());
return;
}
// 2. 更新指令状态为发送中
command.setStatus(CommandStatus.SENDING);
command.setSendTime(LocalDateTime.now());
commandRepository.save(command);
// 3. 获取设备信息
IoTDevice device = deviceManagementService.getDeviceById(command.getDeviceId());
if (device == null) {
throw new IllegalStateException("设备不存在: " + command.getDeviceId());
}
// 4. 根据设备协议发送指令
boolean sent = sendCommandByProtocol(command, device);
if (sent) {
// 5. 更新指令状态为已发送
command.setStatus(CommandStatus.SENT);
commandRepository.save(command);
// 6. 设置超时检查
scheduleTimeoutCheck(command);
log.debug("指令发送成功, commandId: {}, deviceId: {}",
command.getCommandId(), command.getDeviceId());
} else {
throw new RuntimeException("指令发送失败");
}
} catch (Exception e) {
log.error("处理指令失败, commandId: {}", command.getCommandId(), e);
// 更新指令状态为失败
command.setStatus(CommandStatus.FAILED);
command.setErrorMessage(e.getMessage());
command.setCompleteTime(LocalDateTime.now());
commandRepository.save(command);
// 检查是否需要重试
checkAndRetryCommand(command);
}
}
/**
* 根据协议发送指令
*/
private boolean sendCommandByProtocol(DeviceCommand command, IoTDevice device) {
String protocol = device.getProtocol();
switch (protocol.toUpperCase()) {
case "MQTT":
return mqttCommandSender.sendCommand(command, device);
case "HTTP":
return httpCommandSender.sendCommand(command, device);
case "COAP":
// TODO: 实现CoAP指令发送
throw new UnsupportedOperationException("CoAP协议暂未支持");
default:
throw new IllegalArgumentException("不支持的协议: " + protocol);
}
}
/**
* 设置超时检查
*/
private void scheduleTimeoutCheck(DeviceCommand command) {
scheduledExecutor.schedule(() -> {
try {
// 重新查询指令状态
DeviceCommand currentCommand = commandRepository.findByCommandId(command.getCommandId())
.orElse(null);
if (currentCommand != null &&
currentCommand.getStatus() != CommandStatus.SUCCESS &&
currentCommand.getStatus() != CommandStatus.FAILED &&
currentCommand.getStatus() != CommandStatus.CANCELLED) {
// 指令超时
currentCommand.setStatus(CommandStatus.TIMEOUT);
currentCommand.setErrorMessage("指令执行超时");
currentCommand.setCompleteTime(LocalDateTime.now());
commandRepository.save(currentCommand);
log.warn("指令执行超时, commandId: {}, deviceId: {}",
currentCommand.getCommandId(), currentCommand.getDeviceId());
// 检查是否需要重试
checkAndRetryCommand(currentCommand);
}
} catch (Exception e) {
log.error("超时检查异常, commandId: {}", command.getCommandId(), e);
}
}, command.getTimeoutSeconds(), TimeUnit.SECONDS);
}
/**
* 检查并重试指令
*/
private void checkAndRetryCommand(DeviceCommand command) {
if (command.getRetryCount() < command.getMaxRetryCount()) {
// 增加重试次数
command.setRetryCount(command.getRetryCount() + 1);
command.setStatus(CommandStatus.PENDING);
command.setErrorMessage(null);
commandRepository.save(command);
// 延迟重试(指数退避)
long delaySeconds = (long) Math.pow(2, command.getRetryCount()) * 5; // 5s, 10s, 20s
scheduledExecutor.schedule(() -> {
commandQueue.offer(command);
log.info("指令重试, commandId: {}, retryCount: {}",
command.getCommandId(), command.getRetryCount());
}, delaySeconds, TimeUnit.SECONDS);
}
}
/**
* 从队列中移除指令
*/
public void removeFromQueue(String commandId) {
commandQueue.removeIf(cmd -> cmd.getCommandId().equals(commandId));
}
/**
* 启动超时检查线程
*/
private void startTimeoutCheckThread() {
scheduledExecutor.scheduleWithFixedDelay(() -> {
try {
// 查询超时的指令
LocalDateTime timeoutThreshold = LocalDateTime.now().minusMinutes(10);
List<DeviceCommand> timeoutCommands = commandRepository.findTimeoutCommands(
Arrays.asList(CommandStatus.SENT, CommandStatus.RECEIVED, CommandStatus.EXECUTING),
timeoutThreshold
);
for (DeviceCommand command : timeoutCommands) {
command.setStatus(CommandStatus.TIMEOUT);
command.setErrorMessage("指令执行超时(系统检查)");
command.setCompleteTime(LocalDateTime.now());
commandRepository.save(command);
log.warn("系统检查发现超时指令, commandId: {}, deviceId: {}",
command.getCommandId(), command.getDeviceId());
}
} catch (Exception e) {
log.error("超时检查线程异常", e);
}
}, 1, 1, TimeUnit.MINUTES);
}
}
# MQTT指令发送器
@Component
@Slf4j
public class MqttCommandSender {
@Autowired
private MqttTemplate mqttTemplate;
@Autowired
private DeviceAuthService deviceAuthService;
/**
* 发送MQTT指令
*/
public boolean sendCommand(DeviceCommand command, IoTDevice device) {
try {
// 1. 构建MQTT主题
String topic = buildCommandTopic(device.getDeviceId(), command.getCommandType());
// 2. 构建指令消息
MqttCommandMessage message = buildCommandMessage(command);
// 3. 发送MQTT消息
mqttTemplate.convertAndSend(topic, message);
log.debug("MQTT指令发送成功, commandId: {}, topic: {}",
command.getCommandId(), topic);
return true;
} catch (Exception e) {
log.error("MQTT指令发送失败, commandId: {}", command.getCommandId(), e);
return false;
}
}
/**
* 构建指令主题
*/
private String buildCommandTopic(String deviceId, CommandType commandType) {
// 主题格式: /device/{deviceId}/command/{commandType}
return String.format("/device/%s/command/%s", deviceId, commandType.name().toLowerCase());
}
/**
* 构建指令消息
*/
private MqttCommandMessage buildCommandMessage(DeviceCommand command) {
return MqttCommandMessage.builder()
.commandId(command.getCommandId())
.commandName(command.getCommandName())
.commandType(command.getCommandType())
.commandParams(command.getCommandParams())
.commandContent(command.getCommandContent())
.timestamp(System.currentTimeMillis())
.timeout(command.getTimeoutSeconds())
.build();
}
}
# HTTP指令发送器
@Component
@Slf4j
public class HttpCommandSender {
@Autowired
private RestTemplate restTemplate;
@Autowired
private DeviceAuthService deviceAuthService;
/**
* 发送HTTP指令
*/
public boolean sendCommand(DeviceCommand command, IoTDevice device) {
try {
// 1. 构建请求URL
String url = buildCommandUrl(device);
// 2. 构建请求头
HttpHeaders headers = buildRequestHeaders(device);
// 3. 构建请求体
HttpCommandRequest requestBody = buildCommandRequest(command);
// 4. 发送HTTP请求
HttpEntity<HttpCommandRequest> entity = new HttpEntity<>(requestBody, headers);
ResponseEntity<HttpCommandResponse> response = restTemplate.postForEntity(url, entity, HttpCommandResponse.class);
if (response.getStatusCode().is2xxSuccessful()) {
log.debug("HTTP指令发送成功, commandId: {}, url: {}",
command.getCommandId(), url);
return true;
} else {
log.warn("HTTP指令发送失败, commandId: {}, status: {}",
command.getCommandId(), response.getStatusCode());
return false;
}
} catch (Exception e) {
log.error("HTTP指令发送失败, commandId: {}", command.getCommandId(), e);
return false;
}
}
/**
* 构建指令URL
*/
private String buildCommandUrl(IoTDevice device) {
// URL格式: http://{deviceIp}:{port}/api/command
return String.format("http://%s:%d/api/command",
device.getIpAddress(), device.getPort());
}
/**
* 构建请求头
*/
private HttpHeaders buildRequestHeaders(IoTDevice device) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("Device-Id", device.getDeviceId());
// 添加认证头
String authToken = deviceAuthService.generateDeviceToken(device.getDeviceId());
headers.set("Authorization", "Bearer " + authToken);
return headers;
}
/**
* 构建指令请求
*/
private HttpCommandRequest buildCommandRequest(DeviceCommand command) {
return HttpCommandRequest.builder()
.commandId(command.getCommandId())
.commandName(command.getCommandName())
.commandType(command.getCommandType())
.commandParams(command.getCommandParams())
.commandContent(command.getCommandContent())
.timestamp(System.currentTimeMillis())
.timeout(command.getTimeoutSeconds())
.build();
}
}
# 指令验证服务
@Service
@Slf4j
public class CommandValidationService {
@Autowired
private DeviceCommandTemplateRepository templateRepository;
/**
* 验证设备指令
*/
public ValidationResult validate(DeviceCommand command) {
List<String> errors = new ArrayList<>();
try {
// 1. 基础字段验证
validateBasicFields(command, errors);
// 2. 指令内容验证
validateCommandContent(command, errors);
// 3. 参数验证
validateCommandParams(command, errors);
// 4. 安全验证
validateSecurity(command, errors);
} catch (Exception e) {
log.error("指令验证异常, commandId: {}", command.getCommandId(), e);
errors.add("验证过程异常: " + e.getMessage());
}
return ValidationResult.builder()
.valid(errors.isEmpty())
.errors(errors)
.build();
}
/**
* 验证基础字段
*/
private void validateBasicFields(DeviceCommand command, List<String> errors) {
if (StringUtils.isBlank(command.getDeviceId())) {
errors.add("设备ID不能为空");
}
if (command.getCommandType() == null) {
errors.add("指令类型不能为空");
}
if (StringUtils.isBlank(command.getCommandName())) {
errors.add("指令名称不能为空");
}
if (command.getPriority() < 0 || command.getPriority() > 3) {
errors.add("指令优先级必须在0-3之间");
}
if (command.getTimeoutSeconds() <= 0 || command.getTimeoutSeconds() > 3600) {
errors.add("超时时间必须在1-3600秒之间");
}
}
/**
* 验证指令内容
*/
private void validateCommandContent(DeviceCommand command, List<String> errors) {
String content = command.getCommandContent();
if (StringUtils.isBlank(content)) {
// 如果没有指令内容,检查是否有参数
if (StringUtils.isBlank(command.getCommandParams())) {
errors.add("指令内容和参数不能同时为空");
}
return;
}
// 检查内容长度
if (content.length() > 10240) { // 10KB
errors.add("指令内容过长,不能超过10KB");
}
// 如果是JSON格式,验证JSON有效性
if (content.trim().startsWith("{") || content.trim().startsWith("[")) {
try {
ObjectMapper mapper = new ObjectMapper();
mapper.readTree(content);
} catch (JsonProcessingException e) {
errors.add("指令内容JSON格式无效: " + e.getMessage());
}
}
}
/**
* 验证指令参数
*/
private void validateCommandParams(DeviceCommand command, List<String> errors) {
String params = command.getCommandParams();
if (StringUtils.isBlank(params)) {
return;
}
try {
// 验证参数JSON格式
ObjectMapper mapper = new ObjectMapper();
JsonNode paramsNode = mapper.readTree(params);
// 根据指令类型验证特定参数
validateTypeSpecificParams(command.getCommandType(), paramsNode, errors);
} catch (JsonProcessingException e) {
errors.add("指令参数JSON格式无效: " + e.getMessage());
}
}
/**
* 验证特定类型的参数
*/
private void validateTypeSpecificParams(CommandType commandType, JsonNode params, List<String> errors) {
switch (commandType) {
case CONTROL:
validateControlParams(params, errors);
break;
case CONFIG:
validateConfigParams(params, errors);
break;
case UPGRADE:
validateUpgradeParams(params, errors);
break;
// 其他类型的验证...
}
}
/**
* 验证控制参数
*/
private void validateControlParams(JsonNode params, List<String> errors) {
// 控制指令必须包含action字段
if (!params.has("action")) {
errors.add("控制指令必须包含action字段");
}
}
/**
* 验证配置参数
*/
private void validateConfigParams(JsonNode params, List<String> errors) {
// 配置指令必须包含config字段
if (!params.has("config")) {
errors.add("配置指令必须包含config字段");
}
}
/**
* 验证升级参数
*/
private void validateUpgradeParams(JsonNode params, List<String> errors) {
// 升级指令必须包含version和downloadUrl字段
if (!params.has("version")) {
errors.add("升级指令必须包含version字段");
}
if (!params.has("downloadUrl")) {
errors.add("升级指令必须包含downloadUrl字段");
}
}
/**
* 安全验证
*/
private void validateSecurity(DeviceCommand command, List<String> errors) {
// 检查是否包含危险指令
String content = command.getCommandContent();
String params = command.getCommandParams();
List<String> dangerousKeywords = Arrays.asList(
"rm -rf", "format", "delete", "drop", "truncate", "shutdown", "reboot"
);
for (String keyword : dangerousKeywords) {
if ((content != null && content.toLowerCase().contains(keyword)) ||
(params != null && params.toLowerCase().contains(keyword))) {
errors.add("指令包含危险关键字: " + keyword);
}
}
// 检查指令长度是否异常
int totalLength = (content != null ? content.length() : 0) +
(params != null ? params.length() : 0);
if (totalLength > 50000) { // 50KB
errors.add("指令总长度过长,可能存在安全风险");
}
}
}
# 数据库设计
# 设备指令表
CREATE TABLE `iot_device_command` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`command_id` varchar(100) NOT NULL COMMENT '指令ID',
`device_id` varchar(100) NOT NULL COMMENT '设备ID',
`product_id` varchar(100) NOT NULL COMMENT '产品ID',
`command_type` varchar(50) NOT NULL COMMENT '指令类型',
`command_name` varchar(100) NOT NULL COMMENT '指令名称',
`command_params` json DEFAULT NULL COMMENT '指令参数',
`command_content` text COMMENT '指令内容',
`priority` int NOT NULL DEFAULT '0' COMMENT '优先级',
`timeout_seconds` int NOT NULL DEFAULT '30' COMMENT '超时时间(秒)',
`retry_count` int NOT NULL DEFAULT '0' COMMENT '重试次数',
`max_retry_count` int NOT NULL DEFAULT '3' COMMENT '最大重试次数',
`status` varchar(20) NOT NULL DEFAULT 'PENDING' COMMENT '状态',
`response_content` text COMMENT '响应内容',
`error_message` varchar(500) DEFAULT NULL COMMENT '错误信息',
`sender_id` varchar(100) DEFAULT NULL COMMENT '发送者ID',
`sender_type` varchar(20) DEFAULT NULL COMMENT '发送者类型',
`protocol` varchar(20) DEFAULT NULL COMMENT '协议',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`send_time` datetime DEFAULT NULL COMMENT '发送时间',
`response_time` datetime DEFAULT NULL COMMENT '响应时间',
`complete_time` datetime DEFAULT NULL COMMENT '完成时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_command_id` (`command_id`),
KEY `idx_device_id_status` (`device_id`, `status`),
KEY `idx_create_time` (`create_time`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备指令表'
PARTITION BY RANGE (TO_DAYS(create_time)) (
PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
PARTITION p202403 VALUES LESS THAN (TO_DAYS('2024-04-01')),
PARTITION p_future VALUES LESS THAN MAXVALUE
);
# 指令模板表
CREATE TABLE `iot_device_command_template` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`template_id` varchar(100) NOT NULL COMMENT '模板ID',
`template_name` varchar(100) NOT NULL COMMENT '模板名称',
`product_id` varchar(100) NOT NULL COMMENT '产品ID',
`command_type` varchar(50) NOT NULL COMMENT '指令类型',
`command_template` text NOT NULL COMMENT '指令模板',
`param_schema` json DEFAULT NULL COMMENT '参数Schema',
`default_timeout` int NOT NULL DEFAULT '30' COMMENT '默认超时时间',
`default_priority` int NOT NULL DEFAULT '0' COMMENT '默认优先级',
`enabled` tinyint(1) NOT NULL DEFAULT '1' COMMENT '是否启用',
`description` varchar(500) DEFAULT NULL COMMENT '描述',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_template_id` (`template_id`),
KEY `idx_product_id` (`product_id`),
KEY `idx_command_type` (`command_type`),
KEY `idx_enabled` (`enabled`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备指令模板表';
# 性能优化
# 指令队列优化
@Configuration
public class CommandQueueConfig {
/**
* 指令执行线程池
*/
@Bean("commandExecutor")
public TaskExecutor commandExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(2000);
executor.setThreadNamePrefix("command-exec-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* Redis配置
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}
# 监控指标
# 指令执行监控
@Component
public class CommandMetrics {
private final Counter commandSentCounter;
private final Counter commandSuccessCounter;
private final Counter commandFailedCounter;
private final Timer commandExecutionTimer;
private final Gauge pendingCommandsGauge;
public CommandMetrics(MeterRegistry meterRegistry, DeviceCommandRepository commandRepository) {
this.commandSentCounter = Counter.builder("iot.command.sent.total")
.description("发送的指令总数")
.register(meterRegistry);
this.commandSuccessCounter = Counter.builder("iot.command.success.total")
.description("执行成功的指令总数")
.register(meterRegistry);
this.commandFailedCounter = Counter.builder("iot.command.failed.total")
.description("执行失败的指令总数")
.register(meterRegistry);
this.commandExecutionTimer = Timer.builder("iot.command.execution.duration")
.description("指令执行耗时")
.register(meterRegistry);
this.pendingCommandsGauge = Gauge.builder("iot.command.pending.count")
.description("待执行指令数量")
.register(meterRegistry, this, metrics -> {
return commandRepository.countByStatus(CommandStatus.PENDING);
});
}
public void incrementCommandSent(String deviceId, String commandType) {
commandSentCounter.increment(Tags.of(
"device_id", deviceId,
"command_type", commandType
));
}
public void incrementCommandSuccess(String deviceId, String commandType) {
commandSuccessCounter.increment(Tags.of(
"device_id", deviceId,
"command_type", commandType
));
}
public void incrementCommandFailed(String deviceId, String commandType, String reason) {
commandFailedCounter.increment(Tags.of(
"device_id", deviceId,
"command_type", commandType,
"failure_reason", reason
));
}
public Timer.Sample startExecutionTimer() {
return Timer.start();
}
public void stopExecutionTimer(Timer.Sample sample, String deviceId, String commandType) {
sample.stop(Timer.builder("iot.command.execution.duration")
.tag("device_id", deviceId)
.tag("command_type", commandType)
.register(meterRegistry));
}
}
# 总结
设备控制服务是物联网平台的重要组成部分,通过指令管理、协议适配、状态跟踪、安全控制等机制,实现了对设备的可靠控制。合理的架构设计、优先级调度、超时处理和重试机制保证了指令执行的高效性和可靠性。