数据处理层

# 数据处理层

# 📖 章节概述

数据处理层是物联网系统的数据大脑,负责对海量设备数据进行实时处理、分析和存储。本章将深入讲解数据处理架构、流式处理、批处理、数据清洗、数据分析和数据存储策略。

# 🎯 学习目标

  • 掌握物联网数据处理的核心架构和设计原则
  • 理解流式处理和批处理的应用场景
  • 学会实现数据清洗、转换和聚合算法
  • 掌握时序数据库和分布式存储技术

# 1. 数据处理架构

# 1.1 Lambda架构

graph TB
    subgraph "数据源层 Data Sources"
        DS1[设备传感器]
        DS2[网关设备]
        DS3[移动应用]
        DS4[Web应用]
        DS5[第三方API]
    end
    
    subgraph "数据接入层 Data Ingestion"
        DI1[Kafka消息队列]
        DI2[HTTP API网关]
        DI3[MQTT Broker]
        DI4[数据采集器]
    end
    
    subgraph "流处理层 Stream Processing"
        SP1[Kafka Streams]
        SP2[Apache Flink]
        SP3[Storm]
        SP4[实时计算引擎]
    end
    
    subgraph "批处理层 Batch Processing"
        BP1[Apache Spark]
        BP2[Hadoop MapReduce]
        BP3[数据仓库ETL]
        BP4[离线分析引擎]
    end
    
    subgraph "服务层 Serving Layer"
        SL1[实时视图]
        SL2[批处理视图]
        SL3[查询服务]
        SL4[API服务]
    end
    
    subgraph "存储层 Storage Layer"
        ST1[时序数据库]
        ST2[关系数据库]
        ST3[NoSQL数据库]
        ST4[数据湖]
        ST5[缓存系统]
    end
    
    subgraph "应用层 Application Layer"
        AL1[实时监控]
        AL2[数据分析]
        AL3[报表系统]
        AL4[告警系统]
    end
    
    DS1 --> DI1
    DS2 --> DI2
    DS3 --> DI3
    DS4 --> DI4
    DS5 --> DI1
    
    DI1 --> SP1
    DI2 --> SP2
    DI3 --> SP3
    DI4 --> SP4
    
    DI1 --> BP1
    DI2 --> BP2
    DI3 --> BP3
    DI4 --> BP4
    
    SP1 --> SL1
    SP2 --> SL1
    SP3 --> SL1
    SP4 --> SL1
    
    BP1 --> SL2
    BP2 --> SL2
    BP3 --> SL2
    BP4 --> SL2
    
    SL1 --> ST1
    SL1 --> ST5
    SL2 --> ST2
    SL2 --> ST3
    SL2 --> ST4
    
    SL3 --> AL1
    SL3 --> AL2
    SL4 --> AL3
    SL4 --> AL4

# 1.2 数据处理核心框架

// 示例:数据处理核心框架
public class DataProcessingFramework {
    
    private final StreamProcessor streamProcessor;
    private final BatchProcessor batchProcessor;
    private final DataValidator dataValidator;
    private final DataTransformer dataTransformer;
    private final DataRouter dataRouter;
    private final MetricsCollector metricsCollector;
    private final AlertManager alertManager;
    
    public DataProcessingFramework(
            StreamProcessor streamProcessor,
            BatchProcessor batchProcessor,
            DataValidator dataValidator,
            DataTransformer dataTransformer,
            DataRouter dataRouter,
            MetricsCollector metricsCollector,
            AlertManager alertManager) {
        
        this.streamProcessor = streamProcessor;
        this.batchProcessor = batchProcessor;
        this.dataValidator = dataValidator;
        this.dataTransformer = dataTransformer;
        this.dataRouter = dataRouter;
        this.metricsCollector = metricsCollector;
        this.alertManager = alertManager;
    }
    
    // 数据处理管道
    public class DataProcessingPipeline {
        
        private final List<DataProcessor> processors;
        private final ExecutorService executorService;
        private final CompletableFuture<Void> processingFuture;
        
        public DataProcessingPipeline() {
            this.processors = new ArrayList<>();
            this.executorService = Executors.newFixedThreadPool(10);
            this.processingFuture = new CompletableFuture<>();
        }
        
        // 添加处理器
        public DataProcessingPipeline addProcessor(DataProcessor processor) {
            processors.add(processor);
            return this;
        }
        
        // 处理数据
        public CompletableFuture<ProcessingResult> processData(IoTDataMessage data) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    ProcessingContext context = new ProcessingContext(data);
                    
                    // 数据验证
                    ValidationResult validationResult = dataValidator.validate(data);
                    if (!validationResult.isValid()) {
                        metricsCollector.incrementCounter("data.validation.failure");
                        return ProcessingResult.failure("数据验证失败: " + validationResult.getErrorMessage());
                    }
                    
                    // 数据转换
                    IoTDataMessage transformedData = dataTransformer.transform(data);
                    context.setTransformedData(transformedData);
                    
                    // 执行处理器链
                    for (DataProcessor processor : processors) {
                        ProcessingResult result = processor.process(context);
                        if (!result.isSuccess()) {
                            metricsCollector.incrementCounter("data.processing.failure");
                            return result;
                        }
                        context.addResult(processor.getClass().getSimpleName(), result);
                    }
                    
                    // 数据路由
                    dataRouter.route(context.getTransformedData(), context.getRoutingHints());
                    
                    // 记录指标
                    metricsCollector.incrementCounter("data.processing.success");
                    metricsCollector.recordTimer("data.processing.duration", context.getProcessingDuration());
                    
