设备层架构

# 设备层架构

# 📖 章节概述

设备层是物联网系统的基础层,负责数据采集、设备控制和边缘计算。本章将深入讲解传感器、执行器、嵌入式系统设计以及边缘计算节点的架构与实现。

# 🎯 学习目标

  • 掌握各类传感器的工作原理和应用场景
  • 学会设计高效的嵌入式系统架构
  • 理解边缘计算节点的部署和管理
  • 掌握设备固件开发的最佳实践

# 1. 设备层架构概览

# 1.1 设备层组成

graph TB
    subgraph "设备层 Device Layer"
        subgraph "感知设备 Sensing Devices"
            S1[温湿度传感器]
            S2[压力传感器]
            S3[光照传感器]
            S4[加速度传感器]
            S5[气体传感器]
        end
        
        subgraph "执行设备 Actuating Devices"
            A1[电机控制器]
            A2[继电器模块]
            A3[LED控制器]
            A4[阀门控制器]
            A5[蜂鸣器]
        end
        
        subgraph "计算设备 Computing Devices"
            C1[微控制器MCU]
            C2[单板计算机]
            C3[边缘网关]
            C4[工业PC]
        end
        
        subgraph "通信设备 Communication Devices"
            N1[WiFi模块]
            N2[蓝牙模块]
            N3[LoRa模块]
            N4[4G/5G模块]
        end
    end
    
    S1 --> C1
    S2 --> C1
    S3 --> C2
    S4 --> C2
    S5 --> C3
    
    C1 --> A1
    C1 --> A2
    C2 --> A3
    C2 --> A4
    C3 --> A5
    
    C1 --> N1
    C2 --> N2
    C3 --> N3
    C4 --> N4

# 1.2 设备分类

设备类型 功能特点 典型应用 技术要求
传感器节点 数据采集 环境监测 低功耗、高精度
执行器节点 设备控制 自动化控制 实时响应、可靠性
网关设备 协议转换 数据汇聚 多协议、高并发
边缘计算 本地处理 智能分析 计算能力、存储

# 2. 传感器系统设计

# 2.1 传感器选型原则

flowchart TD
    A[传感器选型] --> B{精度要求}
    B -->|高精度| C[工业级传感器]
    B -->|一般精度| D[商用传感器]
    B -->|低精度| E[消费级传感器]
    
    C --> F{功耗要求}
    D --> F
    E --> F
    
    F -->|超低功耗| G[间歇式采样]
    F -->|低功耗| H[优化采样频率]
    F -->|正常功耗| I[连续采样]
    
    G --> J[确定传感器型号]
    H --> J
    I --> J

# 2.2 多传感器融合

// 示例:多传感器数据融合
class SensorFusion {
private:
    struct SensorData {
        float temperature;
        float humidity;
        float pressure;
        float light;
        uint32_t timestamp;
    };
    
    std::queue<SensorData> dataBuffer;
    const size_t BUFFER_SIZE = 10;
    
public:
    // 卡尔曼滤波器用于数据融合
    class KalmanFilter {
    private:
        float Q; // 过程噪声协方差
        float R; // 测量噪声协方差
        float P; // 估计误差协方差
        float K; // 卡尔曼增益
        float X; // 状态估计
        
    public:
        KalmanFilter(float q, float r, float p, float initial_value) 
            : Q(q), R(r), P(p), X(initial_value) {}
        
        float update(float measurement) {
            // 预测步骤
            P = P + Q;
            
            // 更新步骤
            K = P / (P + R);
            X = X + K * (measurement - X);
            P = (1 - K) * P;
            
            return X;
        }
    };
    
    // 传感器数据采集
    SensorData collectSensorData() {
        SensorData data;
        data.temperature = readTemperature();
        data.humidity = readHumidity();
        data.pressure = readPressure();
        data.light = readLight();
        data.timestamp = millis();
        
        return data;
    }
    
    // 数据融合处理
    SensorData fuseSensorData() {
        SensorData rawData = collectSensorData();
        
        // 使用卡尔曼滤波器平滑数据
        static KalmanFilter tempFilter(0.1, 0.1, 1.0, 25.0);
        static KalmanFilter humidFilter(0.1, 0.1, 1.0, 50.0);
        
        SensorData fusedData = rawData;
        fusedData.temperature = tempFilter.update(rawData.temperature);
        fusedData.humidity = humidFilter.update(rawData.humidity);
        
        // 数据验证
        if (validateSensorData(fusedData)) {
            addToBuffer(fusedData);
            return fusedData;
        }
        
        // 返回上一次有效数据
        return getLastValidData();
    }
    
private:
    float readTemperature() {
        // DHT22传感器读取
        // 实际硬件接口调用
        return 25.5; // 示例值
    }
    
    float readHumidity() {
        // DHT22传感器读取
        return 60.2; // 示例值
    }
    
    float readPressure() {
        // BMP280传感器读取
        return 1013.25; // 示例值
    }
    
    float readLight() {
        // BH1750传感器读取
        return 500.0; // 示例值
    }
    
    bool validateSensorData(const SensorData& data) {
        // 数据范围验证
        if (data.temperature < -40 || data.temperature > 80) return false;
        if (data.humidity < 0 || data.humidity > 100) return false;
        if (data.pressure < 300 || data.pressure > 1200) return false;
        if (data.light < 0 || data.light > 65535) return false;
        
        return true;
    }
    
    void addToBuffer(const SensorData& data) {
        if (dataBuffer.size() >= BUFFER_SIZE) {
            dataBuffer.pop();
        }
        dataBuffer.push(data);
    }
    
    SensorData getLastValidData() {
        if (!dataBuffer.empty()) {
            return dataBuffer.back();
        }
        // 返回默认值
        return {25.0, 50.0, 1013.25, 500.0, millis()};
    }
};

# 2.3 传感器校准算法

// 示例:传感器自动校准系统
class SensorCalibration {
private:
    struct CalibrationPoint {
        float reference_value;
        float sensor_value;
    };
    
