数据处理层
# 数据处理层
# 📖 章节概述
数据处理层是物联网系统的数据大脑,负责对海量设备数据进行实时处理、分析和存储。本章将深入讲解数据处理架构、流式处理、批处理、数据清洗、数据分析和数据存储策略。
# 🎯 学习目标
- 掌握物联网数据处理的核心架构和设计原则
- 理解流式处理和批处理的应用场景
- 学会实现数据清洗、转换和聚合算法
- 掌握时序数据库和分布式存储技术
# 1. 数据处理架构
# 1.1 Lambda架构
graph TB
subgraph "数据源层 Data Sources"
DS1[设备传感器]
DS2[网关设备]
DS3[移动应用]
DS4[Web应用]
DS5[第三方API]
end
subgraph "数据接入层 Data Ingestion"
DI1[Kafka消息队列]
DI2[HTTP API网关]
DI3[MQTT Broker]
DI4[数据采集器]
end
subgraph "流处理层 Stream Processing"
SP1[Kafka Streams]
SP2[Apache Flink]
SP3[Storm]
SP4[实时计算引擎]
end
subgraph "批处理层 Batch Processing"
BP1[Apache Spark]
BP2[Hadoop MapReduce]
BP3[数据仓库ETL]
BP4[离线分析引擎]
end
subgraph "服务层 Serving Layer"
SL1[实时视图]
SL2[批处理视图]
SL3[查询服务]
SL4[API服务]
end
subgraph "存储层 Storage Layer"
ST1[时序数据库]
ST2[关系数据库]
ST3[NoSQL数据库]
ST4[数据湖]
ST5[缓存系统]
end
subgraph "应用层 Application Layer"
AL1[实时监控]
AL2[数据分析]
AL3[报表系统]
AL4[告警系统]
end
DS1 --> DI1
DS2 --> DI2
DS3 --> DI3
DS4 --> DI4
DS5 --> DI1
DI1 --> SP1
DI2 --> SP2
DI3 --> SP3
DI4 --> SP4
DI1 --> BP1
DI2 --> BP2
DI3 --> BP3
DI4 --> BP4
SP1 --> SL1
SP2 --> SL1
SP3 --> SL1
SP4 --> SL1
BP1 --> SL2
BP2 --> SL2
BP3 --> SL2
BP4 --> SL2
SL1 --> ST1
SL1 --> ST5
SL2 --> ST2
SL2 --> ST3
SL2 --> ST4
SL3 --> AL1
SL3 --> AL2
SL4 --> AL3
SL4 --> AL4
# 1.2 数据处理核心框架
// 示例:数据处理核心框架
public class DataProcessingFramework {
private final StreamProcessor streamProcessor;
private final BatchProcessor batchProcessor;
private final DataValidator dataValidator;
private final DataTransformer dataTransformer;
private final DataRouter dataRouter;
private final MetricsCollector metricsCollector;
private final AlertManager alertManager;
public DataProcessingFramework(
StreamProcessor streamProcessor,
BatchProcessor batchProcessor,
DataValidator dataValidator,
DataTransformer dataTransformer,
DataRouter dataRouter,
MetricsCollector metricsCollector,
AlertManager alertManager) {
this.streamProcessor = streamProcessor;
this.batchProcessor = batchProcessor;
this.dataValidator = dataValidator;
this.dataTransformer = dataTransformer;
this.dataRouter = dataRouter;
this.metricsCollector = metricsCollector;
this.alertManager = alertManager;
}
// 数据处理管道
public class DataProcessingPipeline {
private final List<DataProcessor> processors;
private final ExecutorService executorService;
private final CompletableFuture<Void> processingFuture;
public DataProcessingPipeline() {
this.processors = new ArrayList<>();
this.executorService = Executors.newFixedThreadPool(10);
this.processingFuture = new CompletableFuture<>();
}
// 添加处理器
public DataProcessingPipeline addProcessor(DataProcessor processor) {
processors.add(processor);
return this;
}
// 处理数据
public CompletableFuture<ProcessingResult> processData(IoTDataMessage data) {
return CompletableFuture.supplyAsync(() -> {
try {
ProcessingContext context = new ProcessingContext(data);
// 数据验证
ValidationResult validationResult = dataValidator.validate(data);
if (!validationResult.isValid()) {
metricsCollector.incrementCounter("data.validation.failure");
return ProcessingResult.failure("数据验证失败: " + validationResult.getErrorMessage());
}
// 数据转换
IoTDataMessage transformedData = dataTransformer.transform(data);
context.setTransformedData(transformedData);
// 执行处理器链
for (DataProcessor processor : processors) {
ProcessingResult result = processor.process(context);
if (!result.isSuccess()) {
metricsCollector.incrementCounter("data.processing.failure");
return result;
}
context.addResult(processor.getClass().getSimpleName(), result);
}
// 数据路由
dataRouter.route(context.getTransformedData(), context.getRoutingHints());
// 记录指标
metricsCollector.incrementCounter("data.processing.success");
metricsCollector.recordTimer("data.processing.duration", context.getProcessingDuration());
return ProcessingResult.success(context.getResults());
} catch (Exception e) {
metricsCollector.incrementCounter("data.processing.error");
alertManager.sendAlert(new ProcessingErrorAlert(data.getDeviceId(), e));
return ProcessingResult.error("数据处理异常: " + e.getMessage(), e);
}
}, executorService);
}
// 批量处理数据
public CompletableFuture<BatchProcessingResult> processBatch(List<IoTDataMessage> dataList) {
return CompletableFuture.supplyAsync(() -> {
List<ProcessingResult> results = new ArrayList<>();
List<String> errors = new ArrayList<>();
for (IoTDataMessage data : dataList) {
try {
ProcessingResult result = processData(data).get();
results.add(result);
} catch (Exception e) {
errors.add(String.format("设备 %s 处理失败: %s", data.getDeviceId(), e.getMessage()));
}
}
return new BatchProcessingResult(results.size(), errors.size(), results, errors);
}, executorService);
}
// 关闭管道
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// 流式数据处理器
public class StreamDataProcessor implements DataProcessor {
private final KafkaStreams kafkaStreams;
private final StreamsBuilder streamsBuilder;
public StreamDataProcessor(Properties kafkaConfig) {
this.streamsBuilder = new StreamsBuilder();
initializeTopology();
this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), kafkaConfig);
}
private void initializeTopology() {
// 定义流处理拓扑
KStream<String, IoTDataMessage> sourceStream = streamsBuilder
.stream("iot-data-input", Consumed.with(Serdes.String(), new IoTDataMessageSerde()));
// 数据清洗和过滤
KStream<String, IoTDataMessage> cleanedStream = sourceStream
.filter((key, value) -> value != null && value.getDeviceId() != null)
.mapValues(this::cleanData);
// 数据聚合(按设备ID分组,5分钟窗口)
KTable<Windowed<String>, AggregatedData> aggregatedData = cleanedStream
.groupByKey(Grouped.with(Serdes.String(), new IoTDataMessageSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
AggregatedData::new,
(key, value, aggregate) -> aggregate.add(value),
Materialized.with(Serdes.String(), new AggregatedDataSerde())
);
// 异常检测
KStream<String, Alert> alertStream = cleanedStream
.filter(this::isAnomalous)
.mapValues(this::createAlert);
// 输出到不同的主题
cleanedStream.to("iot-data-cleaned", Produced.with(Serdes.String(), new IoTDataMessageSerde()));
aggregatedData.toStream().to("iot-data-aggregated", Produced.with(new WindowedSerde<>(Serdes.String()), new AggregatedDataSerde()));
alertStream.to("iot-alerts", Produced.with(Serdes.String(), new AlertSerde()));
}
private IoTDataMessage cleanData(IoTDataMessage data) {
// 数据清洗逻辑
IoTDataMessage cleaned = new IoTDataMessage();
cleaned.setDeviceId(data.getDeviceId());
cleaned.setTimestamp(data.getTimestamp());
cleaned.setMessageType(data.getMessageType());
// 清洗传感器数据
Map<String, Object> cleanedSensorData = new HashMap<>();
Map<String, Object> sensorData = data.getSensorData();
if (sensorData != null) {
for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
// 数据类型转换和验证
Object cleanedValue = cleanSensorValue(key, value);
if (cleanedValue != null) {
cleanedSensorData.put(key, cleanedValue);
}
}
}
cleaned.setSensorData(cleanedSensorData);
return cleaned;
}
private Object cleanSensorValue(String sensorType, Object value) {
try {
switch (sensorType.toLowerCase()) {
case "temperature":
double temp = Double.parseDouble(value.toString());
// 温度范围验证 (-50°C 到 100°C)
return (temp >= -50 && temp <= 100) ? temp : null;
case "humidity":
double humidity = Double.parseDouble(value.toString());
// 湿度范围验证 (0% 到 100%)
return (humidity >= 0 && humidity <= 100) ? humidity : null;
case "pressure":
double pressure = Double.parseDouble(value.toString());
// 压力范围验证 (0 到 2000 hPa)
return (pressure >= 0 && pressure <= 2000) ? pressure : null;
case "voltage":
double voltage = Double.parseDouble(value.toString());
// 电压范围验证 (0V 到 50V)
return (voltage >= 0 && voltage <= 50) ? voltage : null;
case "current":
double current = Double.parseDouble(value.toString());
// 电流范围验证 (0A 到 100A)
return (current >= 0 && current <= 100) ? current : null;
default:
// 对于未知类型,进行基本的数值验证
if (value instanceof Number) {
return value;
} else {
Double.parseDouble(value.toString());
return value;
}
}
} catch (NumberFormatException e) {
// 数值转换失败,返回null表示无效数据
return null;
}
}
private boolean isAnomalous(String key, IoTDataMessage data) {
// 异常检测逻辑
Map<String, Object> sensorData = data.getSensorData();
if (sensorData == null) {
return false;
}
// 检查温度异常
Object tempValue = sensorData.get("temperature");
if (tempValue != null) {
double temperature = Double.parseDouble(tempValue.toString());
if (temperature > 80 || temperature < -20) {
return true; // 温度异常
}
}
// 检查湿度异常
Object humidityValue = sensorData.get("humidity");
if (humidityValue != null) {
double humidity = Double.parseDouble(humidityValue.toString());
if (humidity > 95 || humidity < 5) {
return true; // 湿度异常
}
}
// 检查电压异常
Object voltageValue = sensorData.get("voltage");
if (voltageValue != null) {
double voltage = Double.parseDouble(voltageValue.toString());
if (voltage > 30 || voltage < 1) {
return true; // 电压异常
}
}
return false;
}
private Alert createAlert(IoTDataMessage data) {
Alert alert = new Alert();
alert.setDeviceId(data.getDeviceId());
alert.setTimestamp(System.currentTimeMillis());
alert.setAlertType("SENSOR_ANOMALY");
alert.setSeverity("HIGH");
alert.setMessage("检测到传感器数据异常");
alert.setData(data.getSensorData());
return alert;
}
@Override
public ProcessingResult process(ProcessingContext context) {
// 流处理器的同步处理接口实现
IoTDataMessage data = context.getTransformedData();
try {
// 执行数据清洗
IoTDataMessage cleanedData = cleanData(data);
context.setTransformedData(cleanedData);
// 检查异常
if (isAnomalous(data.getDeviceId(), cleanedData)) {
Alert alert = createAlert(cleanedData);
context.addRoutingHint("alert", alert);
}
return ProcessingResult.success("流处理完成");
} catch (Exception e) {
return ProcessingResult.error("流处理失败: " + e.getMessage(), e);
}
}
public void start() {
kafkaStreams.start();
}
public void stop() {
kafkaStreams.close();
}
}
// 批处理数据处理器
public class BatchDataProcessor implements DataProcessor {
private final SparkSession sparkSession;
private final DataFrameWriter<Row> dataFrameWriter;
public BatchDataProcessor() {
this.sparkSession = SparkSession.builder()
.appName("IoT Data Batch Processing")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.getOrCreate();
}
@Override
public ProcessingResult process(ProcessingContext context) {
try {
IoTDataMessage data = context.getTransformedData();
// 转换为Spark DataFrame
Dataset<Row> dataFrame = convertToDataFrame(data);
// 执行批处理分析
Dataset<Row> processedData = performBatchAnalysis(dataFrame);
// 保存结果
saveProcessedData(processedData);
return ProcessingResult.success("批处理完成");
} catch (Exception e) {
return ProcessingResult.error("批处理失败: " + e.getMessage(), e);
}
}
private Dataset<Row> convertToDataFrame(IoTDataMessage data) {
// 将IoT数据转换为Spark DataFrame
List<Row> rows = new ArrayList<>();
Map<String, Object> sensorData = data.getSensorData();
if (sensorData != null) {
for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
Row row = RowFactory.create(
data.getDeviceId(),
data.getTimestamp(),
entry.getKey(),
entry.getValue(),
data.getMessageType()
);
rows.add(row);
}
}
StructType schema = new StructType(new StructField[]{
new StructField("device_id", DataTypes.StringType, false, Metadata.empty()),
new StructField("timestamp", DataTypes.LongType, false, Metadata.empty()),
new StructField("sensor_type", DataTypes.StringType, false, Metadata.empty()),
new StructField("sensor_value", DataTypes.DoubleType, true, Metadata.empty()),
new StructField("message_type", DataTypes.StringType, true, Metadata.empty())
});
return sparkSession.createDataFrame(rows, schema);
}
private Dataset<Row> performBatchAnalysis(Dataset<Row> dataFrame) {
// 注册临时视图
dataFrame.createOrReplaceTempView("iot_data");
// 执行SQL分析
String sql = """
SELECT
device_id,
sensor_type,
DATE_FORMAT(FROM_UNIXTIME(timestamp/1000), 'yyyy-MM-dd HH:00:00') as hour,
AVG(sensor_value) as avg_value,
MIN(sensor_value) as min_value,
MAX(sensor_value) as max_value,
COUNT(*) as data_count,
STDDEV(sensor_value) as std_dev
FROM iot_data
WHERE sensor_value IS NOT NULL
GROUP BY device_id, sensor_type, DATE_FORMAT(FROM_UNIXTIME(timestamp/1000), 'yyyy-MM-dd HH:00:00')
ORDER BY device_id, sensor_type, hour
""";
return sparkSession.sql(sql);
}
private void saveProcessedData(Dataset<Row> processedData) {
// 保存到时序数据库
processedData.write()
.mode(SaveMode.Append)
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/iot_analytics")
.option("dbtable", "hourly_sensor_stats")
.option("user", "iot_user")
.option("password", "iot_password")
.save();
// 保存到数据湖
processedData.write()
.mode(SaveMode.Append)
.format("parquet")
.partitionBy("device_id", "hour")
.save("/data/lake/iot/hourly_stats");
}
public void shutdown() {
sparkSession.stop();
}
}
// 数据聚合处理器
public class DataAggregationProcessor implements DataProcessor {
private final Map<String, AggregationWindow> aggregationWindows;
private final ScheduledExecutorService scheduler;
public DataAggregationProcessor() {
this.aggregationWindows = new ConcurrentHashMap<>();
this.scheduler = Executors.newScheduledThreadPool(5);
// 启动定时聚合任务
startAggregationTasks();
}
@Override
public ProcessingResult process(ProcessingContext context) {
try {
IoTDataMessage data = context.getTransformedData();
String deviceId = data.getDeviceId();
// 获取或创建聚合窗口
AggregationWindow window = aggregationWindows.computeIfAbsent(
deviceId,
k -> new AggregationWindow(deviceId, Duration.ofMinutes(5))
);
// 添加数据到聚合窗口
window.addData(data);
return ProcessingResult.success("数据已添加到聚合窗口");
} catch (Exception e) {
return ProcessingResult.error("数据聚合失败: " + e.getMessage(), e);
}
}
private void startAggregationTasks() {
// 每分钟执行一次聚合
scheduler.scheduleAtFixedRate(this::performAggregation, 1, 1, TimeUnit.MINUTES);
// 每小时清理过期窗口
scheduler.scheduleAtFixedRate(this::cleanupExpiredWindows, 1, 1, TimeUnit.HOURS);
}
private void performAggregation() {
for (AggregationWindow window : aggregationWindows.values()) {
if (window.isReadyForAggregation()) {
try {
AggregatedData aggregatedData = window.aggregate();
// 发送聚合结果
sendAggregatedData(aggregatedData);
// 重置窗口
window.reset();
} catch (Exception e) {
log.error("聚合数据失败, deviceId: {}", window.getDeviceId(), e);
}
}
}
}
private void cleanupExpiredWindows() {
long currentTime = System.currentTimeMillis();
aggregationWindows.entrySet().removeIf(entry -> {
AggregationWindow window = entry.getValue();
return currentTime - window.getLastUpdateTime() > Duration.ofHours(2).toMillis();
});
}
private void sendAggregatedData(AggregatedData aggregatedData) {
// 发送到Kafka主题
// kafkaProducer.send("iot-aggregated-data", aggregatedData);
// 保存到数据库
// aggregatedDataRepository.save(aggregatedData);
// 更新缓存
// redisTemplate.opsForValue().set(
// "aggregated:" + aggregatedData.getDeviceId(),
// aggregatedData,
// Duration.ofMinutes(10)
// );
}
public void shutdown() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// 聚合窗口类
public static class AggregationWindow {
private final String deviceId;
private final Duration windowDuration;
private final List<IoTDataMessage> dataBuffer;
private long windowStartTime;
private long lastUpdateTime;
public AggregationWindow(String deviceId, Duration windowDuration) {
this.deviceId = deviceId;
this.windowDuration = windowDuration;
this.dataBuffer = new ArrayList<>();
this.windowStartTime = System.currentTimeMillis();
this.lastUpdateTime = System.currentTimeMillis();
}
public synchronized void addData(IoTDataMessage data) {
dataBuffer.add(data);
lastUpdateTime = System.currentTimeMillis();
}
public synchronized boolean isReadyForAggregation() {
long currentTime = System.currentTimeMillis();
return currentTime - windowStartTime >= windowDuration.toMillis() && !dataBuffer.isEmpty();
}
public synchronized AggregatedData aggregate() {
if (dataBuffer.isEmpty()) {
return null;
}
AggregatedData aggregated = new AggregatedData();
aggregated.setDeviceId(deviceId);
aggregated.setWindowStart(windowStartTime);
aggregated.setWindowEnd(System.currentTimeMillis());
aggregated.setDataCount(dataBuffer.size());
// 聚合传感器数据
Map<String, SensorAggregation> sensorAggregations = new HashMap<>();
for (IoTDataMessage data : dataBuffer) {
Map<String, Object> sensorData = data.getSensorData();
if (sensorData != null) {
for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
String sensorType = entry.getKey();
Object value = entry.getValue();
if (value instanceof Number) {
double numValue = ((Number) value).doubleValue();
SensorAggregation aggregation = sensorAggregations.computeIfAbsent(
sensorType,
k -> new SensorAggregation(sensorType)
);
aggregation.addValue(numValue);
}
}
}
}
aggregated.setSensorAggregations(sensorAggregations);
return aggregated;
}
public synchronized void reset() {
dataBuffer.clear();
windowStartTime = System.currentTimeMillis();
}
public String getDeviceId() {
return deviceId;
}
public long getLastUpdateTime() {
return lastUpdateTime;
}
}
// 传感器聚合类
public static class SensorAggregation {
private final String sensorType;
private double sum;
private double min;
private double max;
private int count;
private double sumOfSquares;
public SensorAggregation(String sensorType) {
this.sensorType = sensorType;
this.sum = 0.0;
this.min = Double.MAX_VALUE;
this.max = Double.MIN_VALUE;
this.count = 0;
this.sumOfSquares = 0.0;
}
public void addValue(double value) {
sum += value;
min = Math.min(min, value);
max = Math.max(max, value);
count++;
sumOfSquares += value * value;
}
public double getAverage() {
return count > 0 ? sum / count : 0.0;
}
public double getStandardDeviation() {
if (count <= 1) {
return 0.0;
}
double mean = getAverage();
double variance = (sumOfSquares - count * mean * mean) / (count - 1);
return Math.sqrt(variance);
}
// Getter方法
public String getSensorType() { return sensorType; }
public double getSum() { return sum; }
public double getMin() { return min; }
public double getMax() { return max; }
public int getCount() { return count; }
}
// 聚合数据类
public static class AggregatedData {
private String deviceId;
private long windowStart;
private long windowEnd;
private int dataCount;
private Map<String, SensorAggregation> sensorAggregations;
public AggregatedData() {
this.sensorAggregations = new HashMap<>();
}
public AggregatedData add(IoTDataMessage data) {
// 添加数据到聚合中
dataCount++;
Map<String, Object> sensorData = data.getSensorData();
if (sensorData != null) {
for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
String sensorType = entry.getKey();
Object value = entry.getValue();
if (value instanceof Number) {
double numValue = ((Number) value).doubleValue();
SensorAggregation aggregation = sensorAggregations.computeIfAbsent(
sensorType,
k -> new SensorAggregation(sensorType)
);
aggregation.addValue(numValue);
}
}
}
return this;
}
// Getter和Setter方法
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public long getWindowStart() { return windowStart; }
public void setWindowStart(long windowStart) { this.windowStart = windowStart; }
public long getWindowEnd() { return windowEnd; }
public void setWindowEnd(long windowEnd) { this.windowEnd = windowEnd; }
public int getDataCount() { return dataCount; }
public void setDataCount(int dataCount) { this.dataCount = dataCount; }
public Map<String, SensorAggregation> getSensorAggregations() { return sensorAggregations; }
public void setSensorAggregations(Map<String, SensorAggregation> sensorAggregations) { this.sensorAggregations = sensorAggregations; }
}
}
# 2. 实时流处理
# 2.1 Kafka Streams实现
// 示例:Kafka Streams实时处理实现
public class IoTStreamProcessor {
private final Properties streamConfig;
private final KafkaStreams streams;
private final StreamsBuilder builder;
public IoTStreamProcessor(String bootstrapServers, String applicationId) {
this.streamConfig = createStreamConfig(bootstrapServers, applicationId);
this.builder = new StreamsBuilder();
buildTopology();
this.streams = new KafkaStreams(builder.build(), streamConfig);
}
private Properties createStreamConfig(String bootstrapServers, String applicationId) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
return props;
}
private void buildTopology() {
// 输入流
KStream<String, String> rawDataStream = builder.stream("iot-raw-data");
// 数据解析和验证
KStream<String, IoTDataMessage> parsedStream = rawDataStream
.mapValues(this::parseJsonMessage)
.filter((key, value) -> value != null)
.selectKey((key, value) -> value.getDeviceId());
// 数据清洗
KStream<String, IoTDataMessage> cleanedStream = parsedStream
.mapValues(this::cleanAndValidateData)
.filter((key, value) -> value != null);
// 设备状态检测
KStream<String, DeviceStatus> statusStream = cleanedStream
.mapValues(this::extractDeviceStatus)
.filter((key, value) -> value != null);
// 异常检测
KStream<String, Alert> alertStream = cleanedStream
.filter(this::detectAnomalies)
.mapValues(this::createAlert);
// 数据聚合(滑动窗口)
KTable<Windowed<String>, AggregatedMetrics> aggregatedMetrics = cleanedStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
.aggregate(
AggregatedMetrics::new,
(key, value, aggregate) -> aggregate.update(value),
Materialized.<String, AggregatedMetrics, WindowStore<Bytes, byte[]>>as("aggregated-metrics-store")
.withValueSerde(new AggregatedMetricsSerde())
);
// 趋势分析
KStream<String, TrendAnalysis> trendStream = aggregatedMetrics
.toStream()
.mapValues(this::analyzeTrend)
.filter((key, value) -> value != null);
// 预测分析
KStream<String, Prediction> predictionStream = cleanedStream
.transform(() -> new PredictionTransformer(), "prediction-state-store");
// 输出流
cleanedStream.to("iot-cleaned-data", Produced.with(Serdes.String(), new IoTDataMessageSerde()));
statusStream.to("device-status", Produced.with(Serdes.String(), new DeviceStatusSerde()));
alertStream.to("iot-alerts", Produced.with(Serdes.String(), new AlertSerde()));
aggregatedMetrics.toStream().to("aggregated-metrics", Produced.with(new WindowedSerde<>(Serdes.String()), new AggregatedMetricsSerde()));
trendStream.to("trend-analysis", Produced.with(Serdes.String(), new TrendAnalysisSerde()));
predictionStream.to("predictions", Produced.with(Serdes.String(), new PredictionSerde()));
// 创建状态存储
StoreBuilder<KeyValueStore<String, PredictionModel>> predictionStoreBuilder = Stores
.keyValueStoreBuilder(
Stores.persistentKeyValueStore("prediction-state-store"),
Serdes.String(),
new PredictionModelSerde()
)
.withCachingEnabled();
builder.addStateStore(predictionStoreBuilder);
}
private IoTDataMessage parseJsonMessage(String jsonMessage) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonMessage, IoTDataMessage.class);
} catch (Exception e) {
log.warn("解析JSON消息失败: {}", jsonMessage, e);
return null;
}
}
private IoTDataMessage cleanAndValidateData(IoTDataMessage data) {
if (data.getDeviceId() == null || data.getTimestamp() <= 0) {
return null;
}
// 时间戳验证(不能是未来时间,不能太旧)
long currentTime = System.currentTimeMillis();
if (data.getTimestamp() > currentTime + 60000 || // 未来1分钟
data.getTimestamp() < currentTime - 24 * 60 * 60 * 1000) { // 过去24小时
return null;
}
// 清洗传感器数据
Map<String, Object> sensorData = data.getSensorData();
if (sensorData != null) {
Map<String, Object> cleanedSensorData = new HashMap<>();
for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
String sensorType = entry.getKey();
Object value = entry.getValue();
Object cleanedValue = validateSensorValue(sensorType, value);
if (cleanedValue != null) {
cleanedSensorData.put(sensorType, cleanedValue);
}
}
data.setSensorData(cleanedSensorData);
}
return data;
}
private Object validateSensorValue(String sensorType, Object value) {
try {
double numValue = Double.parseDouble(value.toString());
switch (sensorType.toLowerCase()) {
case "temperature":
return (numValue >= -50 && numValue <= 100) ? numValue : null;
case "humidity":
return (numValue >= 0 && numValue <= 100) ? numValue : null;
case "pressure":
return (numValue >= 0 && numValue <= 2000) ? numValue : null;
case "voltage":
return (numValue >= 0 && numValue <= 50) ? numValue : null;
case "current":
return (numValue >= 0 && numValue <= 100) ? numValue : null;
case "power":
return (numValue >= 0 && numValue <= 10000) ? numValue : null;
case "energy":
return (numValue >= 0) ? numValue : null;
default:
return numValue;
}
} catch (NumberFormatException e) {
return null;
}
}
private DeviceStatus extractDeviceStatus(IoTDataMessage data) {
DeviceStatus status = new DeviceStatus();
status.setDeviceId(data.getDeviceId());
status.setTimestamp(data.getTimestamp());
status.setOnline(true);
// 根据传感器数据判断设备健康状态
Map<String, Object> sensorData = data.getSensorData();
if (sensorData != null) {
// 检查电池电量
Object batteryValue = sensorData.get("battery");
if (batteryValue != null) {
double battery = Double.parseDouble(batteryValue.toString());
status.setBatteryLevel(battery);
if (battery < 10) {
status.setHealthStatus("LOW_BATTERY");
} else if (battery < 20) {
status.setHealthStatus("WARNING");
} else {
status.setHealthStatus("HEALTHY");
}
}
// 检查信号强度
Object signalValue = sensorData.get("signal_strength");
if (signalValue != null) {
int signal = Integer.parseInt(signalValue.toString());
status.setSignalStrength(signal);
if (signal < -90) {
status.setConnectivityStatus("POOR");
} else if (signal < -70) {
status.setConnectivityStatus("FAIR");
} else {
status.setConnectivityStatus("GOOD");
}
}
}
return status;
}
private boolean detectAnomalies(String key, IoTDataMessage data) {
Map<String, Object> sensorData = data.getSensorData();
if (sensorData == null) {
return false;
}
// 温度异常检测
Object tempValue = sensorData.get("temperature");
if (tempValue != null) {
double temperature = Double.parseDouble(tempValue.toString());
if (temperature > 80 || temperature < -20) {
return true;
}
}
// 湿度异常检测
Object humidityValue = sensorData.get("humidity");
if (humidityValue != null) {
double humidity = Double.parseDouble(humidityValue.toString());
if (humidity > 95 || humidity < 5) {
return true;
}
}
// 电压异常检测
Object voltageValue = sensorData.get("voltage");
if (voltageValue != null) {
double voltage = Double.parseDouble(voltageValue.toString());
if (voltage > 30 || voltage < 1) {
return true;
}
}
// 功率异常检测
Object powerValue = sensorData.get("power");
if (powerValue != null) {
double power = Double.parseDouble(powerValue.toString());
if (power > 5000) {
return true;
}
}
return false;
}
private Alert createAlert(IoTDataMessage data) {
Alert alert = new Alert();
alert.setDeviceId(data.getDeviceId());
alert.setTimestamp(System.currentTimeMillis());
alert.setAlertType("SENSOR_ANOMALY");
alert.setSeverity(determineSeverity(data));
alert.setMessage(generateAlertMessage(data));
alert.setData(data.getSensorData());
return alert;
}
private String determineSeverity(IoTDataMessage data) {
Map<String, Object> sensorData = data.getSensorData();
// 检查严重异常
Object tempValue = sensorData.get("temperature");
if (tempValue != null) {
double temperature = Double.parseDouble(tempValue.toString());
if (temperature > 90 || temperature < -30) {
return "CRITICAL";
}
}
Object voltageValue = sensorData.get("voltage");
if (voltageValue != null) {
double voltage = Double.parseDouble(voltageValue.toString());
if (voltage > 35 || voltage < 0.5) {
return "CRITICAL";
}
}
return "HIGH";
}
private String generateAlertMessage(IoTDataMessage data) {
StringBuilder message = new StringBuilder("检测到传感器数据异常: ");
Map<String, Object> sensorData = data.getSensorData();
for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
String sensorType = entry.getKey();
Object value = entry.getValue();
if (isAnomalousValue(sensorType, value)) {
message.append(String.format("%s=%s ", sensorType, value));
}
}
return message.toString();
}
private boolean isAnomalousValue(String sensorType, Object value) {
try {
double numValue = Double.parseDouble(value.toString());
switch (sensorType.toLowerCase()) {
case "temperature":
return numValue > 80 || numValue < -20;
case "humidity":
return numValue > 95 || numValue < 5;
case "voltage":
return numValue > 30 || numValue < 1;
case "power":
return numValue > 5000;
default:
return false;
}
} catch (NumberFormatException e) {
return false;
}
}
private TrendAnalysis analyzeTrend(AggregatedMetrics metrics) {
if (metrics == null || metrics.getDataPoints().size() < 2) {
return null;
}
TrendAnalysis analysis = new TrendAnalysis();
analysis.setDeviceId(metrics.getDeviceId());
analysis.setTimestamp(System.currentTimeMillis());
analysis.setWindowStart(metrics.getWindowStart());
analysis.setWindowEnd(metrics.getWindowEnd());
// 计算趋势
Map<String, Double> trends = new HashMap<>();
Map<String, List<Double>> sensorValues = metrics.getSensorValues();
for (Map.Entry<String, List<Double>> entry : sensorValues.entrySet()) {
String sensorType = entry.getKey();
List<Double> values = entry.getValue();
if (values.size() >= 2) {
double trend = calculateTrend(values);
trends.put(sensorType, trend);
}
}
analysis.setTrends(trends);
return analysis;
}
private double calculateTrend(List<Double> values) {
if (values.size() < 2) {
return 0.0;
}
// 简单线性回归计算趋势
int n = values.size();
double sumX = 0, sumY = 0, sumXY = 0, sumX2 = 0;
for (int i = 0; i < n; i++) {
double x = i;
double y = values.get(i);
sumX += x;
sumY += y;
sumXY += x * y;
sumX2 += x * x;
}
// 计算斜率(趋势)
double slope = (n * sumXY - sumX * sumY) / (n * sumX2 - sumX * sumX);
return slope;
}
// 预测转换器
private class PredictionTransformer implements Transformer<String, IoTDataMessage, KeyValue<String, Prediction>> {
private KeyValueStore<String, PredictionModel> stateStore;
@Override
public void init(ProcessorContext context) {
this.stateStore = context.getStateStore("prediction-state-store");
}
@Override
public KeyValue<String, Prediction> transform(String key, IoTDataMessage value) {
if (value == null || value.getSensorData() == null) {
return null;
}
// 获取或创建预测模型
PredictionModel model = stateStore.get(key);
if (model == null) {
model = new PredictionModel(key);
}
// 更新模型
model.addDataPoint(value);
// 生成预测
Prediction prediction = model.predict();
// 保存更新的模型
stateStore.put(key, model);
return prediction != null ? KeyValue.pair(key, prediction) : null;
}
@Override
public void close() {
// 清理资源
}
}
public void start() {
streams.start();
}
public void stop() {
streams.close();
}
public KafkaStreams.State getState() {
return streams.state();
}
}
# 3. 数据存储策略
# 3.1 时序数据库设计
// 示例:时序数据库操作实现
public class TimeSeriesDataManager {
private final InfluxDBClient influxDBClient;
private final WriteApiBlocking writeApi;
private final QueryApi queryApi;
private final String bucket;
private final String organization;
public TimeSeriesDataManager(String url, String token, String bucket, String organization) {
this.influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray());
this.writeApi = influxDBClient.getWriteApiBlocking();
this.queryApi = influxDBClient.getQueryApi();
this.bucket = bucket;
this.organization = organization;
}
// 写入传感器数据
public void writeSensorData(IoTDataMessage data) {
try {
Map<String, Object> sensorData = data.getSensorData();
if (sensorData == null || sensorData.isEmpty()) {
return;
}
List<Point> points = new ArrayList<>();
for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
String sensorType = entry.getKey();
Object value = entry.getValue();
if (value instanceof Number) {
Point point = Point.measurement("sensor_data")
.addTag("device_id", data.getDeviceId())
.addTag("sensor_type", sensorType)
.addTag("message_type", data.getMessageType())
.addField("value", ((Number) value).doubleValue())
.time(data.getTimestamp(), WritePrecision.MS);
points.add(point);
}
}
if (!points.isEmpty()) {
writeApi.writePoints(bucket, organization, points);
}
} catch (Exception e) {
log.error("写入传感器数据失败, deviceId: {}", data.getDeviceId(), e);
throw new DataWriteException("写入时序数据失败", e);
}
}
// 批量写入数据
public void writeBatchSensorData(List<IoTDataMessage> dataList) {
try {
List<Point> allPoints = new ArrayList<>();
for (IoTDataMessage data : dataList) {
Map<String, Object> sensorData = data.getSensorData();
if (sensorData != null) {
for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
String sensorType = entry.getKey();
Object value = entry.getValue();
if (value instanceof Number) {
Point point = Point.measurement("sensor_data")
.addTag("device_id", data.getDeviceId())
.addTag("sensor_type", sensorType)
.addTag("message_type", data.getMessageType())
.addField("value", ((Number) value).doubleValue())
.time(data.getTimestamp(), WritePrecision.MS);
allPoints.add(point);
}
}
}
}
if (!allPoints.isEmpty()) {
writeApi.writePoints(bucket, organization, allPoints);
}
} catch (Exception e) {
log.error("批量写入传感器数据失败", e);
throw new DataWriteException("批量写入时序数据失败", e);
}
}
// 查询设备最新数据
public List<SensorDataPoint> getLatestSensorData(String deviceId, int limit) {
String query = String.format(
"from(bucket: \"%s\") " +
"|> range(start: -24h) " +
"|> filter(fn: (r) => r._measurement == \"sensor_data\") " +
"|> filter(fn: (r) => r.device_id == \"%s\") " +
"|> sort(columns: [\"_time\"], desc: true) " +
"|> limit(n: %d)",
bucket, deviceId, limit
);
return executeQuery(query);
}
// 查询时间范围内的数据
public List<SensorDataPoint> getSensorDataByTimeRange(
String deviceId,
String sensorType,
long startTime,
long endTime) {
String query = String.format(
"from(bucket: \"%s\") " +
"|> range(start: %s, stop: %s) " +
"|> filter(fn: (r) => r._measurement == \"sensor_data\") " +
"|> filter(fn: (r) => r.device_id == \"%s\") " +
"|> filter(fn: (r) => r.sensor_type == \"%s\") " +
"|> sort(columns: [\"_time\"])",
bucket,
Instant.ofEpochMilli(startTime),
Instant.ofEpochMilli(endTime),
deviceId,
sensorType
);
return executeQuery(query);
}
// 聚合查询
public List<AggregatedSensorData> getAggregatedSensorData(
String deviceId,
String sensorType,
long startTime,
long endTime,
String aggregationWindow) {
String query = String.format(
"from(bucket: \"%s\") " +
"|> range(start: %s, stop: %s) " +
"|> filter(fn: (r) => r._measurement == \"sensor_data\") " +
"|> filter(fn: (r) => r.device_id == \"%s\") " +
"|> filter(fn: (r) => r.sensor_type == \"%s\") " +
"|> aggregateWindow(every: %s, fn: mean, createEmpty: false) " +
"|> yield(name: \"mean\")",
bucket,
Instant.ofEpochMilli(startTime),
Instant.ofEpochMilli(endTime),
deviceId,
sensorType,
aggregationWindow
);
return executeAggregationQuery(query);
}
// 多指标聚合查询
public Map<String, List<AggregatedSensorData>> getMultiMetricAggregatedData(
String deviceId,
List<String> sensorTypes,
long startTime,
long endTime,
String aggregationWindow) {
Map<String, List<AggregatedSensorData>> result = new HashMap<>();
for (String sensorType : sensorTypes) {
List<AggregatedSensorData> data = getAggregatedSensorData(
deviceId, sensorType, startTime, endTime, aggregationWindow
);
result.put(sensorType, data);
}
return result;
}
// 统计分析查询
public SensorStatistics getSensorStatistics(
String deviceId,
String sensorType,
long startTime,
long endTime) {
String query = String.format(
"data = from(bucket: \"%s\") " +
"|> range(start: %s, stop: %s) " +
"|> filter(fn: (r) => r._measurement == \"sensor_data\") " +
"|> filter(fn: (r) => r.device_id == \"%s\") " +
"|> filter(fn: (r) => r.sensor_type == \"%s\") \n" +
"mean = data |> mean() |> yield(name: \"mean\") \n" +
"min = data |> min() |> yield(name: \"min\") \n" +
"max = data |> max() |> yield(name: \"max\") \n" +
"count = data |> count() |> yield(name: \"count\") \n" +
"stddev = data |> stddev() |> yield(name: \"stddev\")",
bucket,
Instant.ofEpochMilli(startTime),
Instant.ofEpochMilli(endTime),
deviceId,
sensorType
);
return executeStatisticsQuery(query);
}
private List<SensorDataPoint> executeQuery(String query) {
List<SensorDataPoint> dataPoints = new ArrayList<>();
try {
List<FluxTable> tables = queryApi.query(query, organization);
for (FluxTable table : tables) {
for (FluxRecord record : table.getRecords()) {
SensorDataPoint point = new SensorDataPoint();
point.setDeviceId((String) record.getValueByKey("device_id"));
point.setSensorType((String) record.getValueByKey("sensor_type"));
point.setValue((Double) record.getValue());
point.setTimestamp(record.getTime().toEpochMilli());
dataPoints.add(point);
}
}
} catch (Exception e) {
log.error("执行查询失败: {}", query, e);
throw new DataQueryException("时序数据查询失败", e);
}
return dataPoints;
}
private List<AggregatedSensorData> executeAggregationQuery(String query) {
List<AggregatedSensorData> aggregatedData = new ArrayList<>();
try {
List<FluxTable> tables = queryApi.query(query, organization);
for (FluxTable table : tables) {
for (FluxRecord record : table.getRecords()) {
AggregatedSensorData data = new AggregatedSensorData();
data.setDeviceId((String) record.getValueByKey("device_id"));
data.setSensorType((String) record.getValueByKey("sensor_type"));
data.setAggregatedValue((Double) record.getValue());
data.setTimestamp(record.getTime().toEpochMilli());
data.setAggregationType("mean");
aggregatedData.add(data);
}
}
} catch (Exception e) {
log.error("执行聚合查询失败: {}", query, e);
throw new DataQueryException("聚合数据查询失败", e);
}
return aggregatedData;
}
private SensorStatistics executeStatisticsQuery(String query) {
SensorStatistics statistics = new SensorStatistics();
try {
List<FluxTable> tables = queryApi.query(query, organization);
for (FluxTable table : tables) {
for (FluxRecord record : table.getRecords()) {
String resultName = (String) record.getValueByKey("result");
Double value = (Double) record.getValue();
switch (resultName) {
case "mean":
statistics.setMean(value);
break;
case "min":
statistics.setMin(value);
break;
case "max":
statistics.setMax(value);
break;
case "count":
statistics.setCount(value.longValue());
break;
case "stddev":
statistics.setStandardDeviation(value);
break;
}
}
}
} catch (Exception e) {
log.error("执行统计查询失败: {}", query, e);
throw new DataQueryException("统计数据查询失败", e);
}
return statistics;
}
// 数据清理
public void cleanupOldData(int retentionDays) {
try {
String deleteQuery = String.format(
"from(bucket: \"%s\") " +
"|> range(start: -365d, stop: -%dd) " +
"|> filter(fn: (r) => r._measurement == \"sensor_data\") " +
"|> drop()",
bucket, retentionDays
);
queryApi.query(deleteQuery, organization);
log.info("清理了{}天前的历史数据", retentionDays);
} catch (Exception e) {
log.error("清理历史数据失败", e);
throw new DataCleanupException("数据清理失败", e);
}
}
public void close() {
if (influxDBClient != null) {
influxDBClient.close();
}
}
// 数据点类
public static class SensorDataPoint {
private String deviceId;
private String sensorType;
private double value;
private long timestamp;
// Getter和Setter方法
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public String getSensorType() { return sensorType; }
public void setSensorType(String sensorType) { this.sensorType = sensorType; }
public double getValue() { return value; }
public void setValue(double value) { this.value = value; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}
// 聚合数据类
public static class AggregatedSensorData {
private String deviceId;
private String sensorType;
private double aggregatedValue;
private long timestamp;
private String aggregationType;
// Getter和Setter方法
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public String getSensorType() { return sensorType; }
public void setSensorType(String sensorType) { this.sensorType = sensorType; }
public double getAggregatedValue() { return aggregatedValue; }
public void setAggregatedValue(double aggregatedValue) { this.aggregatedValue = aggregatedValue; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public String getAggregationType() { return aggregationType; }
public void setAggregationType(String aggregationType) { this.aggregationType = aggregationType; }
}
// 统计数据类
public static class SensorStatistics {
private double mean;
private double min;
private double max;
private long count;
private double standardDeviation;
// Getter和Setter方法
public double getMean() { return mean; }
public void setMean(double mean) { this.mean = mean; }
public double getMin() { return min; }
public void setMin(double min) { this.min = min; }
public double getMax() { return max; }
public void setMax(double max) { this.max = max; }
public long getCount() { return count; }
public void setCount(long count) { this.count = count; }
public double getStandardDeviation() { return standardDeviation; }
public void setStandardDeviation(double standardDeviation) { this.standardDeviation = standardDeviation; }
}
}
# 4. 数据分析与机器学习
# 4.1 异常检测算法
// 示例:异常检测算法实现
public class AnomalyDetectionEngine {
private final Map<String, AnomalyDetector> detectors;
private final AlertManager alertManager;
private final MetricsCollector metricsCollector;
public AnomalyDetectionEngine(AlertManager alertManager, MetricsCollector metricsCollector) {
this.detectors = new ConcurrentHashMap<>();
this.alertManager = alertManager;
this.metricsCollector = metricsCollector;
initializeDetectors();
}
private void initializeDetectors() {
// 统计异常检测器
detectors.put("statistical", new StatisticalAnomalyDetector());
// 基于机器学习的异常检测器
detectors.put("isolation_forest", new IsolationForestDetector());
// 时间序列异常检测器
detectors.put("time_series", new TimeSeriesAnomalyDetector());
// 阈值异常检测器
detectors.put("threshold", new ThresholdAnomalyDetector());
}
// 检测异常
public List<AnomalyResult> detectAnomalies(String deviceId, List<SensorDataPoint> dataPoints) {
List<AnomalyResult> allResults = new ArrayList<>();
for (AnomalyDetector detector : detectors.values()) {
try {
List<AnomalyResult> results = detector.detect(deviceId, dataPoints);
allResults.addAll(results);
// 记录检测指标
metricsCollector.incrementCounter("anomaly.detection." + detector.getName());
} catch (Exception e) {
log.error("异常检测失败, detector: {}, deviceId: {}", detector.getName(), deviceId, e);
metricsCollector.incrementCounter("anomaly.detection.error");
}
}
// 处理检测结果
processAnomalyResults(deviceId, allResults);
return allResults;
}
private void processAnomalyResults(String deviceId, List<AnomalyResult> results) {
for (AnomalyResult result : results) {
if (result.isAnomaly()) {
// 发送告警
Alert alert = createAnomalyAlert(deviceId, result);
alertManager.sendAlert(alert);
// 记录异常指标
metricsCollector.incrementCounter("anomaly.detected");
metricsCollector.recordGauge("anomaly.score", result.getAnomalyScore());
}
}
}
private Alert createAnomalyAlert(String deviceId, AnomalyResult result) {
Alert alert = new Alert();
alert.setDeviceId(deviceId);
alert.setTimestamp(System.currentTimeMillis());
alert.setAlertType("ANOMALY_DETECTED");
alert.setSeverity(determineSeverity(result.getAnomalyScore()));
alert.setMessage(String.format("检测到异常: %s, 异常分数: %.2f",
result.getDescription(), result.getAnomalyScore()));
alert.setData(Map.of(
"detector", result.getDetectorName(),
"anomaly_score", result.getAnomalyScore(),
"sensor_type", result.getSensorType(),
"value", result.getValue()
));
return alert;
}
private String determineSeverity(double anomalyScore) {
if (anomalyScore >= 0.8) {
return "CRITICAL";
} else if (anomalyScore >= 0.6) {
return "HIGH";
} else if (anomalyScore >= 0.4) {
return "MEDIUM";
} else {
return "LOW";
}
}
// 统计异常检测器
public static class StatisticalAnomalyDetector implements AnomalyDetector {
private static final double Z_SCORE_THRESHOLD = 3.0;
@Override
public String getName() {
return "statistical";
}
@Override
public List<AnomalyResult> detect(String deviceId, List<SensorDataPoint> dataPoints) {
List<AnomalyResult> results = new ArrayList<>();
// 按传感器类型分组
Map<String, List<SensorDataPoint>> groupedData = dataPoints.stream()
.collect(Collectors.groupingBy(SensorDataPoint::getSensorType));
for (Map.Entry<String, List<SensorDataPoint>> entry : groupedData.entrySet()) {
String sensorType = entry.getKey();
List<SensorDataPoint> sensorData = entry.getValue();
if (sensorData.size() < 10) {
continue; // 数据点太少,跳过
}
// 计算统计指标
double[] values = sensorData.stream()
.mapToDouble(SensorDataPoint::getValue)
.toArray();
double mean = Arrays.stream(values).average().orElse(0.0);
double stdDev = calculateStandardDeviation(values, mean);
// 检测异常
for (SensorDataPoint point : sensorData) {
double zScore = Math.abs((point.getValue() - mean) / stdDev);
if (zScore > Z_SCORE_THRESHOLD) {
AnomalyResult result = new AnomalyResult();
result.setDeviceId(deviceId);
result.setSensorType(sensorType);
result.setValue(point.getValue());
result.setTimestamp(point.getTimestamp());
result.setAnomaly(true);
result.setAnomalyScore(Math.min(zScore / 5.0, 1.0)); // 归一化到0-1
result.setDetectorName(getName());
result.setDescription(String.format("Z-Score异常: %.2f (阈值: %.1f)", zScore, Z_SCORE_THRESHOLD));
results.add(result);
}
}
}
return results;
}
private double calculateStandardDeviation(double[] values, double mean) {
double sumSquaredDiffs = Arrays.stream(values)
.map(value -> Math.pow(value - mean, 2))
.sum();
return Math.sqrt(sumSquaredDiffs / values.length);
}
}
// 孤立森林异常检测器
public static class IsolationForestDetector implements AnomalyDetector {
private static final int NUM_TREES = 100;
private static final int SAMPLE_SIZE = 256;
private static final double ANOMALY_THRESHOLD = 0.6;
@Override
public String getName() {
return "isolation_forest";
}
@Override
public List<AnomalyResult> detect(String deviceId, List<SensorDataPoint> dataPoints) {
List<AnomalyResult> results = new ArrayList<>();
if (dataPoints.size() < 50) {
return results; // 数据点太少
}
// 准备特征数据
double[][] features = prepareFeatures(dataPoints);
// 构建孤立森林
IsolationForest forest = new IsolationForest(NUM_TREES, SAMPLE_SIZE);
forest.fit(features);
// 计算异常分数
double[] anomalyScores = forest.anomalyScore(features);
// 检测异常
for (int i = 0; i < dataPoints.size(); i++) {
SensorDataPoint point = dataPoints.get(i);
double score = anomalyScores[i];
if (score > ANOMALY_THRESHOLD) {
AnomalyResult result = new AnomalyResult();
result.setDeviceId(deviceId);
result.setSensorType(point.getSensorType());
result.setValue(point.getValue());
result.setTimestamp(point.getTimestamp());
result.setAnomaly(true);
result.setAnomalyScore(score);
result.setDetectorName(getName());
result.setDescription(String.format("孤立森林异常: %.2f (阈值: %.1f)", score, ANOMALY_THRESHOLD));
results.add(result);
}
}
return results;
}
private double[][] prepareFeatures(List<SensorDataPoint> dataPoints) {
// 提取特征:值、时间差、移动平均等
int numFeatures = 4;
double[][] features = new double[dataPoints.size()][numFeatures];
for (int i = 0; i < dataPoints.size(); i++) {
SensorDataPoint point = dataPoints.get(i);
// 特征1: 传感器值
features[i][0] = point.getValue();
// 特征2: 时间差(与前一个点)
if (i > 0) {
features[i][1] = point.getTimestamp() - dataPoints.get(i - 1).getTimestamp();
} else {
features[i][1] = 0;
}
// 特征3: 移动平均(窗口大小5)
int windowStart = Math.max(0, i - 4);
double sum = 0;
int count = 0;
for (int j = windowStart; j <= i; j++) {
sum += dataPoints.get(j).getValue();
count++;
}
features[i][2] = sum / count;
// 特征4: 变化率
if (i > 0) {
double prevValue = dataPoints.get(i - 1).getValue();
features[i][3] = (point.getValue() - prevValue) / Math.max(Math.abs(prevValue), 1e-6);
} else {
features[i][3] = 0;
}
}
return features;
}
}
// 时间序列异常检测器
public static class TimeSeriesAnomalyDetector implements AnomalyDetector {
private static final int SEASONAL_PERIOD = 24; // 24小时周期
private static final double ANOMALY_THRESHOLD = 2.5;
@Override
public String getName() {
return "time_series";
}
@Override
public List<AnomalyResult> detect(String deviceId, List<SensorDataPoint> dataPoints) {
List<AnomalyResult> results = new ArrayList<>();
// 按传感器类型分组
Map<String, List<SensorDataPoint>> groupedData = dataPoints.stream()
.collect(Collectors.groupingBy(SensorDataPoint::getSensorType));
for (Map.Entry<String, List<SensorDataPoint>> entry : groupedData.entrySet()) {
String sensorType = entry.getKey();
List<SensorDataPoint> sensorData = entry.getValue();
if (sensorData.size() < SEASONAL_PERIOD * 2) {
continue; // 数据点太少
}
// 排序数据点
sensorData.sort(Comparator.comparing(SensorDataPoint::getTimestamp));
// 季节性分解
SeasonalDecomposition decomposition = performSeasonalDecomposition(sensorData);
// 检测异常
for (int i = 0; i < sensorData.size(); i++) {
SensorDataPoint point = sensorData.get(i);
double residual = decomposition.getResiduals()[i];
double threshold = ANOMALY_THRESHOLD * decomposition.getResidualStdDev();
if (Math.abs(residual) > threshold) {
AnomalyResult result = new AnomalyResult();
result.setDeviceId(deviceId);
result.setSensorType(sensorType);
result.setValue(point.getValue());
result.setTimestamp(point.getTimestamp());
result.setAnomaly(true);
result.setAnomalyScore(Math.min(Math.abs(residual) / threshold / 2.0, 1.0));
result.setDetectorName(getName());
result.setDescription(String.format("时间序列异常: 残差=%.2f, 阈值=%.2f", residual, threshold));
results.add(result);
}
}
}
return results;
}
private SeasonalDecomposition performSeasonalDecomposition(List<SensorDataPoint> dataPoints) {
double[] values = dataPoints.stream()
.mapToDouble(SensorDataPoint::getValue)
.toArray();
// 简化的季节性分解
double[] trend = calculateTrend(values);
double[] seasonal = calculateSeasonal(values, trend);
double[] residuals = calculateResiduals(values, trend, seasonal);
double residualStdDev = calculateStandardDeviation(residuals);
return new SeasonalDecomposition(trend, seasonal, residuals, residualStdDev);
}
private double[] calculateTrend(double[] values) {
// 使用移动平均计算趋势
int windowSize = SEASONAL_PERIOD;
double[] trend = new double[values.length];
for (int i = 0; i < values.length; i++) {
int start = Math.max(0, i - windowSize / 2);
int end = Math.min(values.length - 1, i + windowSize / 2);
double sum = 0;
int count = 0;
for (int j = start; j <= end; j++) {
sum += values[j];
count++;
}
trend[i] = sum / count;
}
return trend;
}
private double[] calculateSeasonal(double[] values, double[] trend) {
double[] seasonal = new double[values.length];
double[] seasonalPattern = new double[SEASONAL_PERIOD];
// 计算季节性模式
for (int i = 0; i < SEASONAL_PERIOD; i++) {
double sum = 0;
int count = 0;
for (int j = i; j < values.length; j += SEASONAL_PERIOD) {
sum += values[j] - trend[j];
count++;
}
seasonalPattern[i] = count > 0 ? sum / count : 0;
}
// 应用季节性模式
for (int i = 0; i < values.length; i++) {
seasonal[i] = seasonalPattern[i % SEASONAL_PERIOD];
}
return seasonal;
}
private double[] calculateResiduals(double[] values, double[] trend, double[] seasonal) {
double[] residuals = new double[values.length];
for (int i = 0; i < values.length; i++) {
residuals[i] = values[i] - trend[i] - seasonal[i];
}
return residuals;
}
private double calculateStandardDeviation(double[] values) {
double mean = Arrays.stream(values).average().orElse(0.0);
double sumSquaredDiffs = Arrays.stream(values)
.map(value -> Math.pow(value - mean, 2))
.sum();
return Math.sqrt(sumSquaredDiffs / values.length);
}
}
// 季节性分解结果类
public static class SeasonalDecomposition {
private final double[] trend;
private final double[] seasonal;
private final double[] residuals;
private final double residualStdDev;
public SeasonalDecomposition(double[] trend, double[] seasonal, double[] residuals, double residualStdDev) {
this.trend = trend;
this.seasonal = seasonal;
this.residuals = residuals;
this.residualStdDev = residualStdDev;
}
public double[] getTrend() { return trend; }
public double[] getSeasonal() { return seasonal; }
public double[] getResiduals() { return residuals; }
public double getResidualStdDev() { return residualStdDev; }
}
}
# 5. 最佳实践总结
# 5.1 设计原则
- 可扩展性: 采用微服务架构,支持水平扩展
- 高可用性: 实现多副本部署和故障转移
- 数据一致性: 确保数据处理的准确性和完整性
- 实时性: 优化处理延迟,满足实时性要求
- 容错性: 实现优雅降级和错误恢复
# 5.2 开发建议
- 数据建模: 合理设计数据模型,支持高效查询
- 性能优化: 使用缓存、批处理和并行处理
- 监控告警: 实现全面的监控和告警机制
- 数据治理: 建立数据质量管控流程
- 安全保护: 实现数据加密和访问控制
# 6. 下一步学习
- 学习应用服务层的设计和实现
- 深入了解微服务架构模式
- 掌握容器化部署和运维
- 研究边缘计算和雾计算技术