                    return ProcessingResult.success(context.getResults());
                    
                } catch (Exception e) {
                    metricsCollector.incrementCounter("data.processing.error");
                    alertManager.sendAlert(new ProcessingErrorAlert(data.getDeviceId(), e));
                    return ProcessingResult.error("数据处理异常: " + e.getMessage(), e);
                }
            }, executorService);
        }
        
        // 批量处理数据
        public CompletableFuture<BatchProcessingResult> processBatch(List<IoTDataMessage> dataList) {
            return CompletableFuture.supplyAsync(() -> {
                List<ProcessingResult> results = new ArrayList<>();
                List<String> errors = new ArrayList<>();
                
                for (IoTDataMessage data : dataList) {
                    try {
                        ProcessingResult result = processData(data).get();
                        results.add(result);
                    } catch (Exception e) {
                        errors.add(String.format("设备 %s 处理失败: %s", data.getDeviceId(), e.getMessage()));
                    }
                }
                
                return new BatchProcessingResult(results.size(), errors.size(), results, errors);
            }, executorService);
        }
        
        // 关闭管道
        public void shutdown() {
            executorService.shutdown();
            try {
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    executorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                executorService.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }
    
    // 流式数据处理器
    public class StreamDataProcessor implements DataProcessor {
        
        private final KafkaStreams kafkaStreams;
        private final StreamsBuilder streamsBuilder;
        
        public StreamDataProcessor(Properties kafkaConfig) {
            this.streamsBuilder = new StreamsBuilder();
            initializeTopology();
            this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), kafkaConfig);
        }
        
        private void initializeTopology() {
            // 定义流处理拓扑
            KStream<String, IoTDataMessage> sourceStream = streamsBuilder
                .stream("iot-data-input", Consumed.with(Serdes.String(), new IoTDataMessageSerde()));
            
            // 数据清洗和过滤
            KStream<String, IoTDataMessage> cleanedStream = sourceStream
                .filter((key, value) -> value != null && value.getDeviceId() != null)
                .mapValues(this::cleanData);
            
            // 数据聚合(按设备ID分组,5分钟窗口)
            KTable<Windowed<String>, AggregatedData> aggregatedData = cleanedStream
                .groupByKey(Grouped.with(Serdes.String(), new IoTDataMessageSerde()))
                .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
                .aggregate(
                    AggregatedData::new,
                    (key, value, aggregate) -> aggregate.add(value),
                    Materialized.with(Serdes.String(), new AggregatedDataSerde())
                );
            
            // 异常检测
            KStream<String, Alert> alertStream = cleanedStream
                .filter(this::isAnomalous)
                .mapValues(this::createAlert);
            
            // 输出到不同的主题
            cleanedStream.to("iot-data-cleaned", Produced.with(Serdes.String(), new IoTDataMessageSerde()));
            aggregatedData.toStream().to("iot-data-aggregated", Produced.with(new WindowedSerde<>(Serdes.String()), new AggregatedDataSerde()));
            alertStream.to("iot-alerts", Produced.with(Serdes.String(), new AlertSerde()));
        }
        
        private IoTDataMessage cleanData(IoTDataMessage data) {
            // 数据清洗逻辑
            IoTDataMessage cleaned = new IoTDataMessage();
            cleaned.setDeviceId(data.getDeviceId());
            cleaned.setTimestamp(data.getTimestamp());
            cleaned.setMessageType(data.getMessageType());
            
            // 清洗传感器数据
            Map<String, Object> cleanedSensorData = new HashMap<>();
            Map<String, Object> sensorData = data.getSensorData();
            
            if (sensorData != null) {
                for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
                    String key = entry.getKey();
                    Object value = entry.getValue();
                    
                    // 数据类型转换和验证
                    Object cleanedValue = cleanSensorValue(key, value);
                    if (cleanedValue != null) {
                        cleanedSensorData.put(key, cleanedValue);
                    }
                }
            }
            
            cleaned.setSensorData(cleanedSensorData);
            return cleaned;
        }
        
        private Object cleanSensorValue(String sensorType, Object value) {
            try {
                switch (sensorType.toLowerCase()) {
                    case "temperature":
                        double temp = Double.parseDouble(value.toString());
                        // 温度范围验证 (-50°C 到 100°C)
                        return (temp >= -50 && temp <= 100) ? temp : null;
                        
                    case "humidity":
                        double humidity = Double.parseDouble(value.toString());
                        // 湿度范围验证 (0% 到 100%)
                        return (humidity >= 0 && humidity <= 100) ? humidity : null;
                        
                    case "pressure":
                        double pressure = Double.parseDouble(value.toString());
                        // 压力范围验证 (0 到 2000 hPa)
                        return (pressure >= 0 && pressure <= 2000) ? pressure : null;
                        
                    case "voltage":
                        double voltage = Double.parseDouble(value.toString());
                        // 电压范围验证 (0V 到 50V)
                        return (voltage >= 0 && voltage <= 50) ? voltage : null;
                        
                    case "current":
                        double current = Double.parseDouble(value.toString());
                        // 电流范围验证 (0A 到 100A)
                        return (current >= 0 && current <= 100) ? current : null;
                        
                    default:
                        // 对于未知类型,进行基本的数值验证
                        if (value instanceof Number) {
                            return value;
                        } else {
                            Double.parseDouble(value.toString());
                            return value;
                        }
                }
            } catch (NumberFormatException e) {
                // 数值转换失败,返回null表示无效数据
                return null;
            }
        }
        
        private boolean isAnomalous(String key, IoTDataMessage data) {
            // 异常检测逻辑
            Map<String, Object> sensorData = data.getSensorData();
            if (sensorData == null) {
                return false;
            }
            
            // 检查温度异常
            Object tempValue = sensorData.get("temperature");
            if (tempValue != null) {
                double temperature = Double.parseDouble(tempValue.toString());
                if (temperature > 80 || temperature < -20) {
                    return true; // 温度异常
                }
            }
            
            // 检查湿度异常
            Object humidityValue = sensorData.get("humidity");
            if (humidityValue != null) {
                double humidity = Double.parseDouble(humidityValue.toString());
                if (humidity > 95 || humidity < 5) {
                    return true; // 湿度异常
                }
            }
            
            // 检查电压异常
            Object voltageValue = sensorData.get("voltage");
            if (voltageValue != null) {
                double voltage = Double.parseDouble(voltageValue.toString());
                if (voltage > 30 || voltage < 1) {
                    return true; // 电压异常
                }
            }
            
            return false;
        }
        
        private Alert createAlert(IoTDataMessage data) {
            Alert alert = new Alert();
            alert.setDeviceId(data.getDeviceId());
            alert.setTimestamp(System.currentTimeMillis());
            alert.setAlertType("SENSOR_ANOMALY");
            alert.setSeverity("HIGH");
            alert.setMessage("检测到传感器数据异常");
            alert.setData(data.getSensorData());
            return alert;
        }
        
        @Override
        public ProcessingResult process(ProcessingContext context) {
            // 流处理器的同步处理接口实现
            IoTDataMessage data = context.getTransformedData();
            
            try {
                // 执行数据清洗
                IoTDataMessage cleanedData = cleanData(data);
                context.setTransformedData(cleanedData);
                
                // 检查异常
                if (isAnomalous(data.getDeviceId(), cleanedData)) {
                    Alert alert = createAlert(cleanedData);
                    context.addRoutingHint("alert", alert);
                }
                
                return ProcessingResult.success("流处理完成");
                
            } catch (Exception e) {
                return ProcessingResult.error("流处理失败: " + e.getMessage(), e);
            }
        }
        
        public void start() {
            kafkaStreams.start();
        }
        
        public void stop() {
            kafkaStreams.close();
        }
    }
    
    // 批处理数据处理器
    public class BatchDataProcessor implements DataProcessor {
        
        private final SparkSession sparkSession;
        private final DataFrameWriter<Row> dataFrameWriter;
        
        public BatchDataProcessor() {
            this.sparkSession = SparkSession.builder()
                .appName("IoT Data Batch Processing")
                .config("spark.sql.adaptive.enabled", "true")
                .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
                .getOrCreate();
        }
        
        @Override
        public ProcessingResult process(ProcessingContext context) {
            try {
                IoTDataMessage data = context.getTransformedData();
                
                // 转换为Spark DataFrame
                Dataset<Row> dataFrame = convertToDataFrame(data);
                
                // 执行批处理分析
                Dataset<Row> processedData = performBatchAnalysis(dataFrame);
                
                // 保存结果
                saveProcessedData(processedData);
                
                return ProcessingResult.success("批处理完成");
                
            } catch (Exception e) {
                return ProcessingResult.error("批处理失败: " + e.getMessage(), e);
            }
        }
        
        private Dataset<Row> convertToDataFrame(IoTDataMessage data) {
            // 将IoT数据转换为Spark DataFrame
            List<Row> rows = new ArrayList<>();
            
            Map<String, Object> sensorData = data.getSensorData();
            if (sensorData != null) {
                for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
                    Row row = RowFactory.create(
                        data.getDeviceId(),
                        data.getTimestamp(),
                        entry.getKey(),
                        entry.getValue(),
                        data.getMessageType()
                    );
                    rows.add(row);
                }
            }
            
            StructType schema = new StructType(new StructField[]{
                new StructField("device_id", DataTypes.StringType, false, Metadata.empty()),
                new StructField("timestamp", DataTypes.LongType, false, Metadata.empty()),
                new StructField("sensor_type", DataTypes.StringType, false, Metadata.empty()),
                new StructField("sensor_value", DataTypes.DoubleType, true, Metadata.empty()),
                new StructField("message_type", DataTypes.StringType, true, Metadata.empty())
            });
            
            return sparkSession.createDataFrame(rows, schema);
        }
        
        private Dataset<Row> performBatchAnalysis(Dataset<Row> dataFrame) {
            // 注册临时视图
            dataFrame.createOrReplaceTempView("iot_data");
            
            // 执行SQL分析
            String sql = """
                SELECT 
                    device_id,
                    sensor_type,
                    DATE_FORMAT(FROM_UNIXTIME(timestamp/1000), 'yyyy-MM-dd HH:00:00') as hour,
                    AVG(sensor_value) as avg_value,
                    MIN(sensor_value) as min_value,
                    MAX(sensor_value) as max_value,
                    COUNT(*) as data_count,
                    STDDEV(sensor_value) as std_dev
                FROM iot_data 
                WHERE sensor_value IS NOT NULL
                GROUP BY device_id, sensor_type, DATE_FORMAT(FROM_UNIXTIME(timestamp/1000), 'yyyy-MM-dd HH:00:00')
                ORDER BY device_id, sensor_type, hour
            """;
            
            return sparkSession.sql(sql);
        }
        
        private void saveProcessedData(Dataset<Row> processedData) {
            // 保存到时序数据库
            processedData.write()
                .mode(SaveMode.Append)
                .format("jdbc")
                .option("url", "jdbc:postgresql://localhost:5432/iot_analytics")
                .option("dbtable", "hourly_sensor_stats")
                .option("user", "iot_user")
                .option("password", "iot_password")
                .save();
            
            // 保存到数据湖
            processedData.write()
                .mode(SaveMode.Append)
                .format("parquet")
                .partitionBy("device_id", "hour")
                .save("/data/lake/iot/hourly_stats");
        }
        
        public void shutdown() {
            sparkSession.stop();
        }
    }
    
    // 数据聚合处理器
    public class DataAggregationProcessor implements DataProcessor {
        
        private final Map<String, AggregationWindow> aggregationWindows;
        private final ScheduledExecutorService scheduler;
        
        public DataAggregationProcessor() {
            this.aggregationWindows = new ConcurrentHashMap<>();
            this.scheduler = Executors.newScheduledThreadPool(5);
            
            // 启动定时聚合任务
            startAggregationTasks();
        }
        
        @Override
        public ProcessingResult process(ProcessingContext context) {
            try {
                IoTDataMessage data = context.getTransformedData();
                String deviceId = data.getDeviceId();
                
                // 获取或创建聚合窗口
                AggregationWindow window = aggregationWindows.computeIfAbsent(
                    deviceId, 
                    k -> new AggregationWindow(deviceId, Duration.ofMinutes(5))
                );
                
                // 添加数据到聚合窗口
                window.addData(data);
                
                return ProcessingResult.success("数据已添加到聚合窗口");
                
            } catch (Exception e) {
                return ProcessingResult.error("数据聚合失败: " + e.getMessage(), e);
            }
        }
        
        private void startAggregationTasks() {
            // 每分钟执行一次聚合
            scheduler.scheduleAtFixedRate(this::performAggregation, 1, 1, TimeUnit.MINUTES);
            
            // 每小时清理过期窗口
            scheduler.scheduleAtFixedRate(this::cleanupExpiredWindows, 1, 1, TimeUnit.HOURS);
        }
        
        private void performAggregation() {
            for (AggregationWindow window : aggregationWindows.values()) {
                if (window.isReadyForAggregation()) {
                    try {
                        AggregatedData aggregatedData = window.aggregate();
                        
                        // 发送聚合结果
                        sendAggregatedData(aggregatedData);
                        
                        // 重置窗口
                        window.reset();
                        
                    } catch (Exception e) {
                        log.error("聚合数据失败, deviceId: {}", window.getDeviceId(), e);
                    }
                }
            }
        }
        
        private void cleanupExpiredWindows() {
            long currentTime = System.currentTimeMillis();
            aggregationWindows.entrySet().removeIf(entry -> {
                AggregationWindow window = entry.getValue();
                return currentTime - window.getLastUpdateTime() > Duration.ofHours(2).toMillis();
            });
        }
        
        private void sendAggregatedData(AggregatedData aggregatedData) {
            // 发送到Kafka主题
            // kafkaProducer.send("iot-aggregated-data", aggregatedData);
            
            // 保存到数据库
            // aggregatedDataRepository.save(aggregatedData);
            
            // 更新缓存
            // redisTemplate.opsForValue().set(
            //     "aggregated:" + aggregatedData.getDeviceId(), 
            //     aggregatedData, 
            //     Duration.ofMinutes(10)
            // );
        }
        
        public void shutdown() {
            scheduler.shutdown();
            try {
                if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
                    scheduler.shutdownNow();
                }
            } catch (InterruptedException e) {
                scheduler.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }
    
    // 聚合窗口类
    public static class AggregationWindow {
        private final String deviceId;
        private final Duration windowDuration;
        private final List<IoTDataMessage> dataBuffer;
        private long windowStartTime;
        private long lastUpdateTime;
        
        public AggregationWindow(String deviceId, Duration windowDuration) {
            this.deviceId = deviceId;
            this.windowDuration = windowDuration;
            this.dataBuffer = new ArrayList<>();
            this.windowStartTime = System.currentTimeMillis();
            this.lastUpdateTime = System.currentTimeMillis();
        }
        
        public synchronized void addData(IoTDataMessage data) {
            dataBuffer.add(data);
            lastUpdateTime = System.currentTimeMillis();
        }
        
        public synchronized boolean isReadyForAggregation() {
            long currentTime = System.currentTimeMillis();
            return currentTime - windowStartTime >= windowDuration.toMillis() && !dataBuffer.isEmpty();
        }
        
        public synchronized AggregatedData aggregate() {
            if (dataBuffer.isEmpty()) {
                return null;
            }
            
            AggregatedData aggregated = new AggregatedData();
            aggregated.setDeviceId(deviceId);
            aggregated.setWindowStart(windowStartTime);
            aggregated.setWindowEnd(System.currentTimeMillis());
            aggregated.setDataCount(dataBuffer.size());
            
            // 聚合传感器数据
            Map<String, SensorAggregation> sensorAggregations = new HashMap<>();
            
            for (IoTDataMessage data : dataBuffer) {
                Map<String, Object> sensorData = data.getSensorData();
                if (sensorData != null) {
                    for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
                        String sensorType = entry.getKey();
                        Object value = entry.getValue();
                        
                        if (value instanceof Number) {
                            double numValue = ((Number) value).doubleValue();
                            
                            SensorAggregation aggregation = sensorAggregations.computeIfAbsent(
                                sensorType, 
                                k -> new SensorAggregation(sensorType)
                            );
                            
                            aggregation.addValue(numValue);
                        }
                    }
                }
            }
            
            aggregated.setSensorAggregations(sensorAggregations);
            return aggregated;
        }
        
        public synchronized void reset() {
            dataBuffer.clear();
            windowStartTime = System.currentTimeMillis();
        }
        
        public String getDeviceId() {
            return deviceId;
        }
        
        public long getLastUpdateTime() {
            return lastUpdateTime;
        }
    }
    
    // 传感器聚合类
    public static class SensorAggregation {
        private final String sensorType;
        private double sum;
        private double min;
        private double max;
        private int count;
        private double sumOfSquares;
        
        public SensorAggregation(String sensorType) {
            this.sensorType = sensorType;
            this.sum = 0.0;
            this.min = Double.MAX_VALUE;
            this.max = Double.MIN_VALUE;
            this.count = 0;
            this.sumOfSquares = 0.0;
        }
        
        public void addValue(double value) {
            sum += value;
            min = Math.min(min, value);
            max = Math.max(max, value);
            count++;
            sumOfSquares += value * value;
        }
        
        public double getAverage() {
            return count > 0 ? sum / count : 0.0;
        }
        
        public double getStandardDeviation() {
            if (count <= 1) {
                return 0.0;
            }
            
            double mean = getAverage();
            double variance = (sumOfSquares - count * mean * mean) / (count - 1);
            return Math.sqrt(variance);
        }
        
        // Getter方法
        public String getSensorType() { return sensorType; }
        public double getSum() { return sum; }
        public double getMin() { return min; }
        public double getMax() { return max; }
        public int getCount() { return count; }
    }
    
    // 聚合数据类
    public static class AggregatedData {
        private String deviceId;
        private long windowStart;
        private long windowEnd;
        private int dataCount;
        private Map<String, SensorAggregation> sensorAggregations;
        
        public AggregatedData() {
            this.sensorAggregations = new HashMap<>();
        }
        
        public AggregatedData add(IoTDataMessage data) {
            // 添加数据到聚合中
            dataCount++;
            
            Map<String, Object> sensorData = data.getSensorData();
            if (sensorData != null) {
                for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
                    String sensorType = entry.getKey();
                    Object value = entry.getValue();
                    
                    if (value instanceof Number) {
                        double numValue = ((Number) value).doubleValue();
                        
                        SensorAggregation aggregation = sensorAggregations.computeIfAbsent(
                            sensorType, 
                            k -> new SensorAggregation(sensorType)
                        );
                        
                        aggregation.addValue(numValue);
                    }
                }
            }
            
            return this;
        }
        
        // Getter和Setter方法
        public String getDeviceId() { return deviceId; }
        public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
        
        public long getWindowStart() { return windowStart; }
        public void setWindowStart(long windowStart) { this.windowStart = windowStart; }
        
        public long getWindowEnd() { return windowEnd; }
        public void setWindowEnd(long windowEnd) { this.windowEnd = windowEnd; }
        
        public int getDataCount() { return dataCount; }
        public void setDataCount(int dataCount) { this.dataCount = dataCount; }
        
        public Map<String, SensorAggregation> getSensorAggregations() { return sensorAggregations; }
        public void setSensorAggregations(Map<String, SensorAggregation> sensorAggregations) { this.sensorAggregations = sensorAggregations; }
    }
}