    std::vector<CalibrationPoint> calibration_points;
    float slope = 1.0;
    float offset = 0.0;
    
public:
    // 添加校准点
    void addCalibrationPoint(float reference, float sensor) {
        calibration_points.push_back({reference, sensor});
        
        // 当有足够的校准点时,计算校准参数
        if (calibration_points.size() >= 2) {
            calculateCalibrationParameters();
        }
    }
    
    // 应用校准
    float applyCalibratio(float raw_value) {
        return slope * raw_value + offset;
    }
    
    // 自动校准(基于已知环境条件)
    void autoCalibrate() {
        // 温度传感器自校准示例
        // 假设在室温环境下进行校准
        float room_temperature = 25.0; // 已知室温
        float sensor_reading = readRawTemperature();
        
        addCalibrationPoint(room_temperature, sensor_reading);
        
        // 可以添加更多已知条件的校准点
        // 例如:冰点、沸点等
    }
    
private:
    void calculateCalibrationParameters() {
        if (calibration_points.size() < 2) return;
        
        // 使用最小二乘法计算线性校准参数
        float sum_x = 0, sum_y = 0, sum_xy = 0, sum_x2 = 0;
        int n = calibration_points.size();
        
        for (const auto& point : calibration_points) {
            sum_x += point.sensor_value;
            sum_y += point.reference_value;
            sum_xy += point.sensor_value * point.reference_value;
            sum_x2 += point.sensor_value * point.sensor_value;
        }
        
        // 计算斜率和截距
        slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x);
        offset = (sum_y - slope * sum_x) / n;
    }
    
    float readRawTemperature() {
        // 读取原始传感器数据
        return 24.8; // 示例值
    }
};

# 3. 执行器控制系统

# 3.1 执行器类型与控制

// 示例:通用执行器控制框架
class ActuatorController {
public:
    enum ActuatorType {
        SERVO_MOTOR,
        STEPPER_MOTOR,
        DC_MOTOR,
        RELAY,
        PWM_OUTPUT
    };
    
    struct ActuatorConfig {
        ActuatorType type;
        int pin;
        float min_value;
        float max_value;
        bool inverted;
    };
    
private:
    std::map<int, ActuatorConfig> actuators;
    
public:
    // 注册执行器
    void registerActuator(int id, const ActuatorConfig& config) {
        actuators[id] = config;
        initializeActuator(id, config);
    }
    
    // 控制执行器
    bool controlActuator(int id, float value) {
        auto it = actuators.find(id);
        if (it == actuators.end()) {
            return false;
        }
        
        const ActuatorConfig& config = it->second;
        
        // 值范围检查
        value = constrain(value, config.min_value, config.max_value);
        
        // 反向控制支持
        if (config.inverted) {
            value = config.max_value - value + config.min_value;
        }
        
        // 根据执行器类型进行控制
        switch (config.type) {
            case SERVO_MOTOR:
                return controlServo(config.pin, value);
            case STEPPER_MOTOR:
                return controlStepper(config.pin, value);
            case DC_MOTOR:
                return controlDCMotor(config.pin, value);
            case RELAY:
                return controlRelay(config.pin, value > 0.5);
            case PWM_OUTPUT:
                return controlPWM(config.pin, value);
            default:
                return false;
        }
    }
    
    // 批量控制
    void controlMultipleActuators(const std::map<int, float>& commands) {
        for (const auto& command : commands) {
            controlActuator(command.first, command.second);
        }
    }
    
private:
    void initializeActuator(int id, const ActuatorConfig& config) {
        switch (config.type) {
            case SERVO_MOTOR:
                // 初始化舵机
                pinMode(config.pin, OUTPUT);
                break;
            case STEPPER_MOTOR:
                // 初始化步进电机
                pinMode(config.pin, OUTPUT);
                break;
            case DC_MOTOR:
                // 初始化直流电机
                pinMode(config.pin, OUTPUT);
                break;
            case RELAY:
                // 初始化继电器
                pinMode(config.pin, OUTPUT);
                digitalWrite(config.pin, LOW);
                break;
            case PWM_OUTPUT:
                // 初始化PWM输出
                pinMode(config.pin, OUTPUT);
                break;
        }
    }
    
    bool controlServo(int pin, float angle) {
        // 舵机控制(0-180度)
        int pulse_width = map(angle, 0, 180, 500, 2500);
        // 生成PWM信号
        return generateServoPWM(pin, pulse_width);
    }
    
    bool controlStepper(int pin, float steps) {
        // 步进电机控制
        int step_count = (int)steps;
        return moveStepperMotor(pin, step_count);
    }
    
    bool controlDCMotor(int pin, float speed) {
        // 直流电机控制(0-100%)
        int pwm_value = map(speed, 0, 100, 0, 255);
        analogWrite(pin, pwm_value);
        return true;
    }
    
    bool controlRelay(int pin, bool state) {
        // 继电器控制
        digitalWrite(pin, state ? HIGH : LOW);
        return true;
    }
    
    bool controlPWM(int pin, float duty_cycle) {
        // PWM输出控制(0-100%)
        int pwm_value = map(duty_cycle, 0, 100, 0, 255);
        analogWrite(pin, pwm_value);
        return true;
    }
};

# 3.2 PID控制算法

// 示例:PID控制器实现
class PIDController {
private:
    float kp, ki, kd;  // PID参数
    float setpoint;    // 目标值
    float integral;    // 积分项
    float previous_error; // 上次误差
    uint32_t last_time;   // 上次计算时间
    float output_min, output_max; // 输出限制
    
public:
    PIDController(float p, float i, float d) 
        : kp(p), ki(i), kd(d), setpoint(0), integral(0), 
          previous_error(0), last_time(0), 
          output_min(-255), output_max(255) {}
    
    // 设置目标值
    void setSetpoint(float target) {
        setpoint = target;
    }
    
    // 设置PID参数
    void setPIDParameters(float p, float i, float d) {
        kp = p;
        ki = i;
        kd = d;
    }
    
    // 设置输出限制
    void setOutputLimits(float min_output, float max_output) {
        output_min = min_output;
        output_max = max_output;
    }
    
