设备层架构
# 设备层架构
# 📖 章节概述
设备层是物联网系统的基础层,负责数据采集、设备控制和边缘计算。本章将深入讲解传感器、执行器、嵌入式系统设计以及边缘计算节点的架构与实现。
# 🎯 学习目标
- 掌握各类传感器的工作原理和应用场景
- 学会设计高效的嵌入式系统架构
- 理解边缘计算节点的部署和管理
- 掌握设备固件开发的最佳实践
# 1. 设备层架构概览
# 1.1 设备层组成
graph TB
subgraph "设备层 Device Layer"
subgraph "感知设备 Sensing Devices"
S1[温湿度传感器]
S2[压力传感器]
S3[光照传感器]
S4[加速度传感器]
S5[气体传感器]
end
subgraph "执行设备 Actuating Devices"
A1[电机控制器]
A2[继电器模块]
A3[LED控制器]
A4[阀门控制器]
A5[蜂鸣器]
end
subgraph "计算设备 Computing Devices"
C1[微控制器MCU]
C2[单板计算机]
C3[边缘网关]
C4[工业PC]
end
subgraph "通信设备 Communication Devices"
N1[WiFi模块]
N2[蓝牙模块]
N3[LoRa模块]
N4[4G/5G模块]
end
end
S1 --> C1
S2 --> C1
S3 --> C2
S4 --> C2
S5 --> C3
C1 --> A1
C1 --> A2
C2 --> A3
C2 --> A4
C3 --> A5
C1 --> N1
C2 --> N2
C3 --> N3
C4 --> N4
# 1.2 设备分类
设备类型 | 功能特点 | 典型应用 | 技术要求 |
---|---|---|---|
传感器节点 | 数据采集 | 环境监测 | 低功耗、高精度 |
执行器节点 | 设备控制 | 自动化控制 | 实时响应、可靠性 |
网关设备 | 协议转换 | 数据汇聚 | 多协议、高并发 |
边缘计算 | 本地处理 | 智能分析 | 计算能力、存储 |
# 2. 传感器系统设计
# 2.1 传感器选型原则
flowchart TD
A[传感器选型] --> B{精度要求}
B -->|高精度| C[工业级传感器]
B -->|一般精度| D[商用传感器]
B -->|低精度| E[消费级传感器]
C --> F{功耗要求}
D --> F
E --> F
F -->|超低功耗| G[间歇式采样]
F -->|低功耗| H[优化采样频率]
F -->|正常功耗| I[连续采样]
G --> J[确定传感器型号]
H --> J
I --> J
# 2.2 多传感器融合
// 示例:多传感器数据融合
class SensorFusion {
private:
struct SensorData {
float temperature;
float humidity;
float pressure;
float light;
uint32_t timestamp;
};
std::queue<SensorData> dataBuffer;
const size_t BUFFER_SIZE = 10;
public:
// 卡尔曼滤波器用于数据融合
class KalmanFilter {
private:
float Q; // 过程噪声协方差
float R; // 测量噪声协方差
float P; // 估计误差协方差
float K; // 卡尔曼增益
float X; // 状态估计
public:
KalmanFilter(float q, float r, float p, float initial_value)
: Q(q), R(r), P(p), X(initial_value) {}
float update(float measurement) {
// 预测步骤
P = P + Q;
// 更新步骤
K = P / (P + R);
X = X + K * (measurement - X);
P = (1 - K) * P;
return X;
}
};
// 传感器数据采集
SensorData collectSensorData() {
SensorData data;
data.temperature = readTemperature();
data.humidity = readHumidity();
data.pressure = readPressure();
data.light = readLight();
data.timestamp = millis();
return data;
}
// 数据融合处理
SensorData fuseSensorData() {
SensorData rawData = collectSensorData();
// 使用卡尔曼滤波器平滑数据
static KalmanFilter tempFilter(0.1, 0.1, 1.0, 25.0);
static KalmanFilter humidFilter(0.1, 0.1, 1.0, 50.0);
SensorData fusedData = rawData;
fusedData.temperature = tempFilter.update(rawData.temperature);
fusedData.humidity = humidFilter.update(rawData.humidity);
// 数据验证
if (validateSensorData(fusedData)) {
addToBuffer(fusedData);
return fusedData;
}
// 返回上一次有效数据
return getLastValidData();
}
private:
float readTemperature() {
// DHT22传感器读取
// 实际硬件接口调用
return 25.5; // 示例值
}
float readHumidity() {
// DHT22传感器读取
return 60.2; // 示例值
}
float readPressure() {
// BMP280传感器读取
return 1013.25; // 示例值
}
float readLight() {
// BH1750传感器读取
return 500.0; // 示例值
}
bool validateSensorData(const SensorData& data) {
// 数据范围验证
if (data.temperature < -40 || data.temperature > 80) return false;
if (data.humidity < 0 || data.humidity > 100) return false;
if (data.pressure < 300 || data.pressure > 1200) return false;
if (data.light < 0 || data.light > 65535) return false;
return true;
}
void addToBuffer(const SensorData& data) {
if (dataBuffer.size() >= BUFFER_SIZE) {
dataBuffer.pop();
}
dataBuffer.push(data);
}
SensorData getLastValidData() {
if (!dataBuffer.empty()) {
return dataBuffer.back();
}
// 返回默认值
return {25.0, 50.0, 1013.25, 500.0, millis()};
}
};
# 2.3 传感器校准算法
// 示例:传感器自动校准系统
class SensorCalibration {
private:
struct CalibrationPoint {
float reference_value;
float sensor_value;
};
std::vector<CalibrationPoint> calibration_points;
float slope = 1.0;
float offset = 0.0;
public:
// 添加校准点
void addCalibrationPoint(float reference, float sensor) {
calibration_points.push_back({reference, sensor});
// 当有足够的校准点时,计算校准参数
if (calibration_points.size() >= 2) {
calculateCalibrationParameters();
}
}
// 应用校准
float applyCalibratio(float raw_value) {
return slope * raw_value + offset;
}
// 自动校准(基于已知环境条件)
void autoCalibrate() {
// 温度传感器自校准示例
// 假设在室温环境下进行校准
float room_temperature = 25.0; // 已知室温
float sensor_reading = readRawTemperature();
addCalibrationPoint(room_temperature, sensor_reading);
// 可以添加更多已知条件的校准点
// 例如:冰点、沸点等
}
private:
void calculateCalibrationParameters() {
if (calibration_points.size() < 2) return;
// 使用最小二乘法计算线性校准参数
float sum_x = 0, sum_y = 0, sum_xy = 0, sum_x2 = 0;
int n = calibration_points.size();
for (const auto& point : calibration_points) {
sum_x += point.sensor_value;
sum_y += point.reference_value;
sum_xy += point.sensor_value * point.reference_value;
sum_x2 += point.sensor_value * point.sensor_value;
}
// 计算斜率和截距
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x);
offset = (sum_y - slope * sum_x) / n;
}
float readRawTemperature() {
// 读取原始传感器数据
return 24.8; // 示例值
}
};
# 3. 执行器控制系统
# 3.1 执行器类型与控制
// 示例:通用执行器控制框架
class ActuatorController {
public:
enum ActuatorType {
SERVO_MOTOR,
STEPPER_MOTOR,
DC_MOTOR,
RELAY,
PWM_OUTPUT
};
struct ActuatorConfig {
ActuatorType type;
int pin;
float min_value;
float max_value;
bool inverted;
};
private:
std::map<int, ActuatorConfig> actuators;
public:
// 注册执行器
void registerActuator(int id, const ActuatorConfig& config) {
actuators[id] = config;
initializeActuator(id, config);
}
// 控制执行器
bool controlActuator(int id, float value) {
auto it = actuators.find(id);
if (it == actuators.end()) {
return false;
}
const ActuatorConfig& config = it->second;
// 值范围检查
value = constrain(value, config.min_value, config.max_value);
// 反向控制支持
if (config.inverted) {
value = config.max_value - value + config.min_value;
}
// 根据执行器类型进行控制
switch (config.type) {
case SERVO_MOTOR:
return controlServo(config.pin, value);
case STEPPER_MOTOR:
return controlStepper(config.pin, value);
case DC_MOTOR:
return controlDCMotor(config.pin, value);
case RELAY:
return controlRelay(config.pin, value > 0.5);
case PWM_OUTPUT:
return controlPWM(config.pin, value);
default:
return false;
}
}
// 批量控制
void controlMultipleActuators(const std::map<int, float>& commands) {
for (const auto& command : commands) {
controlActuator(command.first, command.second);
}
}
private:
void initializeActuator(int id, const ActuatorConfig& config) {
switch (config.type) {
case SERVO_MOTOR:
// 初始化舵机
pinMode(config.pin, OUTPUT);
break;
case STEPPER_MOTOR:
// 初始化步进电机
pinMode(config.pin, OUTPUT);
break;
case DC_MOTOR:
// 初始化直流电机
pinMode(config.pin, OUTPUT);
break;
case RELAY:
// 初始化继电器
pinMode(config.pin, OUTPUT);
digitalWrite(config.pin, LOW);
break;
case PWM_OUTPUT:
// 初始化PWM输出
pinMode(config.pin, OUTPUT);
break;
}
}
bool controlServo(int pin, float angle) {
// 舵机控制(0-180度)
int pulse_width = map(angle, 0, 180, 500, 2500);
// 生成PWM信号
return generateServoPWM(pin, pulse_width);
}
bool controlStepper(int pin, float steps) {
// 步进电机控制
int step_count = (int)steps;
return moveStepperMotor(pin, step_count);
}
bool controlDCMotor(int pin, float speed) {
// 直流电机控制(0-100%)
int pwm_value = map(speed, 0, 100, 0, 255);
analogWrite(pin, pwm_value);
return true;
}
bool controlRelay(int pin, bool state) {
// 继电器控制
digitalWrite(pin, state ? HIGH : LOW);
return true;
}
bool controlPWM(int pin, float duty_cycle) {
// PWM输出控制(0-100%)
int pwm_value = map(duty_cycle, 0, 100, 0, 255);
analogWrite(pin, pwm_value);
return true;
}
};
# 3.2 PID控制算法
// 示例:PID控制器实现
class PIDController {
private:
float kp, ki, kd; // PID参数
float setpoint; // 目标值
float integral; // 积分项
float previous_error; // 上次误差
uint32_t last_time; // 上次计算时间
float output_min, output_max; // 输出限制
public:
PIDController(float p, float i, float d)
: kp(p), ki(i), kd(d), setpoint(0), integral(0),
previous_error(0), last_time(0),
output_min(-255), output_max(255) {}
// 设置目标值
void setSetpoint(float target) {
setpoint = target;
}
// 设置PID参数
void setPIDParameters(float p, float i, float d) {
kp = p;
ki = i;
kd = d;
}
// 设置输出限制
void setOutputLimits(float min_output, float max_output) {
output_min = min_output;
output_max = max_output;
}
// 计算PID输出
float compute(float input) {
uint32_t current_time = millis();
float dt = (current_time - last_time) / 1000.0; // 转换为秒
if (dt <= 0) {
return 0; // 避免除零错误
}
// 计算误差
float error = setpoint - input;
// 比例项
float proportional = kp * error;
// 积分项
integral += error * dt;
float integral_term = ki * integral;
// 微分项
float derivative = (error - previous_error) / dt;
float derivative_term = kd * derivative;
// PID输出
float output = proportional + integral_term + derivative_term;
// 输出限制
output = constrain(output, output_min, output_max);
// 积分饱和处理
if (output >= output_max || output <= output_min) {
integral -= error * dt; // 回退积分项
}
// 保存当前值用于下次计算
previous_error = error;
last_time = current_time;
return output;
}
// 重置PID控制器
void reset() {
integral = 0;
previous_error = 0;
last_time = millis();
}
// 自动调参(Ziegler-Nichols方法)
void autoTune() {
// 简化的自动调参实现
// 实际应用中需要更复杂的算法
// 1. 设置ki=0, kd=0,逐渐增加kp直到系统振荡
// 2. 记录临界增益Kc和振荡周期Tc
// 3. 根据Ziegler-Nichols公式计算PID参数
float Kc = findCriticalGain();
float Tc = findOscillationPeriod();
// Ziegler-Nichols PID调参公式
kp = 0.6 * Kc;
ki = 2.0 * kp / Tc;
kd = kp * Tc / 8.0;
}
private:
float findCriticalGain() {
// 寻找临界增益的简化实现
// 实际需要通过实验确定
return 1.0; // 示例值
}
float findOscillationPeriod() {
// 寻找振荡周期的简化实现
// 实际需要通过实验确定
return 1.0; // 示例值
}
};
// 示例:温度控制应用
class TemperatureController {
private:
PIDController pid;
SensorFusion sensor;
ActuatorController actuator;
int heater_id = 1;
int fan_id = 2;
public:
TemperatureController() : pid(2.0, 0.5, 0.1) {
// 配置加热器
ActuatorController::ActuatorConfig heater_config;
heater_config.type = ActuatorController::PWM_OUTPUT;
heater_config.pin = 9;
heater_config.min_value = 0;
heater_config.max_value = 100;
heater_config.inverted = false;
actuator.registerActuator(heater_id, heater_config);
// 配置风扇
ActuatorController::ActuatorConfig fan_config;
fan_config.type = ActuatorController::PWM_OUTPUT;
fan_config.pin = 10;
fan_config.min_value = 0;
fan_config.max_value = 100;
fan_config.inverted = false;
actuator.registerActuator(fan_id, fan_config);
// 设置PID输出限制
pid.setOutputLimits(-100, 100);
}
void setTargetTemperature(float target) {
pid.setSetpoint(target);
}
void controlLoop() {
// 读取当前温度
auto sensor_data = sensor.fuseSensorData();
float current_temp = sensor_data.temperature;
// 计算PID输出
float pid_output = pid.compute(current_temp);
// 根据PID输出控制加热器和风扇
if (pid_output > 0) {
// 需要加热
actuator.controlActuator(heater_id, pid_output);
actuator.controlActuator(fan_id, 0);
} else {
// 需要冷却
actuator.controlActuator(heater_id, 0);
actuator.controlActuator(fan_id, -pid_output);
}
}
};
# 4. 嵌入式系统架构
# 4.1 系统架构设计
graph TB
subgraph "应用层 Application Layer"
A1[业务逻辑]
A2[用户接口]
A3[通信协议]
A4[数据处理]
end
subgraph "中间件层 Middleware Layer"
M1[实时操作系统]
M2[设备驱动]
M3[文件系统]
M4[网络协议栈]
end
subgraph "硬件抽象层 HAL"
H1[GPIO控制]
H2[串口通信]
H3[SPI/I2C]
H4[定时器]
end
subgraph "硬件层 Hardware Layer"
HW1[微控制器]
HW2[存储器]
HW3[通信模块]
HW4[电源管理]
end
A1 --> M1
A2 --> M2
A3 --> M3
A4 --> M4
M1 --> H1
M2 --> H2
M3 --> H3
M4 --> H4
H1 --> HW1
H2 --> HW2
H3 --> HW3
H4 --> HW4
# 4.2 实时任务调度
// 示例:基于FreeRTOS的任务调度
#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"
#include "semphr.h"
class IoTTaskScheduler {
private:
// 任务句柄
TaskHandle_t sensor_task_handle;
TaskHandle_t actuator_task_handle;
TaskHandle_t communication_task_handle;
TaskHandle_t watchdog_task_handle;
// 队列和信号量
QueueHandle_t sensor_data_queue;
QueueHandle_t command_queue;
SemaphoreHandle_t i2c_mutex;
SemaphoreHandle_t spi_mutex;
// 任务优先级定义
static const int SENSOR_TASK_PRIORITY = 3;
static const int ACTUATOR_TASK_PRIORITY = 4;
static const int COMMUNICATION_TASK_PRIORITY = 2;
static const int WATCHDOG_TASK_PRIORITY = 5;
public:
void initializeScheduler() {
// 创建队列
sensor_data_queue = xQueueCreate(10, sizeof(SensorData));
command_queue = xQueueCreate(5, sizeof(ActuatorCommand));
// 创建互斥量
i2c_mutex = xSemaphoreCreateMutex();
spi_mutex = xSemaphoreCreateMutex();
// 创建任务
xTaskCreate(sensorTaskWrapper, "SensorTask",
configMINIMAL_STACK_SIZE * 2, this,
SENSOR_TASK_PRIORITY, &sensor_task_handle);
xTaskCreate(actuatorTaskWrapper, "ActuatorTask",
configMINIMAL_STACK_SIZE * 2, this,
ACTUATOR_TASK_PRIORITY, &actuator_task_handle);
xTaskCreate(communicationTaskWrapper, "CommTask",
configMINIMAL_STACK_SIZE * 4, this,
COMMUNICATION_TASK_PRIORITY, &communication_task_handle);
xTaskCreate(watchdogTaskWrapper, "WatchdogTask",
configMINIMAL_STACK_SIZE, this,
WATCHDOG_TASK_PRIORITY, &watchdog_task_handle);
}
void startScheduler() {
vTaskStartScheduler();
}
private:
// 传感器任务
void sensorTask() {
SensorData data;
TickType_t last_wake_time = xTaskGetTickCount();
while (1) {
// 获取I2C互斥量
if (xSemaphoreTake(i2c_mutex, pdMS_TO_TICKS(100)) == pdTRUE) {
// 读取传感器数据
data = readAllSensors();
// 释放互斥量
xSemaphoreGive(i2c_mutex);
// 发送数据到队列
if (xQueueSend(sensor_data_queue, &data, 0) != pdTRUE) {
// 队列满,记录错误
logError("Sensor data queue full");
}
}
// 周期性执行(100ms)
vTaskDelayUntil(&last_wake_time, pdMS_TO_TICKS(100));
}
}
// 执行器任务
void actuatorTask() {
ActuatorCommand command;
while (1) {
// 等待命令
if (xQueueReceive(command_queue, &command, portMAX_DELAY) == pdTRUE) {
// 执行命令
executeActuatorCommand(command);
// 发送执行结果
sendCommandResult(command.id, true);
}
}
}
// 通信任务
void communicationTask() {
SensorData sensor_data;
TickType_t last_wake_time = xTaskGetTickCount();
while (1) {
// 处理接收到的数据
processIncomingMessages();
// 发送传感器数据
if (xQueueReceive(sensor_data_queue, &sensor_data, 0) == pdTRUE) {
sendSensorData(sensor_data);
}
// 周期性执行(500ms)
vTaskDelayUntil(&last_wake_time, pdMS_TO_TICKS(500));
}
}
// 看门狗任务
void watchdogTask() {
while (1) {
// 检查系统状态
if (checkSystemHealth()) {
// 喂狗
feedWatchdog();
} else {
// 系统异常,记录日志并重启
logError("System health check failed");
systemReset();
}
// 每秒检查一次
vTaskDelay(pdMS_TO_TICKS(1000));
}
}
// 静态包装函数
static void sensorTaskWrapper(void* parameter) {
static_cast<IoTTaskScheduler*>(parameter)->sensorTask();
}
static void actuatorTaskWrapper(void* parameter) {
static_cast<IoTTaskScheduler*>(parameter)->actuatorTask();
}
static void communicationTaskWrapper(void* parameter) {
static_cast<IoTTaskScheduler*>(parameter)->communicationTask();
}
static void watchdogTaskWrapper(void* parameter) {
static_cast<IoTTaskScheduler*>(parameter)->watchdogTask();
}
};
# 4.3 内存管理
// 示例:嵌入式系统内存管理
class MemoryManager {
private:
static const size_t HEAP_SIZE = 32768; // 32KB堆空间
static const size_t POOL_COUNT = 4;
struct MemoryPool {
void* pool_start;
size_t block_size;
size_t block_count;
uint8_t* free_blocks;
size_t free_count;
};
uint8_t heap_memory[HEAP_SIZE];
MemoryPool memory_pools[POOL_COUNT];
bool initialized;
public:
MemoryManager() : initialized(false) {}
void initialize() {
if (initialized) return;
// 初始化内存池
// 小块内存池 (32字节 x 64块)
initializePool(0, 32, 64);
// 中块内存池 (128字节 x 32块)
initializePool(1, 128, 32);
// 大块内存池 (512字节 x 16块)
initializePool(2, 512, 16);
// 超大块内存池 (1024字节 x 8块)
initializePool(3, 1024, 8);
initialized = true;
}
void* allocate(size_t size) {
if (!initialized) {
initialize();
}
// 选择合适的内存池
int pool_index = selectPool(size);
if (pool_index >= 0) {
return allocateFromPool(pool_index);
}
// 如果没有合适的内存池,使用系统malloc
return malloc(size);
}
void deallocate(void* ptr) {
if (!ptr) return;
// 检查是否属于内存池
for (int i = 0; i < POOL_COUNT; i++) {
if (belongsToPool(ptr, i)) {
deallocateFromPool(ptr, i);
return;
}
}
// 使用系统free
free(ptr);
}
// 内存使用统计
void getMemoryStats(size_t& total_free, size_t& largest_free) {
total_free = 0;
largest_free = 0;
for (int i = 0; i < POOL_COUNT; i++) {
size_t pool_free = memory_pools[i].free_count * memory_pools[i].block_size;
total_free += pool_free;
if (memory_pools[i].free_count > 0) {
largest_free = std::max(largest_free, memory_pools[i].block_size);
}
}
}
private:
void initializePool(int index, size_t block_size, size_t block_count) {
MemoryPool& pool = memory_pools[index];
// 计算所需内存大小
size_t total_size = block_size * block_count;
size_t bitmap_size = (block_count + 7) / 8; // 位图大小
// 分配内存
static size_t heap_offset = 0;
pool.pool_start = &heap_memory[heap_offset];
heap_offset += total_size;
pool.free_blocks = &heap_memory[heap_offset];
heap_offset += bitmap_size;
pool.block_size = block_size;
pool.block_count = block_count;
pool.free_count = block_count;
// 初始化位图(全部标记为空闲)
memset(pool.free_blocks, 0xFF, bitmap_size);
}
int selectPool(size_t size) {
for (int i = 0; i < POOL_COUNT; i++) {
if (size <= memory_pools[i].block_size &&
memory_pools[i].free_count > 0) {
return i;
}
}
return -1; // 没有合适的内存池
}
void* allocateFromPool(int pool_index) {
MemoryPool& pool = memory_pools[pool_index];
// 查找空闲块
for (size_t i = 0; i < pool.block_count; i++) {
size_t byte_index = i / 8;
size_t bit_index = i % 8;
if (pool.free_blocks[byte_index] & (1 << bit_index)) {
// 标记为已使用
pool.free_blocks[byte_index] &= ~(1 << bit_index);
pool.free_count--;
// 返回块地址
return static_cast<uint8_t*>(pool.pool_start) + i * pool.block_size;
}
}
return nullptr; // 没有空闲块
}
void deallocateFromPool(void* ptr, int pool_index) {
MemoryPool& pool = memory_pools[pool_index];
// 计算块索引
size_t offset = static_cast<uint8_t*>(ptr) - static_cast<uint8_t*>(pool.pool_start);
size_t block_index = offset / pool.block_size;
if (block_index < pool.block_count) {
size_t byte_index = block_index / 8;
size_t bit_index = block_index % 8;
// 标记为空闲
pool.free_blocks[byte_index] |= (1 << bit_index);
pool.free_count++;
}
}
bool belongsToPool(void* ptr, int pool_index) {
MemoryPool& pool = memory_pools[pool_index];
uint8_t* start = static_cast<uint8_t*>(pool.pool_start);
uint8_t* end = start + pool.block_size * pool.block_count;
return ptr >= start && ptr < end;
}
};
# 5. 边缘计算节点
# 5.1 边缘网关架构
graph TB
subgraph "边缘网关 Edge Gateway"
subgraph "设备接入层"
D1[Modbus设备]
D2[CAN总线设备]
D3[串口设备]
D4[以太网设备]
end
subgraph "协议转换层"
P1[Modbus适配器]
P2[CAN适配器]
P3[串口适配器]
P4[以太网适配器]
end
subgraph "数据处理层"
DP1[数据采集]
DP2[数据过滤]
DP3[数据聚合]
DP4[边缘计算]
end
subgraph "云端连接层"
C1[MQTT客户端]
C2[HTTP客户端]
C3[WebSocket客户端]
C4[自定义协议]
end
end
D1 --> P1
D2 --> P2
D3 --> P3
D4 --> P4
P1 --> DP1
P2 --> DP2
P3 --> DP3
P4 --> DP4
DP1 --> C1
DP2 --> C2
DP3 --> C3
DP4 --> C4
# 5.2 边缘计算实现
# 示例:边缘计算节点实现
import asyncio
import json
import time
from typing import Dict, List, Any
from dataclasses import dataclass
from abc import ABC, abstractmethod
@dataclass
class SensorReading:
device_id: str
sensor_type: str
value: float
timestamp: float
quality: int # 数据质量 0-100
class EdgeProcessor(ABC):
@abstractmethod
async def process(self, data: List[SensorReading]) -> Dict[str, Any]:
pass
class AnomalyDetector(EdgeProcessor):
def __init__(self, threshold_multiplier: float = 2.0):
self.threshold_multiplier = threshold_multiplier
self.history: Dict[str, List[float]] = {}
self.window_size = 50
async def process(self, data: List[SensorReading]) -> Dict[str, Any]:
anomalies = []
for reading in data:
key = f"{reading.device_id}_{reading.sensor_type}"
# 初始化历史数据
if key not in self.history:
self.history[key] = []
history = self.history[key]
# 异常检测(基于统计方法)
if len(history) >= 10:
mean = sum(history) / len(history)
variance = sum((x - mean) ** 2 for x in history) / len(history)
std_dev = variance ** 0.5
threshold = std_dev * self.threshold_multiplier
if abs(reading.value - mean) > threshold:
anomalies.append({
'device_id': reading.device_id,
'sensor_type': reading.sensor_type,
'value': reading.value,
'expected_range': [mean - threshold, mean + threshold],
'severity': min(abs(reading.value - mean) / threshold, 5.0)
})
# 更新历史数据
history.append(reading.value)
if len(history) > self.window_size:
history.pop(0)
return {
'type': 'anomaly_detection',
'timestamp': time.time(),
'anomalies': anomalies
}
class DataAggregator(EdgeProcessor):
def __init__(self, aggregation_window: int = 60):
self.aggregation_window = aggregation_window
self.data_buffer: Dict[str, List[SensorReading]] = {}
async def process(self, data: List[SensorReading]) -> Dict[str, Any]:
current_time = time.time()
aggregated_data = {}
# 按设备和传感器类型分组
for reading in data:
key = f"{reading.device_id}_{reading.sensor_type}"
if key not in self.data_buffer:
self.data_buffer[key] = []
self.data_buffer[key].append(reading)
# 清理过期数据
self.data_buffer[key] = [
r for r in self.data_buffer[key]
if current_time - r.timestamp <= self.aggregation_window
]
# 计算聚合统计
for key, readings in self.data_buffer.items():
if readings:
values = [r.value for r in readings]
aggregated_data[key] = {
'count': len(values),
'min': min(values),
'max': max(values),
'avg': sum(values) / len(values),
'latest': readings[-1].value,
'quality_avg': sum(r.quality for r in readings) / len(readings)
}
return {
'type': 'data_aggregation',
'timestamp': current_time,
'window_seconds': self.aggregation_window,
'aggregated_data': aggregated_data
}
class PredictiveAnalyzer(EdgeProcessor):
def __init__(self):
self.models = {} # 简化的预测模型
async def process(self, data: List[SensorReading]) -> Dict[str, Any]:
predictions = []
for reading in data:
# 简单的线性预测模型
prediction = await self.predict_next_value(reading)
if prediction is not None:
predictions.append({
'device_id': reading.device_id,
'sensor_type': reading.sensor_type,
'current_value': reading.value,
'predicted_value': prediction,
'prediction_horizon': 300, # 5分钟预测
'confidence': 0.8 # 简化的置信度
})
return {
'type': 'predictive_analysis',
'timestamp': time.time(),
'predictions': predictions
}
async def predict_next_value(self, reading: SensorReading) -> float:
# 简化的预测逻辑
# 实际应用中会使用机器学习模型
key = f"{reading.device_id}_{reading.sensor_type}"
if key not in self.models:
self.models[key] = {'history': [], 'trend': 0}
model = self.models[key]
model['history'].append(reading.value)
if len(model['history']) > 10:
model['history'].pop(0)
# 计算简单趋势
if len(model['history']) >= 2:
recent_trend = model['history'][-1] - model['history'][-2]
model['trend'] = 0.7 * model['trend'] + 0.3 * recent_trend
return reading.value + model['trend']
class EdgeComputingNode:
def __init__(self, node_id: str):
self.node_id = node_id
self.processors: List[EdgeProcessor] = []
self.data_queue = asyncio.Queue(maxsize=1000)
self.running = False
# 默认处理器
self.add_processor(AnomalyDetector())
self.add_processor(DataAggregator())
self.add_processor(PredictiveAnalyzer())
def add_processor(self, processor: EdgeProcessor):
self.processors.append(processor)
async def add_sensor_data(self, readings: List[SensorReading]):
try:
await self.data_queue.put(readings)
except asyncio.QueueFull:
print(f"Warning: Data queue full, dropping {len(readings)} readings")
async def start_processing(self):
self.running = True
while self.running:
try:
# 获取传感器数据
readings = await asyncio.wait_for(
self.data_queue.get(), timeout=1.0
)
# 并行处理数据
tasks = [
processor.process(readings)
for processor in self.processors
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Processor {i} error: {result}")
else:
await self.handle_processing_result(result)
except asyncio.TimeoutError:
# 超时,继续下一轮
continue
except Exception as e:
print(f"Processing error: {e}")
async def handle_processing_result(self, result: Dict[str, Any]):
# 处理计算结果
result_type = result.get('type')
if result_type == 'anomaly_detection':
anomalies = result.get('anomalies', [])
if anomalies:
await self.send_alert({
'type': 'anomaly_alert',
'node_id': self.node_id,
'anomalies': anomalies
})
elif result_type == 'data_aggregation':
# 发送聚合数据到云端
await self.send_to_cloud({
'type': 'aggregated_data',
'node_id': self.node_id,
'data': result
})
elif result_type == 'predictive_analysis':
predictions = result.get('predictions', [])
# 基于预测结果进行本地决策
await self.make_local_decisions(predictions)
async def send_alert(self, alert: Dict[str, Any]):
# 发送告警(实际实现会连接到告警系统)
print(f"ALERT: {json.dumps(alert, indent=2)}")
async def send_to_cloud(self, data: Dict[str, Any]):
# 发送数据到云端(实际实现会使用MQTT等协议)
print(f"TO_CLOUD: {data['type']} from {self.node_id}")
async def make_local_decisions(self, predictions: List[Dict[str, Any]]):
# 基于预测进行本地决策
for prediction in predictions:
predicted_value = prediction['predicted_value']
current_value = prediction['current_value']
# 简单的决策逻辑
if abs(predicted_value - current_value) > 10:
print(f"Local decision: Adjust {prediction['device_id']} "
f"based on prediction {predicted_value}")
def stop_processing(self):
self.running = False
# 使用示例
async def main():
# 创建边缘计算节点
edge_node = EdgeComputingNode("edge_001")
# 启动处理任务
processing_task = asyncio.create_task(edge_node.start_processing())
# 模拟传感器数据
for i in range(100):
readings = [
SensorReading(
device_id="sensor_001",
sensor_type="temperature",
value=25.0 + (i % 10) * 0.5,
timestamp=time.time(),
quality=95
),
SensorReading(
device_id="sensor_002",
sensor_type="humidity",
value=60.0 + (i % 20) * 0.3,
timestamp=time.time(),
quality=90
)
]
await edge_node.add_sensor_data(readings)
await asyncio.sleep(0.1)
# 停止处理
edge_node.stop_processing()
await processing_task
if __name__ == "__main__":
asyncio.run(main())
# 6. 设备固件开发
# 6.1 固件架构设计
// 示例:模块化固件架构
class IoTFirmware {
private:
// 系统组件
SystemConfig config;
HardwareAbstraction hal;
CommunicationManager comm;
SensorManager sensors;
ActuatorManager actuators;
PowerManager power;
SecurityManager security;
// 状态管理
enum SystemState {
INITIALIZING,
NORMAL_OPERATION,
LOW_POWER_MODE,
ERROR_STATE,
FIRMWARE_UPDATE
};
SystemState current_state;
uint32_t last_heartbeat;
public:
void initialize() {
// 1. 硬件初始化
hal.initialize();
// 2. 加载配置
config.load();
// 3. 初始化安全模块
security.initialize(config.getSecurityConfig());
// 4. 初始化通信
comm.initialize(config.getCommunicationConfig());
// 5. 初始化传感器
sensors.initialize(config.getSensorConfig());
// 6. 初始化执行器
actuators.initialize(config.getActuatorConfig());
// 7. 初始化电源管理
power.initialize(config.getPowerConfig());
current_state = NORMAL_OPERATION;
last_heartbeat = millis();
}
void mainLoop() {
while (true) {
uint32_t loop_start = millis();
switch (current_state) {
case NORMAL_OPERATION:
normalOperationLoop();
break;
case LOW_POWER_MODE:
lowPowerLoop();
break;
case ERROR_STATE:
errorHandlingLoop();
break;
case FIRMWARE_UPDATE:
firmwareUpdateLoop();
break;
default:
current_state = ERROR_STATE;
break;
}
// 看门狗喂狗
if (millis() - last_heartbeat > 30000) { // 30秒
systemReset();
}
// 控制循环频率
uint32_t loop_time = millis() - loop_start;
if (loop_time < 100) { // 100ms循环
delay(100 - loop_time);
}
}
}
private:
void normalOperationLoop() {
// 1. 读取传感器数据
auto sensor_data = sensors.readAll();
// 2. 处理数据
auto processed_data = processData(sensor_data);
// 3. 执行控制逻辑
auto control_commands = executeControlLogic(processed_data);
// 4. 控制执行器
actuators.executeCommands(control_commands);
// 5. 发送数据
comm.sendData(processed_data);
// 6. 处理接收到的命令
auto received_commands = comm.receiveCommands();
processReceivedCommands(received_commands);
// 7. 检查系统状态
checkSystemHealth();
// 8. 更新心跳
last_heartbeat = millis();
}
void lowPowerLoop() {
// 低功耗模式下的简化操作
// 1. 减少传感器采样频率
if (millis() % 10000 == 0) { // 每10秒采样一次
auto sensor_data = sensors.readCriticalSensors();
// 2. 检查是否需要唤醒
if (shouldWakeUp(sensor_data)) {
current_state = NORMAL_OPERATION;
power.exitLowPowerMode();
return;
}
// 3. 发送关键数据
comm.sendCriticalData(sensor_data);
}
// 4. 进入深度睡眠
power.enterDeepSleep(1000); // 睡眠1秒
}
void errorHandlingLoop() {
// 错误处理逻辑
// 1. 诊断系统问题
auto error_info = diagnoseProblem();
// 2. 尝试恢复
if (attemptRecovery(error_info)) {
current_state = NORMAL_OPERATION;
return;
}
// 3. 发送错误报告
comm.sendErrorReport(error_info);
// 4. 等待外部干预或自动重启
delay(5000);
// 5. 如果无法恢复,重启系统
systemReset();
}
void firmwareUpdateLoop() {
// 固件更新逻辑
// 1. 接收固件数据
auto firmware_chunk = comm.receiveFirmwareChunk();
if (firmware_chunk.isValid()) {
// 2. 验证数据完整性
if (security.verifyFirmwareChunk(firmware_chunk)) {
// 3. 写入Flash
hal.writeFirmwareChunk(firmware_chunk);
// 4. 发送确认
comm.sendUpdateProgress(firmware_chunk.sequence);
} else {
// 数据损坏,请求重传
comm.requestRetransmission(firmware_chunk.sequence);
}
}
// 5. 检查更新是否完成
if (hal.isFirmwareUpdateComplete()) {
// 验证完整固件
if (security.verifyCompleteFirmware()) {
// 重启到新固件
hal.rebootToNewFirmware();
} else {
// 固件损坏,回滚
hal.rollbackFirmware();
current_state = ERROR_STATE;
}
}
}
};
# 6.2 OTA固件更新
// 示例:OTA固件更新实现
class OTAManager {
private:
struct FirmwareInfo {
uint32_t version;
uint32_t size;
uint32_t crc32;
char description[64];
};
struct UpdateProgress {
uint32_t total_chunks;
uint32_t received_chunks;
uint32_t failed_chunks;
bool update_in_progress;
};
FirmwareInfo current_firmware;
FirmwareInfo target_firmware;
UpdateProgress progress;
static const uint32_t CHUNK_SIZE = 1024;
static const uint32_t MAX_RETRIES = 3;
public:
bool checkForUpdate() {
// 1. 查询服务器是否有新固件
auto server_info = queryServerForUpdate();
if (server_info.version > current_firmware.version) {
target_firmware = server_info;
return true;
}
return false;
}
bool startUpdate() {
if (target_firmware.version <= current_firmware.version) {
return false;
}
// 初始化更新进度
progress.total_chunks = (target_firmware.size + CHUNK_SIZE - 1) / CHUNK_SIZE;
progress.received_chunks = 0;
progress.failed_chunks = 0;
progress.update_in_progress = true;
// 请求开始更新
return requestUpdateStart();
}
bool processUpdateChunk(const uint8_t* data, uint32_t chunk_id, uint32_t size) {
if (!progress.update_in_progress) {
return false;
}
// 验证chunk数据
if (!validateChunk(data, chunk_id, size)) {
progress.failed_chunks++;
return false;
}
// 写入Flash
if (writeChunkToFlash(data, chunk_id, size)) {
progress.received_chunks++;
// 检查是否完成
if (progress.received_chunks >= progress.total_chunks) {
return finalizeUpdate();
}
return true;
}
progress.failed_chunks++;
return false;
}
float getUpdateProgress() {
if (!progress.update_in_progress) {
return 0.0;
}
return (float)progress.received_chunks / progress.total_chunks * 100.0;
}
private:
bool finalizeUpdate() {
// 验证完整固件
if (verifyCompleteFirmware()) {
// 标记新固件为有效
markFirmwareValid();
// 重启到新固件
scheduleReboot();
progress.update_in_progress = false;
return true;
}
// 固件验证失败,清理
cleanupFailedUpdate();
progress.update_in_progress = false;
return false;
}
};
# 7. 电源管理
# 7.1 低功耗设计
// 示例:智能电源管理系统
class PowerManager {
private:
enum PowerMode {
ACTIVE_MODE,
IDLE_MODE,
SLEEP_MODE,
DEEP_SLEEP_MODE,
HIBERNATION_MODE
};
PowerMode current_mode;
uint32_t battery_level;
uint32_t last_activity_time;
bool external_power_available;
public:
void initialize() {
current_mode = ACTIVE_MODE;
battery_level = readBatteryLevel();
last_activity_time = millis();
external_power_available = checkExternalPower();
// 配置低功耗模式
configureLowPowerModes();
}
void updatePowerState() {
uint32_t current_time = millis();
uint32_t idle_time = current_time - last_activity_time;
// 更新电池状态
battery_level = readBatteryLevel();
external_power_available = checkExternalPower();
// 根据条件选择功耗模式
PowerMode target_mode = selectOptimalPowerMode(idle_time);
if (target_mode != current_mode) {
transitionToPowerMode(target_mode);
}
}
void recordActivity() {
last_activity_time = millis();
// 如果在低功耗模式,唤醒到活跃模式
if (current_mode != ACTIVE_MODE) {
transitionToPowerMode(ACTIVE_MODE);
}
}
private:
PowerMode selectOptimalPowerMode(uint32_t idle_time) {
// 电池电量过低,进入休眠模式
if (battery_level < 10 && !external_power_available) {
return HIBERNATION_MODE;
}
// 根据空闲时间选择模式
if (idle_time < 5000) { // 5秒内
return ACTIVE_MODE;
} else if (idle_time < 30000) { // 30秒内
return IDLE_MODE;
} else if (idle_time < 300000) { // 5分钟内
return SLEEP_MODE;
} else { // 超过5分钟
return DEEP_SLEEP_MODE;
}
}
void transitionToPowerMode(PowerMode new_mode) {
// 退出当前模式
exitCurrentMode();
// 进入新模式
switch (new_mode) {
case ACTIVE_MODE:
enterActiveMode();
break;
case IDLE_MODE:
enterIdleMode();
break;
case SLEEP_MODE:
enterSleepMode();
break;
case DEEP_SLEEP_MODE:
enterDeepSleepMode();
break;
case HIBERNATION_MODE:
enterHibernationMode();
break;
}
current_mode = new_mode;
}
void enterActiveMode() {
// 启用所有外设
enableAllPeripherals();
// 设置最高时钟频率
setCPUFrequency(80000000); // 80MHz
// 启用所有传感器
enableAllSensors();
}
void enterIdleMode() {
// 降低CPU频率
setCPUFrequency(40000000); // 40MHz
// 关闭非必要外设
disableNonEssentialPeripherals();
}
void enterSleepMode() {
// 进一步降低频率
setCPUFrequency(8000000); // 8MHz
// 关闭更多外设
disableMorePeripherals();
// 减少传感器采样频率
reduceSensorSampling();
}
void enterDeepSleepMode() {
// 保存关键状态
saveSystemState();
// 关闭大部分外设
disableMostPeripherals();
// 只保留关键传感器
enableOnlyCriticalSensors();
// 设置唤醒条件
configureWakeupSources();
// 进入深度睡眠
esp_deep_sleep(60000000); // 睡眠60秒
}
void enterHibernationMode() {
// 保存所有状态到非易失性存储
saveAllStateToNVS();
// 关闭所有非必要功能
shutdownAllNonEssential();
// 设置最小唤醒条件
configureMinimalWakeup();
// 进入最深度睡眠
esp_deep_sleep(3600000000); // 睡眠1小时
}
};
# 8. 最佳实践总结
# 8.1 设计原则
- 模块化设计: 将功能分解为独立模块
- 低功耗优先: 在设计阶段就考虑功耗优化
- 可靠性保证: 实现故障检测和恢复机制
- 安全性内置: 从硬件到软件的全方位安全
- 可维护性: 支持远程诊断和固件更新
# 8.2 开发建议
- 硬件选型: 根据应用需求选择合适的MCU和传感器
- 实时性保证: 使用RTOS确保任务调度的实时性
- 内存优化: 合理管理内存,避免内存泄漏
- 通信可靠: 实现重传机制和错误检测
- 测试充分: 进行压力测试和长期稳定性测试
# 9. 下一步学习
完成本章学习后,建议继续学习:
恭喜您完成了设备层架构学习! 🎉
💡 下一步: 继续学习网络通信层,深入了解物联网设备间的通信机制。