# 2. 实时流处理

# 2.1 Kafka Streams实现

// 示例:Kafka Streams实时处理实现
public class IoTStreamProcessor {
    
    private final Properties streamConfig;
    private final KafkaStreams streams;
    private final StreamsBuilder builder;
    
    public IoTStreamProcessor(String bootstrapServers, String applicationId) {
        this.streamConfig = createStreamConfig(bootstrapServers, applicationId);
        this.builder = new StreamsBuilder();
        buildTopology();
        this.streams = new KafkaStreams(builder.build(), streamConfig);
    }
    
    private Properties createStreamConfig(String bootstrapServers, String applicationId) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
        return props;
    }
    
    private void buildTopology() {
        // 输入流
        KStream<String, String> rawDataStream = builder.stream("iot-raw-data");
        
        // 数据解析和验证
        KStream<String, IoTDataMessage> parsedStream = rawDataStream
            .mapValues(this::parseJsonMessage)
            .filter((key, value) -> value != null)
            .selectKey((key, value) -> value.getDeviceId());
        
        // 数据清洗
        KStream<String, IoTDataMessage> cleanedStream = parsedStream
            .mapValues(this::cleanAndValidateData)
            .filter((key, value) -> value != null);
        
        // 设备状态检测
        KStream<String, DeviceStatus> statusStream = cleanedStream
            .mapValues(this::extractDeviceStatus)
            .filter((key, value) -> value != null);
        
        // 异常检测
        KStream<String, Alert> alertStream = cleanedStream
            .filter(this::detectAnomalies)
            .mapValues(this::createAlert);
        
        // 数据聚合(滑动窗口)
        KTable<Windowed<String>, AggregatedMetrics> aggregatedMetrics = cleanedStream
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
            .aggregate(
                AggregatedMetrics::new,
                (key, value, aggregate) -> aggregate.update(value),
                Materialized.<String, AggregatedMetrics, WindowStore<Bytes, byte[]>>as("aggregated-metrics-store")
                    .withValueSerde(new AggregatedMetricsSerde())
            );
        
        // 趋势分析
        KStream<String, TrendAnalysis> trendStream = aggregatedMetrics
            .toStream()
            .mapValues(this::analyzeTrend)
            .filter((key, value) -> value != null);
        
        // 预测分析
        KStream<String, Prediction> predictionStream = cleanedStream
            .transform(() -> new PredictionTransformer(), "prediction-state-store");
        
        // 输出流
        cleanedStream.to("iot-cleaned-data", Produced.with(Serdes.String(), new IoTDataMessageSerde()));
        statusStream.to("device-status", Produced.with(Serdes.String(), new DeviceStatusSerde()));
        alertStream.to("iot-alerts", Produced.with(Serdes.String(), new AlertSerde()));
        aggregatedMetrics.toStream().to("aggregated-metrics", Produced.with(new WindowedSerde<>(Serdes.String()), new AggregatedMetricsSerde()));
        trendStream.to("trend-analysis", Produced.with(Serdes.String(), new TrendAnalysisSerde()));
        predictionStream.to("predictions", Produced.with(Serdes.String(), new PredictionSerde()));
        
        // 创建状态存储
        StoreBuilder<KeyValueStore<String, PredictionModel>> predictionStoreBuilder = Stores
            .keyValueStoreBuilder(
                Stores.persistentKeyValueStore("prediction-state-store"),
                Serdes.String(),
                new PredictionModelSerde()
            )
            .withCachingEnabled();
        
        builder.addStateStore(predictionStoreBuilder);
    }
    
    private IoTDataMessage parseJsonMessage(String jsonMessage) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            return mapper.readValue(jsonMessage, IoTDataMessage.class);
        } catch (Exception e) {
            log.warn("解析JSON消息失败: {}", jsonMessage, e);
            return null;
        }
    }
    
    private IoTDataMessage cleanAndValidateData(IoTDataMessage data) {
        if (data.getDeviceId() == null || data.getTimestamp() <= 0) {
            return null;
        }
        
        // 时间戳验证(不能是未来时间,不能太旧)
        long currentTime = System.currentTimeMillis();
        if (data.getTimestamp() > currentTime + 60000 || // 未来1分钟
            data.getTimestamp() < currentTime - 24 * 60 * 60 * 1000) { // 过去24小时
            return null;
        }
        
        // 清洗传感器数据
        Map<String, Object> sensorData = data.getSensorData();
        if (sensorData != null) {
            Map<String, Object> cleanedSensorData = new HashMap<>();
            
            for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
                String sensorType = entry.getKey();
                Object value = entry.getValue();
                
                Object cleanedValue = validateSensorValue(sensorType, value);
                if (cleanedValue != null) {
                    cleanedSensorData.put(sensorType, cleanedValue);
                }
            }
            
            data.setSensorData(cleanedSensorData);
        }
        
        return data;
    }
    
    private Object validateSensorValue(String sensorType, Object value) {
        try {
            double numValue = Double.parseDouble(value.toString());
            
            switch (sensorType.toLowerCase()) {
                case "temperature":
                    return (numValue >= -50 && numValue <= 100) ? numValue : null;
                case "humidity":
                    return (numValue >= 0 && numValue <= 100) ? numValue : null;
                case "pressure":
                    return (numValue >= 0 && numValue <= 2000) ? numValue : null;
                case "voltage":
                    return (numValue >= 0 && numValue <= 50) ? numValue : null;
                case "current":
                    return (numValue >= 0 && numValue <= 100) ? numValue : null;
                case "power":
                    return (numValue >= 0 && numValue <= 10000) ? numValue : null;
                case "energy":
                    return (numValue >= 0) ? numValue : null;
                default:
                    return numValue;
            }
        } catch (NumberFormatException e) {
            return null;
        }
    }
    
    private DeviceStatus extractDeviceStatus(IoTDataMessage data) {
        DeviceStatus status = new DeviceStatus();
        status.setDeviceId(data.getDeviceId());
        status.setTimestamp(data.getTimestamp());
        status.setOnline(true);
        
        // 根据传感器数据判断设备健康状态
        Map<String, Object> sensorData = data.getSensorData();
        if (sensorData != null) {
            // 检查电池电量
            Object batteryValue = sensorData.get("battery");
            if (batteryValue != null) {
                double battery = Double.parseDouble(batteryValue.toString());
                status.setBatteryLevel(battery);
                
                if (battery < 10) {
                    status.setHealthStatus("LOW_BATTERY");
                } else if (battery < 20) {
                    status.setHealthStatus("WARNING");
                } else {
                    status.setHealthStatus("HEALTHY");
                }
            }
            
            // 检查信号强度
            Object signalValue = sensorData.get("signal_strength");
            if (signalValue != null) {
                int signal = Integer.parseInt(signalValue.toString());
                status.setSignalStrength(signal);
                
                if (signal < -90) {
                    status.setConnectivityStatus("POOR");
                } else if (signal < -70) {
                    status.setConnectivityStatus("FAIR");
                } else {
                    status.setConnectivityStatus("GOOD");
                }
            }
        }
        
        return status;
    }
    
    private boolean detectAnomalies(String key, IoTDataMessage data) {
        Map<String, Object> sensorData = data.getSensorData();
        if (sensorData == null) {
            return false;
        }
        
        // 温度异常检测
        Object tempValue = sensorData.get("temperature");
        if (tempValue != null) {
            double temperature = Double.parseDouble(tempValue.toString());
            if (temperature > 80 || temperature < -20) {
                return true;
            }
        }
        
        // 湿度异常检测
        Object humidityValue = sensorData.get("humidity");
        if (humidityValue != null) {
            double humidity = Double.parseDouble(humidityValue.toString());
            if (humidity > 95 || humidity < 5) {
                return true;
            }
        }
        
        // 电压异常检测
        Object voltageValue = sensorData.get("voltage");
        if (voltageValue != null) {
            double voltage = Double.parseDouble(voltageValue.toString());
            if (voltage > 30 || voltage < 1) {
                return true;
            }
        }
        
        // 功率异常检测
        Object powerValue = sensorData.get("power");
        if (powerValue != null) {
            double power = Double.parseDouble(powerValue.toString());
            if (power > 5000) {
                return true;
            }
        }
        
        return false;
    }
    
    private Alert createAlert(IoTDataMessage data) {
        Alert alert = new Alert();
        alert.setDeviceId(data.getDeviceId());
        alert.setTimestamp(System.currentTimeMillis());
        alert.setAlertType("SENSOR_ANOMALY");
        alert.setSeverity(determineSeverity(data));
        alert.setMessage(generateAlertMessage(data));
        alert.setData(data.getSensorData());
        return alert;
    }
    
    private String determineSeverity(IoTDataMessage data) {
        Map<String, Object> sensorData = data.getSensorData();
        
        // 检查严重异常
        Object tempValue = sensorData.get("temperature");
        if (tempValue != null) {
            double temperature = Double.parseDouble(tempValue.toString());
            if (temperature > 90 || temperature < -30) {
                return "CRITICAL";
            }
        }
        
        Object voltageValue = sensorData.get("voltage");
        if (voltageValue != null) {
            double voltage = Double.parseDouble(voltageValue.toString());
            if (voltage > 35 || voltage < 0.5) {
                return "CRITICAL";
            }
        }
        
        return "HIGH";
    }
    
    private String generateAlertMessage(IoTDataMessage data) {
        StringBuilder message = new StringBuilder("检测到传感器数据异常: ");
        Map<String, Object> sensorData = data.getSensorData();
        
        for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
            String sensorType = entry.getKey();
            Object value = entry.getValue();
            
            if (isAnomalousValue(sensorType, value)) {
                message.append(String.format("%s=%s ", sensorType, value));
            }
        }
        
        return message.toString();
    }
    
    private boolean isAnomalousValue(String sensorType, Object value) {
        try {
            double numValue = Double.parseDouble(value.toString());
            
            switch (sensorType.toLowerCase()) {
                case "temperature":
                    return numValue > 80 || numValue < -20;
                case "humidity":
                    return numValue > 95 || numValue < 5;
                case "voltage":
                    return numValue > 30 || numValue < 1;
                case "power":
                    return numValue > 5000;
                default:
                    return false;
            }
        } catch (NumberFormatException e) {
            return false;
        }
    }
    
    private TrendAnalysis analyzeTrend(AggregatedMetrics metrics) {
        if (metrics == null || metrics.getDataPoints().size() < 2) {
            return null;
        }
        
        TrendAnalysis analysis = new TrendAnalysis();
        analysis.setDeviceId(metrics.getDeviceId());
        analysis.setTimestamp(System.currentTimeMillis());
        analysis.setWindowStart(metrics.getWindowStart());
        analysis.setWindowEnd(metrics.getWindowEnd());
        
        // 计算趋势
        Map<String, Double> trends = new HashMap<>();
        Map<String, List<Double>> sensorValues = metrics.getSensorValues();
        
        for (Map.Entry<String, List<Double>> entry : sensorValues.entrySet()) {
            String sensorType = entry.getKey();
            List<Double> values = entry.getValue();
            
            if (values.size() >= 2) {
                double trend = calculateTrend(values);
                trends.put(sensorType, trend);
            }
        }
        
        analysis.setTrends(trends);
        return analysis;
    }
    
    private double calculateTrend(List<Double> values) {
        if (values.size() < 2) {
            return 0.0;
        }
        
        // 简单线性回归计算趋势
        int n = values.size();
        double sumX = 0, sumY = 0, sumXY = 0, sumX2 = 0;
        
        for (int i = 0; i < n; i++) {
            double x = i;
            double y = values.get(i);
            
            sumX += x;
            sumY += y;
            sumXY += x * y;
            sumX2 += x * x;
        }
        
        // 计算斜率(趋势)
        double slope = (n * sumXY - sumX * sumY) / (n * sumX2 - sumX * sumX);
        return slope;
    }
    
    // 预测转换器
    private class PredictionTransformer implements Transformer<String, IoTDataMessage, KeyValue<String, Prediction>> {
        
        private KeyValueStore<String, PredictionModel> stateStore;
        
        @Override
        public void init(ProcessorContext context) {
            this.stateStore = context.getStateStore("prediction-state-store");
        }
        
        @Override
        public KeyValue<String, Prediction> transform(String key, IoTDataMessage value) {
            if (value == null || value.getSensorData() == null) {
                return null;
            }
            
            // 获取或创建预测模型
            PredictionModel model = stateStore.get(key);
            if (model == null) {
                model = new PredictionModel(key);
            }
            
            // 更新模型
            model.addDataPoint(value);
            
            // 生成预测
            Prediction prediction = model.predict();
            
            // 保存更新的模型
            stateStore.put(key, model);
            
            return prediction != null ? KeyValue.pair(key, prediction) : null;
        }
        
        @Override
        public void close() {
            // 清理资源
        }
    }
    
    public void start() {
        streams.start();
    }
    
    public void stop() {
        streams.close();
    }
    
    public KafkaStreams.State getState() {
        return streams.state();
    }
}