    // 计算PID输出
    float compute(float input) {
        uint32_t current_time = millis();
        float dt = (current_time - last_time) / 1000.0; // 转换为秒
        
        if (dt <= 0) {
            return 0; // 避免除零错误
        }
        
        // 计算误差
        float error = setpoint - input;
        
        // 比例项
        float proportional = kp * error;
        
        // 积分项
        integral += error * dt;
        float integral_term = ki * integral;
        
        // 微分项
        float derivative = (error - previous_error) / dt;
        float derivative_term = kd * derivative;
        
        // PID输出
        float output = proportional + integral_term + derivative_term;
        
        // 输出限制
        output = constrain(output, output_min, output_max);
        
        // 积分饱和处理
        if (output >= output_max || output <= output_min) {
            integral -= error * dt; // 回退积分项
        }
        
        // 保存当前值用于下次计算
        previous_error = error;
        last_time = current_time;
        
        return output;
    }
    
    // 重置PID控制器
    void reset() {
        integral = 0;
        previous_error = 0;
        last_time = millis();
    }
    
    // 自动调参(Ziegler-Nichols方法)
    void autoTune() {
        // 简化的自动调参实现
        // 实际应用中需要更复杂的算法
        
        // 1. 设置ki=0, kd=0,逐渐增加kp直到系统振荡
        // 2. 记录临界增益Kc和振荡周期Tc
        // 3. 根据Ziegler-Nichols公式计算PID参数
        
        float Kc = findCriticalGain();
        float Tc = findOscillationPeriod();
        
        // Ziegler-Nichols PID调参公式
        kp = 0.6 * Kc;
        ki = 2.0 * kp / Tc;
        kd = kp * Tc / 8.0;
    }
    
private:
    float findCriticalGain() {
        // 寻找临界增益的简化实现
        // 实际需要通过实验确定
        return 1.0; // 示例值
    }
    
    float findOscillationPeriod() {
        // 寻找振荡周期的简化实现
        // 实际需要通过实验确定
        return 1.0; // 示例值
    }
};

// 示例:温度控制应用
class TemperatureController {
private:
    PIDController pid;
    SensorFusion sensor;
    ActuatorController actuator;
    int heater_id = 1;
    int fan_id = 2;
    
public:
    TemperatureController() : pid(2.0, 0.5, 0.1) {
        // 配置加热器
        ActuatorController::ActuatorConfig heater_config;
        heater_config.type = ActuatorController::PWM_OUTPUT;
        heater_config.pin = 9;
        heater_config.min_value = 0;
        heater_config.max_value = 100;
        heater_config.inverted = false;
        actuator.registerActuator(heater_id, heater_config);
        
        // 配置风扇
        ActuatorController::ActuatorConfig fan_config;
        fan_config.type = ActuatorController::PWM_OUTPUT;
        fan_config.pin = 10;
        fan_config.min_value = 0;
        fan_config.max_value = 100;
        fan_config.inverted = false;
        actuator.registerActuator(fan_id, fan_config);
        
        // 设置PID输出限制
        pid.setOutputLimits(-100, 100);
    }
    
    void setTargetTemperature(float target) {
        pid.setSetpoint(target);
    }
    
    void controlLoop() {
        // 读取当前温度
        auto sensor_data = sensor.fuseSensorData();
        float current_temp = sensor_data.temperature;
        
        // 计算PID输出
        float pid_output = pid.compute(current_temp);
        
        // 根据PID输出控制加热器和风扇
        if (pid_output > 0) {
            // 需要加热
            actuator.controlActuator(heater_id, pid_output);
            actuator.controlActuator(fan_id, 0);
        } else {
            // 需要冷却
            actuator.controlActuator(heater_id, 0);
            actuator.controlActuator(fan_id, -pid_output);
        }
    }
};

# 4. 嵌入式系统架构

# 4.1 系统架构设计

graph TB
    subgraph "应用层 Application Layer"
        A1[业务逻辑]
        A2[用户接口]
        A3[通信协议]
        A4[数据处理]
    end
    
    subgraph "中间件层 Middleware Layer"
        M1[实时操作系统]
        M2[设备驱动]
        M3[文件系统]
        M4[网络协议栈]
    end
    
    subgraph "硬件抽象层 HAL"
        H1[GPIO控制]
        H2[串口通信]
        H3[SPI/I2C]
        H4[定时器]
    end
    
    subgraph "硬件层 Hardware Layer"
        HW1[微控制器]
        HW2[存储器]
        HW3[通信模块]
        HW4[电源管理]
    end
    
    A1 --> M1
    A2 --> M2
    A3 --> M3
    A4 --> M4
    
    M1 --> H1
    M2 --> H2
    M3 --> H3
    M4 --> H4
    
    H1 --> HW1
    H2 --> HW2
    H3 --> HW3
    H4 --> HW4

# 4.2 实时任务调度

// 示例:基于FreeRTOS的任务调度
#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"
#include "semphr.h"

class IoTTaskScheduler {
private:
    // 任务句柄
    TaskHandle_t sensor_task_handle;
    TaskHandle_t actuator_task_handle;
    TaskHandle_t communication_task_handle;
    TaskHandle_t watchdog_task_handle;
    
    // 队列和信号量
    QueueHandle_t sensor_data_queue;
    QueueHandle_t command_queue;
    SemaphoreHandle_t i2c_mutex;
    SemaphoreHandle_t spi_mutex;
    
    // 任务优先级定义
    static const int SENSOR_TASK_PRIORITY = 3;
    static const int ACTUATOR_TASK_PRIORITY = 4;
    static const int COMMUNICATION_TASK_PRIORITY = 2;
    static const int WATCHDOG_TASK_PRIORITY = 5;
    
public:
    void initializeScheduler() {
        // 创建队列
        sensor_data_queue = xQueueCreate(10, sizeof(SensorData));
        command_queue = xQueueCreate(5, sizeof(ActuatorCommand));
        
        // 创建互斥量
        i2c_mutex = xSemaphoreCreateMutex();
        spi_mutex = xSemaphoreCreateMutex();
        
        // 创建任务
        xTaskCreate(sensorTaskWrapper, "SensorTask", 
                   configMINIMAL_STACK_SIZE * 2, this, 
                   SENSOR_TASK_PRIORITY, &sensor_task_handle);
        
        xTaskCreate(actuatorTaskWrapper, "ActuatorTask", 
                   configMINIMAL_STACK_SIZE * 2, this, 
                   ACTUATOR_TASK_PRIORITY, &actuator_task_handle);
        
        xTaskCreate(communicationTaskWrapper, "CommTask", 
                   configMINIMAL_STACK_SIZE * 4, this, 
                   COMMUNICATION_TASK_PRIORITY, &communication_task_handle);
        
        xTaskCreate(watchdogTaskWrapper, "WatchdogTask", 
                   configMINIMAL_STACK_SIZE, this, 
                   WATCHDOG_TASK_PRIORITY, &watchdog_task_handle);
    }
    
