应用服务层设计与实现
# 应用服务层设计与实现
# 引言:智慧城市的大脑中枢
想象一下,如果物联网系统是一座智慧城市,那么应用服务层就是这座城市的"大脑中枢"。它不仅要处理来自千万设备的数据,还要做出智能决策、发出控制指令、监控异常情况,并从海量数据中挖掘出有价值的洞察。
就像一个经验丰富的城市管理者,应用服务层需要具备四大核心能力:
- 设备控制:就像交通指挥中心,能够精确控制每一个设备
- 规则引擎:就像智能决策系统,能够根据复杂规则自动做出判断
- 告警服务:就像城市监控系统,能够及时发现和处理异常情况
- 数据分析:就像城市规划师,能够从数据中发现规律和趋势
让我们一起深入探索这个"智慧大脑"是如何工作的。
# 1. 设备控制服务 (Device Control Service)
# 1.1 设计理念
设备控制服务就像一个智能的"遥控器管家",它需要能够:
- 统一管理不同类型设备的控制指令
- 确保指令的可靠传输和执行
- 提供灵活的控制策略和权限管理
- 支持批量操作和定时任务
# 1.2 核心接口设计
/**
* 设备控制服务接口
* 就像一个万能遥控器,能够控制各种不同的设备
*/
public interface DeviceControlService {
/**
* 发送单个设备控制指令
* 就像按下遥控器上的一个按钮
*/
ControlResult sendCommand(DeviceControlRequest request);
/**
* 批量发送控制指令
* 就像同时控制多个设备,比如一键关闭所有灯光
*/
BatchControlResult sendBatchCommands(List<DeviceControlRequest> requests);
/**
* 定时控制指令
* 就像设置闹钟,到时间自动执行某个操作
*/
ScheduledControlResult scheduleCommand(ScheduledControlRequest request);
/**
* 取消控制指令
* 就像撤销一个还没执行的操作
*/
boolean cancelCommand(String commandId);
/**
* 查询指令执行状态
* 就像查看遥控器发出的指令是否被设备接收并执行
*/
CommandStatus getCommandStatus(String commandId);
/**
* 获取设备支持的控制指令
* 就像查看遥控器上有哪些按钮可以用
*/
List<SupportedCommand> getSupportedCommands(String deviceId);
}
# 1.3 控制指令模型
/**
* 设备控制请求
* 封装了一次控制操作的所有信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceControlRequest {
/**
* 指令ID(用于追踪)
*/
private String commandId;
/**
* 目标设备ID
*/
private String deviceId;
/**
* 控制指令类型
*/
private CommandType commandType;
/**
* 指令参数(JSON格式)
* 比如:{"brightness": 80, "color": "#FF0000"}
*/
private Map<String, Object> parameters;
/**
* 指令优先级
*/
private CommandPriority priority;
/**
* 超时时间(毫秒)
*/
private Long timeoutMs;
/**
* 重试次数
*/
private Integer retryCount;
/**
* 发送者ID
*/
private String senderId;
/**
* 指令来源
*/
private CommandSource source;
/**
* 创建时间
*/
private LocalDateTime createdTime;
/**
* 期望执行时间
*/
private LocalDateTime expectedExecuteTime;
}
/**
* 指令类型枚举
*/
public enum CommandType {
// 基础控制指令
POWER_ON("开机", "device.power.on"),
POWER_OFF("关机", "device.power.off"),
RESTART("重启", "device.restart"),
// 配置指令
UPDATE_CONFIG("更新配置", "device.config.update"),
RESET_CONFIG("重置配置", "device.config.reset"),
// 数据指令
START_COLLECT("开始采集", "device.data.start"),
STOP_COLLECT("停止采集", "device.data.stop"),
// 维护指令
SELF_CHECK("自检", "device.maintenance.check"),
CALIBRATE("校准", "device.maintenance.calibrate"),
// 自定义指令
CUSTOM("自定义", "device.custom");
private final String displayName;
private final String commandCode;
CommandType(String displayName, String commandCode) {
this.displayName = displayName;
this.commandCode = commandCode;
}
public String getDisplayName() {
return displayName;
}
public String getCommandCode() {
return commandCode;
}
}
/**
* 指令优先级
*/
public enum CommandPriority {
LOW(1, "低优先级"),
NORMAL(5, "普通优先级"),
HIGH(8, "高优先级"),
URGENT(10, "紧急优先级");
private final int level;
private final String description;
CommandPriority(int level, String description) {
this.level = level;
this.description = description;
}
public int getLevel() {
return level;
}
public String getDescription() {
return description;
}
}
# 1.4 控制结果模型
/**
* 控制结果
* 记录指令执行的详细结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ControlResult {
/**
* 指令ID
*/
private String commandId;
/**
* 设备ID
*/
private String deviceId;
/**
* 执行状态
*/
private ExecutionStatus status;
/**
* 执行结果消息
*/
private String message;
/**
* 返回数据
*/
private Map<String, Object> responseData;
/**
* 执行开始时间
*/
private LocalDateTime startTime;
/**
* 执行结束时间
*/
private LocalDateTime endTime;
/**
* 执行耗时(毫秒)
*/
private Long executionTimeMs;
/**
* 错误代码(如果失败)
*/
private String errorCode;
/**
* 错误详情(如果失败)
*/
private String errorDetail;
}
/**
* 执行状态枚举
*/
public enum ExecutionStatus {
PENDING("等待执行"),
EXECUTING("执行中"),
SUCCESS("执行成功"),
FAILED("执行失败"),
TIMEOUT("执行超时"),
CANCELLED("已取消");
private final String description;
ExecutionStatus(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
# 1.5 设备控制服务实现
/**
* 设备控制服务实现
* 就像一个智能的设备管家,负责协调和执行各种控制指令
*/
@Service
@Slf4j
@Transactional
public class DeviceControlServiceImpl implements DeviceControlService {
@Autowired
private DeviceInfoService deviceInfoService;
@Autowired
private CommandExecutor commandExecutor;
@Autowired
private CommandRepository commandRepository;
@Autowired
private MessageProducer messageProducer;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ApplicationEventPublisher eventPublisher;
/**
* 发送单个设备控制指令
* 就像精确地控制一个特定的设备
*/
@Override
public ControlResult sendCommand(DeviceControlRequest request) {
log.info("开始执行设备控制指令,设备ID: {}, 指令类型: {}",
request.getDeviceId(), request.getCommandType());
try {
// 1. 验证设备是否存在且在线
validateDeviceAvailability(request.getDeviceId());
// 2. 验证指令是否被设备支持
validateCommandSupport(request.getDeviceId(), request.getCommandType());
// 3. 检查权限
validateCommandPermission(request);
// 4. 生成指令ID(如果没有提供)
if (StringUtils.isEmpty(request.getCommandId())) {
request.setCommandId(generateCommandId());
}
// 5. 保存指令记录
saveCommandRecord(request);
// 6. 执行指令
ControlResult result = executeCommand(request);
// 7. 更新指令状态
updateCommandStatus(request.getCommandId(), result);
// 8. 发布指令执行事件
publishCommandExecutedEvent(request, result);
log.info("设备控制指令执行完成,指令ID: {}, 状态: {}",
request.getCommandId(), result.getStatus());
return result;
} catch (Exception e) {
log.error("设备控制指令执行失败,设备ID: {}, 指令类型: {}",
request.getDeviceId(), request.getCommandType(), e);
// 创建失败结果
ControlResult failureResult = ControlResult.builder()
.commandId(request.getCommandId())
.deviceId(request.getDeviceId())
.status(ExecutionStatus.FAILED)
.message("指令执行失败: " + e.getMessage())
.errorCode("EXECUTION_ERROR")
.errorDetail(e.toString())
.startTime(LocalDateTime.now())
.endTime(LocalDateTime.now())
.build();
// 更新指令状态
updateCommandStatus(request.getCommandId(), failureResult);
return failureResult;
}
}
/**
* 批量发送控制指令
* 就像同时控制多个设备,比如一键关闭整栋楼的所有灯光
*/
@Override
public BatchControlResult sendBatchCommands(List<DeviceControlRequest> requests) {
log.info("开始执行批量设备控制,指令数量: {}", requests.size());
String batchId = generateBatchId();
List<ControlResult> results = new ArrayList<>();
int successCount = 0;
int failureCount = 0;
// 使用并行流处理,提高执行效率
List<CompletableFuture<ControlResult>> futures = requests.stream()
.map(request -> {
// 设置批次ID
request.getParameters().put("batchId", batchId);
// 异步执行每个指令
return CompletableFuture.supplyAsync(() -> sendCommand(request));
})
.collect(Collectors.toList());
// 等待所有指令执行完成
for (CompletableFuture<ControlResult> future : futures) {
try {
ControlResult result = future.get(30, TimeUnit.SECONDS);
results.add(result);
if (result.getStatus() == ExecutionStatus.SUCCESS) {
successCount++;
} else {
failureCount++;
}
} catch (Exception e) {
log.error("批量指令执行异常", e);
failureCount++;
}
}
BatchControlResult batchResult = BatchControlResult.builder()
.batchId(batchId)
.totalCount(requests.size())
.successCount(successCount)
.failureCount(failureCount)
.results(results)
.executionTime(LocalDateTime.now())
.build();
log.info("批量设备控制执行完成,批次ID: {}, 成功: {}, 失败: {}",
batchId, successCount, failureCount);
return batchResult;
}
/**
* 定时控制指令
* 就像设置一个智能闹钟,到时间自动执行某个操作
*/
@Override
public ScheduledControlResult scheduleCommand(ScheduledControlRequest request) {
log.info("创建定时控制指令,设备ID: {}, 执行时间: {}",
request.getDeviceId(), request.getScheduledTime());
try {
// 1. 验证定时时间
if (request.getScheduledTime().isBefore(LocalDateTime.now())) {
throw new IllegalArgumentException("定时时间不能早于当前时间");
}
// 2. 创建定时任务
String scheduleId = generateScheduleId();
ScheduledTask task = ScheduledTask.builder()
.scheduleId(scheduleId)
.deviceId(request.getDeviceId())
.commandType(request.getCommandType())
.parameters(request.getParameters())
.scheduledTime(request.getScheduledTime())
.status(ScheduleStatus.PENDING)
.createdTime(LocalDateTime.now())
.build();
// 3. 保存定时任务
scheduledTaskRepository.save(task);
// 4. 注册到调度器
scheduleCommandExecution(task);
log.info("定时控制指令创建成功,调度ID: {}", scheduleId);
return ScheduledControlResult.builder()
.scheduleId(scheduleId)
.success(true)
.message("定时指令创建成功")
.scheduledTime(request.getScheduledTime())
.build();
} catch (Exception e) {
log.error("创建定时控制指令失败", e);
return ScheduledControlResult.builder()
.success(false)
.message("定时指令创建失败: " + e.getMessage())
.build();
}
}
/**
* 执行具体的控制指令
* 这是控制逻辑的核心,就像遥控器发出红外信号
*/
private ControlResult executeCommand(DeviceControlRequest request) {
LocalDateTime startTime = LocalDateTime.now();
try {
// 1. 根据设备类型选择合适的执行器
CommandExecutor executor = getCommandExecutor(request.getDeviceId());
// 2. 构建执行上下文
CommandExecutionContext context = CommandExecutionContext.builder()
.commandId(request.getCommandId())
.deviceId(request.getDeviceId())
.commandType(request.getCommandType())
.parameters(request.getParameters())
.timeout(request.getTimeoutMs())
.retryCount(request.getRetryCount())
.build();
// 3. 执行指令
CommandExecutionResult executionResult = executor.execute(context);
LocalDateTime endTime = LocalDateTime.now();
long executionTime = Duration.between(startTime, endTime).toMillis();
// 4. 构建控制结果
return ControlResult.builder()
.commandId(request.getCommandId())
.deviceId(request.getDeviceId())
.status(executionResult.isSuccess() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILED)
.message(executionResult.getMessage())
.responseData(executionResult.getResponseData())
.startTime(startTime)
.endTime(endTime)
.executionTimeMs(executionTime)
.errorCode(executionResult.getErrorCode())
.errorDetail(executionResult.getErrorDetail())
.build();
} catch (Exception e) {
LocalDateTime endTime = LocalDateTime.now();
long executionTime = Duration.between(startTime, endTime).toMillis();
return ControlResult.builder()
.commandId(request.getCommandId())
.deviceId(request.getDeviceId())
.status(ExecutionStatus.FAILED)
.message("指令执行异常")
.startTime(startTime)
.endTime(endTime)
.executionTimeMs(executionTime)
.errorCode("EXECUTION_EXCEPTION")
.errorDetail(e.toString())
.build();
}
}
/**
* 验证设备可用性
*/
private void validateDeviceAvailability(String deviceId) {
DeviceInfo deviceInfo = deviceInfoService.getDeviceInfo(deviceId)
.orElseThrow(() -> new DeviceNotFoundException("设备不存在: " + deviceId));
if (deviceInfo.getDeviceStatus() != DeviceStatus.ACTIVE) {
throw new DeviceNotAvailableException("设备不在线或不可用: " + deviceId);
}
}
/**
* 验证指令支持
*/
private void validateCommandSupport(String deviceId, CommandType commandType) {
List<SupportedCommand> supportedCommands = getSupportedCommands(deviceId);
boolean isSupported = supportedCommands.stream()
.anyMatch(cmd -> cmd.getCommandType() == commandType);
if (!isSupported) {
throw new UnsupportedCommandException(
String.format("设备 %s 不支持指令类型 %s", deviceId, commandType));
}
}
/**
* 生成指令ID
*/
private String generateCommandId() {
return "CMD-" + System.currentTimeMillis() + "-" +
ThreadLocalRandom.current().nextInt(1000, 9999);
}
}
# 2. 规则引擎服务 (Rule Engine Service)
# 2.1 设计理念
规则引擎就像一个"智能决策大脑",它能够:
- 根据预定义的业务规则自动做出决策
- 支持复杂的条件判断和逻辑运算
- 实现业务逻辑与代码的分离
- 提供可视化的规则配置和管理
想象一下智能家居场景:当温度超过28度且时间在下午2点到6点之间时,自动开启空调并设置为制冷模式。这就是规则引擎的典型应用。
# 2.2 规则引擎核心接口
/**
* 规则引擎服务接口
* 就像一个智能的决策助手,能够根据各种条件自动做出判断
*/
public interface RuleEngineService {
/**
* 创建规则
* 就像制定一个新的决策标准
*/
RuleDefinition createRule(CreateRuleRequest request);
/**
* 更新规则
* 就像修改已有的决策标准
*/
RuleDefinition updateRule(String ruleId, UpdateRuleRequest request);
/**
* 删除规则
*/
boolean deleteRule(String ruleId);
/**
* 启用/禁用规则
*/
boolean toggleRule(String ruleId, boolean enabled);
/**
* 执行规则评估
* 就像让决策大脑分析当前情况并给出建议
*/
RuleExecutionResult executeRules(RuleExecutionContext context);
/**
* 批量执行规则
*/
List<RuleExecutionResult> executeBatchRules(List<RuleExecutionContext> contexts);
/**
* 获取规则列表
*/
List<RuleDefinition> getRules(RuleQueryRequest request);
/**
* 获取规则执行历史
*/
List<RuleExecutionRecord> getRuleExecutionHistory(String ruleId, LocalDateTime startTime, LocalDateTime endTime);
/**
* 验证规则语法
*/
RuleValidationResult validateRule(String ruleExpression);
}
# 2.3 规则定义模型
/**
* 规则定义
* 描述一个完整的业务规则
*/
@Entity
@Table(name = "rule_definition")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RuleDefinition {
/**
* 规则ID
*/
@Id
@Column(name = "rule_id", length = 64)
private String ruleId;
/**
* 规则名称
*/
@Column(name = "rule_name", nullable = false)
private String ruleName;
/**
* 规则描述
*/
@Column(name = "description")
private String description;
/**
* 规则类型
*/
@Enumerated(EnumType.STRING)
@Column(name = "rule_type", nullable = false)
private RuleType ruleType;
/**
* 规则表达式(支持多种格式:Groovy、SpEL、自定义DSL等)
*/
@Column(name = "rule_expression", columnDefinition = "TEXT")
private String ruleExpression;
/**
* 条件表达式
* 例如:"temperature > 28 && time.hour >= 14 && time.hour <= 18"
*/
@Column(name = "condition_expression", columnDefinition = "TEXT")
private String conditionExpression;
/**
* 动作配置(JSON格式)
* 定义规则触发后要执行的动作
*/
@Column(name = "action_config", columnDefinition = "JSON")
private String actionConfig;
/**
* 规则优先级
*/
@Column(name = "priority")
private Integer priority;
/**
* 是否启用
*/
@Column(name = "enabled")
private Boolean enabled;
/**
* 规则分组
*/
@Column(name = "rule_group")
private String ruleGroup;
/**
* 适用的设备类型
*/
@Column(name = "applicable_device_types", columnDefinition = "JSON")
private String applicableDeviceTypes;
/**
* 规则标签
*/
@Column(name = "tags", columnDefinition = "JSON")
private String tags;
/**
* 执行频率限制(防止频繁触发)
*/
@Column(name = "execution_limit")
private String executionLimit;
/**
* 有效期开始时间
*/
@Column(name = "valid_from")
private LocalDateTime validFrom;
/**
* 有效期结束时间
*/
@Column(name = "valid_to")
private LocalDateTime validTo;
/**
* 创建者ID
*/
@Column(name = "creator_id")
private String creatorId;
/**
* 创建时间
*/
@CreationTimestamp
@Column(name = "created_time", nullable = false)
private LocalDateTime createdTime;
/**
* 更新时间
*/
@UpdateTimestamp
@Column(name = "updated_time", nullable = false)
private LocalDateTime updatedTime;
/**
* 最后执行时间
*/
@Column(name = "last_executed_time")
private LocalDateTime lastExecutedTime;
/**
* 执行次数
*/
@Column(name = "execution_count")
private Long executionCount;
}
/**
* 规则类型枚举
*/
public enum RuleType {
/**
* 实时规则(数据到达时立即执行)
*/
REAL_TIME("实时规则", "数据到达时立即执行"),
/**
* 定时规则(按时间周期执行)
*/
SCHEDULED("定时规则", "按时间周期执行"),
/**
* 事件驱动规则(特定事件触发)
*/
EVENT_DRIVEN("事件驱动", "特定事件触发执行"),
/**
* 阈值规则(数值超过阈值时触发)
*/
THRESHOLD("阈值规则", "数值超过阈值时触发"),
/**
* 复合规则(多个条件组合)
*/
COMPOSITE("复合规则", "多个条件组合判断");
private final String displayName;
private final String description;
RuleType(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}
public String getDisplayName() {
return displayName;
}
public String getDescription() {
return description;
}
}
# 2.4 规则执行上下文
/**
* 规则执行上下文
* 包含规则执行所需的所有数据和环境信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RuleExecutionContext {
/**
* 执行ID(用于追踪)
*/
private String executionId;
/**
* 设备ID
*/
private String deviceId;
/**
* 设备类型
*/
private String deviceType;
/**
* 当前设备数据
*/
private Map<String, Object> deviceData;
/**
* 历史数据(用于趋势分析)
*/
private List<Map<String, Object>> historicalData;
/**
* 环境变量(如时间、天气等)
*/
private Map<String, Object> environmentVariables;
/**
* 用户自定义变量
*/
private Map<String, Object> customVariables;
/**
* 执行时间
*/
private LocalDateTime executionTime;
/**
* 事件类型(如果是事件驱动)
*/
private String eventType;
/**
* 事件数据
*/
private Map<String, Object> eventData;
/**
* 规则分组过滤(只执行特定分组的规则)
*/
private List<String> ruleGroups;
}
/**
* 规则执行结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RuleExecutionResult {
/**
* 执行ID
*/
private String executionId;
/**
* 规则ID
*/
private String ruleId;
/**
* 规则名称
*/
private String ruleName;
/**
* 是否匹配条件
*/
private boolean matched;
/**
* 是否执行成功
*/
private boolean success;
/**
* 执行结果消息
*/
private String message;
/**
* 执行的动作列表
*/
private List<ActionExecutionResult> actionResults;
/**
* 执行开始时间
*/
private LocalDateTime startTime;
/**
* 执行结束时间
*/
private LocalDateTime endTime;
/**
* 执行耗时(毫秒)
*/
private Long executionTimeMs;
/**
* 错误信息(如果失败)
*/
private String errorMessage;
/**
* 调试信息
*/
private Map<String, Object> debugInfo;
}
# 2.5 规则引擎服务实现
/**
* 规则引擎服务实现
* 就像一个智能的决策中心,能够根据复杂的业务规则自动做出判断和执行相应的动作
*/
@Service
@Slf4j
@Transactional
public class RuleEngineServiceImpl implements RuleEngineService {
@Autowired
private RuleDefinitionRepository ruleRepository;
@Autowired
private RuleExecutionRecordRepository executionRecordRepository;
@Autowired
private RuleExpressionEvaluator expressionEvaluator;
@Autowired
private ActionExecutor actionExecutor;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ApplicationEventPublisher eventPublisher;
/**
* 执行规则评估
* 这是规则引擎的核心方法,就像大脑进行思考和决策的过程
*/
@Override
public RuleExecutionResult executeRules(RuleExecutionContext context) {
String executionId = generateExecutionId();
context.setExecutionId(executionId);
log.info("开始执行规则评估,执行ID: {}, 设备ID: {}", executionId, context.getDeviceId());
LocalDateTime startTime = LocalDateTime.now();
List<ActionExecutionResult> allActionResults = new ArrayList<>();
boolean hasMatchedRule = false;
boolean allSuccess = true;
StringBuilder messageBuilder = new StringBuilder();
try {
// 1. 获取适用的规则列表
List<RuleDefinition> applicableRules = getApplicableRules(context);
log.debug("找到适用规则数量: {}", applicableRules.size());
// 2. 按优先级排序
applicableRules.sort(Comparator.comparing(RuleDefinition::getPriority,
Comparator.nullsLast(Comparator.naturalOrder())).reversed());
// 3. 逐个评估规则
for (RuleDefinition rule : applicableRules) {
try {
RuleEvaluationResult evaluationResult = evaluateRule(rule, context);
if (evaluationResult.isMatched()) {
hasMatchedRule = true;
log.info("规则匹配成功,规则ID: {}, 规则名称: {}", rule.getRuleId(), rule.getRuleName());
// 执行规则动作
List<ActionExecutionResult> actionResults = executeRuleActions(rule, context);
allActionResults.addAll(actionResults);
// 检查动作执行结果
boolean actionSuccess = actionResults.stream().allMatch(ActionExecutionResult::isSuccess);
if (!actionSuccess) {
allSuccess = false;
}
// 更新规则执行统计
updateRuleExecutionStats(rule.getRuleId());
// 记录规则执行历史
recordRuleExecution(rule, context, evaluationResult, actionResults);
messageBuilder.append(String.format("规则[%s]执行成功; ", rule.getRuleName()));
// 如果规则配置为执行后停止,则不再评估后续规则
if (isStopAfterExecution(rule)) {
log.info("规则配置为执行后停止,终止后续规则评估");
break;
}
}
} catch (Exception e) {
log.error("规则评估异常,规则ID: {}", rule.getRuleId(), e);
allSuccess = false;
messageBuilder.append(String.format("规则[%s]执行异常: %s; ", rule.getRuleName(), e.getMessage()));
}
}
LocalDateTime endTime = LocalDateTime.now();
long executionTime = Duration.between(startTime, endTime).toMillis();
// 4. 构建执行结果
RuleExecutionResult result = RuleExecutionResult.builder()
.executionId(executionId)
.matched(hasMatchedRule)
.success(allSuccess)
.message(messageBuilder.length() > 0 ? messageBuilder.toString() : "无匹配规则")
.actionResults(allActionResults)
.startTime(startTime)
.endTime(endTime)
.executionTimeMs(executionTime)
.build();
// 5. 发布规则执行事件
publishRuleExecutionEvent(context, result);
log.info("规则评估执行完成,执行ID: {}, 匹配规则: {}, 执行动作: {}",
executionId, hasMatchedRule, allActionResults.size());
return result;
} catch (Exception e) {
log.error("规则评估执行异常,执行ID: {}", executionId, e);
LocalDateTime endTime = LocalDateTime.now();
long executionTime = Duration.between(startTime, endTime).toMillis();
return RuleExecutionResult.builder()
.executionId(executionId)
.matched(false)
.success(false)
.message("规则评估异常: " + e.getMessage())
.actionResults(allActionResults)
.startTime(startTime)
.endTime(endTime)
.executionTimeMs(executionTime)
.errorMessage(e.toString())
.build();
}
}
/**
* 获取适用的规则列表
* 就像筛选出适合当前情况的所有决策标准
*/
private List<RuleDefinition> getApplicableRules(RuleExecutionContext context) {
// 1. 基础查询条件
List<RuleDefinition> rules = ruleRepository.findByEnabledTrueAndValidTimeRange(
context.getExecutionTime());
// 2. 按设备类型过滤
if (StringUtils.hasText(context.getDeviceType())) {
rules = rules.stream()
.filter(rule -> isRuleApplicableToDeviceType(rule, context.getDeviceType()))
.collect(Collectors.toList());
}
// 3. 按规则分组过滤
if (context.getRuleGroups() != null && !context.getRuleGroups().isEmpty()) {
rules = rules.stream()
.filter(rule -> context.getRuleGroups().contains(rule.getRuleGroup()))
.collect(Collectors.toList());
}
// 4. 检查执行频率限制
rules = rules.stream()
.filter(rule -> checkExecutionLimit(rule, context))
.collect(Collectors.toList());
return rules;
}
/**
* 评估单个规则
* 就像判断一个特定的条件是否满足
*/
private RuleEvaluationResult evaluateRule(RuleDefinition rule, RuleExecutionContext context) {
log.debug("开始评估规则,规则ID: {}, 规则名称: {}", rule.getRuleId(), rule.getRuleName());
try {
// 1. 准备评估上下文
Map<String, Object> evaluationContext = prepareEvaluationContext(rule, context);
// 2. 评估条件表达式
boolean conditionResult = expressionEvaluator.evaluate(
rule.getConditionExpression(), evaluationContext);
log.debug("规则条件评估结果,规则ID: {}, 结果: {}", rule.getRuleId(), conditionResult);
return RuleEvaluationResult.builder()
.ruleId(rule.getRuleId())
.matched(conditionResult)
.evaluationContext(evaluationContext)
.evaluationTime(LocalDateTime.now())
.build();
} catch (Exception e) {
log.error("规则评估异常,规则ID: {}", rule.getRuleId(), e);
throw new RuleEvaluationException("规则评估失败: " + e.getMessage(), e);
}
}
/**
* 执行规则动作
* 就像执行决策后的具体行动
*/
private List<ActionExecutionResult> executeRuleActions(RuleDefinition rule, RuleExecutionContext context) {
log.info("开始执行规则动作,规则ID: {}", rule.getRuleId());
List<ActionExecutionResult> results = new ArrayList<>();
try {
// 1. 解析动作配置
List<ActionConfig> actionConfigs = parseActionConfig(rule.getActionConfig());
// 2. 逐个执行动作
for (ActionConfig actionConfig : actionConfigs) {
try {
ActionExecutionResult result = actionExecutor.execute(actionConfig, context);
results.add(result);
log.debug("动作执行完成,动作类型: {}, 结果: {}",
actionConfig.getActionType(), result.isSuccess());
} catch (Exception e) {
log.error("动作执行异常,动作类型: {}", actionConfig.getActionType(), e);
ActionExecutionResult errorResult = ActionExecutionResult.builder()
.actionType(actionConfig.getActionType())
.success(false)
.message("动作执行异常: " + e.getMessage())
.executionTime(LocalDateTime.now())
.build();
results.add(errorResult);
}
}
} catch (Exception e) {
log.error("解析动作配置异常,规则ID: {}", rule.getRuleId(), e);
}
return results;
}
/**
* 准备评估上下文
* 将所有相关数据整理成规则表达式可以使用的格式
*/
private Map<String, Object> prepareEvaluationContext(RuleDefinition rule, RuleExecutionContext context) {
Map<String, Object> evaluationContext = new HashMap<>();
// 1. 设备数据
if (context.getDeviceData() != null) {
evaluationContext.putAll(context.getDeviceData());
}
// 2. 环境变量
if (context.getEnvironmentVariables() != null) {
evaluationContext.putAll(context.getEnvironmentVariables());
}
// 3. 自定义变量
if (context.getCustomVariables() != null) {
evaluationContext.putAll(context.getCustomVariables());
}
// 4. 时间相关变量
LocalDateTime now = context.getExecutionTime();
evaluationContext.put("now", now);
evaluationContext.put("hour", now.getHour());
evaluationContext.put("minute", now.getMinute());
evaluationContext.put("dayOfWeek", now.getDayOfWeek().getValue());
evaluationContext.put("dayOfMonth", now.getDayOfMonth());
evaluationContext.put("month", now.getMonthValue());
evaluationContext.put("year", now.getYear());
// 5. 设备信息
evaluationContext.put("deviceId", context.getDeviceId());
evaluationContext.put("deviceType", context.getDeviceType());
// 6. 历史数据统计(如果有)
if (context.getHistoricalData() != null && !context.getHistoricalData().isEmpty()) {
addHistoricalDataStatistics(evaluationContext, context.getHistoricalData());
}
return evaluationContext;
}
/**
* 添加历史数据统计
* 计算平均值、最大值、最小值等统计信息
*/
private void addHistoricalDataStatistics(Map<String, Object> context, List<Map<String, Object>> historicalData) {
// 这里可以计算各种统计指标
// 例如:温度的平均值、最大值、最小值、趋势等
Map<String, List<Double>> numericData = new HashMap<>();
// 收集数值型数据
for (Map<String, Object> data : historicalData) {
for (Map.Entry<String, Object> entry : data.entrySet()) {
if (entry.getValue() instanceof Number) {
numericData.computeIfAbsent(entry.getKey(), k -> new ArrayList<>())
.add(((Number) entry.getValue()).doubleValue());
}
}
}
// 计算统计指标
for (Map.Entry<String, List<Double>> entry : numericData.entrySet()) {
String fieldName = entry.getKey();
List<Double> values = entry.getValue();
if (!values.isEmpty()) {
double avg = values.stream().mapToDouble(Double::doubleValue).average().orElse(0.0);
double max = values.stream().mapToDouble(Double::doubleValue).max().orElse(0.0);
double min = values.stream().mapToDouble(Double::doubleValue).min().orElse(0.0);
context.put(fieldName + "_avg", avg);
context.put(fieldName + "_max", max);
context.put(fieldName + "_min", min);
context.put(fieldName + "_count", values.size());
}
}
}
/**
* 生成执行ID
*/
private String generateExecutionId() {
return "RULE-EXEC-" + System.currentTimeMillis() + "-" +
ThreadLocalRandom.current().nextInt(1000, 9999);
}
}
# 2.6 实际应用示例
让我们通过一个智能温室的例子来看看规则引擎是如何工作的:
/**
* 智能温室规则应用示例
* 展示规则引擎在实际场景中的应用
*/
@Component
@Slf4j
public class SmartGreenhouseRuleExample {
@Autowired
private RuleEngineService ruleEngineService;
@Autowired
private DeviceControlService deviceControlService;
/**
* 初始化温室管理规则
*/
@PostConstruct
public void initGreenhouseRules() {
// 1. 创建温度控制规则
createTemperatureControlRule();
// 2. 创建湿度控制规则
createHumidityControlRule();
// 3. 创建光照控制规则
createLightControlRule();
// 4. 创建异常告警规则
createAlarmRule();
}
/**
* 创建温度控制规则
* 当温度过高或过低时,自动调节空调和加热器
*/
private void createTemperatureControlRule() {
CreateRuleRequest request = CreateRuleRequest.builder()
.ruleName("温室温度自动控制")
.description("根据温度自动控制空调和加热设备")
.ruleType(RuleType.REAL_TIME)
.conditionExpression("temperature > 28 || temperature < 18")
.actionConfig(buildTemperatureActionConfig())
.priority(100)
.enabled(true)
.ruleGroup("greenhouse_climate")
.applicableDeviceTypes(Arrays.asList("temperature_sensor"))
.build();
RuleDefinition rule = ruleEngineService.createRule(request);
log.info("创建温度控制规则成功,规则ID: {}", rule.getRuleId());
}
/**
* 构建温度控制动作配置
*/
private String buildTemperatureActionConfig() {
List<ActionConfig> actions = Arrays.asList(
// 温度过高时开启空调
ActionConfig.builder()
.actionType("DEVICE_CONTROL")
.condition("temperature > 28")
.targetDeviceType("air_conditioner")
.command("TURN_ON")
.parameters(Map.of("mode", "cooling", "targetTemp", 25))
.build(),
// 温度过低时开启加热器
ActionConfig.builder()
.actionType("DEVICE_CONTROL")
.condition("temperature < 18")
.targetDeviceType("heater")
.command("TURN_ON")
.parameters(Map.of("targetTemp", 22))
.build(),
// 发送通知
ActionConfig.builder()
.actionType("NOTIFICATION")
.message("温室温度异常,当前温度: ${temperature}°C")
.recipients(Arrays.asList("greenhouse_manager"))
.build()
);
return JsonUtils.toJson(actions);
}
/**
* 处理设备数据并触发规则评估
*/
public void processDeviceData(String deviceId, Map<String, Object> deviceData) {
// 构建规则执行上下文
RuleExecutionContext context = RuleExecutionContext.builder()
.deviceId(deviceId)
.deviceType("temperature_sensor")
.deviceData(deviceData)
.environmentVariables(getEnvironmentVariables())
.executionTime(LocalDateTime.now())
.ruleGroups(Arrays.asList("greenhouse_climate"))
.build();
// 执行规则评估
RuleExecutionResult result = ruleEngineService.executeRules(context);
log.info("规则执行完成,设备ID: {}, 匹配规则: {}, 执行动作: {}",
deviceId, result.isMatched(), result.getActionResults().size());
}
/**
* 获取环境变量
*/
private Map<String, Object> getEnvironmentVariables() {
Map<String, Object> env = new HashMap<>();
LocalDateTime now = LocalDateTime.now();
env.put("currentTime", now);
env.put("hour", now.getHour());
env.put("isWorkingHours", now.getHour() >= 8 && now.getHour() <= 18);
env.put("season", getCurrentSeason());
return env;
}
private String getCurrentSeason() {
int month = LocalDateTime.now().getMonthValue();
if (month >= 3 && month <= 5) return "spring";
if (month >= 6 && month <= 8) return "summer";
if (month >= 9 && month <= 11) return "autumn";
return "winter";
}
}
# 3. 告警服务 (Alarm Service)
# 3.1 设计理念
告警服务就像一个"智能哨兵",它能够:
- 实时监控设备状态和数据异常
- 根据不同的严重程度分级处理告警
- 支持多种告警通知方式
- 提供告警的生命周期管理
- 防止告警风暴和重复告警
想象一下工厂的安全监控系统:当检测到温度异常时,系统会立即发出告警,通知相关人员,并自动记录告警信息,直到问题解决。
# 3.2 告警服务核心接口
/**
* 告警服务接口
* 就像一个智能的安全卫士,时刻监控着系统的健康状态
*/
public interface AlarmService {
/**
* 创建告警
* 就像发现异常情况时拉响警报
*/
AlarmRecord createAlarm(CreateAlarmRequest request);
/**
* 批量创建告警
*/
List<AlarmRecord> createBatchAlarms(List<CreateAlarmRequest> requests);
/**
* 确认告警
* 就像值班人员确认已经知道了这个问题
*/
boolean acknowledgeAlarm(String alarmId, String operatorId, String remarks);
/**
* 处理告警
* 就像开始解决这个问题
*/
boolean processAlarm(String alarmId, String operatorId, String solution);
/**
* 关闭告警
* 就像问题解决后关闭警报
*/
boolean closeAlarm(String alarmId, String operatorId, String closeReason);
/**
* 批量操作告警
*/
BatchOperationResult batchOperateAlarms(List<String> alarmIds, AlarmOperation operation, String operatorId);
/**
* 查询告警列表
*/
PageResult<AlarmRecord> queryAlarms(AlarmQueryRequest request);
/**
* 获取告警统计信息
*/
AlarmStatistics getAlarmStatistics(AlarmStatisticsRequest request);
/**
* 获取实时告警数量
*/
Map<AlarmLevel, Long> getRealTimeAlarmCount();
/**
* 订阅告警通知
*/
void subscribeAlarmNotification(AlarmSubscription subscription);
/**
* 取消告警订阅
*/
void unsubscribeAlarmNotification(String subscriptionId);
}
# 3.3 告警数据模型
/**
* 告警记录
* 描述一个完整的告警事件
*/
@Entity
@Table(name = "alarm_record")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AlarmRecord {
/**
* 告警ID
*/
@Id
@Column(name = "alarm_id", length = 64)
private String alarmId;
/**
* 告警标题
*/
@Column(name = "alarm_title", nullable = false)
private String alarmTitle;
/**
* 告警描述
*/
@Column(name = "alarm_description", columnDefinition = "TEXT")
private String alarmDescription;
/**
* 告警级别
*/
@Enumerated(EnumType.STRING)
@Column(name = "alarm_level", nullable = false)
private AlarmLevel alarmLevel;
/**
* 告警类型
*/
@Column(name = "alarm_type", nullable = false)
private String alarmType;
/**
* 告警状态
*/
@Enumerated(EnumType.STRING)
@Column(name = "alarm_status", nullable = false)
private AlarmStatus alarmStatus;
/**
* 设备ID
*/
@Column(name = "device_id")
private String deviceId;
/**
* 设备名称
*/
@Column(name = "device_name")
private String deviceName;
/**
* 设备类型
*/
@Column(name = "device_type")
private String deviceType;
/**
* 告警源数据(JSON格式)
*/
@Column(name = "source_data", columnDefinition = "JSON")
private String sourceData;
/**
* 告警规则ID
*/
@Column(name = "rule_id")
private String ruleId;
/**
* 告警规则名称
*/
@Column(name = "rule_name")
private String ruleName;
/**
* 告警位置信息
*/
@Column(name = "location_info", columnDefinition = "JSON")
private String locationInfo;
/**
* 告警标签
*/
@Column(name = "tags", columnDefinition = "JSON")
private String tags;
/**
* 告警发生时间
*/
@Column(name = "alarm_time", nullable = false)
private LocalDateTime alarmTime;
/**
* 告警确认时间
*/
@Column(name = "acknowledge_time")
private LocalDateTime acknowledgeTime;
/**
* 告警确认人
*/
@Column(name = "acknowledge_user")
private String acknowledgeUser;
/**
* 告警处理时间
*/
@Column(name = "process_time")
private LocalDateTime processTime;
/**
* 告警处理人
*/
@Column(name = "process_user")
private String processUser;
/**
* 告警关闭时间
*/
@Column(name = "close_time")
private LocalDateTime closeTime;
/**
* 告警关闭人
*/
@Column(name = "close_user")
private String closeUser;
/**
* 处理方案
*/
@Column(name = "solution", columnDefinition = "TEXT")
private String solution;
/**
* 关闭原因
*/
@Column(name = "close_reason")
private String closeReason;
/**
* 告警持续时间(秒)
*/
@Column(name = "duration_seconds")
private Long durationSeconds;
/**
* 通知状态
*/
@Column(name = "notification_status", columnDefinition = "JSON")
private String notificationStatus;
/**
* 创建时间
*/
@CreationTimestamp
@Column(name = "created_time", nullable = false)
private LocalDateTime createdTime;
/**
* 更新时间
*/
@UpdateTimestamp
@Column(name = "updated_time", nullable = false)
private LocalDateTime updatedTime;
}
/**
* 告警级别枚举
*/
public enum AlarmLevel {
/**
* 紧急告警(需要立即处理)
*/
CRITICAL("紧急", 1, "#FF0000"),
/**
* 重要告警(需要尽快处理)
*/
MAJOR("重要", 2, "#FF8C00"),
/**
* 次要告警(需要关注)
*/
MINOR("次要", 3, "#FFD700"),
/**
* 警告信息(仅提醒)
*/
WARNING("警告", 4, "#32CD32"),
/**
* 信息提示
*/
INFO("信息", 5, "#87CEEB");
private final String displayName;
private final int priority;
private final String color;
AlarmLevel(String displayName, int priority, String color) {
this.displayName = displayName;
this.priority = priority;
this.color = color;
}
public String getDisplayName() {
return displayName;
}
public int getPriority() {
return priority;
}
public String getColor() {
return color;
}
}
/**
* 告警状态枚举
*/
public enum AlarmStatus {
/**
* 新建告警
*/
NEW("新建", "告警刚刚产生,等待处理"),
/**
* 已确认
*/
ACKNOWLEDGED("已确认", "告警已被确认,正在处理中"),
/**
* 处理中
*/
PROCESSING("处理中", "告警正在被处理"),
/**
* 已关闭
*/
CLOSED("已关闭", "告警已处理完成并关闭"),
/**
* 已忽略
*/
IGNORED("已忽略", "告警被标记为忽略");
private final String displayName;
private final String description;
AlarmStatus(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}
public String getDisplayName() {
return displayName;
}
public String getDescription() {
return description;
}
}
# 3.4 告警服务实现
/**
* 告警服务实现
* 就像一个智能的安全监控中心,负责处理各种异常情况的告警
*/
@Service
@Slf4j
@Transactional
public class AlarmServiceImpl implements AlarmService {
@Autowired
private AlarmRecordRepository alarmRepository;
@Autowired
private AlarmRuleRepository alarmRuleRepository;
@Autowired
private NotificationService notificationService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ApplicationEventPublisher eventPublisher;
private static final String ALARM_COUNTER_KEY = "alarm:counter:";
private static final String ALARM_SUPPRESSION_KEY = "alarm:suppression:";
/**
* 创建告警
* 这是告警系统的核心方法,就像安全系统检测到异常时触发警报
*/
@Override
public AlarmRecord createAlarm(CreateAlarmRequest request) {
log.info("开始创建告警,告警类型: {}, 设备ID: {}", request.getAlarmType(), request.getDeviceId());
try {
// 1. 检查告警抑制(防止重复告警)
if (isAlarmSuppressed(request)) {
log.debug("告警被抑制,跳过创建,设备ID: {}, 告警类型: {}",
request.getDeviceId(), request.getAlarmType());
return null;
}
// 2. 生成告警ID
String alarmId = generateAlarmId();
// 3. 构建告警记录
AlarmRecord alarm = AlarmRecord.builder()
.alarmId(alarmId)
.alarmTitle(request.getAlarmTitle())
.alarmDescription(request.getAlarmDescription())
.alarmLevel(request.getAlarmLevel())
.alarmType(request.getAlarmType())
.alarmStatus(AlarmStatus.NEW)
.deviceId(request.getDeviceId())
.deviceName(request.getDeviceName())
.deviceType(request.getDeviceType())
.sourceData(JsonUtils.toJson(request.getSourceData()))
.ruleId(request.getRuleId())
.ruleName(request.getRuleName())
.locationInfo(JsonUtils.toJson(request.getLocationInfo()))
.tags(JsonUtils.toJson(request.getTags()))
.alarmTime(LocalDateTime.now())
.build();
// 4. 保存告警记录
alarm = alarmRepository.save(alarm);
// 5. 更新告警统计
updateAlarmStatistics(alarm);
// 6. 设置告警抑制(防止短时间内重复告警)
setAlarmSuppression(request);
// 7. 发送告警通知
sendAlarmNotification(alarm);
// 8. 发布告警事件
publishAlarmEvent(alarm, "ALARM_CREATED");
log.info("告警创建成功,告警ID: {}, 级别: {}", alarmId, request.getAlarmLevel());
return alarm;
} catch (Exception e) {
log.error("创建告警异常,设备ID: {}", request.getDeviceId(), e);
throw new AlarmServiceException("创建告警失败: " + e.getMessage(), e);
}
}
/**
* 确认告警
* 就像值班人员看到告警后点击"我知道了"
*/
@Override
public boolean acknowledgeAlarm(String alarmId, String operatorId, String remarks) {
log.info("开始确认告警,告警ID: {}, 操作人: {}", alarmId, operatorId);
try {
Optional<AlarmRecord> alarmOpt = alarmRepository.findById(alarmId);
if (!alarmOpt.isPresent()) {
log.warn("告警不存在,告警ID: {}", alarmId);
return false;
}
AlarmRecord alarm = alarmOpt.get();
// 检查告警状态
if (alarm.getAlarmStatus() != AlarmStatus.NEW) {
log.warn("告警状态不允许确认,当前状态: {}", alarm.getAlarmStatus());
return false;
}
// 更新告警状态
alarm.setAlarmStatus(AlarmStatus.ACKNOWLEDGED);
alarm.setAcknowledgeTime(LocalDateTime.now());
alarm.setAcknowledgeUser(operatorId);
alarmRepository.save(alarm);
// 记录操作日志
recordAlarmOperation(alarmId, operatorId, "ACKNOWLEDGE", remarks);
// 发布事件
publishAlarmEvent(alarm, "ALARM_ACKNOWLEDGED");
log.info("告警确认成功,告警ID: {}", alarmId);
return true;
} catch (Exception e) {
log.error("确认告警异常,告警ID: {}", alarmId, e);
return false;
}
}
/**
* 处理告警
* 就像开始解决这个问题
*/
@Override
public boolean processAlarm(String alarmId, String operatorId, String solution) {
log.info("开始处理告警,告警ID: {}, 操作人: {}", alarmId, operatorId);
try {
Optional<AlarmRecord> alarmOpt = alarmRepository.findById(alarmId);
if (!alarmOpt.isPresent()) {
log.warn("告警不存在,告警ID: {}", alarmId);
return false;
}
AlarmRecord alarm = alarmOpt.get();
// 检查告警状态
if (alarm.getAlarmStatus() == AlarmStatus.CLOSED) {
log.warn("告警已关闭,无法处理,告警ID: {}", alarmId);
return false;
}
// 更新告警状态
alarm.setAlarmStatus(AlarmStatus.PROCESSING);
alarm.setProcessTime(LocalDateTime.now());
alarm.setProcessUser(operatorId);
alarm.setSolution(solution);
alarmRepository.save(alarm);
// 记录操作日志
recordAlarmOperation(alarmId, operatorId, "PROCESS", solution);
// 发布事件
publishAlarmEvent(alarm, "ALARM_PROCESSING");
log.info("告警处理成功,告警ID: {}", alarmId);
return true;
} catch (Exception e) {
log.error("处理告警异常,告警ID: {}", alarmId, e);
return false;
}
}
/**
* 关闭告警
* 就像问题解决后关闭警报
*/
@Override
public boolean closeAlarm(String alarmId, String operatorId, String closeReason) {
log.info("开始关闭告警,告警ID: {}, 操作人: {}", alarmId, operatorId);
try {
Optional<AlarmRecord> alarmOpt = alarmRepository.findById(alarmId);
if (!alarmOpt.isPresent()) {
log.warn("告警不存在,告警ID: {}", alarmId);
return false;
}
AlarmRecord alarm = alarmOpt.get();
// 检查告警状态
if (alarm.getAlarmStatus() == AlarmStatus.CLOSED) {
log.warn("告警已关闭,告警ID: {}", alarmId);
return false;
}
LocalDateTime closeTime = LocalDateTime.now();
// 计算告警持续时间
long durationSeconds = Duration.between(alarm.getAlarmTime(), closeTime).getSeconds();
// 更新告警状态
alarm.setAlarmStatus(AlarmStatus.CLOSED);
alarm.setCloseTime(closeTime);
alarm.setCloseUser(operatorId);
alarm.setCloseReason(closeReason);
alarm.setDurationSeconds(durationSeconds);
alarmRepository.save(alarm);
// 更新统计信息
updateAlarmCloseStatistics(alarm);
// 记录操作日志
recordAlarmOperation(alarmId, operatorId, "CLOSE", closeReason);
// 发布事件
publishAlarmEvent(alarm, "ALARM_CLOSED");
log.info("告警关闭成功,告警ID: {}, 持续时间: {}秒", alarmId, durationSeconds);
return true;
} catch (Exception e) {
log.error("关闭告警异常,告警ID: {}", alarmId, e);
return false;
}
}
/**
* 检查告警是否被抑制
* 防止短时间内产生大量重复告警
*/
private boolean isAlarmSuppressed(CreateAlarmRequest request) {
String suppressionKey = ALARM_SUPPRESSION_KEY +
request.getDeviceId() + ":" + request.getAlarmType();
return redisTemplate.hasKey(suppressionKey);
}
/**
* 设置告警抑制
* 在一定时间内防止相同类型的告警重复产生
*/
private void setAlarmSuppression(CreateAlarmRequest request) {
String suppressionKey = ALARM_SUPPRESSION_KEY +
request.getDeviceId() + ":" + request.getAlarmType();
// 根据告警级别设置不同的抑制时间
int suppressionMinutes = getSuppressionMinutes(request.getAlarmLevel());
redisTemplate.opsForValue().set(suppressionKey, "1",
Duration.ofMinutes(suppressionMinutes));
}
/**
* 根据告警级别获取抑制时间
*/
private int getSuppressionMinutes(AlarmLevel level) {
switch (level) {
case CRITICAL: return 5; // 紧急告警抑制5分钟
case MAJOR: return 10; // 重要告警抑制10分钟
case MINOR: return 15; // 次要告警抑制15分钟
case WARNING: return 30; // 警告抑制30分钟
case INFO: return 60; // 信息抑制60分钟
default: return 10;
}
}
/**
* 发送告警通知
* 根据告警级别和配置发送不同类型的通知
*/
private void sendAlarmNotification(AlarmRecord alarm) {
try {
// 构建通知内容
NotificationRequest notification = NotificationRequest.builder()
.title("【" + alarm.getAlarmLevel().getDisplayName() + "告警】" + alarm.getAlarmTitle())
.content(buildNotificationContent(alarm))
.level(mapAlarmLevelToNotificationLevel(alarm.getAlarmLevel()))
.targetType("ALARM")
.targetId(alarm.getAlarmId())
.tags(Arrays.asList("alarm", alarm.getAlarmType(), alarm.getAlarmLevel().name()))
.build();
// 获取通知接收者
List<String> recipients = getAlarmRecipients(alarm);
// 发送通知
for (String recipient : recipients) {
notification.setRecipient(recipient);
notificationService.sendNotification(notification);
}
log.debug("告警通知发送完成,告警ID: {}, 接收者数量: {}",
alarm.getAlarmId(), recipients.size());
} catch (Exception e) {
log.error("发送告警通知异常,告警ID: {}", alarm.getAlarmId(), e);
}
}
/**
* 构建通知内容
*/
private String buildNotificationContent(AlarmRecord alarm) {
StringBuilder content = new StringBuilder();
content.append("告警描述:").append(alarm.getAlarmDescription()).append("\n");
content.append("设备信息:").append(alarm.getDeviceName()).append("(").append(alarm.getDeviceId()).append(")\n");
content.append("告警时间:").append(alarm.getAlarmTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))).append("\n");
if (StringUtils.hasText(alarm.getLocationInfo())) {
content.append("位置信息:").append(alarm.getLocationInfo()).append("\n");
}
return content.toString();
}
/**
* 生成告警ID
*/
private String generateAlarmId() {
return "ALARM-" + System.currentTimeMillis() + "-" +
ThreadLocalRandom.current().nextInt(1000, 9999);
}
}
# 3.5 实际应用示例
让我们通过一个智能工厂的例子来看看告警服务是如何工作的:
/**
* 智能工厂告警应用示例
* 展示告警服务在实际场景中的应用
*/
@Component
@Slf4j
public class SmartFactoryAlarmExample {
@Autowired
private AlarmService alarmService;
@Autowired
private DeviceService deviceService;
/**
* 处理设备异常数据并创建告警
*/
public void handleDeviceAbnormalData(String deviceId, Map<String, Object> deviceData) {
// 检查温度异常
if (deviceData.containsKey("temperature")) {
double temperature = (Double) deviceData.get("temperature");
if (temperature > 80) {
createTemperatureAlarm(deviceId, temperature, deviceData);
}
}
// 检查压力异常
if (deviceData.containsKey("pressure")) {
double pressure = (Double) deviceData.get("pressure");
if (pressure > 10.0) {
createPressureAlarm(deviceId, pressure, deviceData);
}
}
// 检查设备离线
if (deviceData.containsKey("status") && "OFFLINE".equals(deviceData.get("status"))) {
createDeviceOfflineAlarm(deviceId, deviceData);
}
}
/**
* 创建温度异常告警
*/
private void createTemperatureAlarm(String deviceId, double temperature, Map<String, Object> sourceData) {
CreateAlarmRequest request = CreateAlarmRequest.builder()
.alarmTitle("设备温度异常")
.alarmDescription(String.format("设备温度过高,当前温度: %.2f°C,超过安全阈值80°C", temperature))
.alarmLevel(temperature > 100 ? AlarmLevel.CRITICAL : AlarmLevel.MAJOR)
.alarmType("TEMPERATURE_HIGH")
.deviceId(deviceId)
.deviceName(getDeviceName(deviceId))
.deviceType("production_equipment")
.sourceData(sourceData)
.locationInfo(getDeviceLocation(deviceId))
.tags(Arrays.asList("temperature", "safety", "production"))
.build();
AlarmRecord alarm = alarmService.createAlarm(request);
if (alarm != null) {
log.info("温度异常告警创建成功,设备ID: {}, 告警ID: {}", deviceId, alarm.getAlarmId());
}
}
/**
* 创建压力异常告警
*/
private void createPressureAlarm(String deviceId, double pressure, Map<String, Object> sourceData) {
CreateAlarmRequest request = CreateAlarmRequest.builder()
.alarmTitle("设备压力异常")
.alarmDescription(String.format("设备压力过高,当前压力: %.2f MPa,超过安全阈值10.0 MPa", pressure))
.alarmLevel(AlarmLevel.CRITICAL)
.alarmType("PRESSURE_HIGH")
.deviceId(deviceId)
.deviceName(getDeviceName(deviceId))
.deviceType("production_equipment")
.sourceData(sourceData)
.locationInfo(getDeviceLocation(deviceId))
.tags(Arrays.asList("pressure", "safety", "critical"))
.build();
AlarmRecord alarm = alarmService.createAlarm(request);
if (alarm != null) {
log.warn("压力异常告警创建成功,设备ID: {}, 告警ID: {}", deviceId, alarm.getAlarmId());
// 紧急情况,立即通知安全管理员
notifyEmergencyContact(alarm);
}
}
/**
* 创建设备离线告警
*/
private void createDeviceOfflineAlarm(String deviceId, Map<String, Object> sourceData) {
CreateAlarmRequest request = CreateAlarmRequest.builder()
.alarmTitle("设备离线")
.alarmDescription("设备失去连接,可能影响生产进度")
.alarmLevel(AlarmLevel.MINOR)
.alarmType("DEVICE_OFFLINE")
.deviceId(deviceId)
.deviceName(getDeviceName(deviceId))
.deviceType("production_equipment")
.sourceData(sourceData)
.locationInfo(getDeviceLocation(deviceId))
.tags(Arrays.asList("connectivity", "production"))
.build();
AlarmRecord alarm = alarmService.createAlarm(request);
if (alarm != null) {
log.info("设备离线告警创建成功,设备ID: {}, 告警ID: {}", deviceId, alarm.getAlarmId());
}
}
/**
* 批量处理告警
*/
public void batchProcessAlarms(List<String> alarmIds, String operatorId) {
BatchOperationResult result = alarmService.batchOperateAlarms(
alarmIds, AlarmOperation.ACKNOWLEDGE, operatorId);
log.info("批量确认告警完成,成功: {}, 失败: {}",
result.getSuccessCount(), result.getFailureCount());
}
private String getDeviceName(String deviceId) {
// 从设备服务获取设备名称
return "生产设备-" + deviceId.substring(deviceId.length() - 4);
}
private Map<String, Object> getDeviceLocation(String deviceId) {
// 获取设备位置信息
Map<String, Object> location = new HashMap<>();
location.put("workshop", "A车间");
location.put("line", "生产线1");
location.put("position", "工位3");
return location;
}
private void notifyEmergencyContact(AlarmRecord alarm) {
// 紧急联系人通知逻辑
log.warn("发送紧急告警通知,告警ID: {}", alarm.getAlarmId());
}
}
# 4. 数据分析服务 (Data Analysis Service)
# 4.1 设计理念
数据分析服务就像一个"智能数据科学家",它能够:
- 对海量的IoT设备数据进行实时和离线分析
- 发现数据中的模式、趋势和异常
- 提供预测性分析和智能洞察
- 支持多维度的数据统计和报表生成
- 为业务决策提供数据支撑
想象一下智能城市的交通分析系统:通过分析路口传感器的数据,预测交通拥堵,优化信号灯配时,提升整个城市的交通效率。
# 4.2 数据分析服务核心接口
/**
* 数据分析服务接口
* 就像一个智能的数据分析师,能够从海量数据中挖掘有价值的信息
*/
public interface DataAnalysisService {
/**
* 实时数据分析
* 就像实时监控数据流并立即给出分析结果
*/
AnalysisResult analyzeRealTimeData(RealTimeAnalysisRequest request);
/**
* 批量数据分析
* 就像对历史数据进行深度挖掘
*/
AnalysisResult analyzeBatchData(BatchAnalysisRequest request);
/**
* 趋势分析
* 分析数据的变化趋势和规律
*/
TrendAnalysisResult analyzeTrend(TrendAnalysisRequest request);
/**
* 异常检测
* 识别数据中的异常模式
*/
AnomalyDetectionResult detectAnomalies(AnomalyDetectionRequest request);
/**
* 预测分析
* 基于历史数据预测未来趋势
*/
PredictionResult predictFuture(PredictionRequest request);
/**
* 统计分析
* 生成各种统计指标和报表
*/
StatisticsResult generateStatistics(StatisticsRequest request);
/**
* 相关性分析
* 分析不同指标之间的关联关系
*/
CorrelationResult analyzeCorrelation(CorrelationRequest request);
/**
* 创建分析任务
* 创建长期运行的分析任务
*/
AnalysisTask createAnalysisTask(CreateAnalysisTaskRequest request);
/**
* 获取分析任务状态
*/
AnalysisTaskStatus getTaskStatus(String taskId);
/**
* 获取分析结果
*/
AnalysisResult getAnalysisResult(String taskId);
/**
* 导出分析报告
*/
ReportExportResult exportReport(ReportExportRequest request);
}
# 4.3 数据分析模型
/**
* 分析请求基类
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AnalysisRequest {
/**
* 分析ID
*/
private String analysisId;
/**
* 分析类型
*/
private AnalysisType analysisType;
/**
* 数据源配置
*/
private DataSourceConfig dataSource;
/**
* 分析参数
*/
private Map<String, Object> parameters;
/**
* 时间范围
*/
private TimeRange timeRange;
/**
* 过滤条件
*/
private List<FilterCondition> filters;
/**
* 分组字段
*/
private List<String> groupByFields;
/**
* 聚合函数
*/
private List<AggregationFunction> aggregations;
/**
* 输出格式
*/
private OutputFormat outputFormat;
}
/**
* 实时分析请求
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class RealTimeAnalysisRequest extends AnalysisRequest {
/**
* 实时数据
*/
private List<Map<String, Object>> realTimeData;
/**
* 窗口大小(秒)
*/
private Integer windowSize;
/**
* 滑动步长(秒)
*/
private Integer slideInterval;
/**
* 是否需要历史对比
*/
private Boolean needHistoricalComparison;
}
/**
* 批量分析请求
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class BatchAnalysisRequest extends AnalysisRequest {
/**
* 数据表名
*/
private String tableName;
/**
* SQL查询语句(可选)
*/
private String sqlQuery;
/**
* 分页参数
*/
private PageRequest pageRequest;
/**
* 是否异步执行
*/
private Boolean asyncExecution;
}
/**
* 分析结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AnalysisResult {
/**
* 分析ID
*/
private String analysisId;
/**
* 分析类型
*/
private AnalysisType analysisType;
/**
* 分析状态
*/
private AnalysisStatus status;
/**
* 分析结果数据
*/
private List<Map<String, Object>> resultData;
/**
* 统计摘要
*/
private Map<String, Object> summary;
/**
* 图表数据
*/
private List<ChartData> charts;
/**
* 洞察信息
*/
private List<Insight> insights;
/**
* 分析开始时间
*/
private LocalDateTime startTime;
/**
* 分析结束时间
*/
private LocalDateTime endTime;
/**
* 执行耗时(毫秒)
*/
private Long executionTimeMs;
/**
* 数据量
*/
private Long dataCount;
/**
* 错误信息
*/
private String errorMessage;
/**
* 元数据
*/
private Map<String, Object> metadata;
}
/**
* 分析类型枚举
*/
public enum AnalysisType {
/**
* 描述性统计分析
*/
DESCRIPTIVE_STATISTICS("描述性统计", "计算基本统计指标"),
/**
* 趋势分析
*/
TREND_ANALYSIS("趋势分析", "分析数据变化趋势"),
/**
* 异常检测
*/
ANOMALY_DETECTION("异常检测", "识别异常数据点"),
/**
* 预测分析
*/
PREDICTIVE_ANALYSIS("预测分析", "预测未来数据趋势"),
/**
* 相关性分析
*/
CORRELATION_ANALYSIS("相关性分析", "分析变量间关联关系"),
/**
* 聚类分析
*/
CLUSTERING_ANALYSIS("聚类分析", "数据分组和模式识别"),
/**
* 实时监控
*/
REAL_TIME_MONITORING("实时监控", "实时数据流分析");
private final String displayName;
private final String description;
AnalysisType(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}
public String getDisplayName() {
return displayName;
}
public String getDescription() {
return description;
}
}
# 4.4 数据分析服务实现
/**
* 数据分析服务实现
* 就像一个智能的数据分析专家,能够从复杂的数据中提取有价值的洞察
*/
@Service
@Slf4j
public class DataAnalysisServiceImpl implements DataAnalysisService {
@Autowired
private DataQueryService dataQueryService;
@Autowired
private StatisticsCalculator statisticsCalculator;
@Autowired
private TrendAnalyzer trendAnalyzer;
@Autowired
private AnomalyDetector anomalyDetector;
@Autowired
private PredictionEngine predictionEngine;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private TaskExecutor analysisTaskExecutor;
/**
* 实时数据分析
* 这是实时分析的核心方法,就像一个实时监控大屏不断刷新数据
*/
@Override
public AnalysisResult analyzeRealTimeData(RealTimeAnalysisRequest request) {
String analysisId = generateAnalysisId();
log.info("开始实时数据分析,分析ID: {}, 数据量: {}", analysisId, request.getRealTimeData().size());
LocalDateTime startTime = LocalDateTime.now();
try {
// 1. 数据预处理
List<Map<String, Object>> processedData = preprocessData(request.getRealTimeData(), request.getFilters());
// 2. 应用时间窗口
List<Map<String, Object>> windowedData = applyTimeWindow(processedData, request.getWindowSize());
// 3. 执行分析
Map<String, Object> analysisResults = new HashMap<>();
// 基础统计分析
if (request.getAggregations() != null && !request.getAggregations().isEmpty()) {
Map<String, Object> statistics = calculateRealTimeStatistics(windowedData, request.getAggregations());
analysisResults.put("statistics", statistics);
}
// 趋势分析
if (request.getAnalysisType() == AnalysisType.TREND_ANALYSIS) {
TrendAnalysisResult trendResult = analyzeTrendInRealTime(windowedData, request);
analysisResults.put("trend", trendResult);
}
// 异常检测
if (request.getAnalysisType() == AnalysisType.ANOMALY_DETECTION) {
AnomalyDetectionResult anomalyResult = detectRealTimeAnomalies(windowedData, request);
analysisResults.put("anomalies", anomalyResult);
}
// 4. 历史对比(如果需要)
if (Boolean.TRUE.equals(request.getNeedHistoricalComparison())) {
Map<String, Object> historicalComparison = performHistoricalComparison(windowedData, request);
analysisResults.put("historicalComparison", historicalComparison);
}
// 5. 生成洞察
List<Insight> insights = generateRealTimeInsights(analysisResults, request);
// 6. 构建结果
LocalDateTime endTime = LocalDateTime.now();
long executionTime = Duration.between(startTime, endTime).toMillis();
AnalysisResult result = AnalysisResult.builder()
.analysisId(analysisId)
.analysisType(request.getAnalysisType())
.status(AnalysisStatus.COMPLETED)
.resultData(Arrays.asList(analysisResults))
.summary(buildSummary(analysisResults))
.insights(insights)
.startTime(startTime)
.endTime(endTime)
.executionTimeMs(executionTime)
.dataCount((long) windowedData.size())
.build();
log.info("实时数据分析完成,分析ID: {}, 耗时: {}ms", analysisId, executionTime);
return result;
} catch (Exception e) {
log.error("实时数据分析异常,分析ID: {}", analysisId, e);
LocalDateTime endTime = LocalDateTime.now();
long executionTime = Duration.between(startTime, endTime).toMillis();
return AnalysisResult.builder()
.analysisId(analysisId)
.analysisType(request.getAnalysisType())
.status(AnalysisStatus.FAILED)
.startTime(startTime)
.endTime(endTime)
.executionTimeMs(executionTime)
.errorMessage(e.getMessage())
.build();
}
}
/**
* 批量数据分析
* 就像对历史数据进行深度挖掘和分析
*/
@Override
public AnalysisResult analyzeBatchData(BatchAnalysisRequest request) {
String analysisId = generateAnalysisId();
log.info("开始批量数据分析,分析ID: {}, 表名: {}", analysisId, request.getTableName());
LocalDateTime startTime = LocalDateTime.now();
try {
// 1. 查询数据
List<Map<String, Object>> rawData = queryBatchData(request);
log.debug("查询到数据量: {}", rawData.size());
// 2. 数据预处理
List<Map<String, Object>> processedData = preprocessData(rawData, request.getFilters());
// 3. 分组聚合
Map<String, List<Map<String, Object>>> groupedData = groupData(processedData, request.getGroupByFields());
// 4. 执行分析
Map<String, Object> analysisResults = new HashMap<>();
for (Map.Entry<String, List<Map<String, Object>>> group : groupedData.entrySet()) {
String groupKey = group.getKey();
List<Map<String, Object>> groupData = group.getValue();
Map<String, Object> groupResult = new HashMap<>();
// 统计分析
if (request.getAggregations() != null) {
Map<String, Object> statistics = calculateStatistics(groupData, request.getAggregations());
groupResult.put("statistics", statistics);
}
// 趋势分析
if (request.getAnalysisType() == AnalysisType.TREND_ANALYSIS) {
TrendAnalysisResult trendResult = analyzeTrend(createTrendRequest(groupData, request));
groupResult.put("trend", trendResult);
}
// 异常检测
if (request.getAnalysisType() == AnalysisType.ANOMALY_DETECTION) {
AnomalyDetectionResult anomalyResult = detectAnomalies(createAnomalyRequest(groupData, request));
groupResult.put("anomalies", anomalyResult);
}
analysisResults.put(groupKey, groupResult);
}
// 5. 生成洞察
List<Insight> insights = generateBatchInsights(analysisResults, request);
// 6. 构建结果
LocalDateTime endTime = LocalDateTime.now();
long executionTime = Duration.between(startTime, endTime).toMillis();
AnalysisResult result = AnalysisResult.builder()
.analysisId(analysisId)
.analysisType(request.getAnalysisType())
.status(AnalysisStatus.COMPLETED)
.resultData(Arrays.asList(analysisResults))
.summary(buildSummary(analysisResults))
.insights(insights)
.startTime(startTime)
.endTime(endTime)
.executionTimeMs(executionTime)
.dataCount((long) processedData.size())
.build();
log.info("批量数据分析完成,分析ID: {}, 耗时: {}ms", analysisId, executionTime);
return result;
} catch (Exception e) {
log.error("批量数据分析异常,分析ID: {}", analysisId, e);
LocalDateTime endTime = LocalDateTime.now();
long executionTime = Duration.between(startTime, endTime).toMillis();
return AnalysisResult.builder()
.analysisId(analysisId)
.analysisType(request.getAnalysisType())
.status(AnalysisStatus.FAILED)
.startTime(startTime)
.endTime(endTime)
.executionTimeMs(executionTime)
.errorMessage(e.getMessage())
.build();
}
}
/**
* 趋势分析
* 分析数据的变化趋势和规律
*/
@Override
public TrendAnalysisResult analyzeTrend(TrendAnalysisRequest request) {
log.info("开始趋势分析,时间范围: {} - {}", request.getTimeRange().getStartTime(), request.getTimeRange().getEndTime());
try {
// 1. 查询时间序列数据
List<Map<String, Object>> timeSeriesData = queryTimeSeriesData(request);
// 2. 数据平滑处理
List<Map<String, Object>> smoothedData = smoothData(timeSeriesData, request.getSmoothingMethod());
// 3. 趋势计算
TrendCalculationResult trendCalculation = calculateTrend(smoothedData, request.getTrendFields());
// 4. 季节性分析
SeasonalityResult seasonality = analyzeSeasonality(smoothedData, request);
// 5. 周期性检测
CyclicalityResult cyclicality = detectCyclicality(smoothedData, request);
// 6. 构建结果
return TrendAnalysisResult.builder()
.analysisId(generateAnalysisId())
.trendCalculation(trendCalculation)
.seasonality(seasonality)
.cyclicality(cyclicality)
.dataPoints(smoothedData.size())
.analysisTime(LocalDateTime.now())
.build();
} catch (Exception e) {
log.error("趋势分析异常", e);
throw new DataAnalysisException("趋势分析失败: " + e.getMessage(), e);
}
}
/**
* 预测分析
* 基于历史数据预测未来趋势
*/
@Override
public PredictionResult predictFuture(PredictionRequest request) {
log.info("开始预测分析,预测字段: {}, 预测期间: {}个时间点",
request.getPredictFields(), request.getPredictPeriods());
try {
// 1. 准备训练数据
List<Map<String, Object>> trainingData = prepareTrainingData(request);
// 2. 特征工程
List<Map<String, Object>> features = extractFeatures(trainingData, request);
// 3. 模型训练
PredictionModel model = trainPredictionModel(features, request);
// 4. 执行预测
List<PredictionPoint> predictions = generatePredictions(model, request);
// 5. 计算置信区间
List<ConfidenceInterval> confidenceIntervals = calculateConfidenceIntervals(predictions, request);
// 6. 模型评估
ModelEvaluation evaluation = evaluateModel(model, features, request);
// 7. 构建结果
return PredictionResult.builder()
.analysisId(generateAnalysisId())
.predictions(predictions)
.confidenceIntervals(confidenceIntervals)
.modelEvaluation(evaluation)
.modelType(request.getModelType())
.trainingDataSize(trainingData.size())
.predictionTime(LocalDateTime.now())
.build();
} catch (Exception e) {
log.error("预测分析异常", e);
throw new DataAnalysisException("预测分析失败: " + e.getMessage(), e);
}
}
/**
* 数据预处理
* 清洗和转换原始数据
*/
private List<Map<String, Object>> preprocessData(List<Map<String, Object>> rawData, List<FilterCondition> filters) {
List<Map<String, Object>> processedData = new ArrayList<>(rawData);
// 1. 应用过滤条件
if (filters != null && !filters.isEmpty()) {
processedData = applyFilters(processedData, filters);
}
// 2. 数据清洗
processedData = cleanData(processedData);
// 3. 数据类型转换
processedData = convertDataTypes(processedData);
// 4. 缺失值处理
processedData = handleMissingValues(processedData);
return processedData;
}
/**
* 计算实时统计指标
*/
private Map<String, Object> calculateRealTimeStatistics(List<Map<String, Object>> data, List<AggregationFunction> aggregations) {
Map<String, Object> statistics = new HashMap<>();
for (AggregationFunction aggregation : aggregations) {
String field = aggregation.getField();
String function = aggregation.getFunction();
List<Double> values = extractNumericValues(data, field);
switch (function.toUpperCase()) {
case "COUNT":
statistics.put(field + "_count", values.size());
break;
case "SUM":
statistics.put(field + "_sum", values.stream().mapToDouble(Double::doubleValue).sum());
break;
case "AVG":
statistics.put(field + "_avg", values.stream().mapToDouble(Double::doubleValue).average().orElse(0.0));
break;
case "MAX":
statistics.put(field + "_max", values.stream().mapToDouble(Double::doubleValue).max().orElse(0.0));
break;
case "MIN":
statistics.put(field + "_min", values.stream().mapToDouble(Double::doubleValue).min().orElse(0.0));
break;
case "STDDEV":
statistics.put(field + "_stddev", calculateStandardDeviation(values));
break;
}
}
return statistics;
}
/**
* 生成实时洞察
*/
private List<Insight> generateRealTimeInsights(Map<String, Object> analysisResults, RealTimeAnalysisRequest request) {
List<Insight> insights = new ArrayList<>();
// 基于统计结果生成洞察
if (analysisResults.containsKey("statistics")) {
Map<String, Object> statistics = (Map<String, Object>) analysisResults.get("statistics");
insights.addAll(generateStatisticalInsights(statistics));
}
// 基于趋势结果生成洞察
if (analysisResults.containsKey("trend")) {
TrendAnalysisResult trend = (TrendAnalysisResult) analysisResults.get("trend");
insights.addAll(generateTrendInsights(trend));
}
// 基于异常检测结果生成洞察
if (analysisResults.containsKey("anomalies")) {
AnomalyDetectionResult anomalies = (AnomalyDetectionResult) analysisResults.get("anomalies");
insights.addAll(generateAnomalyInsights(anomalies));
}
return insights;
}
/**
* 生成分析ID
*/
private String generateAnalysisId() {
return "ANALYSIS-" + System.currentTimeMillis() + "-" +
ThreadLocalRandom.current().nextInt(1000, 9999);
}
}
# 4.5 实际应用示例
让我们通过一个智能制造工厂的例子来看看数据分析服务是如何工作的:
/**
* 智能制造数据分析应用示例
* 展示数据分析服务在实际场景中的应用
*/
@Component
@Slf4j
public class SmartManufacturingAnalysisExample {
@Autowired
private DataAnalysisService dataAnalysisService;
@Autowired
private DeviceDataService deviceDataService;
/**
* 生产线效率实时分析
* 分析生产线的实时效率和性能指标
*/
public void analyzeProductionLineEfficiency(String productionLineId) {
log.info("开始分析生产线效率,生产线ID: {}", productionLineId);
// 1. 获取实时生产数据
List<Map<String, Object>> realTimeData = deviceDataService.getRealtimeProductionData(productionLineId);
// 2. 构建实时分析请求
RealTimeAnalysisRequest request = new RealTimeAnalysisRequest();
request.setAnalysisType(AnalysisType.DESCRIPTIVE_STATISTICS);
request.setRealTimeData(realTimeData);
request.setWindowSize(300); // 5分钟窗口
request.setSlideInterval(60); // 1分钟滑动
// 设置聚合函数
List<AggregationFunction> aggregations = Arrays.asList(
AggregationFunction.builder().field("production_count").function("SUM").build(),
AggregationFunction.builder().field("cycle_time").function("AVG").build(),
AggregationFunction.builder().field("efficiency_rate").function("AVG").build(),
AggregationFunction.builder().field("defect_rate").function("AVG").build()
);
request.setAggregations(aggregations);
// 3. 执行分析
AnalysisResult result = dataAnalysisService.analyzeRealTimeData(request);
// 4. 处理分析结果
if (result.getStatus() == AnalysisStatus.COMPLETED) {
processProductionEfficiencyResult(result, productionLineId);
} else {
log.error("生产线效率分析失败: {}", result.getErrorMessage());
}
}
/**
* 设备故障预测分析
* 基于历史数据预测设备可能的故障
*/
public void predictEquipmentFailure(String equipmentId) {
log.info("开始设备故障预测分析,设备ID: {}", equipmentId);
// 1. 构建预测请求
PredictionRequest request = PredictionRequest.builder()
.equipmentId(equipmentId)
.predictFields(Arrays.asList("temperature", "vibration", "pressure", "current"))
.predictPeriods(24) // 预测未来24小时
.modelType("LSTM") // 使用LSTM模型
.timeRange(TimeRange.builder()
.startTime(LocalDateTime.now().minusDays(30))
.endTime(LocalDateTime.now())
.build())
.build();
// 2. 执行预测分析
PredictionResult result = dataAnalysisService.predictFuture(request);
// 3. 分析预测结果
if (result.getPredictions() != null && !result.getPredictions().isEmpty()) {
analyzeFailurePrediction(result, equipmentId);
}
}
/**
* 质量趋势分析
* 分析产品质量的变化趋势
*/
public void analyzeQualityTrend(String productType, LocalDateTime startTime, LocalDateTime endTime) {
log.info("开始质量趋势分析,产品类型: {}, 时间范围: {} - {}", productType, startTime, endTime);
// 1. 构建趋势分析请求
TrendAnalysisRequest request = TrendAnalysisRequest.builder()
.productType(productType)
.timeRange(TimeRange.builder()
.startTime(startTime)
.endTime(endTime)
.build())
.trendFields(Arrays.asList("defect_rate", "first_pass_yield", "customer_satisfaction"))
.smoothingMethod("MOVING_AVERAGE")
.seasonalityPeriod(7) // 按周分析季节性
.build();
// 2. 执行趋势分析
TrendAnalysisResult result = dataAnalysisService.analyzeTrend(request);
// 3. 处理趋势分析结果
processQualityTrendResult(result, productType);
}
/**
* 异常检测分析
* 检测生产过程中的异常情况
*/
public void detectProductionAnomalies(String workshopId) {
log.info("开始生产异常检测,车间ID: {}", workshopId);
// 1. 构建异常检测请求
AnomalyDetectionRequest request = AnomalyDetectionRequest.builder()
.workshopId(workshopId)
.detectionFields(Arrays.asList("temperature", "humidity", "noise_level", "air_quality"))
.detectionMethod("ISOLATION_FOREST")
.sensitivityLevel(0.1) // 敏感度
.timeRange(TimeRange.builder()
.startTime(LocalDateTime.now().minusHours(24))
.endTime(LocalDateTime.now())
.build())
.build();
// 2. 执行异常检测
AnomalyDetectionResult result = dataAnalysisService.detectAnomalies(request);
// 3. 处理异常检测结果
if (result.getAnomalies() != null && !result.getAnomalies().isEmpty()) {
handleDetectedAnomalies(result, workshopId);
}
}
/**
* 综合生产报表分析
* 生成综合的生产分析报表
*/
public void generateProductionReport(String factoryId, LocalDateTime reportDate) {
log.info("开始生成生产报表,工厂ID: {}, 报表日期: {}", factoryId, reportDate);
// 1. 构建批量分析请求
BatchAnalysisRequest request = new BatchAnalysisRequest();
request.setAnalysisType(AnalysisType.DESCRIPTIVE_STATISTICS);
request.setTableName("production_data");
request.setTimeRange(TimeRange.builder()
.startTime(reportDate.toLocalDate().atStartOfDay())
.endTime(reportDate.toLocalDate().atTime(23, 59, 59))
.build());
// 设置过滤条件
List<FilterCondition> filters = Arrays.asList(
FilterCondition.builder().field("factory_id").operator("=").value(factoryId).build(),
FilterCondition.builder().field("status").operator("=").value("COMPLETED").build()
);
request.setFilters(filters);
// 设置分组字段
request.setGroupByFields(Arrays.asList("production_line_id", "product_type"));
// 设置聚合函数
List<AggregationFunction> aggregations = Arrays.asList(
AggregationFunction.builder().field("production_count").function("SUM").build(),
AggregationFunction.builder().field("defect_count").function("SUM").build(),
AggregationFunction.builder().field("cycle_time").function("AVG").build(),
AggregationFunction.builder().field("energy_consumption").function("SUM").build()
);
request.setAggregations(aggregations);
// 2. 执行批量分析
AnalysisResult result = dataAnalysisService.analyzeBatchData(request);
// 3. 生成报表
if (result.getStatus() == AnalysisStatus.COMPLETED) {
generateAndSaveReport(result, factoryId, reportDate);
}
}
/**
* 处理生产效率分析结果
*/
private void processProductionEfficiencyResult(AnalysisResult result, String productionLineId) {
Map<String, Object> summary = result.getSummary();
// 提取关键指标
Double totalProduction = (Double) summary.get("production_count_sum");
Double avgCycleTime = (Double) summary.get("cycle_time_avg");
Double avgEfficiency = (Double) summary.get("efficiency_rate_avg");
Double avgDefectRate = (Double) summary.get("defect_rate_avg");
log.info("生产线 {} 效率分析结果: 总产量={}, 平均周期时间={}s, 平均效率={}%, 平均缺陷率={}%",
productionLineId, totalProduction, avgCycleTime, avgEfficiency * 100, avgDefectRate * 100);
// 生成洞察和建议
List<Insight> insights = result.getInsights();
for (Insight insight : insights) {
log.info("洞察: {} - {}", insight.getTitle(), insight.getDescription());
// 根据洞察类型采取行动
if (insight.getType() == InsightType.ALERT && insight.getSeverity() == InsightSeverity.HIGH) {
// 发送高优先级告警
sendProductionAlert(productionLineId, insight);
}
}
}
/**
* 分析故障预测结果
*/
private void analyzeFailurePrediction(PredictionResult result, String equipmentId) {
List<PredictionPoint> predictions = result.getPredictions();
// 查找可能的故障点
for (PredictionPoint prediction : predictions) {
if (prediction.getConfidence() > 0.8) { // 高置信度预测
Map<String, Object> values = prediction.getPredictedValues();
// 检查温度预测
if (values.containsKey("temperature")) {
Double predictedTemp = (Double) values.get("temperature");
if (predictedTemp > 80.0) { // 温度过高阈值
log.warn("设备 {} 预测在 {} 时温度将达到 {}°C,可能发生过热故障",
equipmentId, prediction.getTimestamp(), predictedTemp);
// 创建预防性维护任务
createPreventiveMaintenanceTask(equipmentId, "temperature_high", prediction.getTimestamp());
}
}
// 检查振动预测
if (values.containsKey("vibration")) {
Double predictedVibration = (Double) values.get("vibration");
if (predictedVibration > 5.0) { // 振动过大阈值
log.warn("设备 {} 预测在 {} 时振动将达到 {}mm/s,可能发生机械故障",
equipmentId, prediction.getTimestamp(), predictedVibration);
createPreventiveMaintenanceTask(equipmentId, "vibration_high", prediction.getTimestamp());
}
}
}
}
}
/**
* 处理质量趋势分析结果
*/
private void processQualityTrendResult(TrendAnalysisResult result, String productType) {
TrendCalculationResult trendCalculation = result.getTrendCalculation();
// 分析缺陷率趋势
if (trendCalculation.getTrends().containsKey("defect_rate")) {
TrendInfo defectTrend = trendCalculation.getTrends().get("defect_rate");
if (defectTrend.getDirection() == TrendDirection.INCREASING) {
log.warn("产品 {} 的缺陷率呈上升趋势,斜率: {}", productType, defectTrend.getSlope());
// 触发质量改进流程
triggerQualityImprovementProcess(productType, defectTrend);
} else if (defectTrend.getDirection() == TrendDirection.DECREASING) {
log.info("产品 {} 的缺陷率呈下降趋势,质量持续改善", productType);
}
}
// 分析季节性模式
SeasonalityResult seasonality = result.getSeasonality();
if (seasonality.hasSeasonality()) {
log.info("产品 {} 的质量指标存在 {} 周期的季节性模式",
productType, seasonality.getPeriod());
}
}
/**
* 处理检测到的异常
*/
private void handleDetectedAnomalies(AnomalyDetectionResult result, String workshopId) {
List<AnomalyPoint> anomalies = result.getAnomalies();
for (AnomalyPoint anomaly : anomalies) {
log.warn("检测到异常: 车间={}, 时间={}, 字段={}, 异常值={}, 异常分数={}",
workshopId, anomaly.getTimestamp(), anomaly.getField(),
anomaly.getValue(), anomaly.getAnomalyScore());
// 根据异常类型采取不同的处理措施
switch (anomaly.getField()) {
case "temperature":
handleTemperatureAnomaly(workshopId, anomaly);
break;
case "humidity":
handleHumidityAnomaly(workshopId, anomaly);
break;
case "air_quality":
handleAirQualityAnomaly(workshopId, anomaly);
break;
default:
handleGenericAnomaly(workshopId, anomaly);
break;
}
}
}
/**
* 生成并保存报表
*/
private void generateAndSaveReport(AnalysisResult result, String factoryId, LocalDateTime reportDate) {
// 构建报表导出请求
ReportExportRequest exportRequest = ReportExportRequest.builder()
.reportType("PRODUCTION_DAILY_REPORT")
.analysisResult(result)
.factoryId(factoryId)
.reportDate(reportDate)
.exportFormat("PDF")
.includeCharts(true)
.includeInsights(true)
.build();
// 导出报表
ReportExportResult exportResult = dataAnalysisService.exportReport(exportRequest);
if (exportResult.isSuccess()) {
log.info("生产报表生成成功,文件路径: {}", exportResult.getFilePath());
// 发送报表给相关人员
sendReportToStakeholders(exportResult, factoryId);
} else {
log.error("生产报表生成失败: {}", exportResult.getErrorMessage());
}
}
// 辅助方法
private void sendProductionAlert(String productionLineId, Insight insight) {
log.warn("发送生产告警: 生产线={}, 告警={}", productionLineId, insight.getTitle());
}
private void createPreventiveMaintenanceTask(String equipmentId, String issueType, LocalDateTime scheduledTime) {
log.info("创建预防性维护任务: 设备={}, 问题类型={}, 计划时间={}",
equipmentId, issueType, scheduledTime);
}
private void triggerQualityImprovementProcess(String productType, TrendInfo defectTrend) {
log.info("触发质量改进流程: 产品类型={}, 趋势斜率={}", productType, defectTrend.getSlope());
}
private void handleTemperatureAnomaly(String workshopId, AnomalyPoint anomaly) {
log.warn("处理温度异常: 车间={}, 异常温度={}°C", workshopId, anomaly.getValue());
}
private void handleHumidityAnomaly(String workshopId, AnomalyPoint anomaly) {
log.warn("处理湿度异常: 车间={}, 异常湿度={}%", workshopId, anomaly.getValue());
}
private void handleAirQualityAnomaly(String workshopId, AnomalyPoint anomaly) {
log.warn("处理空气质量异常: 车间={}, 异常指数={}", workshopId, anomaly.getValue());
}
private void handleGenericAnomaly(String workshopId, AnomalyPoint anomaly) {
log.warn("处理通用异常: 车间={}, 字段={}, 异常值={}",
workshopId, anomaly.getField(), anomaly.getValue());
}
private void sendReportToStakeholders(ReportExportResult exportResult, String factoryId) {
log.info("发送报表给相关人员: 工厂={}, 报表文件={}", factoryId, exportResult.getFilePath());
}
}
# 5. 应用服务层最佳实践和总结
# 5.1 设计原则
应用服务层的设计遵循以下核心原则:
- 单一职责原则:每个服务专注于特定的业务功能
- 开放封闭原则:对扩展开放,对修改封闭
- 依赖倒置原则:依赖抽象而不是具体实现
- 接口隔离原则:提供细粒度的接口定义
- 高内聚低耦合:服务内部高度内聚,服务间松散耦合
# 5.2 性能优化建议
/**
* 应用服务层性能优化配置
*/
@Configuration
@EnableAsync
@EnableCaching
public class ApplicationServiceOptimizationConfig {
/**
* 异步任务执行器
* 用于处理耗时的分析任务
*/
@Bean("analysisTaskExecutor")
public TaskExecutor analysisTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("analysis-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* Redis缓存配置
* 缓存分析结果和规则定义
*/
@Bean
public CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofHours(1)) // 缓存1小时
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()));
return RedisCacheManager.builder(redisConnectionFactory)
.cacheDefaults(config)
.build();
}
/**
* 批量操作配置
*/
@Bean
public BatchOperationConfig batchOperationConfig() {
return BatchOperationConfig.builder()
.defaultBatchSize(100)
.maxBatchSize(1000)
.batchTimeout(Duration.ofSeconds(30))
.retryAttempts(3)
.build();
}
}
# 5.3 监控和指标
/**
* 应用服务层监控指标
*/
@Component
public class ApplicationServiceMetrics {
private final MeterRegistry meterRegistry;
private final Counter deviceControlCounter;
private final Timer ruleExecutionTimer;
private final Gauge activeAlarmsGauge;
private final Timer analysisExecutionTimer;
public ApplicationServiceMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// 设备控制计数器
this.deviceControlCounter = Counter.builder("device.control.commands")
.description("设备控制命令总数")
.register(meterRegistry);
// 规则执行时间
this.ruleExecutionTimer = Timer.builder("rule.execution.time")
.description("规则执行耗时")
.register(meterRegistry);
// 活跃告警数量
this.activeAlarmsGauge = Gauge.builder("alarms.active.count")
.description("当前活跃告警数量")
.register(meterRegistry, this, ApplicationServiceMetrics::getActiveAlarmCount);
// 数据分析执行时间
this.analysisExecutionTimer = Timer.builder("analysis.execution.time")
.description("数据分析执行耗时")
.register(meterRegistry);
}
public void recordDeviceControl(String commandType, String result) {
deviceControlCounter.increment(
Tags.of(
Tag.of("command_type", commandType),
Tag.of("result", result)
)
);
}
public Timer.Sample startRuleExecution() {
return Timer.start(meterRegistry);
}
public void recordRuleExecution(Timer.Sample sample, String ruleType, String result) {
sample.stop(Timer.builder("rule.execution.time")
.tags(
Tag.of("rule_type", ruleType),
Tag.of("result", result)
)
.register(meterRegistry));
}
public Timer.Sample startAnalysisExecution() {
return Timer.start(meterRegistry);
}
public void recordAnalysisExecution(Timer.Sample sample, String analysisType, String result) {
sample.stop(Timer.builder("analysis.execution.time")
.tags(
Tag.of("analysis_type", analysisType),
Tag.of("result", result)
)
.register(meterRegistry));
}
private double getActiveAlarmCount() {
// 实际实现中从数据库或缓存获取活跃告警数量
return 0.0;
}
}
# 5.4 总结
应用服务层是IoT平台的"智能大脑",它将底层的设备数据转化为有价值的业务洞察和自动化操作。通过本文档,我们详细介绍了四个核心服务:
# 🎮 设备控制服务
- 核心价值:实现对IoT设备的远程控制和管理
- 关键特性:支持单个、批量、定时控制,具备完善的验证和错误处理机制
- 应用场景:智能家居控制、工业设备操作、农业灌溉系统等
# 🧠 规则引擎服务
- 核心价值:提供灵活的业务规则定义和自动化执行能力
- 关键特性:支持多种规则类型,具备强大的条件判断和动作执行能力
- 应用场景:智能温室自动化、安防系统联动、能源管理优化等
# 🚨 告警服务
- 核心价值:及时发现和处理系统异常,保障业务连续性
- 关键特性:多级告警、智能抑制、批量处理、完整的生命周期管理
- 应用场景:设备故障监控、环境异常检测、安全事件响应等
# 📊 数据分析服务
- 核心价值:从海量IoT数据中提取有价值的业务洞察
- 关键特性:实时分析、批量处理、趋势预测、异常检测、智能洞察
- 应用场景:生产效率优化、设备预测性维护、质量趋势分析等
# 🏗️ 架构优势
- 模块化设计:每个服务职责清晰,可独立开发和部署
- 可扩展性:支持水平扩展,能够处理大规模IoT场景
- 高可用性:具备完善的错误处理和恢复机制
- 易于维护:清晰的接口定义和完整的文档
- 性能优化:支持缓存、异步处理、批量操作等优化策略
# 🎯 最佳实践
- 统一的错误处理:使用统一的异常处理机制
- 完善的日志记录:记录关键操作和性能指标
- 缓存策略:合理使用缓存提升性能
- 异步处理:对于耗时操作使用异步处理
- 监控告警:建立完善的监控和告警体系
通过这四个核心服务的协同工作,应用服务层为IoT平台提供了强大的业务处理能力,使得复杂的IoT场景能够以简单、可靠的方式实现自动化管理和智能化决策。