# 3. 数据存储策略

# 3.1 时序数据库设计

// 示例:时序数据库操作实现
public class TimeSeriesDataManager {
    
    private final InfluxDBClient influxDBClient;
    private final WriteApiBlocking writeApi;
    private final QueryApi queryApi;
    private final String bucket;
    private final String organization;
    
    public TimeSeriesDataManager(String url, String token, String bucket, String organization) {
        this.influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray());
        this.writeApi = influxDBClient.getWriteApiBlocking();
        this.queryApi = influxDBClient.getQueryApi();
        this.bucket = bucket;
        this.organization = organization;
    }
    
    // 写入传感器数据
    public void writeSensorData(IoTDataMessage data) {
        try {
            Map<String, Object> sensorData = data.getSensorData();
            if (sensorData == null || sensorData.isEmpty()) {
                return;
            }
            
            List<Point> points = new ArrayList<>();
            
            for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
                String sensorType = entry.getKey();
                Object value = entry.getValue();
                
                if (value instanceof Number) {
                    Point point = Point.measurement("sensor_data")
                        .addTag("device_id", data.getDeviceId())
                        .addTag("sensor_type", sensorType)
                        .addTag("message_type", data.getMessageType())
                        .addField("value", ((Number) value).doubleValue())
                        .time(data.getTimestamp(), WritePrecision.MS);
                    
                    points.add(point);
                }
            }
            
            if (!points.isEmpty()) {
                writeApi.writePoints(bucket, organization, points);
            }
            
        } catch (Exception e) {
            log.error("写入传感器数据失败, deviceId: {}", data.getDeviceId(), e);
            throw new DataWriteException("写入时序数据失败", e);
        }
    }
    
    // 批量写入数据
    public void writeBatchSensorData(List<IoTDataMessage> dataList) {
        try {
            List<Point> allPoints = new ArrayList<>();
            
            for (IoTDataMessage data : dataList) {
                Map<String, Object> sensorData = data.getSensorData();
                if (sensorData != null) {
                    for (Map.Entry<String, Object> entry : sensorData.entrySet()) {
                        String sensorType = entry.getKey();
                        Object value = entry.getValue();
                        
                        if (value instanceof Number) {
                            Point point = Point.measurement("sensor_data")
                                .addTag("device_id", data.getDeviceId())
                                .addTag("sensor_type", sensorType)
                                .addTag("message_type", data.getMessageType())
                                .addField("value", ((Number) value).doubleValue())
                                .time(data.getTimestamp(), WritePrecision.MS);
                            
                            allPoints.add(point);
                        }
                    }
                }
            }
            
            if (!allPoints.isEmpty()) {
                writeApi.writePoints(bucket, organization, allPoints);
            }
            
        } catch (Exception e) {
            log.error("批量写入传感器数据失败", e);
            throw new DataWriteException("批量写入时序数据失败", e);
        }
    }
    
    // 查询设备最新数据
    public List<SensorDataPoint> getLatestSensorData(String deviceId, int limit) {
        String query = String.format(
            "from(bucket: \"%s\") " +
            "|> range(start: -24h) " +
            "|> filter(fn: (r) => r._measurement == \"sensor_data\") " +
            "|> filter(fn: (r) => r.device_id == \"%s\") " +
            "|> sort(columns: [\"_time\"], desc: true) " +
            "|> limit(n: %d)",
            bucket, deviceId, limit
        );
        
        return executeQuery(query);
    }
    
    // 查询时间范围内的数据
    public List<SensorDataPoint> getSensorDataByTimeRange(
            String deviceId, 
            String sensorType, 
            long startTime, 
            long endTime) {
        
        String query = String.format(
            "from(bucket: \"%s\") " +
            "|> range(start: %s, stop: %s) " +
            "|> filter(fn: (r) => r._measurement == \"sensor_data\") " +
            "|> filter(fn: (r) => r.device_id == \"%s\") " +
            "|> filter(fn: (r) => r.sensor_type == \"%s\") " +
            "|> sort(columns: [\"_time\"])",
            bucket, 
            Instant.ofEpochMilli(startTime),
            Instant.ofEpochMilli(endTime),
            deviceId, 
            sensorType
        );
        
        return executeQuery(query);
    }
    
    // 聚合查询
    public List<AggregatedSensorData> getAggregatedSensorData(
            String deviceId, 
            String sensorType, 
            long startTime, 
            long endTime, 
            String aggregationWindow) {
        
        String query = String.format(
            "from(bucket: \"%s\") " +
            "|> range(start: %s, stop: %s) " +
            "|> filter(fn: (r) => r._measurement == \"sensor_data\") " +
            "|> filter(fn: (r) => r.device_id == \"%s\") " +
            "|> filter(fn: (r) => r.sensor_type == \"%s\") " +
            "|> aggregateWindow(every: %s, fn: mean, createEmpty: false) " +
            "|> yield(name: \"mean\")",
            bucket,
            Instant.ofEpochMilli(startTime),
            Instant.ofEpochMilli(endTime),
            deviceId,
            sensorType,
            aggregationWindow
        );
        
        return executeAggregationQuery(query);
    }
    
    // 多指标聚合查询
    public Map<String, List<AggregatedSensorData>> getMultiMetricAggregatedData(
            String deviceId, 
            List<String> sensorTypes, 
            long startTime, 
            long endTime, 
            String aggregationWindow) {
        
        Map<String, List<AggregatedSensorData>> result = new HashMap<>();
        
        for (String sensorType : sensorTypes) {
   List<AggregatedSensorData> data = getAggregatedSensorData(
                deviceId, sensorType, startTime, endTime, aggregationWindow
            );
            result.put(sensorType, data);
        }
        
        return result;
    }
    
    // 统计分析查询
    public SensorStatistics getSensorStatistics(
            String deviceId, 
            String sensorType, 
            long startTime, 
            long endTime) {
        
        String query = String.format(
            "data = from(bucket: \"%s\") " +
            "|> range(start: %s, stop: %s) " +
            "|> filter(fn: (r) => r._measurement == \"sensor_data\") " +
            "|> filter(fn: (r) => r.device_id == \"%s\") " +
            "|> filter(fn: (r) => r.sensor_type == \"%s\") \n" +
            "mean = data |> mean() |> yield(name: \"mean\") \n" +
            "min = data |> min() |> yield(name: \"min\") \n" +
            "max = data |> max() |> yield(name: \"max\") \n" +
            "count = data |> count() |> yield(name: \"count\") \n" +
            "stddev = data |> stddev() |> yield(name: \"stddev\")",
            bucket,
            Instant.ofEpochMilli(startTime),
            Instant.ofEpochMilli(endTime),
            deviceId,
            sensorType
        );
        
        return executeStatisticsQuery(query);
    }
    
    private List<SensorDataPoint> executeQuery(String query) {
        List<SensorDataPoint> dataPoints = new ArrayList<>();
        
        try {
            List<FluxTable> tables = queryApi.query(query, organization);
            
            for (FluxTable table : tables) {
                for (FluxRecord record : table.getRecords()) {
                    SensorDataPoint point = new SensorDataPoint();
                    point.setDeviceId((String) record.getValueByKey("device_id"));
                    point.setSensorType((String) record.getValueByKey("sensor_type"));
                    point.setValue((Double) record.getValue());
                    point.setTimestamp(record.getTime().toEpochMilli());
                    
                    dataPoints.add(point);
                }
            }
            
        } catch (Exception e) {
            log.error("执行查询失败: {}", query, e);
            throw new DataQueryException("时序数据查询失败", e);
        }
        
        return dataPoints;
    }
    
    private List<AggregatedSensorData> executeAggregationQuery(String query) {
        List<AggregatedSensorData> aggregatedData = new ArrayList<>();
        
        try {
            List<FluxTable> tables = queryApi.query(query, organization);
            
            for (FluxTable table : tables) {
                for (FluxRecord record : table.getRecords()) {
                    AggregatedSensorData data = new AggregatedSensorData();
                    data.setDeviceId((String) record.getValueByKey("device_id"));
                    data.setSensorType((String) record.getValueByKey("sensor_type"));
                    data.setAggregatedValue((Double) record.getValue());
                    data.setTimestamp(record.getTime().toEpochMilli());
                    data.setAggregationType("mean");
                    
                    aggregatedData.add(data);
                }
            }
            
        } catch (Exception e) {
            log.error("执行聚合查询失败: {}", query, e);
            throw new DataQueryException("聚合数据查询失败", e);
        }
        
        return aggregatedData;
    }
    
    private SensorStatistics executeStatisticsQuery(String query) {
        SensorStatistics statistics = new SensorStatistics();
        
        try {
            List<FluxTable> tables = queryApi.query(query, organization);
            
            for (FluxTable table : tables) {
                for (FluxRecord record : table.getRecords()) {
                    String resultName = (String) record.getValueByKey("result");
                    Double value = (Double) record.getValue();
                    
                    switch (resultName) {
                        case "mean":
                            statistics.setMean(value);
                            break;
                        case "min":
                            statistics.setMin(value);
                            break;
                        case "max":
                            statistics.setMax(value);
                            break;
                        case "count":
                            statistics.setCount(value.longValue());
                            break;
                        case "stddev":
                            statistics.setStandardDeviation(value);
                            break;
                    }
                }
            }
            
        } catch (Exception e) {
            log.error("执行统计查询失败: {}", query, e);
            throw new DataQueryException("统计数据查询失败", e);
        }
        
        return statistics;
    }
    
    // 数据清理
    public void cleanupOldData(int retentionDays) {
        try {
            String deleteQuery = String.format(
                "from(bucket: \"%s\") " +
                "|> range(start: -365d, stop: -%dd) " +
                "|> filter(fn: (r) => r._measurement == \"sensor_data\") " +
                "|> drop()",
                bucket, retentionDays
            );
            
            queryApi.query(deleteQuery, organization);
            log.info("清理了{}天前的历史数据", retentionDays);
            
        } catch (Exception e) {
            log.error("清理历史数据失败", e);
            throw new DataCleanupException("数据清理失败", e);
        }
    }
    
    public void close() {
        if (influxDBClient != null) {
            influxDBClient.close();
        }
    }
    
    // 数据点类
    public static class SensorDataPoint {
        private String deviceId;
        private String sensorType;
        private double value;
        private long timestamp;
        
        // Getter和Setter方法
        public String getDeviceId() { return deviceId; }
        public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
        
        public String getSensorType() { return sensorType; }
        public void setSensorType(String sensorType) { this.sensorType = sensorType; }
        
        public double getValue() { return value; }
        public void setValue(double value) { this.value = value; }
        
        public long getTimestamp() { return timestamp; }
        public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
    }
    
    // 聚合数据类
    public static class AggregatedSensorData {
        private String deviceId;
        private String sensorType;
        private double aggregatedValue;
        private long timestamp;
        private String aggregationType;
        
        // Getter和Setter方法
        public String getDeviceId() { return deviceId; }
        public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
        
        public String getSensorType() { return sensorType; }
        public void setSensorType(String sensorType) { this.sensorType = sensorType; }
        
        public double getAggregatedValue() { return aggregatedValue; }
        public void setAggregatedValue(double aggregatedValue) { this.aggregatedValue = aggregatedValue; }
        
        public long getTimestamp() { return timestamp; }
        public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
        
        public String getAggregationType() { return aggregationType; }
        public void setAggregationType(String aggregationType) { this.aggregationType = aggregationType; }
    }
    
    // 统计数据类
    public static class SensorStatistics {
        private double mean;
        private double min;
        private double max;
        private long count;
        private double standardDeviation;
        
        // Getter和Setter方法
        public double getMean() { return mean; }
        public void setMean(double mean) { this.mean = mean; }
        
        public double getMin() { return min; }
        public void setMin(double min) { this.min = min; }
        
        public double getMax() { return max; }
        public void setMax(double max) { this.max = max; }
        
        public long getCount() { return count; }
        public void setCount(long count) { this.count = count; }
        
        public double getStandardDeviation() { return standardDeviation; }
        public void setStandardDeviation(double standardDeviation) { this.standardDeviation = standardDeviation; }
    }
}