    void startScheduler() {
        vTaskStartScheduler();
    }
    
private:
    // 传感器任务
    void sensorTask() {
        SensorData data;
        TickType_t last_wake_time = xTaskGetTickCount();
        
        while (1) {
            // 获取I2C互斥量
            if (xSemaphoreTake(i2c_mutex, pdMS_TO_TICKS(100)) == pdTRUE) {
                // 读取传感器数据
                data = readAllSensors();
                
                // 释放互斥量
                xSemaphoreGive(i2c_mutex);
                
                // 发送数据到队列
                if (xQueueSend(sensor_data_queue, &data, 0) != pdTRUE) {
                    // 队列满,记录错误
                    logError("Sensor data queue full");
                }
            }
            
            // 周期性执行(100ms)
            vTaskDelayUntil(&last_wake_time, pdMS_TO_TICKS(100));
        }
    }
    
    // 执行器任务
    void actuatorTask() {
        ActuatorCommand command;
        
        while (1) {
            // 等待命令
            if (xQueueReceive(command_queue, &command, portMAX_DELAY) == pdTRUE) {
                // 执行命令
                executeActuatorCommand(command);
                
                // 发送执行结果
                sendCommandResult(command.id, true);
            }
        }
    }
    
    // 通信任务
    void communicationTask() {
        SensorData sensor_data;
        TickType_t last_wake_time = xTaskGetTickCount();
        
        while (1) {
            // 处理接收到的数据
            processIncomingMessages();
            
            // 发送传感器数据
            if (xQueueReceive(sensor_data_queue, &sensor_data, 0) == pdTRUE) {
                sendSensorData(sensor_data);
            }
            
            // 周期性执行(500ms)
            vTaskDelayUntil(&last_wake_time, pdMS_TO_TICKS(500));
        }
    }
    
    // 看门狗任务
    void watchdogTask() {
        while (1) {
            // 检查系统状态
            if (checkSystemHealth()) {
                // 喂狗
                feedWatchdog();
            } else {
                // 系统异常,记录日志并重启
                logError("System health check failed");
                systemReset();
            }
            
            // 每秒检查一次
            vTaskDelay(pdMS_TO_TICKS(1000));
        }
    }
    
    // 静态包装函数
    static void sensorTaskWrapper(void* parameter) {
        static_cast<IoTTaskScheduler*>(parameter)->sensorTask();
    }
    
    static void actuatorTaskWrapper(void* parameter) {
        static_cast<IoTTaskScheduler*>(parameter)->actuatorTask();
    }
    
    static void communicationTaskWrapper(void* parameter) {
        static_cast<IoTTaskScheduler*>(parameter)->communicationTask();
    }
    
    static void watchdogTaskWrapper(void* parameter) {
        static_cast<IoTTaskScheduler*>(parameter)->watchdogTask();
    }
};

# 4.3 内存管理

// 示例:嵌入式系统内存管理
class MemoryManager {
private:
    static const size_t HEAP_SIZE = 32768; // 32KB堆空间
    static const size_t POOL_COUNT = 4;
    
    struct MemoryPool {
        void* pool_start;
        size_t block_size;
        size_t block_count;
        uint8_t* free_blocks;
        size_t free_count;
    };
    
    uint8_t heap_memory[HEAP_SIZE];
    MemoryPool memory_pools[POOL_COUNT];
    bool initialized;
    
public:
    MemoryManager() : initialized(false) {}
    
    void initialize() {
        if (initialized) return;
        
        // 初始化内存池
        // 小块内存池 (32字节 x 64块)
        initializePool(0, 32, 64);
        
        // 中块内存池 (128字节 x 32块)
        initializePool(1, 128, 32);
        
        // 大块内存池 (512字节 x 16块)
        initializePool(2, 512, 16);
        
        // 超大块内存池 (1024字节 x 8块)
        initializePool(3, 1024, 8);
        
        initialized = true;
    }
    
    void* allocate(size_t size) {
        if (!initialized) {
            initialize();
        }
        
        // 选择合适的内存池
        int pool_index = selectPool(size);
        if (pool_index >= 0) {
            return allocateFromPool(pool_index);
        }
        
        // 如果没有合适的内存池,使用系统malloc
        return malloc(size);
    }
    
    void deallocate(void* ptr) {
        if (!ptr) return;
        
        // 检查是否属于内存池
        for (int i = 0; i < POOL_COUNT; i++) {
            if (belongsToPool(ptr, i)) {
                deallocateFromPool(ptr, i);
                return;
            }
        }
        
        // 使用系统free
        free(ptr);
    }
    
    // 内存使用统计
    void getMemoryStats(size_t& total_free, size_t& largest_free) {
        total_free = 0;
        largest_free = 0;
        
        for (int i = 0; i < POOL_COUNT; i++) {
            size_t pool_free = memory_pools[i].free_count * memory_pools[i].block_size;
            total_free += pool_free;
            
            if (memory_pools[i].free_count > 0) {
                largest_free = std::max(largest_free, memory_pools[i].block_size);
            }
        }
    }
    
private:
    void initializePool(int index, size_t block_size, size_t block_count) {
        MemoryPool& pool = memory_pools[index];
        
        // 计算所需内存大小
        size_t total_size = block_size * block_count;
        size_t bitmap_size = (block_count + 7) / 8; // 位图大小
        
        // 分配内存
        static size_t heap_offset = 0;
        pool.pool_start = &heap_memory[heap_offset];
        heap_offset += total_size;
        
        pool.free_blocks = &heap_memory[heap_offset];
        heap_offset += bitmap_size;
        
        pool.block_size = block_size;
        pool.block_count = block_count;
        pool.free_count = block_count;
        
        // 初始化位图(全部标记为空闲)
        memset(pool.free_blocks, 0xFF, bitmap_size);
    }
    