# 4. 数据分析与机器学习

# 4.1 异常检测算法

// 示例:异常检测算法实现
public class AnomalyDetectionEngine {
    
    private final Map<String, AnomalyDetector> detectors;
    private final AlertManager alertManager;
    private final MetricsCollector metricsCollector;
    
    public AnomalyDetectionEngine(AlertManager alertManager, MetricsCollector metricsCollector) {
        this.detectors = new ConcurrentHashMap<>();
        this.alertManager = alertManager;
        this.metricsCollector = metricsCollector;
        
        initializeDetectors();
    }
    
    private void initializeDetectors() {
        // 统计异常检测器
        detectors.put("statistical", new StatisticalAnomalyDetector());
        
        // 基于机器学习的异常检测器
        detectors.put("isolation_forest", new IsolationForestDetector());
        
        // 时间序列异常检测器
        detectors.put("time_series", new TimeSeriesAnomalyDetector());
        
        // 阈值异常检测器
        detectors.put("threshold", new ThresholdAnomalyDetector());
    }
    
    // 检测异常
    public List<AnomalyResult> detectAnomalies(String deviceId, List<SensorDataPoint> dataPoints) {
        List<AnomalyResult> allResults = new ArrayList<>();
        
        for (AnomalyDetector detector : detectors.values()) {
            try {
                List<AnomalyResult> results = detector.detect(deviceId, dataPoints);
                allResults.addAll(results);
                
                // 记录检测指标
                metricsCollector.incrementCounter("anomaly.detection." + detector.getName());
                
            } catch (Exception e) {
                log.error("异常检测失败, detector: {}, deviceId: {}", detector.getName(), deviceId, e);
                metricsCollector.incrementCounter("anomaly.detection.error");
            }
        }
        
        // 处理检测结果
        processAnomalyResults(deviceId, allResults);
        
        return allResults;
    }
    