    int selectPool(size_t size) {
        for (int i = 0; i < POOL_COUNT; i++) {
            if (size <= memory_pools[i].block_size && 
                memory_pools[i].free_count > 0) {
                return i;
            }
        }
        return -1; // 没有合适的内存池
    }
    
    void* allocateFromPool(int pool_index) {
        MemoryPool& pool = memory_pools[pool_index];
        
        // 查找空闲块
        for (size_t i = 0; i < pool.block_count; i++) {
            size_t byte_index = i / 8;
            size_t bit_index = i % 8;
            
            if (pool.free_blocks[byte_index] & (1 << bit_index)) {
                // 标记为已使用
                pool.free_blocks[byte_index] &= ~(1 << bit_index);
                pool.free_count--;
                
                // 返回块地址
                return static_cast<uint8_t*>(pool.pool_start) + i * pool.block_size;
            }
        }
        
        return nullptr; // 没有空闲块
    }
    
    void deallocateFromPool(void* ptr, int pool_index) {
        MemoryPool& pool = memory_pools[pool_index];
        
        // 计算块索引
        size_t offset = static_cast<uint8_t*>(ptr) - static_cast<uint8_t*>(pool.pool_start);
        size_t block_index = offset / pool.block_size;
        
        if (block_index < pool.block_count) {
            size_t byte_index = block_index / 8;
            size_t bit_index = block_index % 8;
            
            // 标记为空闲
            pool.free_blocks[byte_index] |= (1 << bit_index);
            pool.free_count++;
        }
    }
    
    bool belongsToPool(void* ptr, int pool_index) {
        MemoryPool& pool = memory_pools[pool_index];
        uint8_t* start = static_cast<uint8_t*>(pool.pool_start);
        uint8_t* end = start + pool.block_size * pool.block_count;
        
        return ptr >= start && ptr < end;
    }
};

# 5. 边缘计算节点

# 5.1 边缘网关架构

graph TB
    subgraph "边缘网关 Edge Gateway"
        subgraph "设备接入层"
            D1[Modbus设备]
            D2[CAN总线设备]
            D3[串口设备]
            D4[以太网设备]
        end
        
        subgraph "协议转换层"
            P1[Modbus适配器]
            P2[CAN适配器]
            P3[串口适配器]
            P4[以太网适配器]
        end
        
        subgraph "数据处理层"
            DP1[数据采集]
            DP2[数据过滤]
            DP3[数据聚合]
            DP4[边缘计算]
        end
        
        subgraph "云端连接层"
            C1[MQTT客户端]
            C2[HTTP客户端]
            C3[WebSocket客户端]
            C4[自定义协议]
        end
    end
    
    D1 --> P1
    D2 --> P2
    D3 --> P3
    D4 --> P4
    
    P1 --> DP1
    P2 --> DP2
    P3 --> DP3
    P4 --> DP4
    
    DP1 --> C1
    DP2 --> C2
    DP3 --> C3
    DP4 --> C4

# 5.2 边缘计算实现

# 示例:边缘计算节点实现
import asyncio
import json
import time
from typing import Dict, List, Any
from dataclasses import dataclass
from abc import ABC, abstractmethod

@dataclass
class SensorReading:
    device_id: str
    sensor_type: str
    value: float
    timestamp: float
    quality: int  # 数据质量 0-100

class EdgeProcessor(ABC):
    @abstractmethod
    async def process(self, data: List[SensorReading]) -> Dict[str, Any]:
        pass

class AnomalyDetector(EdgeProcessor):
    def __init__(self, threshold_multiplier: float = 2.0):
        self.threshold_multiplier = threshold_multiplier
        self.history: Dict[str, List[float]] = {}
        self.window_size = 50
    
    async def process(self, data: List[SensorReading]) -> Dict[str, Any]:
        anomalies = []
        
        for reading in data:
            key = f"{reading.device_id}_{reading.sensor_type}"
            
            # 初始化历史数据
            if key not in self.history:
                self.history[key] = []
            
            history = self.history[key]
            
            # 异常检测(基于统计方法)
            if len(history) >= 10:
                mean = sum(history) / len(history)
                variance = sum((x - mean) ** 2 for x in history) / len(history)
                std_dev = variance ** 0.5
                
                threshold = std_dev * self.threshold_multiplier
                
                if abs(reading.value - mean) > threshold:
                    anomalies.append({
                        'device_id': reading.device_id,
                        'sensor_type': reading.sensor_type,
                        'value': reading.value,
                        'expected_range': [mean - threshold, mean + threshold],
                        'severity': min(abs(reading.value - mean) / threshold, 5.0)
                    })
            
            # 更新历史数据
            history.append(reading.value)
            if len(history) > self.window_size:
                history.pop(0)
        
        return {
            'type': 'anomaly_detection',
            'timestamp': time.time(),
            'anomalies': anomalies
        }

class DataAggregator(EdgeProcessor):
    def __init__(self, aggregation_window: int = 60):
        self.aggregation_window = aggregation_window
        self.data_buffer: Dict[str, List[SensorReading]] = {}
    
    async def process(self, data: List[SensorReading]) -> Dict[str, Any]:
        current_time = time.time()
        aggregated_data = {}
        
        # 按设备和传感器类型分组
        for reading in data:
            key = f"{reading.device_id}_{reading.sensor_type}"
            
            if key not in self.data_buffer:
                self.data_buffer[key] = []
            
            self.data_buffer[key].append(reading)
            
            # 清理过期数据
            self.data_buffer[key] = [
                r for r in self.data_buffer[key] 
                if current_time - r.timestamp <= self.aggregation_window
            ]
        
        # 计算聚合统计
        for key, readings in self.data_buffer.items():
            if readings:
                values = [r.value for r in readings]
                aggregated_data[key] = {
                    'count': len(values),
                    'min': min(values),
                    'max': max(values),
                    'avg': sum(values) / len(values),
                    'latest': readings[-1].value,
                    'quality_avg': sum(r.quality for r in readings) / len(readings)
                }
        
        return {
            'type': 'data_aggregation',
            'timestamp': current_time,
            'window_seconds': self.aggregation_window,
            'aggregated_data': aggregated_data
        }

class PredictiveAnalyzer(EdgeProcessor):
    def __init__(self):
        self.models = {}  # 简化的预测模型
    
    async def process(self, data: List[SensorReading]) -> Dict[str, Any]:
        predictions = []
        
        for reading in data:
            # 简单的线性预测模型
            prediction = await self.predict_next_value(reading)
            if prediction is not None:
                predictions.append({
                    'device_id': reading.device_id,
                    'sensor_type': reading.sensor_type,
                    'current_value': reading.value,
                    'predicted_value': prediction,
                    'prediction_horizon': 300,  # 5分钟预测
                    'confidence': 0.8  # 简化的置信度
                })
        
        return {
            'type': 'predictive_analysis',
            'timestamp': time.time(),
            'predictions': predictions
        }
    
    async def predict_next_value(self, reading: SensorReading) -> float:
        # 简化的预测逻辑
        # 实际应用中会使用机器学习模型
        key = f"{reading.device_id}_{reading.sensor_type}"
        
        if key not in self.models:
            self.models[key] = {'history': [], 'trend': 0}
        
        model = self.models[key]
        model['history'].append(reading.value)
        
        if len(model['history']) > 10:
            model['history'].pop(0)
            
            # 计算简单趋势
            if len(model['history']) >= 2:
                recent_trend = model['history'][-1] - model['history'][-2]
                model['trend'] = 0.7 * model['trend'] + 0.3 * recent_trend
        
        return reading.value + model['trend']

class EdgeComputingNode:
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.processors: List[EdgeProcessor] = []
        self.data_queue = asyncio.Queue(maxsize=1000)
        self.running = False
        
        # 默认处理器
        self.add_processor(AnomalyDetector())
        self.add_processor(DataAggregator())
        self.add_processor(PredictiveAnalyzer())
    
    def add_processor(self, processor: EdgeProcessor):
        self.processors.append(processor)
    
    async def add_sensor_data(self, readings: List[SensorReading]):
        try:
            await self.data_queue.put(readings)
        except asyncio.QueueFull:
            print(f"Warning: Data queue full, dropping {len(readings)} readings")
    
    async def start_processing(self):
        self.running = True
        
        while self.running:
            try:
                # 获取传感器数据
                readings = await asyncio.wait_for(
                    self.data_queue.get(), timeout=1.0
                )
                
                # 并行处理数据
                tasks = [
                    processor.process(readings) 
                    for processor in self.processors
                ]
                
                results = await asyncio.gather(*tasks, return_exceptions=True)
                
                # 处理结果
                for i, result in enumerate(results):
                    if isinstance(result, Exception):
                        print(f"Processor {i} error: {result}")
                    else:
                        await self.handle_processing_result(result)
                
            except asyncio.TimeoutError:
                # 超时,继续下一轮
                continue
            except Exception as e:
                print(f"Processing error: {e}")
    
    async def handle_processing_result(self, result: Dict[str, Any]):
        # 处理计算结果
        result_type = result.get('type')
        
        if result_type == 'anomaly_detection':
            anomalies = result.get('anomalies', [])
            if anomalies:
                await self.send_alert({
                    'type': 'anomaly_alert',
                    'node_id': self.node_id,
                    'anomalies': anomalies
                })
        
        elif result_type == 'data_aggregation':
            # 发送聚合数据到云端
            await self.send_to_cloud({
                'type': 'aggregated_data',
                'node_id': self.node_id,
                'data': result
            })
        
        elif result_type == 'predictive_analysis':
            predictions = result.get('predictions', [])
            # 基于预测结果进行本地决策
            await self.make_local_decisions(predictions)
    
    async def send_alert(self, alert: Dict[str, Any]):
        # 发送告警(实际实现会连接到告警系统)
        print(f"ALERT: {json.dumps(alert, indent=2)}")
    
    async def send_to_cloud(self, data: Dict[str, Any]):
        # 发送数据到云端(实际实现会使用MQTT等协议)
        print(f"TO_CLOUD: {data['type']} from {self.node_id}")
    
    async def make_local_decisions(self, predictions: List[Dict[str, Any]]):
        # 基于预测进行本地决策
        for prediction in predictions:
            predicted_value = prediction['predicted_value']
            current_value = prediction['current_value']
            
            # 简单的决策逻辑
            if abs(predicted_value - current_value) > 10:
                print(f"Local decision: Adjust {prediction['device_id']} "
                     f"based on prediction {predicted_value}")
    
    def stop_processing(self):
        self.running = False

# 使用示例
async def main():
    # 创建边缘计算节点
    edge_node = EdgeComputingNode("edge_001")
    
    # 启动处理任务
    processing_task = asyncio.create_task(edge_node.start_processing())
    
    # 模拟传感器数据
    for i in range(100):
        readings = [
            SensorReading(
                device_id="sensor_001",
                sensor_type="temperature",
                value=25.0 + (i % 10) * 0.5,
                timestamp=time.time(),
                quality=95
            ),
            SensorReading(
                device_id="sensor_002",
                sensor_type="humidity",
                value=60.0 + (i % 20) * 0.3,
                timestamp=time.time(),
                quality=90
            )
        ]
        
        await edge_node.add_sensor_data(readings)
        await asyncio.sleep(0.1)
    
    # 停止处理
    edge_node.stop_processing()
    await processing_task

if __name__ == "__main__":
    asyncio.run(main())

# 6. 设备固件开发

# 6.1 固件架构设计

// 示例:模块化固件架构
class IoTFirmware {
private:
    // 系统组件
    SystemConfig config;
    HardwareAbstraction hal;
    CommunicationManager comm;
    SensorManager sensors;
    ActuatorManager actuators;
    PowerManager power;
    SecurityManager security;
    
    // 状态管理
    enum SystemState {
        INITIALIZING,
        NORMAL_OPERATION,
        LOW_POWER_MODE,
        ERROR_STATE,
        FIRMWARE_UPDATE
    };
    