    private void processAnomalyResults(String deviceId, List<AnomalyResult> results) {
        for (AnomalyResult result : results) {
            if (result.isAnomaly()) {
                // 发送告警
                Alert alert = createAnomalyAlert(deviceId, result);
                alertManager.sendAlert(alert);
                
                // 记录异常指标
                metricsCollector.incrementCounter("anomaly.detected");
                metricsCollector.recordGauge("anomaly.score", result.getAnomalyScore());
            }
        }
    }
    
    private Alert createAnomalyAlert(String deviceId, AnomalyResult result) {
        Alert alert = new Alert();
        alert.setDeviceId(deviceId);
        alert.setTimestamp(System.currentTimeMillis());
        alert.setAlertType("ANOMALY_DETECTED");
        alert.setSeverity(determineSeverity(result.getAnomalyScore()));
        alert.setMessage(String.format("检测到异常: %s, 异常分数: %.2f", 
            result.getDescription(), result.getAnomalyScore()));
        alert.setData(Map.of(
            "detector", result.getDetectorName(),
            "anomaly_score", result.getAnomalyScore(),
            "sensor_type", result.getSensorType(),
            "value", result.getValue()
        ));
        return alert;
    }
    
    private String determineSeverity(double anomalyScore) {
        if (anomalyScore >= 0.8) {
            return "CRITICAL";
        } else if (anomalyScore >= 0.6) {
            return "HIGH";
        } else if (anomalyScore >= 0.4) {
            return "MEDIUM";
        } else {
            return "LOW";
        }
    }
    