    SystemState current_state;
    uint32_t last_heartbeat;
    
public:
    void initialize() {
        // 1. 硬件初始化
        hal.initialize();
        
        // 2. 加载配置
        config.load();
        
        // 3. 初始化安全模块
        security.initialize(config.getSecurityConfig());
        
        // 4. 初始化通信
        comm.initialize(config.getCommunicationConfig());
        
        // 5. 初始化传感器
        sensors.initialize(config.getSensorConfig());
        
        // 6. 初始化执行器
        actuators.initialize(config.getActuatorConfig());
        
        // 7. 初始化电源管理
        power.initialize(config.getPowerConfig());
        
        current_state = NORMAL_OPERATION;
        last_heartbeat = millis();
    }
    
    void mainLoop() {
        while (true) {
            uint32_t loop_start = millis();
            
            switch (current_state) {
                case NORMAL_OPERATION:
                    normalOperationLoop();
                    break;
                case LOW_POWER_MODE:
                    lowPowerLoop();
                    break;
                case ERROR_STATE:
                    errorHandlingLoop();
                    break;
                case FIRMWARE_UPDATE:
                    firmwareUpdateLoop();
                    break;
                default:
                    current_state = ERROR_STATE;
                    break;
            }
            
            // 看门狗喂狗
            if (millis() - last_heartbeat > 30000) { // 30秒
                systemReset();
            }
            
            // 控制循环频率
            uint32_t loop_time = millis() - loop_start;
            if (loop_time < 100) { // 100ms循环
                delay(100 - loop_time);
            }
        }
    }
    
private:
    void normalOperationLoop() {
        // 1. 读取传感器数据
        auto sensor_data = sensors.readAll();
        
        // 2. 处理数据
        auto processed_data = processData(sensor_data);
        
        // 3. 执行控制逻辑
        auto control_commands = executeControlLogic(processed_data);
        
        // 4. 控制执行器
        actuators.executeCommands(control_commands);
        
        // 5. 发送数据
        comm.sendData(processed_data);
        
        // 6. 处理接收到的命令
        auto received_commands = comm.receiveCommands();
        processReceivedCommands(received_commands);
        
        // 7. 检查系统状态
        checkSystemHealth();
        
        // 8. 更新心跳
        last_heartbeat = millis();
    }
    
    void lowPowerLoop() {
        // 低功耗模式下的简化操作
        
        // 1. 减少传感器采样频率
        if (millis() % 10000 == 0) { // 每10秒采样一次
            auto sensor_data = sensors.readCriticalSensors();
            
            // 2. 检查是否需要唤醒
            if (shouldWakeUp(sensor_data)) {
                current_state = NORMAL_OPERATION;
                power.exitLowPowerMode();
                return;
            }
            
            // 3. 发送关键数据
            comm.sendCriticalData(sensor_data);
        }
        
        // 4. 进入深度睡眠
        power.enterDeepSleep(1000); // 睡眠1秒
    }
    
    void errorHandlingLoop() {
        // 错误处理逻辑
        
        // 1. 诊断系统问题
        auto error_info = diagnoseProblem();
        
        // 2. 尝试恢复
        if (attemptRecovery(error_info)) {
            current_state = NORMAL_OPERATION;
            return;
        }
        
        // 3. 发送错误报告
        comm.sendErrorReport(error_info);
        
        // 4. 等待外部干预或自动重启
        delay(5000);
        
        // 5. 如果无法恢复,重启系统
        systemReset();
    }
    
    void firmwareUpdateLoop() {
        // 固件更新逻辑
        
        // 1. 接收固件数据
        auto firmware_chunk = comm.receiveFirmwareChunk();
        
        if (firmware_chunk.isValid()) {
            // 2. 验证数据完整性
            if (security.verifyFirmwareChunk(firmware_chunk)) {
                // 3. 写入Flash
                hal.writeFirmwareChunk(firmware_chunk);
                
                // 4. 发送确认
                comm.sendUpdateProgress(firmware_chunk.sequence);
            } else {
                // 数据损坏,请求重传
                comm.requestRetransmission(firmware_chunk.sequence);
            }
        }
        
        // 5. 检查更新是否完成
        if (hal.isFirmwareUpdateComplete()) {
            // 验证完整固件
            if (security.verifyCompleteFirmware()) {
                // 重启到新固件
                hal.rebootToNewFirmware();
            } else {
                // 固件损坏,回滚
                hal.rollbackFirmware();
                current_state = ERROR_STATE;
            }
        }
    }
};

# 6.2 OTA固件更新

// 示例:OTA固件更新实现
class OTAManager {
private:
    struct FirmwareInfo {
        uint32_t version;
        uint32_t size;
        uint32_t crc32;
        char description[64];
    };
    
    struct UpdateProgress {
        uint32_t total_chunks;
        uint32_t received_chunks;
        uint32_t failed_chunks;
        bool update_in_progress;
    };
    
    FirmwareInfo current_firmware;
    FirmwareInfo target_firmware;
    UpdateProgress progress;
    
    static const uint32_t CHUNK_SIZE = 1024;
    static const uint32_t MAX_RETRIES = 3;
    
public:
    bool checkForUpdate() {
        // 1. 查询服务器是否有新固件
        auto server_info = queryServerForUpdate();
        
        if (server_info.version > current_firmware.version) {
            target_firmware = server_info;
            return true;
        }
        
        return false;
    }
    
    bool startUpdate() {
        if (target_firmware.version <= current_firmware.version) {
            return false;
        }
        
        // 初始化更新进度
        progress.total_chunks = (target_firmware.size + CHUNK_SIZE - 1) / CHUNK_SIZE;
        progress.received_chunks = 0;
        progress.failed_chunks = 0;
        progress.update_in_progress = true;
        
        // 请求开始更新
        return requestUpdateStart();
    }
    
    bool processUpdateChunk(const uint8_t* data, uint32_t chunk_id, uint32_t size) {
        if (!progress.update_in_progress) {
            return false;
        }
        
        // 验证chunk数据
        if (!validateChunk(data, chunk_id, size)) {
            progress.failed_chunks++;
            return false;
        }
        
        // 写入Flash
        if (writeChunkToFlash(data, chunk_id, size)) {
            progress.received_chunks++;
            
            // 检查是否完成
            if (progress.received_chunks >= progress.total_chunks) {
                return finalizeUpdate();
            }
            
            return true;
        }
        
        progress.failed_chunks++;
        return false;
    }
    
    float getUpdateProgress() {
        if (!progress.update_in_progress) {
            return 0.0;
        }
        
        return (float)progress.received_chunks / progress.total_chunks * 100.0;
    }
    
private:
    bool finalizeUpdate() {
        // 验证完整固件
        if (verifyCompleteFirmware()) {
            // 标记新固件为有效
            markFirmwareValid();
            
            // 重启到新固件
            scheduleReboot();
            
            progress.update_in_progress = false;
            return true;
        }
        
        // 固件验证失败,清理
        cleanupFailedUpdate();
        progress.update_in_progress = false;
        return false;
    }
};

# 7. 电源管理

# 7.1 低功耗设计

// 示例:智能电源管理系统
class PowerManager {
private:
    enum PowerMode {
        ACTIVE_MODE,
        IDLE_MODE,
        SLEEP_MODE,
        DEEP_SLEEP_MODE,
        HIBERNATION_MODE
    };
    
    PowerMode current_mode;
    uint32_t battery_level;
    uint32_t last_activity_time;
    bool external_power_available;
    
public:
    void initialize() {
        current_mode = ACTIVE_MODE;
        battery_level = readBatteryLevel();
        last_activity_time = millis();
        external_power_available = checkExternalPower();
        
        // 配置低功耗模式
        configureLowPowerModes();
    }
    
    void updatePowerState() {
        uint32_t current_time = millis();
        uint32_t idle_time = current_time - last_activity_time;
        
        // 更新电池状态
        battery_level = readBatteryLevel();
        external_power_available = checkExternalPower();
        
        // 根据条件选择功耗模式
        PowerMode target_mode = selectOptimalPowerMode(idle_time);
        
        if (target_mode != current_mode) {
            transitionToPowerMode(target_mode);
        }
    }
    
    void recordActivity() {
        last_activity_time = millis();
        
        // 如果在低功耗模式,唤醒到活跃模式
        if (current_mode != ACTIVE_MODE) {
            transitionToPowerMode(ACTIVE_MODE);
        }
    }
    
private:
    PowerMode selectOptimalPowerMode(uint32_t idle_time) {
        // 电池电量过低,进入休眠模式
        if (battery_level < 10 && !external_power_available) {
            return HIBERNATION_MODE;
        }
        
        // 根据空闲时间选择模式
        if (idle_time < 5000) {        // 5秒内
            return ACTIVE_MODE;
        } else if (idle_time < 30000) { // 30秒内
            return IDLE_MODE;
        } else if (idle_time < 300000) { // 5分钟内
            return SLEEP_MODE;
        } else {                        // 超过5分钟
            return DEEP_SLEEP_MODE;
        }
    }
    
    void transitionToPowerMode(PowerMode new_mode) {
        // 退出当前模式
        exitCurrentMode();
        
        // 进入新模式
        switch (new_mode) {
            case ACTIVE_MODE:
                enterActiveMode();
                break;
            case IDLE_MODE:
                enterIdleMode();
                break;
            case SLEEP_MODE:
                enterSleepMode();
                break;
            case DEEP_SLEEP_MODE:
                enterDeepSleepMode();
                break;
            case HIBERNATION_MODE:
                enterHibernationMode();
                break;
        }
        
        current_mode = new_mode;
    }
    
    void enterActiveMode() {
        // 启用所有外设
        enableAllPeripherals();
        
        // 设置最高时钟频率
        setCPUFrequency(80000000); // 80MHz
        
        // 启用所有传感器
        enableAllSensors();
    }
    
    void enterIdleMode() {
        // 降低CPU频率
        setCPUFrequency(40000000); // 40MHz
        
        // 关闭非必要外设
        disableNonEssentialPeripherals();
    }
    
    void enterSleepMode() {
        // 进一步降低频率
        setCPUFrequency(8000000); // 8MHz
        
        // 关闭更多外设
        disableMorePeripherals();
        
        // 减少传感器采样频率
        reduceSensorSampling();
    }
    
    void enterDeepSleepMode() {
        // 保存关键状态
        saveSystemState();
        
        // 关闭大部分外设
        disableMostPeripherals();
        
        // 只保留关键传感器
        enableOnlyCriticalSensors();
        
        // 设置唤醒条件
        configureWakeupSources();
        
        // 进入深度睡眠
        esp_deep_sleep(60000000); // 睡眠60秒
    }
    
    void enterHibernationMode() {
        // 保存所有状态到非易失性存储
        saveAllStateToNVS();
        
        // 关闭所有非必要功能
        shutdownAllNonEssential();
        
        // 设置最小唤醒条件
        configureMinimalWakeup();
        
        // 进入最深度睡眠
        esp_deep_sleep(3600000000); // 睡眠1小时
    }
};

# 8. 最佳实践总结

# 8.1 设计原则

  1. 模块化设计: 将功能分解为独立模块
  2. 低功耗优先: 在设计阶段就考虑功耗优化
  3. 可靠性保证: 实现故障检测和恢复机制
  4. 安全性内置: 从硬件到软件的全方位安全
  5. 可维护性: 支持远程诊断和固件更新

# 8.2 开发建议

  1. 硬件选型: 根据应用需求选择合适的MCU和传感器
  2. 实时性保证: 使用RTOS确保任务调度的实时性
  3. 内存优化: 合理管理内存,避免内存泄漏
  4. 通信可靠: 实现重传机制和错误检测
  5. 测试充分: 进行压力测试和长期稳定性测试

# 9. 下一步学习

完成本章学习后,建议继续学习:

  1. 网络通信层 - 了解设备间通信
  2. MQTT协议与实现 - 掌握IoT通信协议
  3. 设备接入与认证 - 学习设备安全接入

恭喜您完成了设备层架构学习! 🎉

💡 下一步: 继续学习网络通信层,深入了解物联网设备间的通信机制。