    // 统计异常检测器
    public static class StatisticalAnomalyDetector implements AnomalyDetector {
        
        private static final double Z_SCORE_THRESHOLD = 3.0;
        
        @Override
        public String getName() {
            return "statistical";
        }
        
        @Override
        public List<AnomalyResult> detect(String deviceId, List<SensorDataPoint> dataPoints) {
            List<AnomalyResult> results = new ArrayList<>();
            
            // 按传感器类型分组
            Map<String, List<SensorDataPoint>> groupedData = dataPoints.stream()
                .collect(Collectors.groupingBy(SensorDataPoint::getSensorType));
            
            for (Map.Entry<String, List<SensorDataPoint>> entry : groupedData.entrySet()) {
                String sensorType = entry.getKey();
                List<SensorDataPoint> sensorData = entry.getValue();
                
                if (sensorData.size() < 10) {
                    continue; // 数据点太少,跳过
                }
                
                // 计算统计指标
                double[] values = sensorData.stream()
                    .mapToDouble(SensorDataPoint::getValue)
                    .toArray();
                
                double mean = Arrays.stream(values).average().orElse(0.0);
                double stdDev = calculateStandardDeviation(values, mean);
                
                // 检测异常
                for (SensorDataPoint point : sensorData) {
                    double zScore = Math.abs((point.getValue() - mean) / stdDev);
                    
                    if (zScore > Z_SCORE_THRESHOLD) {
                        AnomalyResult result = new AnomalyResult();
                        result.setDeviceId(deviceId);
                        result.setSensorType(sensorType);
                        result.setValue(point.getValue());
                        result.setTimestamp(point.getTimestamp());
                        result.setAnomaly(true);
                        result.setAnomalyScore(Math.min(zScore / 5.0, 1.0)); // 归一化到0-1
                        result.setDetectorName(getName());
                        result.setDescription(String.format("Z-Score异常: %.2f (阈值: %.1f)", zScore, Z_SCORE_THRESHOLD));
                        
                        results.add(result);
                    }
                }
            }
            
            return results;
        }
        
        private double calculateStandardDeviation(double[] values, double mean) {
            double sumSquaredDiffs = Arrays.stream(values)
                .map(value -> Math.pow(value - mean, 2))
                .sum();
            
            return Math.sqrt(sumSquaredDiffs / values.length);
        }
    }
    
    // 孤立森林异常检测器
    public static class IsolationForestDetector implements AnomalyDetector {
        
        private static final int NUM_TREES = 100;
        private static final int SAMPLE_SIZE = 256;
        private static final double ANOMALY_THRESHOLD = 0.6;
        
        @Override
        public String getName() {
            return "isolation_forest";
        }
        
        @Override
        public List<AnomalyResult> detect(String deviceId, List<SensorDataPoint> dataPoints) {
            List<AnomalyResult> results = new ArrayList<>();
            
            if (dataPoints.size() < 50) {
                return results; // 数据点太少
            }
            
            // 准备特征数据
            double[][] features = prepareFeatures(dataPoints);
            
            // 构建孤立森林
            IsolationForest forest = new IsolationForest(NUM_TREES, SAMPLE_SIZE);
            forest.fit(features);
            
            // 计算异常分数
            double[] anomalyScores = forest.anomalyScore(features);
            
            // 检测异常
            for (int i = 0; i < dataPoints.size(); i++) {
                SensorDataPoint point = dataPoints.get(i);
                double score = anomalyScores[i];
                
                if (score > ANOMALY_THRESHOLD) {
                    AnomalyResult result = new AnomalyResult();
                    result.setDeviceId(deviceId);
                    result.setSensorType(point.getSensorType());
                    result.setValue(point.getValue());
                    result.setTimestamp(point.getTimestamp());
                    result.setAnomaly(true);
                    result.setAnomalyScore(score);
                    result.setDetectorName(getName());
                    result.setDescription(String.format("孤立森林异常: %.2f (阈值: %.1f)", score, ANOMALY_THRESHOLD));
                    
                    results.add(result);
                }
            }
            
            return results;
        }
        
        private double[][] prepareFeatures(List<SensorDataPoint> dataPoints) {
            // 提取特征:值、时间差、移动平均等
            int numFeatures = 4;
            double[][] features = new double[dataPoints.size()][numFeatures];
            
            for (int i = 0; i < dataPoints.size(); i++) {
                SensorDataPoint point = dataPoints.get(i);
                
                // 特征1: 传感器值
                features[i][0] = point.getValue();
                
                // 特征2: 时间差(与前一个点)
                if (i > 0) {
                    features[i][1] = point.getTimestamp() - dataPoints.get(i - 1).getTimestamp();
                } else {
                    features[i][1] = 0;
                }
                
                // 特征3: 移动平均(窗口大小5)
                int windowStart = Math.max(0, i - 4);
                double sum = 0;
                int count = 0;
                for (int j = windowStart; j <= i; j++) {
                    sum += dataPoints.get(j).getValue();
                    count++;
                }
                features[i][2] = sum / count;
                
                // 特征4: 变化率
                if (i > 0) {
                    double prevValue = dataPoints.get(i - 1).getValue();
                    features[i][3] = (point.getValue() - prevValue) / Math.max(Math.abs(prevValue), 1e-6);
                } else {
                    features[i][3] = 0;
                }
            }
            
            return features;
        }
    }
    
    // 时间序列异常检测器
    public static class TimeSeriesAnomalyDetector implements AnomalyDetector {
        
        private static final int SEASONAL_PERIOD = 24; // 24小时周期
        private static final double ANOMALY_THRESHOLD = 2.5;
        
        @Override
        public String getName() {
            return "time_series";
        }
        
        @Override
        public List<AnomalyResult> detect(String deviceId, List<SensorDataPoint> dataPoints) {
            List<AnomalyResult> results = new ArrayList<>();
            
            // 按传感器类型分组
            Map<String, List<SensorDataPoint>> groupedData = dataPoints.stream()
                .collect(Collectors.groupingBy(SensorDataPoint::getSensorType));
            
            for (Map.Entry<String, List<SensorDataPoint>> entry : groupedData.entrySet()) {
                String sensorType = entry.getKey();
                List<SensorDataPoint> sensorData = entry.getValue();
                
                if (sensorData.size() < SEASONAL_PERIOD * 2) {
                    continue; // 数据点太少
                }
                
                // 排序数据点
                sensorData.sort(Comparator.comparing(SensorDataPoint::getTimestamp));
                
                // 季节性分解
                SeasonalDecomposition decomposition = performSeasonalDecomposition(sensorData);
                
                // 检测异常
                for (int i = 0; i < sensorData.size(); i++) {
                    SensorDataPoint point = sensorData.get(i);
                    double residual = decomposition.getResiduals()[i];
                    double threshold = ANOMALY_THRESHOLD * decomposition.getResidualStdDev();
                    
                    if (Math.abs(residual) > threshold) {
                        AnomalyResult result = new AnomalyResult();
                        result.setDeviceId(deviceId);
                        result.setSensorType(sensorType);
                        result.setValue(point.getValue());
                        result.setTimestamp(point.getTimestamp());
                        result.setAnomaly(true);
                        result.setAnomalyScore(Math.min(Math.abs(residual) / threshold / 2.0, 1.0));
                        result.setDetectorName(getName());
                        result.setDescription(String.format("时间序列异常: 残差=%.2f, 阈值=%.2f", residual, threshold));
                        
                        results.add(result);
                    }
                }
            }
            
            return results;
        }
        
        private SeasonalDecomposition performSeasonalDecomposition(List<SensorDataPoint> dataPoints) {
            double[] values = dataPoints.stream()
                .mapToDouble(SensorDataPoint::getValue)
                .toArray();
            
            // 简化的季节性分解
            double[] trend = calculateTrend(values);
            double[] seasonal = calculateSeasonal(values, trend);
            double[] residuals = calculateResiduals(values, trend, seasonal);
            
            double residualStdDev = calculateStandardDeviation(residuals);
            
            return new SeasonalDecomposition(trend, seasonal, residuals, residualStdDev);
        }
        
        private double[] calculateTrend(double[] values) {
            // 使用移动平均计算趋势
            int windowSize = SEASONAL_PERIOD;
            double[] trend = new double[values.length];
            
            for (int i = 0; i < values.length; i++) {
                int start = Math.max(0, i - windowSize / 2);
                int end = Math.min(values.length - 1, i + windowSize / 2);
                
                double sum = 0;
                int count = 0;
                for (int j = start; j <= end; j++) {
                    sum += values[j];
                    count++;
                }
                
                trend[i] = sum / count;
            }
            
            return trend;
        }
        
        private double[] calculateSeasonal(double[] values, double[] trend) {
            double[] seasonal = new double[values.length];
            double[] seasonalPattern = new double[SEASONAL_PERIOD];
            
            // 计算季节性模式
            for (int i = 0; i < SEASONAL_PERIOD; i++) {
                double sum = 0;
                int count = 0;
                
                for (int j = i; j < values.length; j += SEASONAL_PERIOD) {
                    sum += values[j] - trend[j];
                    count++;
                }
                
                seasonalPattern[i] = count > 0 ? sum / count : 0;
            }
            
            // 应用季节性模式
            for (int i = 0; i < values.length; i++) {
                seasonal[i] = seasonalPattern[i % SEASONAL_PERIOD];
            }
            
            return seasonal;
        }
        
        private double[] calculateResiduals(double[] values, double[] trend, double[] seasonal) {
            double[] residuals = new double[values.length];
            
            for (int i = 0; i < values.length; i++) {
                residuals[i] = values[i] - trend[i] - seasonal[i];
            }
            
            return residuals;
        }
        
        private double calculateStandardDeviation(double[] values) {
            double mean = Arrays.stream(values).average().orElse(0.0);
            double sumSquaredDiffs = Arrays.stream(values)
                .map(value -> Math.pow(value - mean, 2))
                .sum();
            
            return Math.sqrt(sumSquaredDiffs / values.length);
        }
    }
    
    // 季节性分解结果类
    public static class SeasonalDecomposition {
        private final double[] trend;
        private final double[] seasonal;
        private final double[] residuals;
        private final double residualStdDev;
        
        public SeasonalDecomposition(double[] trend, double[] seasonal, double[] residuals, double residualStdDev) {
            this.trend = trend;
            this.seasonal = seasonal;
            this.residuals = residuals;
            this.residualStdDev = residualStdDev;
        }
        
        public double[] getTrend() { return trend; }
        public double[] getSeasonal() { return seasonal; }
        public double[] getResiduals() { return residuals; }
        public double getResidualStdDev() { return residualStdDev; }
    }
}

# 5. 最佳实践总结

# 5.1 设计原则

  • 可扩展性: 采用微服务架构,支持水平扩展
  • 高可用性: 实现多副本部署和故障转移
  • 数据一致性: 确保数据处理的准确性和完整性
  • 实时性: 优化处理延迟,满足实时性要求
  • 容错性: 实现优雅降级和错误恢复

# 5.2 开发建议

  • 数据建模: 合理设计数据模型,支持高效查询
  • 性能优化: 使用缓存、批处理和并行处理
  • 监控告警: 实现全面的监控和告警机制
  • 数据治理: 建立数据质量管控流程
  • 安全保护: 实现数据加密和访问控制

# 6. 下一步学习

  • 学习应用服务层的设计和实现
  • 深入了解微服务架构模式
  • 掌握容器化部署和运维
  • 研究边缘计算和雾计算技术