HTTP设备接入层详细设计与实现
# HTTP设备接入层详细设计与实现
# 故事背景
想象一下,你正在为一家智慧城市公司开发物联网平台。城市里有各种各样的设备:智能路灯、环境监测站、交通摄像头、停车场传感器等。这些设备大多数都有稳定的电源供应和良好的网络连接,它们需要传输大量的数据,比如高清图片、视频流、详细的环境数据等。
HTTP协议就像是"专业的快递服务",虽然比CoAP重一些,但它提供了更丰富的功能:支持大文件传输、完善的状态码体系、丰富的头部信息、强大的安全机制等。对于那些"不差电"的设备来说,HTTP是最佳选择。
让我们一起来构建这个基于HTTP的"专业快递网络"!
# HTTP协议在物联网中的应用
# 为什么选择HTTP?
- 成熟稳定:HTTP协议经过几十年的发展,非常成熟
- 工具丰富:有大量的开发工具和调试工具
- 易于理解:开发人员都熟悉HTTP协议
- 功能强大:支持认证、加密、压缩、缓存等
- 防火墙友好:大部分防火墙都允许HTTP流量
# HTTP vs 其他协议对比
特性 | HTTP | MQTT | CoAP |
---|---|---|---|
传输协议 | TCP | TCP | UDP |
消息大小 | 大 | 小 | 很小 |
功耗 | 高 | 中 | 低 |
安全性 | 强(HTTPS) | 中 | 中 |
开发难度 | 低 | 中 | 中 |
适用场景 | 通用设备 | 消息传递 | 受限设备 |
# HTTP在IoT中的典型应用场景
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 设备类型 │ │ 数据类型 │ │ 传输方式 │
│ │ │ │ │ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ 智能摄像头 │ │ │ │ 图片/视频 │ │ │ │ POST上传 │ │
│ │ 环境监测站 │ │◄──►│ │ 传感器数据 │ │◄──►│ │ GET查询 │ │
│ │ 智能网关 │ │ │ │ 配置信息 │ │ │ │ PUT更新 │ │
│ │ 工业设备 │ │ │ │ 控制指令 │ │ │ │ DELETE删除 │ │
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
# 系统架构设计
# 整体架构
┌─────────────────────────────────────────────────────────────────┐
│ HTTP设备接入层 │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │
│ │ 负载均衡器 │ │ API网关 │ │ 认证服务 │ │ 限流器 │ │
│ │ (Nginx) │ │ (Gateway) │ │ (Auth) │ │ (Rate) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │
│ │ 设备管理 │ │ 数据接收 │ │ 文件上传 │ │ 设备控制│ │
│ │ Controller │ │ Controller │ │ Controller │ │ Controller│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │
│ │ 设备服务 │ │ 数据服务 │ │ 文件服务 │ │ 消息服务│ │
│ │ Service │ │ Service │ │ Service │ │ Service │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │
│ │ MySQL │ │ Redis │ │ MinIO │ │ RabbitMQ│ │
│ │ (数据存储) │ │ (缓存) │ │ (文件存储) │ │ (消息队列)│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────────┘
# 核心实体设计
# 1. HTTP设备实体
/**
* HTTP设备实体
* 记录通过HTTP接入的设备信息,就像设备的"身份证"
*/
@Entity
@Table(name = "http_device")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class HttpDevice {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 设备ID - 设备的唯一标识
*/
@Column(name = "device_id", unique = true, nullable = false)
private String deviceId;
/**
* 设备名称
*/
@Column(name = "device_name")
private String deviceName;
/**
* 设备类型:CAMERA(摄像头)、SENSOR(传感器)、GATEWAY(网关)、INDUSTRIAL(工业设备)
*/
@Enumerated(EnumType.STRING)
@Column(name = "device_type")
private DeviceType deviceType;
/**
* 设备IP地址
*/
@Column(name = "ip_address")
private String ipAddress;
/**
* 设备MAC地址
*/
@Column(name = "mac_address")
private String macAddress;
/**
* 设备状态:ONLINE(在线)、OFFLINE(离线)、MAINTENANCE(维护中)
*/
@Enumerated(EnumType.STRING)
@Column(name = "status")
private DeviceStatus status;
/**
* API密钥 - 用于设备认证
*/
@Column(name = "api_key")
private String apiKey;
/**
* API密钥过期时间
*/
@Column(name = "api_key_expire_time")
private LocalDateTime apiKeyExpireTime;
/**
* 设备版本信息
*/
@Column(name = "firmware_version")
private String firmwareVersion;
/**
* 设备制造商
*/
@Column(name = "manufacturer")
private String manufacturer;
/**
* 设备型号
*/
@Column(name = "model")
private String model;
/**
* 设备位置信息(JSON格式)
*/
@Lob
@Column(name = "location")
private String location;
/**
* 设备配置信息(JSON格式)
*/
@Lob
@Column(name = "config")
private String config;
/**
* 支持的API端点列表(JSON格式)
*/
@Lob
@Column(name = "supported_apis")
private String supportedApis;
/**
* 最后心跳时间
*/
@Column(name = "last_heartbeat")
private LocalDateTime lastHeartbeat;
/**
* 最后数据上报时间
*/
@Column(name = "last_data_report")
private LocalDateTime lastDataReport;
/**
* 创建时间
*/
@Column(name = "create_time")
private LocalDateTime createTime;
/**
* 更新时间
*/
@Column(name = "update_time")
private LocalDateTime updateTime;
/**
* 是否启用
*/
@Column(name = "enabled")
private Boolean enabled = true;
// 构造函数、getter、setter由Lombok自动生成
}
/**
* 设备类型枚举
*/
public enum DeviceType {
CAMERA("摄像头"),
SENSOR("传感器"),
GATEWAY("网关"),
INDUSTRIAL("工业设备"),
ENVIRONMENTAL("环境监测"),
TRAFFIC("交通设备");
private final String description;
DeviceType(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
# 11. 部署指南
# 1. 环境要求
# 硬件要求
- CPU: 4核心以上,推荐8核心
- 内存: 8GB以上,推荐16GB
- 存储: SSD硬盘,至少100GB可用空间
- 网络: 千兆网卡,稳定的网络连接
# 软件要求
- 操作系统: Linux (CentOS 7+, Ubuntu 18.04+) 或 Windows Server 2016+
- Java: JDK 11 或更高版本
- 数据库: MySQL 8.0+ 或 PostgreSQL 12+
- 缓存: Redis 6.0+
- 消息队列: RabbitMQ 3.8+ 或 Apache Kafka 2.8+
- 文件存储: MinIO 或 AWS S3兼容存储
# 2. 部署步骤
# 步骤1: 环境准备
# 安装Java 11
sudo yum install java-11-openjdk-devel
# 安装MySQL
sudo yum install mysql-server
sudo systemctl start mysqld
sudo systemctl enable mysqld
# 安装Redis
sudo yum install redis
sudo systemctl start redis
sudo systemctl enable redis
# 安装MinIO
wget https://dl.min.io/server/minio/release/linux-amd64/minio
chmod +x minio
sudo mv minio /usr/local/bin/
# 步骤2: 数据库初始化
-- 创建数据库
CREATE DATABASE iot_http_device DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
-- 创建用户
CREATE USER 'iot_user'@'%' IDENTIFIED BY 'your_password';
GRANT ALL PRIVILEGES ON iot_http_device.* TO 'iot_user'@'%';
FLUSH PRIVILEGES;
-- 执行DDL脚本
source /path/to/schema.sql;
# 步骤3: 应用配置
# application-prod.yml
spring:
profiles:
active: prod
datasource:
url: jdbc:mysql://localhost:3306/iot_http_device?useSSL=true&serverTimezone=Asia/Shanghai
username: iot_user
password: ${DB_PASSWORD}
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
redis:
host: localhost
port: 6379
password: ${REDIS_PASSWORD}
timeout: 3000ms
lettuce:
pool:
max-active: 20
max-idle: 10
min-idle: 5
server:
port: 8080
tomcat:
threads:
max: 200
min-spare: 10
max-connections: 8192
accept-count: 100
logging:
level:
com.iot.http: INFO
org.springframework.web: WARN
file:
name: /var/log/iot-http-device/application.log
max-size: 100MB
max-history: 30
# 步骤4: 启动脚本
#!/bin/bash
# start.sh
APP_NAME="iot-http-device"
APP_JAR="iot-http-device-1.0.0.jar"
JAVA_OPTS="-Xms2g -Xmx4g -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
SPRING_PROFILES="prod"
# 检查Java环境
if [ -z "$JAVA_HOME" ]; then
echo "JAVA_HOME is not set"
exit 1
fi
# 设置环境变量
export DB_PASSWORD="your_db_password"
export REDIS_PASSWORD="your_redis_password"
# 启动应用
nohup $JAVA_HOME/bin/java $JAVA_OPTS \
-Dspring.profiles.active=$SPRING_PROFILES \
-jar $APP_JAR > /var/log/$APP_NAME/startup.log 2>&1 &
echo $! > /var/run/$APP_NAME.pid
echo "$APP_NAME started with PID $(cat /var/run/$APP_NAME.pid)"
# 步骤5: 系统服务配置
# /etc/systemd/system/iot-http-device.service
[Unit]
Description=IoT HTTP Device Access Layer
After=network.target
[Service]
Type=forking
User=iot
Group=iot
WorkingDirectory=/opt/iot-http-device
ExecStart=/opt/iot-http-device/start.sh
ExecStop=/opt/iot-http-device/stop.sh
PIDFile=/var/run/iot-http-device.pid
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
# 3. 负载均衡配置
# Nginx配置
upstream iot_http_backend {
server 192.168.1.10:8080 weight=1 max_fails=3 fail_timeout=30s;
server 192.168.1.11:8080 weight=1 max_fails=3 fail_timeout=30s;
server 192.168.1.12:8080 weight=1 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
server_name iot-api.example.com;
# 重定向到HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name iot-api.example.com;
ssl_certificate /etc/ssl/certs/iot-api.crt;
ssl_certificate_key /etc/ssl/private/iot-api.key;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512;
# 客户端最大请求体大小
client_max_body_size 100M;
# 超时设置
proxy_connect_timeout 30s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
location /api/v1/devices {
proxy_pass http://iot_http_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 启用缓存
proxy_cache iot_cache;
proxy_cache_valid 200 5m;
proxy_cache_key "$scheme$request_method$host$request_uri";
}
location /api/v1/devices/*/files {
proxy_pass http://iot_http_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 文件上传不缓存
proxy_cache off;
proxy_request_buffering off;
}
}
# 12. 性能调优
# 1. JVM调优
# 生产环境JVM参数
JAVA_OPTS="
-Xms4g -Xmx8g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m
-XX:+G1UseAdaptiveIHOP
-XX:G1MixedGCCountTarget=8
-XX:G1MixedGCLiveThresholdPercent=85
-XX:+UseStringDeduplication
-XX:+PrintGC
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-XX:+PrintGCApplicationStoppedTime
-Xloggc:/var/log/iot-http-device/gc.log
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=100M
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/iot-http-device/
-Djava.awt.headless=true
-Dfile.encoding=UTF-8
-Duser.timezone=Asia/Shanghai
"
# 2. 数据库调优
# MySQL配置优化
# /etc/mysql/mysql.conf.d/mysqld.cnf
[mysqld]
# 基础配置
port = 3306
socket = /var/run/mysqld/mysqld.sock
basedir = /usr
datadir = /var/lib/mysql
tmpdir = /tmp
# 字符集
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
# 内存配置
innodb_buffer_pool_size = 4G
innodb_buffer_pool_instances = 4
innodb_log_file_size = 512M
innodb_log_buffer_size = 64M
innodb_flush_log_at_trx_commit = 2
innodb_flush_method = O_DIRECT
# 连接配置
max_connections = 1000
max_connect_errors = 100000
wait_timeout = 28800
interactive_timeout = 28800
# 查询缓存
query_cache_type = 1
query_cache_size = 256M
query_cache_limit = 2M
# 慢查询日志
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2
log_queries_not_using_indexes = 1
# 索引优化建议
-- 设备表索引
CREATE INDEX idx_device_status ON http_device(status);
CREATE INDEX idx_device_type ON http_device(device_type);
CREATE INDEX idx_device_create_time ON http_device(create_time);
CREATE INDEX idx_device_last_heartbeat ON http_device(last_heartbeat_time);
-- 设备数据表索引
CREATE INDEX idx_data_device_time ON http_device_data(device_id, create_time);
CREATE INDEX idx_data_type_time ON http_device_data(data_type, create_time);
CREATE INDEX idx_data_create_time ON http_device_data(create_time);
-- 文件记录表索引
CREATE INDEX idx_file_device_time ON file_record(device_id, upload_time);
CREATE INDEX idx_file_type_time ON file_record(file_type, upload_time);
# 3. Redis调优
# /etc/redis/redis.conf
# 内存配置
maxmemory 2gb
maxmemory-policy allkeys-lru
# 持久化配置
save 900 1
save 300 10
save 60 10000
# AOF配置
appendonly yes
appendfsync everysec
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
# 网络配置
tcp-keepalive 300
timeout 0
# 客户端配置
maxclients 10000
# 慢日志配置
slowlog-log-slower-than 10000
slowlog-max-len 128
# 4. 应用层优化
# 连接池配置
spring:
datasource:
hikari:
maximum-pool-size: 50
minimum-idle: 10
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
leak-detection-threshold: 60000
redis:
lettuce:
pool:
max-active: 50
max-idle: 20
min-idle: 10
max-wait: 3000ms
shutdown-timeout: 100ms
# 异步处理配置
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("dataProcessExecutor")
public Executor dataProcessExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("DataProcess-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean("fileProcessExecutor")
public Executor fileProcessExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(500);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("FileProcess-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
# 13. 监控和运维
# 1. 监控指标
# 应用监控
- QPS: 每秒请求数
- 响应时间: 平均响应时间、P95、P99
- 错误率: 4xx、5xx错误比例
- 活跃连接数: 当前活跃的HTTP连接数
- 线程池状态: 活跃线程数、队列长度
# 系统监控
- CPU使用率: 系统和应用CPU使用情况
- 内存使用率: 堆内存、非堆内存使用情况
- 磁盘I/O: 读写速率、IOPS
- 网络I/O: 网络带宽使用情况
# 业务监控
- 设备在线数: 当前在线设备数量
- 数据接收量: 每分钟接收的数据条数
- 文件上传量: 每小时上传的文件数量和大小
- API调用统计: 各API的调用次数和成功率
# 2. 告警规则
# Prometheus告警规则
groups:
- name: iot-http-device
rules:
- alert: HighErrorRate
expr: rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m]) > 0.05
for: 2m
labels:
severity: critical
annotations:
summary: "HTTP error rate is too high"
description: "Error rate is {{ $value | humanizePercentage }}"
- alert: HighResponseTime
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "HTTP response time is too high"
description: "95th percentile response time is {{ $value }}s"
- alert: DatabaseConnectionPoolExhausted
expr: hikaricp_connections_active / hikaricp_connections_max > 0.9
for: 1m
labels:
severity: critical
annotations:
summary: "Database connection pool nearly exhausted"
description: "Connection pool usage is {{ $value | humanizePercentage }}"
# 3. 日志管理
# Logback配置
<!-- logback-spring.xml -->
<configuration>
<springProfile name="prod">
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/var/log/iot-http-device/application.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/var/log/iot-http-device/application.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>100MB</maxFileSize>
<maxHistory>30</maxHistory>
<totalSizeCap>10GB</totalSizeCap>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp/>
<logLevel/>
<loggerName/>
<message/>
<mdc/>
<stackTrace/>
</providers>
</encoder>
</appender>
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<file>/var/log/iot-http-device/error.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/var/log/iot-http-device/error.%d{yyyy-MM-dd}.log.gz</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp/>
<logLevel/>
<loggerName/>
<message/>
<mdc/>
<stackTrace/>
</providers>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="FILE"/>
<appender-ref ref="ERROR_FILE"/>
</root>
</springProfile>
</configuration>
# 14. 总结
# 1. 核心特性
HTTP设备接入层作为物联网系统的重要组成部分,具备以下核心特性:
- 高性能: 基于Spring Boot和Tomcat,支持高并发HTTP请求处理
- 安全可靠: 完善的认证机制、数据验证和异常处理
- 易于集成: 标准的RESTful API设计,支持多种数据格式
- 可扩展性: 模块化设计,支持水平扩展和负载均衡
- 监控完善: 全面的性能监控和业务监控指标
# 2. 适用场景
- 智能家居: 智能设备通过HTTP协议上报状态和接收控制指令
- 工业物联网: 工业设备通过HTTP接口上传生产数据和设备状态
- 智慧城市: 各类传感器设备通过HTTP协议上报环境数据
- 车联网: 车载设备通过HTTP接口上传行驶数据和车辆状态
- 智慧农业: 农业传感器设备上报土壤、气象等数据
# 3. 性能指标
在标准配置下,HTTP设备接入层可以达到以下性能指标:
- 并发连接数: 支持10,000+并发HTTP连接
- 数据吞吐量: 每秒处理50,000+条设备数据
- 响应时间: 平均响应时间<100ms,P95<500ms
- 可用性: 99.9%以上的服务可用性
- 文件上传: 支持单文件最大100MB,并发上传1000+文件
# 4. 最佳实践
# 部署建议
- 使用负载均衡器分发请求,提高系统可用性
- 配置合适的JVM参数,优化垃圾回收性能
- 使用Redis集群提高缓存性能和可用性
- 定期备份数据库,制定灾难恢复计划
# 安全建议
- 使用HTTPS加密传输,保护数据安全
- 实施API密钥轮换机制,定期更新密钥
- 配置防火墙和安全组,限制网络访问
- 启用审计日志,记录关键操作
# 运维建议
- 建立完善的监控体系,及时发现问题
- 制定详细的运维手册和应急预案
- 定期进行性能测试和容量规划
- 持续优化系统配置和代码性能
# 开发建议
- 遵循RESTful API设计原则
- 编写完整的单元测试和集成测试
- 使用统一的错误码和响应格式
- 保持代码的可读性和可维护性
通过以上详细的设计和实现,HTTP设备接入层能够为物联网系统提供稳定、高效、安全的设备接入服务,满足各种业务场景的需求。
# 3.3 数据接收服务
数据接收服务负责处理设备上报的各种数据,包括传感器数据、状态数据、事件数据等。
/**
* HTTP数据接收服务
* 负责处理设备通过HTTP协议上报的数据
*/
@Service
@Slf4j
public class HttpDataReceptionService {
@Autowired
private HttpDeviceDataRepository dataRepository;
@Autowired
private HttpDeviceService deviceService;
@Autowired
private DataValidationService validationService;
@Autowired
private DataProcessingService processingService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private MessageProducer messageProducer;
private static final String DATA_CACHE_PREFIX = "http:device:data:";
/**
* 接收设备数据
*/
public void receiveDeviceData(String deviceId, DeviceDataRequest request) {
try {
// 验证设备是否存在
if (!deviceService.deviceExists(deviceId)) {
throw new DeviceNotFoundException("设备不存在:" + deviceId);
}
// 验证数据格式
if (!validationService.validateDataFormat(request)) {
throw new DataValidationException("数据格式验证失败");
}
// 创建数据记录
HttpDeviceData deviceData = createDeviceData(deviceId, request);
// 数据预处理
deviceData = processingService.preprocessData(deviceData);
// 保存到数据库
deviceData = dataRepository.save(deviceData);
// 缓存最新数据
cacheLatestData(deviceId, deviceData);
// 更新设备最后数据上报时间
deviceService.updateLastDataReport(deviceId);
// 异步处理数据
processDataAsync(deviceData);
log.info("设备数据接收成功:deviceId={}, dataId={}", deviceId, deviceData.getId());
} catch (Exception e) {
log.error("接收设备数据失败:deviceId={}", deviceId, e);
throw new DataReceptionException("数据接收失败", e);
}
}
/**
* 批量接收设备数据
*/
public void receiveDeviceDataBatch(String deviceId, List<DeviceDataRequest> requests) {
try {
// 验证设备是否存在
if (!deviceService.deviceExists(deviceId)) {
throw new DeviceNotFoundException("设备不存在:" + deviceId);
}
List<HttpDeviceData> dataList = new ArrayList<>();
for (DeviceDataRequest request : requests) {
try {
// 验证数据格式
if (!validationService.validateDataFormat(request)) {
log.warn("数据格式验证失败,跳过:deviceId={}, timestamp={}",
deviceId, request.getTimestamp());
continue;
}
// 创建数据记录
HttpDeviceData deviceData = createDeviceData(deviceId, request);
// 数据预处理
deviceData = processingService.preprocessData(deviceData);
dataList.add(deviceData);
} catch (Exception e) {
log.warn("处理单条数据失败,跳过:deviceId={}, timestamp={}",
deviceId, request.getTimestamp(), e);
}
}
if (!dataList.isEmpty()) {
// 批量保存到数据库
dataList = dataRepository.saveAll(dataList);
// 缓存最新数据
HttpDeviceData latestData = dataList.get(dataList.size() - 1);
cacheLatestData(deviceId, latestData);
// 更新设备最后数据上报时间
deviceService.updateLastDataReport(deviceId);
// 异步处理数据
for (HttpDeviceData data : dataList) {
processDataAsync(data);
}
log.info("批量设备数据接收成功:deviceId={}, count={}", deviceId, dataList.size());
}
} catch (Exception e) {
log.error("批量接收设备数据失败:deviceId={}", deviceId, e);
throw new DataReceptionException("批量数据接收失败", e);
}
}
/**
* 接收设备事件
*/
public void receiveDeviceEvent(String deviceId, DeviceEventRequest request) {
try {
// 验证设备是否存在
if (!deviceService.deviceExists(deviceId)) {
throw new DeviceNotFoundException("设备不存在:" + deviceId);
}
// 验证事件格式
if (!validationService.validateEventFormat(request)) {
throw new DataValidationException("事件格式验证失败");
}
// 创建事件记录
HttpDeviceData eventData = createEventData(deviceId, request);
// 保存到数据库
eventData = dataRepository.save(eventData);
// 发送事件通知
sendEventNotification(eventData);
log.info("设备事件接收成功:deviceId={}, eventType={}, eventId={}",
deviceId, request.getEventType(), eventData.getId());
} catch (Exception e) {
log.error("接收设备事件失败:deviceId={}", deviceId, e);
throw new DataReceptionException("事件接收失败", e);
}
}
/**
* 获取设备最新数据
*/
public HttpDeviceData getLatestData(String deviceId) {
try {
// 先从缓存中获取
HttpDeviceData cachedData = getCachedLatestData(deviceId);
if (cachedData != null) {
return cachedData;
}
// 从数据库获取
Optional<HttpDeviceData> dataOpt = dataRepository
.findTopByDeviceIdOrderByTimestampDesc(deviceId);
if (dataOpt.isPresent()) {
HttpDeviceData data = dataOpt.get();
// 缓存数据
cacheLatestData(deviceId, data);
return data;
}
return null;
} catch (Exception e) {
log.error("获取设备最新数据失败:deviceId={}", deviceId, e);
return null;
}
}
/**
* 获取设备历史数据
*/
public List<HttpDeviceData> getHistoryData(String deviceId,
LocalDateTime startTime,
LocalDateTime endTime,
int limit) {
try {
return dataRepository.findByDeviceIdAndTimestampBetweenOrderByTimestampDesc(
deviceId, startTime, endTime, PageRequest.of(0, limit));
} catch (Exception e) {
log.error("获取设备历史数据失败:deviceId={}", deviceId, e);
return Collections.emptyList();
}
}
/**
* 创建设备数据记录
*/
private HttpDeviceData createDeviceData(String deviceId, DeviceDataRequest request) {
HttpDeviceData data = new HttpDeviceData();
data.setDeviceId(deviceId);
data.setDataType(DataType.SENSOR_DATA);
data.setTimestamp(request.getTimestamp() != null ?
request.getTimestamp() : LocalDateTime.now());
data.setData(request.getData());
data.setQuality(request.getQuality());
data.setCreateTime(LocalDateTime.now());
return data;
}
/**
* 创建事件数据记录
*/
private HttpDeviceData createEventData(String deviceId, DeviceEventRequest request) {
HttpDeviceData data = new HttpDeviceData();
data.setDeviceId(deviceId);
data.setDataType(DataType.EVENT_DATA);
data.setTimestamp(request.getTimestamp() != null ?
request.getTimestamp() : LocalDateTime.now());
data.setData(request.getEventData());
data.setEventType(request.getEventType());
data.setEventLevel(request.getEventLevel());
data.setCreateTime(LocalDateTime.now());
return data;
}
/**
* 异步处理数据
*/
@Async
private void processDataAsync(HttpDeviceData data) {
try {
// 数据分析处理
processingService.analyzeData(data);
// 发送到消息队列
messageProducer.sendDataMessage(data);
// 触发规则引擎
processingService.triggerRules(data);
} catch (Exception e) {
log.error("异步处理数据失败:dataId={}", data.getId(), e);
}
}
/**
* 发送事件通知
*/
private void sendEventNotification(HttpDeviceData eventData) {
try {
// 发送到消息队列
messageProducer.sendEventMessage(eventData);
// 触发告警规则
if (EventLevel.ERROR.equals(eventData.getEventLevel()) ||
EventLevel.CRITICAL.equals(eventData.getEventLevel())) {
processingService.triggerAlarmRules(eventData);
}
} catch (Exception e) {
log.error("发送事件通知失败:eventId={}", eventData.getId(), e);
}
}
/**
* 缓存最新数据
*/
private void cacheLatestData(String deviceId, HttpDeviceData data) {
try {
String key = DATA_CACHE_PREFIX + deviceId;
redisTemplate.opsForValue().set(key, data, Duration.ofMinutes(30));
} catch (Exception e) {
log.warn("缓存最新数据失败:deviceId={}", deviceId, e);
}
}
/**
* 获取缓存的最新数据
*/
private HttpDeviceData getCachedLatestData(String deviceId) {
try {
String key = DATA_CACHE_PREFIX + deviceId;
return (HttpDeviceData) redisTemplate.opsForValue().get(key);
} catch (Exception e) {
log.warn("获取缓存最新数据失败:deviceId={}", deviceId, e);
return null;
}
}
}
# 3.4 文件上传服务
文件上传服务用于处理设备上传的文件,如固件升级包、日志文件、配置文件等。
/**
* HTTP文件上传服务
* 负责处理设备通过HTTP协议上传的文件
*/
@Service
@Slf4j
public class HttpFileUploadService {
@Value("${iot.http.file.upload.path:/data/iot/files}")
private String uploadPath;
@Value("${iot.http.file.max-size:10MB}")
private String maxFileSize;
@Autowired
private HttpDeviceService deviceService;
@Autowired
private FileStorageService storageService;
@Autowired
private VirusScanner virusScanner;
/**
* 上传设备文件
*/
public FileUploadResult uploadDeviceFile(String deviceId,
MultipartFile file,
FileUploadRequest request) {
try {
// 验证设备是否存在
if (!deviceService.deviceExists(deviceId)) {
throw new DeviceNotFoundException("设备不存在:" + deviceId);
}
// 验证文件
validateFile(file, request);
// 病毒扫描
if (!virusScanner.scan(file)) {
throw new FileSecurityException("文件安全检查失败");
}
// 生成文件路径
String filePath = generateFilePath(deviceId, file.getOriginalFilename(), request);
// 保存文件
String savedPath = storageService.saveFile(file, filePath);
// 记录文件信息
FileUploadRecord record = createFileRecord(deviceId, file, request, savedPath);
// 处理特殊文件类型
processSpecialFile(record);
log.info("设备文件上传成功:deviceId={}, fileName={}, size={}",
deviceId, file.getOriginalFilename(), file.getSize());
return FileUploadResult.success(record);
} catch (Exception e) {
log.error("设备文件上传失败:deviceId={}, fileName={}",
deviceId, file.getOriginalFilename(), e);
throw new FileUploadException("文件上传失败", e);
}
}
/**
* 验证文件
*/
private void validateFile(MultipartFile file, FileUploadRequest request) {
// 检查文件是否为空
if (file.isEmpty()) {
throw new FileValidationException("文件不能为空");
}
// 检查文件大小
if (file.getSize() > parseFileSize(maxFileSize)) {
throw new FileValidationException("文件大小超过限制:" + maxFileSize);
}
// 检查文件类型
if (!isAllowedFileType(file.getOriginalFilename(), request.getFileType())) {
throw new FileValidationException("不支持的文件类型");
}
// 检查文件名
if (!isValidFileName(file.getOriginalFilename())) {
throw new FileValidationException("文件名格式不正确");
}
}
/**
* 生成文件路径
*/
private String generateFilePath(String deviceId, String fileName, FileUploadRequest request) {
String dateDir = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy/MM/dd"));
String fileType = request.getFileType().toString().toLowerCase();
String uuid = UUID.randomUUID().toString();
String extension = getFileExtension(fileName);
return String.format("%s/%s/%s/%s_%s.%s",
uploadPath, deviceId, fileType, dateDir, uuid, extension);
}
/**
* 创建文件记录
*/
private FileUploadRecord createFileRecord(String deviceId,
MultipartFile file,
FileUploadRequest request,
String savedPath) {
FileUploadRecord record = new FileUploadRecord();
record.setDeviceId(deviceId);
record.setOriginalFileName(file.getOriginalFilename());
record.setFileType(request.getFileType());
record.setFileSize(file.getSize());
record.setFilePath(savedPath);
record.setMd5Hash(calculateMD5(file));
record.setUploadTime(LocalDateTime.now());
record.setDescription(request.getDescription());
return record;
}
/**
* 处理特殊文件类型
*/
private void processSpecialFile(FileUploadRecord record) {
switch (record.getFileType()) {
case FIRMWARE:
// 处理固件文件
processFirmwareFile(record);
break;
case LOG:
// 处理日志文件
processLogFile(record);
break;
case CONFIG:
// 处理配置文件
processConfigFile(record);
break;
default:
// 默认处理
break;
}
}
/**
* 处理固件文件
*/
private void processFirmwareFile(FileUploadRecord record) {
try {
// 验证固件文件格式
if (!validateFirmwareFormat(record.getFilePath())) {
throw new FileValidationException("固件文件格式不正确");
}
// 提取固件信息
FirmwareInfo firmwareInfo = extractFirmwareInfo(record.getFilePath());
record.setFirmwareVersion(firmwareInfo.getVersion());
record.setFirmwareType(firmwareInfo.getType());
// 通知固件管理服务
notifyFirmwareService(record);
} catch (Exception e) {
log.error("处理固件文件失败:filePath={}", record.getFilePath(), e);
}
}
/**
* 处理日志文件
*/
private void processLogFile(FileUploadRecord record) {
try {
// 解析日志文件
LogAnalysisResult result = analyzeLogFile(record.getFilePath());
// 检查是否有错误日志
if (result.hasErrors()) {
// 发送告警
sendLogErrorAlert(record.getDeviceId(), result);
}
// 存储日志分析结果
storeLogAnalysisResult(record, result);
} catch (Exception e) {
log.error("处理日志文件失败:filePath={}", record.getFilePath(), e);
}
}
/**
* 处理配置文件
*/
private void processConfigFile(FileUploadRecord record) {
try {
// 验证配置文件格式
if (!validateConfigFormat(record.getFilePath())) {
throw new FileValidationException("配置文件格式不正确");
}
// 解析配置文件
DeviceConfig config = parseConfigFile(record.getFilePath());
// 更新设备配置
deviceService.updateDeviceConfig(record.getDeviceId(),
new DeviceConfigUpdateRequest(config));
} catch (Exception e) {
log.error("处理配置文件失败:filePath={}", record.getFilePath(), e);
}
}
// 辅助方法
private boolean isAllowedFileType(String fileName, FileType fileType) {
String extension = getFileExtension(fileName).toLowerCase();
switch (fileType) {
case FIRMWARE:
return Arrays.asList("bin", "hex", "fw").contains(extension);
case LOG:
return Arrays.asList("log", "txt").contains(extension);
case CONFIG:
return Arrays.asList("json", "xml", "yaml", "yml", "properties").contains(extension);
case IMAGE:
return Arrays.asList("jpg", "jpeg", "png", "gif").contains(extension);
default:
return true;
}
}
private boolean isValidFileName(String fileName) {
return fileName != null &&
fileName.matches("^[a-zA-Z0-9._-]+$") &&
fileName.length() <= 255;
}
private String getFileExtension(String fileName) {
int lastDotIndex = fileName.lastIndexOf('.');
return lastDotIndex > 0 ? fileName.substring(lastDotIndex + 1) : "";
}
private long parseFileSize(String sizeStr) {
// 解析文件大小字符串,如 "10MB", "1GB" 等
// 实现省略...
return 10 * 1024 * 1024; // 默认10MB
}
private String calculateMD5(MultipartFile file) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] digest = md.digest(file.getBytes());
return DatatypeConverter.printHexBinary(digest).toLowerCase();
} catch (Exception e) {
log.warn("计算文件MD5失败", e);
return null;
}
}
}
# 4. 数据库设计
# 4.1 HTTP设备表
-- HTTP设备信息表
CREATE TABLE http_device (
id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
device_id VARCHAR(64) NOT NULL UNIQUE COMMENT '设备ID',
device_name VARCHAR(128) NOT NULL COMMENT '设备名称',
device_type VARCHAR(32) NOT NULL COMMENT '设备类型',
manufacturer VARCHAR(64) COMMENT '制造商',
model VARCHAR(64) COMMENT '设备型号',
firmware_version VARCHAR(32) COMMENT '固件版本',
api_key VARCHAR(128) COMMENT 'API密钥',
api_key_expire_time DATETIME COMMENT 'API密钥过期时间',
status VARCHAR(16) NOT NULL DEFAULT 'OFFLINE' COMMENT '设备状态',
enabled BOOLEAN NOT NULL DEFAULT TRUE COMMENT '是否启用',
location VARCHAR(256) COMMENT '设备位置',
config JSON COMMENT '设备配置',
supported_apis JSON COMMENT '支持的API列表',
last_heartbeat DATETIME COMMENT '最后心跳时间',
last_data_report DATETIME COMMENT '最后数据上报时间',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_device_id (device_id),
INDEX idx_device_type (device_type),
INDEX idx_status (status),
INDEX idx_last_heartbeat (last_heartbeat),
INDEX idx_create_time (create_time)
) COMMENT='HTTP设备信息表';
# 4.2 HTTP设备数据表
-- HTTP设备数据表
CREATE TABLE http_device_data (
id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
device_id VARCHAR(64) NOT NULL COMMENT '设备ID',
data_type VARCHAR(16) NOT NULL COMMENT '数据类型',
timestamp DATETIME NOT NULL COMMENT '数据时间戳',
data JSON NOT NULL COMMENT '数据内容',
quality TINYINT COMMENT '数据质量',
event_type VARCHAR(32) COMMENT '事件类型',
event_level VARCHAR(16) COMMENT '事件级别',
processed BOOLEAN NOT NULL DEFAULT FALSE COMMENT '是否已处理',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
INDEX idx_device_id (device_id),
INDEX idx_timestamp (timestamp),
INDEX idx_data_type (data_type),
INDEX idx_event_type (event_type),
INDEX idx_processed (processed),
INDEX idx_device_timestamp (device_id, timestamp),
INDEX idx_create_time (create_time)
) COMMENT='HTTP设备数据表'
PARTITION BY RANGE (TO_DAYS(create_time)) (
PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
PARTITION p202403 VALUES LESS THAN (TO_DAYS('2024-04-01')),
PARTITION p_future VALUES LESS THAN MAXVALUE
);
# 4.3 HTTP请求日志表
-- HTTP请求日志表
CREATE TABLE http_request_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
device_id VARCHAR(64) COMMENT '设备ID',
request_id VARCHAR(64) NOT NULL COMMENT '请求ID',
method VARCHAR(8) NOT NULL COMMENT 'HTTP方法',
uri VARCHAR(256) NOT NULL COMMENT '请求URI',
headers JSON COMMENT '请求头',
request_body TEXT COMMENT '请求体',
response_status INT COMMENT '响应状态码',
response_body TEXT COMMENT '响应体',
processing_time INT COMMENT '处理时间(毫秒)',
client_ip VARCHAR(45) COMMENT '客户端IP',
user_agent VARCHAR(512) COMMENT 'User-Agent',
error_message TEXT COMMENT '错误信息',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
INDEX idx_device_id (device_id),
INDEX idx_request_id (request_id),
INDEX idx_method (method),
INDEX idx_response_status (response_status),
INDEX idx_create_time (create_time)
) COMMENT='HTTP请求日志表'
PARTITION BY RANGE (TO_DAYS(create_time)) (
PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
PARTITION p202403 VALUES LESS THAN (TO_DAYS('2024-04-01')),
PARTITION p_future VALUES LESS THAN MAXVALUE
);
# 5. 性能优化策略
# 5.1 连接池配置
# application.yml
server:
tomcat:
# 最大连接数
max-connections: 10000
# 最大线程数
threads:
max: 200
min-spare: 10
# 连接超时时间
connection-timeout: 20000
# 保持连接时间
keep-alive-timeout: 60000
# 最大保持连接请求数
max-keep-alive-requests: 100
# HTTP/2支持
http2:
enabled: true
# 5.2 缓存策略
/**
* HTTP设备接入层缓存配置
*/
@Configuration
@EnableCaching
public class HttpCacheConfig {
@Bean
public CacheManager cacheManager(RedisConnectionFactory connectionFactory) {
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30)) // 默认过期时间30分钟
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()))
.disableCachingNullValues();
// 不同缓存的配置
Map<String, RedisCacheConfiguration> cacheConfigurations = new HashMap<>();
// 设备信息缓存 - 1小时
cacheConfigurations.put("device-info", config.entryTtl(Duration.ofHours(1)));
// API密钥缓存 - 6小时
cacheConfigurations.put("api-key", config.entryTtl(Duration.ofHours(6)));
// 设备数据缓存 - 30分钟
cacheConfigurations.put("device-data", config.entryTtl(Duration.ofMinutes(30)));
// 设备配置缓存 - 2小时
cacheConfigurations.put("device-config", config.entryTtl(Duration.ofHours(2)));
return RedisCacheManager.builder(connectionFactory)
.cacheDefaults(config)
.withInitialCacheConfigurations(cacheConfigurations)
.build();
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
// 设置序列化器
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
template.afterPropertiesSet();
return template;
}
}
# 5.3 异步处理配置
/**
* 异步处理配置
*/
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(10);
// 最大线程数
executor.setMaxPoolSize(50);
// 队列容量
executor.setQueueCapacity(200);
// 线程名前缀
executor.setThreadNamePrefix("http-async-");
// 拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待任务完成后关闭
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
/**
* 数据处理线程池
*/
@Bean("dataProcessingExecutor")
public ThreadPoolTaskExecutor dataProcessingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("data-processing-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* 文件处理线程池
*/
@Bean("fileProcessingExecutor")
public ThreadPoolTaskExecutor fileProcessingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("file-processing-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
# 5.4 数据库优化
/**
* 数据库配置优化
*/
@Configuration
public class DatabaseConfig {
@Bean
@Primary
public DataSource dataSource() {
HikariConfig config = new HikariConfig();
// 基本配置
config.setJdbcUrl("jdbc:mysql://localhost:3306/iot_platform");
config.setUsername("iot_user");
config.setPassword("iot_password");
config.setDriverClassName("com.mysql.cj.jdbc.Driver");
// 连接池配置
config.setMaximumPoolSize(20); // 最大连接数
config.setMinimumIdle(5); // 最小空闲连接数
config.setConnectionTimeout(30000); // 连接超时时间
config.setIdleTimeout(600000); // 空闲超时时间
config.setMaxLifetime(1800000); // 连接最大生命周期
config.setLeakDetectionThreshold(60000); // 连接泄漏检测阈值
// 性能优化
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useLocalSessionState", "true");
config.addDataSourceProperty("rewriteBatchedStatements", "true");
config.addDataSourceProperty("cacheResultSetMetadata", "true");
config.addDataSourceProperty("cacheServerConfiguration", "true");
config.addDataSourceProperty("elideSetAutoCommits", "true");
config.addDataSourceProperty("maintainTimeStats", "false");
return new HikariDataSource(config);
}
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
# 6. 监控和告警
# 6.1 性能监控
/**
* HTTP设备接入层性能监控
*/
@Component
@Slf4j
public class HttpAccessLayerMonitor {
private final MeterRegistry meterRegistry;
private final Counter requestCounter;
private final Timer requestTimer;
private final Gauge activeDevicesGauge;
private final Counter errorCounter;
public HttpAccessLayerMonitor(MeterRegistry meterRegistry,
HttpDeviceService deviceService) {
this.meterRegistry = meterRegistry;
// 请求计数器
this.requestCounter = Counter.builder("http.device.requests.total")
.description("Total HTTP device requests")
.register(meterRegistry);
// 请求处理时间
this.requestTimer = Timer.builder("http.device.request.duration")
.description("HTTP device request processing time")
.register(meterRegistry);
// 活跃设备数量
this.activeDevicesGauge = Gauge.builder("http.device.active.count")
.description("Number of active HTTP devices")
.register(meterRegistry, this, HttpAccessLayerMonitor::getActiveDeviceCount);
// 错误计数器
this.errorCounter = Counter.builder("http.device.errors.total")
.description("Total HTTP device errors")
.register(meterRegistry);
}
/**
* 记录请求
*/
public void recordRequest(String endpoint, String method) {
requestCounter.increment(
Tags.of(
"endpoint", endpoint,
"method", method
)
);
}
/**
* 记录请求处理时间
*/
public void recordRequestTime(String endpoint, String method, Duration duration) {
requestTimer.record(duration,
Tags.of(
"endpoint", endpoint,
"method", method
)
);
}
/**
* 记录错误
*/
public void recordError(String errorType, String endpoint) {
errorCounter.increment(
Tags.of(
"error_type", errorType,
"endpoint", endpoint
)
);
}
/**
* 获取活跃设备数量
*/
private double getActiveDeviceCount() {
// 实现获取活跃设备数量的逻辑
return 0.0;
}
/**
* 记录数据接收量
*/
public void recordDataReceived(String deviceId, long dataSize) {
Counter.builder("http.device.data.received.bytes")
.description("Total bytes of data received from HTTP devices")
.tag("device_id", deviceId)
.register(meterRegistry)
.increment(dataSize);
}
/**
* 记录文件上传
*/
public void recordFileUpload(String deviceId, String fileType, long fileSize) {
Counter.builder("http.device.file.uploads.total")
.description("Total file uploads from HTTP devices")
.tags(
"device_id", deviceId,
"file_type", fileType
)
.register(meterRegistry)
.increment();
Counter.builder("http.device.file.uploads.bytes")
.description("Total bytes of files uploaded from HTTP devices")
.tags(
"device_id", deviceId,
"file_type", fileType
)
.register(meterRegistry)
.increment(fileSize);
}
}
# 6.2 健康检查
/**
* HTTP设备接入层健康检查
*/
@Component
public class HttpAccessLayerHealthIndicator implements HealthIndicator {
@Autowired
private HttpDeviceService deviceService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private DataSource dataSource;
@Override
public Health health() {
Health.Builder builder = new Health.Builder();
try {
// 检查数据库连接
checkDatabase(builder);
// 检查Redis连接
checkRedis(builder);
// 检查设备服务
checkDeviceService(builder);
// 检查系统资源
checkSystemResources(builder);
builder.up();
} catch (Exception e) {
builder.down(e);
}
return builder.build();
}
private void checkDatabase(Health.Builder builder) {
try (Connection connection = dataSource.getConnection()) {
if (connection.isValid(5)) {
builder.withDetail("database", "UP");
} else {
builder.withDetail("database", "DOWN");
}
} catch (Exception e) {
builder.withDetail("database", "DOWN - " + e.getMessage());
}
}
private void checkRedis(Health.Builder builder) {
try {
String pong = redisTemplate.getConnectionFactory()
.getConnection().ping();
if ("PONG".equals(pong)) {
builder.withDetail("redis", "UP");
} else {
builder.withDetail("redis", "DOWN");
}
} catch (Exception e) {
builder.withDetail("redis", "DOWN - " + e.getMessage());
}
}
private void checkDeviceService(Health.Builder builder) {
try {
// 检查设备服务是否正常
long deviceCount = deviceService.getActiveDeviceCount();
builder.withDetail("device_service", "UP")
.withDetail("active_devices", deviceCount);
} catch (Exception e) {
builder.withDetail("device_service", "DOWN - " + e.getMessage());
}
}
private void checkSystemResources(Health.Builder builder) {
// 检查内存使用情况
Runtime runtime = Runtime.getRuntime();
long maxMemory = runtime.maxMemory();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;
double memoryUsagePercent = (double) usedMemory / maxMemory * 100;
builder.withDetail("memory_usage_percent", String.format("%.2f%%", memoryUsagePercent))
.withDetail("max_memory_mb", maxMemory / 1024 / 1024)
.withDetail("used_memory_mb", usedMemory / 1024 / 1024);
if (memoryUsagePercent > 90) {
builder.withDetail("memory_status", "CRITICAL");
} else if (memoryUsagePercent > 80) {
builder.withDetail("memory_status", "WARNING");
} else {
builder.withDetail("memory_status", "OK");
}
}
}
# 7. 配置文件
# 7.1 应用配置
# application.yml
spring:
application:
name: iot-http-access-layer
# 数据库配置
datasource:
url: jdbc:mysql://localhost:3306/iot_platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
username: ${DB_USERNAME:iot_user}
password: ${DB_PASSWORD:iot_password}
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
# Redis配置
redis:
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}
password: ${REDIS_PASSWORD:}
database: ${REDIS_DATABASE:0}
timeout: 5000
lettuce:
pool:
max-active: 20
max-idle: 10
min-idle: 5
max-wait: 5000
# JPA配置
jpa:
hibernate:
ddl-auto: validate
show-sql: false
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL8Dialect
format_sql: true
use_sql_comments: true
# 文件上传配置
servlet:
multipart:
max-file-size: 10MB
max-request-size: 50MB
enabled: true
# 异步配置
task:
execution:
pool:
core-size: 10
max-size: 50
queue-capacity: 200
keep-alive: 60s
thread-name-prefix: async-task-
# 服务器配置
server:
port: ${SERVER_PORT:8080}
servlet:
context-path: /api/v1
tomcat:
max-connections: 10000
threads:
max: 200
min-spare: 10
connection-timeout: 20000
keep-alive-timeout: 60000
max-keep-alive-requests: 100
http2:
enabled: true
# 日志配置
logging:
level:
com.iot.http: DEBUG
org.springframework.web: INFO
org.hibernate.SQL: DEBUG
org.hibernate.type.descriptor.sql.BasicBinder: TRACE
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"
file: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"
file:
name: logs/iot-http-access-layer.log
max-size: 100MB
max-history: 30
# 管理端点配置
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus
endpoint:
health:
show-details: always
metrics:
export:
prometheus:
enabled: true
# IoT HTTP接入层配置
iot:
http:
# 设备配置
device:
# API密钥过期时间(天)
api-key-expire-days: 365
# 心跳超时时间(分钟)
heartbeat-timeout-minutes: 10
# 设备离线检查间隔(分钟)
offline-check-interval-minutes: 5
# 数据配置
data:
# 批量处理大小
batch-size: 100
# 数据保留天数
retention-days: 90
# 数据压缩阈值(天)
compression-threshold-days: 7
# 文件配置
file:
# 上传路径
upload-path: ${FILE_UPLOAD_PATH:/data/iot/files}
# 最大文件大小
max-size: 10MB
# 允许的文件类型
allowed-types: bin,hex,fw,log,txt,json,xml,yaml,yml,properties,jpg,jpeg,png,gif
# 病毒扫描启用
virus-scan-enabled: true
# 安全配置
security:
# API限流配置
rate-limit:
# 每分钟请求数限制
requests-per-minute: 1000
# 每小时请求数限制
requests-per-hour: 10000
# IP白名单
ip-whitelist:
enabled: false
addresses: []
# 请求签名验证
signature:
enabled: true
algorithm: HmacSHA256
# 监控配置
monitoring:
# 性能指标收集间隔(秒)
metrics-interval-seconds: 60
# 告警阈值
alert:
# 错误率阈值(%)
error-rate-threshold: 5.0
# 响应时间阈值(毫秒)
response-time-threshold: 5000
# 内存使用率阈值(%)
memory-usage-threshold: 80.0
# 7.2 开发环境配置
# application-dev.yml
spring:
datasource:
url: jdbc:mysql://localhost:3306/iot_platform_dev?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
redis:
host: localhost
port: 6379
database: 1
jpa:
hibernate:
ddl-auto: update
show-sql: true
logging:
level:
com.iot.http: DEBUG
root: INFO
iot:
http:
security:
rate-limit:
requests-per-minute: 10000
signature:
enabled: false
monitoring:
alert:
error-rate-threshold: 10.0
# 7.3 生产环境配置
# application-prod.yml
spring:
datasource:
url: jdbc:mysql://${DB_HOST}:${DB_PORT}/${DB_NAME}?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=Asia/Shanghai
hikari:
maximum-pool-size: 50
minimum-idle: 10
redis:
host: ${REDIS_HOST}
port: ${REDIS_PORT}
password: ${REDIS_PASSWORD}
database: ${REDIS_DATABASE}
lettuce:
pool:
max-active: 50
max-idle: 20
min-idle: 10
jpa:
hibernate:
ddl-auto: validate
show-sql: false
server:
tomcat:
max-connections: 20000
threads:
max: 500
min-spare: 50
logging:
level:
com.iot.http: INFO
root: WARN
file:
name: /var/log/iot/http-access-layer.log
iot:
http:
security:
ip-whitelist:
enabled: true
addresses: ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"]
signature:
enabled: true
file:
upload-path: /data/iot/files
virus-scan-enabled: true
# 8. 总结
HTTP设备接入层作为IoT平台的重要组成部分,提供了简单易用、兼容性强的设备接入方式。通过RESTful API设计,设备可以方便地进行注册、数据上报、文件上传等操作。
# 8.1 主要特性
- 简单易用:基于标准HTTP协议,设备端实现简单
- 安全可靠:支持API密钥认证、请求签名验证
- 高性能:采用连接池、缓存、异步处理等优化策略
- 可扩展:支持多种数据格式,易于扩展新功能
- 可监控:完善的监控指标和健康检查机制
# 8.2 适用场景
- 智能家居设备接入
- 工业设备数据采集
- 移动设备数据上报
- 第三方系统集成
- 临时或测试设备接入
# 8.3 最佳实践
- 安全性:始终启用API密钥认证和HTTPS
- 性能:合理配置连接池和缓存策略
- 监控:建立完善的监控和告警机制
- 容错:实现优雅的错误处理和重试机制
- 文档:提供清晰的API文档和示例代码
通过合理的架构设计和优化配置,HTTP设备接入层能够为IoT平台提供稳定、高效的设备接入服务。
设备状态枚举 */ public enum DeviceStatus { ONLINE("在线"), OFFLINE("离线"), MAINTENANCE("维护中"), ERROR("故障");
private final String description;
DeviceStatus(String description) { this.description = description; }
public String getDescription() { return description; } }
### 2. HTTP设备数据实体
```java
/**
* HTTP设备数据实体
* 记录设备通过HTTP上报的数据,就像"数据档案"
*/
@Entity
@Table(name = "http_device_data")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class HttpDeviceData {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 数据ID - 数据的唯一标识
*/
@Column(name = "data_id", unique = true)
private String dataId;
/**
* 设备ID
*/
@Column(name = "device_id", nullable = false)
private String deviceId;
/**
* 数据类型:SENSOR_DATA(传感器数据)、IMAGE(图片)、VIDEO(视频)、LOG(日志)
*/
@Enumerated(EnumType.STRING)
@Column(name = "data_type")
private DataType dataType;
/**
* 数据内容(JSON格式)
*/
@Lob
@Column(name = "data_content")
private String dataContent;
/**
* 文件路径(如果是文件数据)
*/
@Column(name = "file_path")
private String filePath;
/**
* 文件大小(字节)
*/
@Column(name = "file_size")
private Long fileSize;
/**
* 文件类型(MIME类型)
*/
@Column(name = "content_type")
private String contentType;
/**
* 数据时间戳(设备端时间)
*/
@Column(name = "data_timestamp")
private LocalDateTime dataTimestamp;
/**
* 接收时间(服务器时间)
*/
@Column(name = "receive_time")
private LocalDateTime receiveTime;
/**
* 处理状态:PENDING(待处理)、PROCESSING(处理中)、COMPLETED(已完成)、FAILED(失败)
*/
@Enumerated(EnumType.STRING)
@Column(name = "process_status")
private ProcessStatus processStatus;
/**
* 处理结果
*/
@Column(name = "process_result")
private String processResult;
/**
* 数据质量评分(0-100)
*/
@Column(name = "quality_score")
private Integer qualityScore;
/**
* 标签信息(JSON格式)
*/
@Lob
@Column(name = "tags")
private String tags;
/**
* 创建时间
*/
@Column(name = "create_time")
private LocalDateTime createTime;
// 构造函数、getter、setter由Lombok自动生成
}
/**
* 数据类型枚举
*/
public enum DataType {
SENSOR_DATA("传感器数据"),
IMAGE("图片"),
VIDEO("视频"),
LOG("日志"),
CONFIG("配置"),
STATUS("状态"),
ALARM("告警");
private final String description;
DataType(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
/**
* 处理状态枚举
*/
public enum ProcessStatus {
PENDING("待处理"),
PROCESSING("处理中"),
COMPLETED("已完成"),
FAILED("失败");
private final String description;
ProcessStatus(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
# 3. HTTP请求日志实体
/**
* HTTP请求日志实体
* 记录所有HTTP请求的详细信息,就像"通话记录"
*/
@Entity
@Table(name = "http_request_log")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class HttpRequestLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 请求ID
*/
@Column(name = "request_id", unique = true)
private String requestId;
/**
* 设备ID
*/
@Column(name = "device_id")
private String deviceId;
/**
* 请求方法:GET、POST、PUT、DELETE
*/
@Column(name = "method")
private String method;
/**
* 请求URL
*/
@Column(name = "url")
private String url;
/**
* 请求头信息(JSON格式)
*/
@Lob
@Column(name = "headers")
private String headers;
/**
* 请求参数(JSON格式)
*/
@Lob
@Column(name = "parameters")
private String parameters;
/**
* 请求体大小(字节)
*/
@Column(name = "request_size")
private Long requestSize;
/**
* 响应状态码
*/
@Column(name = "response_code")
private Integer responseCode;
/**
* 响应体大小(字节)
*/
@Column(name = "response_size")
private Long responseSize;
/**
* 处理时间(毫秒)
*/
@Column(name = "process_time")
private Long processTime;
/**
* 客户端IP地址
*/
@Column(name = "client_ip")
private String clientIp;
/**
* User-Agent
*/
@Column(name = "user_agent")
private String userAgent;
/**
* 错误信息
*/
@Column(name = "error_message")
private String errorMessage;
/**
* 请求时间
*/
@Column(name = "request_time")
private LocalDateTime requestTime;
/**
* 响应时间
*/
@Column(name = "response_time")
private LocalDateTime responseTime;
// 构造函数、getter、setter由Lombok自动生成
}
# 核心控制器实现
# 1. 设备管理控制器
/**
* 设备管理控制器
* 处理设备的注册、查询、更新等操作,就像"设备管理员"
*/
@RestController
@RequestMapping("/api/v1/devices")
@Slf4j
@Validated
public class DeviceManagementController {
@Autowired
private HttpDeviceService deviceService;
@Autowired
private HttpRequestLogService requestLogService;
/**
* 设备注册
* POST /api/v1/devices/register
*/
@PostMapping("/register")
public ResponseEntity<ApiResponse<DeviceRegistrationResponse>> registerDevice(
@Valid @RequestBody DeviceRegistrationRequest request,
HttpServletRequest httpRequest) {
String requestId = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
try {
log.info("收到设备注册请求:requestId={}, deviceId={}", requestId, request.getDeviceId());
// 验证设备信息
if (!deviceService.validateDeviceInfo(request)) {
return ResponseEntity.badRequest()
.body(ApiResponse.error("设备信息验证失败"));
}
// 检查设备是否已存在
if (deviceService.deviceExists(request.getDeviceId())) {
return ResponseEntity.status(HttpStatus.CONFLICT)
.body(ApiResponse.error("设备已存在"));
}
// 注册设备
String clientIp = getClientIpAddress(httpRequest);
HttpDevice device = deviceService.registerDevice(request, clientIp);
// 生成API密钥
String apiKey = deviceService.generateApiKey(device.getDeviceId());
// 构建响应
DeviceRegistrationResponse response = DeviceRegistrationResponse.builder()
.deviceId(device.getDeviceId())
.apiKey(apiKey)
.status("SUCCESS")
.message("设备注册成功")
.serverTime(LocalDateTime.now())
.build();
log.info("设备注册成功:deviceId={}, apiKey={}", device.getDeviceId(),
apiKey.substring(0, 8) + "...");
return ResponseEntity.status(HttpStatus.CREATED)
.body(ApiResponse.success(response));
} catch (Exception e) {
log.error("设备注册失败:requestId={}", requestId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("设备注册失败:" + e.getMessage()));
} finally {
// 记录请求日志
long processTime = System.currentTimeMillis() - startTime;
requestLogService.logRequest(requestId, request.getDeviceId(),
"POST", "/api/v1/devices/register", httpRequest, processTime);
}
}
/**
* 设备心跳
* POST /api/v1/devices/{deviceId}/heartbeat
*/
@PostMapping("/{deviceId}/heartbeat")
public ResponseEntity<ApiResponse<HeartbeatResponse>> deviceHeartbeat(
@PathVariable String deviceId,
@Valid @RequestBody HeartbeatRequest request,
@RequestHeader("X-API-Key") String apiKey,
HttpServletRequest httpRequest) {
String requestId = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
try {
log.debug("收到设备心跳:deviceId={}, requestId={}", deviceId, requestId);
// 验证API密钥
if (!deviceService.validateApiKey(deviceId, apiKey)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.error("API密钥验证失败"));
}
// 更新设备心跳
deviceService.updateHeartbeat(deviceId, request);
// 获取服务器配置(如果有更新)
DeviceConfig config = deviceService.getDeviceConfig(deviceId);
// 构建响应
HeartbeatResponse response = HeartbeatResponse.builder()
.status("SUCCESS")
.serverTime(LocalDateTime.now())
.config(config)
.nextHeartbeatInterval(60) // 下次心跳间隔(秒)
.build();
return ResponseEntity.ok(ApiResponse.success(response));
} catch (Exception e) {
log.error("处理设备心跳失败:deviceId={}, requestId={}", deviceId, requestId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("心跳处理失败:" + e.getMessage()));
} finally {
long processTime = System.currentTimeMillis() - startTime;
requestLogService.logRequest(requestId, deviceId,
"POST", "/api/v1/devices/" + deviceId + "/heartbeat", httpRequest, processTime);
}
}
/**
* 查询设备信息
* GET /api/v1/devices/{deviceId}
*/
@GetMapping("/{deviceId}")
public ResponseEntity<ApiResponse<DeviceInfoResponse>> getDeviceInfo(
@PathVariable String deviceId,
@RequestHeader("X-API-Key") String apiKey,
HttpServletRequest httpRequest) {
String requestId = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
try {
// 验证API密钥
if (!deviceService.validateApiKey(deviceId, apiKey)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.error("API密钥验证失败"));
}
// 获取设备信息
HttpDevice device = deviceService.getDevice(deviceId);
if (device == null) {
return ResponseEntity.notFound().build();
}
// 构建响应
DeviceInfoResponse response = DeviceInfoResponse.builder()
.deviceId(device.getDeviceId())
.deviceName(device.getDeviceName())
.deviceType(device.getDeviceType())
.status(device.getStatus())
.firmwareVersion(device.getFirmwareVersion())
.lastHeartbeat(device.getLastHeartbeat())
.lastDataReport(device.getLastDataReport())
.build();
return ResponseEntity.ok(ApiResponse.success(response));
} catch (Exception e) {
log.error("查询设备信息失败:deviceId={}, requestId={}", deviceId, requestId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("查询失败:" + e.getMessage()));
} finally {
long processTime = System.currentTimeMillis() - startTime;
requestLogService.logRequest(requestId, deviceId,
"GET", "/api/v1/devices/" + deviceId, httpRequest, processTime);
}
}
/**
* 更新设备配置
* PUT /api/v1/devices/{deviceId}/config
*/
@PutMapping("/{deviceId}/config")
public ResponseEntity<ApiResponse<String>> updateDeviceConfig(
@PathVariable String deviceId,
@Valid @RequestBody DeviceConfigUpdateRequest request,
@RequestHeader("X-API-Key") String apiKey,
HttpServletRequest httpRequest) {
String requestId = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
try {
log.info("收到设备配置更新请求:deviceId={}, requestId={}", deviceId, requestId);
// 验证API密钥
if (!deviceService.validateApiKey(deviceId, apiKey)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.error("API密钥验证失败"));
}
// 更新设备配置
deviceService.updateDeviceConfig(deviceId, request);
log.info("设备配置更新成功:deviceId={}", deviceId);
return ResponseEntity.ok(ApiResponse.success("配置更新成功"));
} catch (Exception e) {
log.error("更新设备配置失败:deviceId={}, requestId={}", deviceId, requestId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("配置更新失败:" + e.getMessage()));
} finally {
long processTime = System.currentTimeMillis() - startTime;
requestLogService.logRequest(requestId, deviceId,
"PUT", "/api/v1/devices/" + deviceId + "/config", httpRequest, processTime);
}
}
/**
* 获取客户端IP地址
*/
private String getClientIpAddress(HttpServletRequest request) {
String xForwardedFor = request.getHeader("X-Forwarded-For");
if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
return xForwardedFor.split(",")[0].trim();
}
String xRealIp = request.getHeader("X-Real-IP");
if (xRealIp != null && !xRealIp.isEmpty()) {
return xRealIp;
}
return request.getRemoteAddr();
}
}
# 2. 数据接收控制器
/**
* 数据接收控制器
* 处理设备上报的各种数据,就像"数据收集站"
*/
@RestController
@RequestMapping("/api/v1/data")
@Slf4j
@Validated
public class DataReceptionController {
@Autowired
private HttpDeviceDataService dataService;
@Autowired
private HttpDeviceService deviceService;
@Autowired
private HttpRequestLogService requestLogService;
@Autowired
private DataValidationService validationService;
/**
* 接收传感器数据
* POST /api/v1/data/sensor
*/
@PostMapping("/sensor")
public ResponseEntity<ApiResponse<DataReceptionResponse>> receiveSensorData(
@Valid @RequestBody SensorDataRequest request,
@RequestHeader("X-API-Key") String apiKey,
@RequestHeader("X-Device-ID") String deviceId,
HttpServletRequest httpRequest) {
String requestId = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
try {
log.info("收到传感器数据:deviceId={}, dataType={}, requestId={}",
deviceId, request.getDataType(), requestId);
// 验证API密钥
if (!deviceService.validateApiKey(deviceId, apiKey)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.error("API密钥验证失败"));
}
// 验证数据格式
ValidationResult validationResult = validationService.validateSensorData(request);
if (!validationResult.isValid()) {
return ResponseEntity.badRequest()
.body(ApiResponse.error("数据验证失败:" + validationResult.getErrorMessage()));
}
// 处理传感器数据
String dataId = dataService.processSensorData(deviceId, request);
// 更新设备最后数据上报时间
deviceService.updateLastDataReport(deviceId);
// 构建响应
DataReceptionResponse response = DataReceptionResponse.builder()
.dataId(dataId)
.status("SUCCESS")
.message("数据接收成功")
.receiveTime(LocalDateTime.now())
.qualityScore(validationResult.getQualityScore())
.build();
log.info("传感器数据处理完成:deviceId={}, dataId={}, qualityScore={}",
deviceId, dataId, validationResult.getQualityScore());
return ResponseEntity.ok(ApiResponse.success(response));
} catch (Exception e) {
log.error("处理传感器数据失败:deviceId={}, requestId={}", deviceId, requestId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("数据处理失败:" + e.getMessage()));
} finally {
long processTime = System.currentTimeMillis() - startTime;
requestLogService.logRequest(requestId, deviceId,
"POST", "/api/v1/data/sensor", httpRequest, processTime);
}
}
/**
* 批量接收传感器数据
* POST /api/v1/data/sensor/batch
*/
@PostMapping("/sensor/batch")
public ResponseEntity<ApiResponse<BatchDataReceptionResponse>> receiveBatchSensorData(
@Valid @RequestBody BatchSensorDataRequest request,
@RequestHeader("X-API-Key") String apiKey,
@RequestHeader("X-Device-ID") String deviceId,
HttpServletRequest httpRequest) {
String requestId = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
try {
log.info("收到批量传感器数据:deviceId={}, count={}, requestId={}",
deviceId, request.getData().size(), requestId);
// 验证API密钥
if (!deviceService.validateApiKey(deviceId, apiKey)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.error("API密钥验证失败"));
}
// 检查批量数据大小限制
if (request.getData().size() > 100) {
return ResponseEntity.badRequest()
.body(ApiResponse.error("批量数据数量不能超过100条"));
}
// 批量处理传感器数据
BatchProcessResult result = dataService.processBatchSensorData(deviceId, request);
// 更新设备最后数据上报时间
deviceService.updateLastDataReport(deviceId);
// 构建响应
BatchDataReceptionResponse response = BatchDataReceptionResponse.builder()
.totalCount(result.getTotalCount())
.successCount(result.getSuccessCount())
.failedCount(result.getFailedCount())
.failedItems(result.getFailedItems())
.status("SUCCESS")
.message("批量数据处理完成")
.receiveTime(LocalDateTime.now())
.build();
log.info("批量传感器数据处理完成:deviceId={}, success={}, failed={}",
deviceId, result.getSuccessCount(), result.getFailedCount());
return ResponseEntity.ok(ApiResponse.success(response));
} catch (Exception e) {
log.error("处理批量传感器数据失败:deviceId={}, requestId={}", deviceId, requestId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("批量数据处理失败:" + e.getMessage()));
} finally {
long processTime = System.currentTimeMillis() - startTime;
requestLogService.logRequest(requestId, deviceId,
"POST", "/api/v1/data/sensor/batch", httpRequest, processTime);
}
}
/**
* 接收日志数据
* POST /api/v1/data/log
*/
@PostMapping("/log")
public ResponseEntity<ApiResponse<DataReceptionResponse>> receiveLogData(
@Valid @RequestBody LogDataRequest request,
@RequestHeader("X-API-Key") String apiKey,
@RequestHeader("X-Device-ID") String deviceId,
HttpServletRequest httpRequest) {
String requestId = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
try {
log.debug("收到日志数据:deviceId={}, level={}, requestId={}",
deviceId, request.getLevel(), requestId);
// 验证API密钥
if (!deviceService.validateApiKey(deviceId, apiKey)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.error("API密钥验证失败"));
}
// 处理日志数据
String dataId = dataService.processLogData(deviceId, request);
// 构建响应
DataReceptionResponse response = DataReceptionResponse.builder()
.dataId(dataId)
.status("SUCCESS")
.message("日志接收成功")
.receiveTime(LocalDateTime.now())
.build();
return ResponseEntity.ok(ApiResponse.success(response));
} catch (Exception e) {
log.error("处理日志数据失败:deviceId={}, requestId={}", deviceId, requestId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("日志处理失败:" + e.getMessage()));
} finally {
long processTime = System.currentTimeMillis() - startTime;
requestLogService.logRequest(requestId, deviceId,
"POST", "/api/v1/data/log", httpRequest, processTime);
}
}
/**
* 查询设备数据
* GET /api/v1/data/{deviceId}
*/
@GetMapping("/{deviceId}")
public ResponseEntity<ApiResponse<PageResult<HttpDeviceData>>> getDeviceData(
@PathVariable String deviceId,
@RequestParam(defaultValue = "1") int page,
@RequestParam(defaultValue = "20") int size,
@RequestParam(required = false) DataType dataType,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime,
@RequestHeader("X-API-Key") String apiKey,
HttpServletRequest httpRequest) {
String requestId = UUID.randomUUID().toString();
long startTimeMs = System.currentTimeMillis();
try {
// 验证API密钥
if (!deviceService.validateApiKey(deviceId, apiKey)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.error("API密钥验证失败"));
}
// 构建查询条件
DataQueryCondition condition = DataQueryCondition.builder()
.deviceId(deviceId)
.dataType(dataType)
.startTime(startTime)
.endTime(endTime)
.page(page)
.size(size)
.build();
// 查询数据
PageResult<HttpDeviceData> result = dataService.queryDeviceData(condition);
return ResponseEntity.ok(ApiResponse.success(result));
} catch (Exception e) {
log.error("查询设备数据失败:deviceId={}, requestId={}", deviceId, requestId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("数据查询失败:" + e.getMessage()));
} finally {
long processTime = System.currentTimeMillis() - startTimeMs;
requestLogService.logRequest(requestId, deviceId,
"GET", "/api/v1/data/" + deviceId, httpRequest, processTime);
}
}
}
# 3. 文件上传控制器
/**
* 文件上传控制器
* 处理设备上传的图片、视频等文件,就像"文件管理员"
*/
@RestController
@RequestMapping("/api/v1/files")
@Slf4j
@Validated
public class FileUploadController {
@Autowired
private FileStorageService fileStorageService;
@Autowired
private HttpDeviceService deviceService;
@Autowired
private HttpDeviceDataService dataService;
@Autowired
private HttpRequestLogService requestLogService;
@Value("${file.upload.max-size:10485760}") // 10MB
private long maxFileSize;
@Value("${file.upload.allowed-types:image/jpeg,image/png,video/mp4,application/json}")
private String allowedTypes;
/**
* 上传图片文件
* POST /api/v1/files/image
*/
@PostMapping("/image")
public ResponseEntity<ApiResponse<FileUploadResponse>> uploadImage(
@RequestParam("file") MultipartFile file,
@RequestParam("deviceId") String deviceId,
@RequestParam(required = false) String description,
@RequestParam(required = false) String tags,
@RequestHeader("X-API-Key") String apiKey,
HttpServletRequest httpRequest) {
String requestId = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
try {
log.info("收到图片上传请求:deviceId={}, fileName={}, size={}, requestId={}",
deviceId, file.getOriginalFilename(), file.getSize(), requestId);
// 验证API密钥
if (!deviceService.validateApiKey(deviceId, apiKey)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.error("API密钥验证失败"));
}
// 验证文件
ValidationResult validationResult = validateFile(file, "image");
if (!validationResult.isValid()) {
return ResponseEntity.badRequest()
.body(ApiResponse.error(validationResult.getErrorMessage()));
}
// 上传文件
FileUploadResult uploadResult = fileStorageService.uploadFile(file, deviceId, "image");
// 保存文件记录
String dataId = dataService.saveFileData(deviceId, DataType.IMAGE, uploadResult, description, tags);
// 更新设备最后数据上报时间
deviceService.updateLastDataReport(deviceId);
// 构建响应
FileUploadResponse response = FileUploadResponse.builder()
.dataId(dataId)
.fileName(uploadResult.getFileName())
.filePath(uploadResult.getFilePath())
.fileSize(uploadResult.getFileSize())
.fileUrl(uploadResult.getFileUrl())
.status("SUCCESS")
.message("图片上传成功")
.uploadTime(LocalDateTime.now())
.build();
log.info("图片上传成功:deviceId={}, dataId={}, filePath={}",
deviceId, dataId, uploadResult.getFilePath());
return ResponseEntity.ok(ApiResponse.success(response));
} catch (Exception e) {
log.error("图片上传失败:deviceId={}, requestId={}", deviceId, requestId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("图片上传失败:" + e.getMessage()));
} finally {
long processTime = System.currentTimeMillis() - startTime;
requestLogService.logRequest(requestId, deviceId,
"POST", "/api/v1/files/image", httpRequest, processTime);
}
}
/**
* 上传视频文件
* POST /api/v1/files/video
*/
@PostMapping("/video")
public ResponseEntity<ApiResponse<FileUploadResponse>> uploadVideo(
@RequestParam("file") MultipartFile file,
@RequestParam("deviceId") String deviceId,
@RequestParam(required = false) String description,
@RequestParam(required = false) String tags,
@RequestHeader("X-API-Key") String apiKey,
HttpServletRequest httpRequest) {
String requestId = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
try {
log.info("收到视频上传请求:deviceId={}, fileName={}, size={}, requestId={}",
deviceId, file.getOriginalFilename(), file.getSize(), requestId);
// 验证API密钥
if (!deviceService.validateApiKey(deviceId, apiKey)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.error("API密钥验证失败"));
}
// 验证文件
ValidationResult validationResult = validateFile(file, "video");
if (!validationResult.isValid()) {
return ResponseEntity.badRequest()
.body(ApiResponse.error(validationResult.getErrorMessage()));
}
// 上传文件
FileUploadResult uploadResult = fileStorageService.uploadFile(file, deviceId, "video");
// 保存文件记录
String dataId = dataService.saveFileData(deviceId, DataType.VIDEO, uploadResult, description, tags);
// 更新设备最后数据上报时间
deviceService.updateLastDataReport(deviceId);
// 异步处理视频(如生成缩略图、转码等)
fileStorageService.processVideoAsync(uploadResult.getFilePath());
// 构建响应
FileUploadResponse response = FileUploadResponse.builder()
.dataId(dataId)
.fileName(uploadResult.getFileName())
.filePath(uploadResult.getFilePath())
.fileSize(uploadResult.getFileSize())
.fileUrl(uploadResult.getFileUrl())
.status("SUCCESS")
.message("视频上传成功")
.uploadTime(LocalDateTime.now())
.build();
log.info("视频上传成功:deviceId={}, dataId={}, filePath={}",
deviceId, dataId, uploadResult.getFilePath());
return ResponseEntity.ok(ApiResponse.success(response));
} catch (Exception e) {
log.error("视频上传失败:deviceId={}, requestId={}", deviceId, requestId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("视频上传失败:" + e.getMessage()));
} finally {
long processTime = System.currentTimeMillis() - startTime;
requestLogService.logRequest(requestId, deviceId,
"POST", "/api/v1/files/video", httpRequest, processTime);
}
}
/**
* 分片上传大文件
* POST /api/v1/files/chunk
*/
@PostMapping("/chunk")
public ResponseEntity<ApiResponse<ChunkUploadResponse>> uploadChunk(
@RequestParam("file") MultipartFile chunk,
@RequestParam("deviceId") String deviceId,
@RequestParam("fileName") String fileName,
@RequestParam("chunkIndex") int chunkIndex,
@RequestParam("totalChunks") int totalChunks,
@RequestParam("fileHash") String fileHash,
@RequestHeader("X-API-Key") String apiKey,
HttpServletRequest httpRequest) {
String requestId = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
try {
log.debug("收到分片上传请求:deviceId={}, fileName={}, chunk={}/{}, requestId={}",
deviceId, fileName, chunkIndex + 1, totalChunks, requestId);
// 验证API密钥
if (!deviceService.validateApiKey(deviceId, apiKey)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(ApiResponse.error("API密钥验证失败"));
}
// 上传分片
ChunkUploadResult result = fileStorageService.uploadChunk(
chunk, deviceId, fileName, chunkIndex, totalChunks, fileHash);
// 构建响应
ChunkUploadResponse response = ChunkUploadResponse.builder()
.chunkIndex(chunkIndex)
.uploaded(true)
.completed(result.isCompleted())
.fileUrl(result.getFileUrl())
.message(result.isCompleted() ? "文件上传完成" : "分片上传成功")
.build();
// 如果文件上传完成,保存文件记录
if (result.isCompleted()) {
String dataId = dataService.saveFileData(deviceId,
getDataTypeByFileName(fileName), result.getUploadResult(), null, null);
response.setDataId(dataId);
deviceService.updateLastDataReport(deviceId);
log.info("分片文件上传完成:deviceId={}, fileName={}, dataId={}",
deviceId, fileName, dataId);
}
return ResponseEntity.ok(ApiResponse.success(response));
} catch (Exception e) {
log.error("分片上传失败:deviceId={}, fileName={}, chunk={}, requestId={}",
deviceId, fileName, chunkIndex, requestId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ApiResponse.error("分片上传失败:" + e.getMessage()));
} finally {
long processTime = System.currentTimeMillis() - startTime;
requestLogService.logRequest(requestId, deviceId,
"POST", "/api/v1/files/chunk", httpRequest, processTime);
}
}
/**
* 下载文件
* GET /api/v1/files/{dataId}/download
*/
@GetMapping("/{dataId}/download")
public ResponseEntity<Resource> downloadFile(
@PathVariable String dataId,
@RequestHeader("X-API-Key") String apiKey,
HttpServletRequest httpRequest) {
try {
// 获取文件数据
HttpDeviceData fileData = dataService.getDeviceData(dataId);
if (fileData == null) {
return ResponseEntity.notFound().build();
}
// 验证API密钥
if (!deviceService.validateApiKey(fileData.getDeviceId(), apiKey)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build();
}
// 获取文件资源
Resource resource = fileStorageService.getFileResource(fileData.getFilePath());
if (!resource.exists()) {
return ResponseEntity.notFound().build();
}
// 构建响应头
String fileName = Paths.get(fileData.getFilePath()).getFileName().toString();
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + fileName + "\"")
.header(HttpHeaders.CONTENT_TYPE, fileData.getContentType())
.header(HttpHeaders.CONTENT_LENGTH, String.valueOf(fileData.getFileSize()))
.body(resource);
} catch (Exception e) {
log.error("文件下载失败:dataId={}", dataId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
/**
* 验证文件
*/
private ValidationResult validateFile(MultipartFile file, String category) {
// 检查文件是否为空
if (file.isEmpty()) {
return ValidationResult.invalid("文件不能为空");
}
// 检查文件大小
if (file.getSize() > maxFileSize) {
return ValidationResult.invalid("文件大小不能超过" + (maxFileSize / 1024 / 1024) + "MB");
}
// 检查文件类型
String contentType = file.getContentType();
if (contentType == null || !isAllowedContentType(contentType, category)) {
return ValidationResult.invalid("不支持的文件类型:" + contentType);
}
return ValidationResult.valid();
}
/**
* 检查是否为允许的文件类型
*/
private boolean isAllowedContentType(String contentType, String category) {
String[] allowedTypeArray = allowedTypes.split(",");
for (String allowedType : allowedTypeArray) {
if (contentType.equals(allowedType.trim())) {
// 进一步检查类别
if ("image".equals(category) && contentType.startsWith("image/")) {
return true;
} else if ("video".equals(category) && contentType.startsWith("video/")) {
return true;
} else if ("document".equals(category) &&
(contentType.startsWith("application/") || contentType.startsWith("text/"))) {
return true;
}
}
}
return false;
}
/**
* 根据文件名获取数据类型
*/
private DataType getDataTypeByFileName(String fileName) {
String extension = fileName.substring(fileName.lastIndexOf(".") + 1).toLowerCase();
switch (extension) {
case "jpg":
case "jpeg":
case "png":
case "gif":
return DataType.IMAGE;
case "mp4":
case "avi":
case "mov":
return DataType.VIDEO;
default:
return DataType.LOG;
}
}
}
# 核心服务实现
# 1. HTTP设备服务
/**
* HTTP设备服务
* 处理设备的核心业务逻辑,就像"设备管家"
*/
@Service
@Slf4j
@Transactional
public class HttpDeviceService {
@Autowired
private HttpDeviceRepository deviceRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ApiKeyGenerator apiKeyGenerator;
@Autowired
private DeviceConfigService configService;
private static final String DEVICE_CACHE_PREFIX = "http:device:";
private static final String API_KEY_CACHE_PREFIX = "http:apikey:";
/**
* 注册设备
*/
public HttpDevice registerDevice(DeviceRegistrationRequest request, String clientIp) {
try {
log.info("开始注册设备:deviceId={}, ip={}", request.getDeviceId(), clientIp);
// 创建设备实体
HttpDevice device = new HttpDevice();
device.setDeviceId(request.getDeviceId());
device.setDeviceName(request.getDeviceName());
device.setDeviceType(request.getDeviceType());
device.setIpAddress(clientIp);
device.setMacAddress(request.getMacAddress());
device.setFirmwareVersion(request.getFirmwareVersion());
device.setManufacturer(request.getManufacturer());
device.setModel(request.getModel());
device.setLocation(request.getLocation());
device.setConfig(request.getConfig());
device.setSupportedApis(request.getSupportedApis());
device.setStatus(DeviceStatus.ONLINE);
device.setEnabled(true);
device.setCreateTime(LocalDateTime.now());
device.setUpdateTime(LocalDateTime.now());
device.setLastHeartbeat(LocalDateTime.now());
// 保存到数据库
device = deviceRepository.save(device);
// 缓存设备信息
cacheDevice(device);
log.info("设备注册成功:deviceId={}, id={}", device.getDeviceId(), device.getId());
return device;
} catch (Exception e) {
log.error("设备注册失败:deviceId={}", request.getDeviceId(), e);
throw new RuntimeException("设备注册失败", e);
}
}
/**
* 生成API密钥
*/
public String generateApiKey(String deviceId) {
try {
// 生成API密钥
String apiKey = apiKeyGenerator.generateApiKey(deviceId);
// 设置过期时间(默认1年)
LocalDateTime expireTime = LocalDateTime.now().plusYears(1);
// 更新设备的API密钥
HttpDevice device = getDevice(deviceId);
if (device != null) {
device.setApiKey(apiKey);
device.setApiKeyExpireTime(expireTime);
device.setUpdateTime(LocalDateTime.now());
deviceRepository.save(device);
// 缓存API密钥
cacheApiKey(deviceId, apiKey, expireTime);
}
log.info("API密钥生成成功:deviceId={}", deviceId);
return apiKey;
} catch (Exception e) {
log.error("生成API密钥失败:deviceId={}", deviceId, e);
throw new RuntimeException("API密钥生成失败", e);
}
}
/**
* 验证API密钥
*/
public boolean validateApiKey(String deviceId, String apiKey) {
try {
// 先从缓存中查找
String cachedApiKey = getCachedApiKey(deviceId);
if (cachedApiKey != null) {
return cachedApiKey.equals(apiKey);
}
// 从数据库查找
HttpDevice device = getDevice(deviceId);
if (device == null || !device.getEnabled()) {
return false;
}
// 检查API密钥是否匹配
if (!apiKey.equals(device.getApiKey())) {
return false;
}
// 检查API密钥是否过期
if (device.getApiKeyExpireTime() != null &&
device.getApiKeyExpireTime().isBefore(LocalDateTime.now())) {
log.warn("API密钥已过期:deviceId={}", deviceId);
return false;
}
// 缓存有效的API密钥
cacheApiKey(deviceId, apiKey, device.getApiKeyExpireTime());
return true;
} catch (Exception e) {
log.error("验证API密钥失败:deviceId={}", deviceId, e);
return false;
}
}
/**
* 更新设备心跳
*/
public void updateHeartbeat(String deviceId, HeartbeatRequest request) {
try {
HttpDevice device = getDevice(deviceId);
if (device != null) {
device.setLastHeartbeat(LocalDateTime.now());
device.setStatus(DeviceStatus.ONLINE);
// 更新设备状态信息
if (request.getStatus() != null) {
updateDeviceStatus(device, request);
}
device.setUpdateTime(LocalDateTime.now());
deviceRepository.save(device);
// 更新缓存
cacheDevice(device);
log.debug("设备心跳更新成功:deviceId={}", deviceId);
}
} catch (Exception e) {
log.error("更新设备心跳失败:deviceId={}", deviceId, e);
throw new RuntimeException("心跳更新失败", e);
}
}
/**
* 更新设备最后数据上报时间
*/
public void updateLastDataReport(String deviceId) {
try {
HttpDevice device = getDevice(deviceId);
if (device != null) {
device.setLastDataReport(LocalDateTime.now());
device.setUpdateTime(LocalDateTime.now());
deviceRepository.save(device);
// 更新缓存
cacheDevice(device);
}
} catch (Exception e) {
log.error("更新设备数据上报时间失败:deviceId={}", deviceId, e);
}
}
/**
* 获取设备信息
*/
public HttpDevice getDevice(String deviceId) {
try {
// 先从缓存中查找
HttpDevice cachedDevice = getCachedDevice(deviceId);
if (cachedDevice != null) {
return cachedDevice;
}
// 从数据库查找
Optional<HttpDevice> deviceOpt = deviceRepository.findByDeviceId(deviceId);
if (deviceOpt.isPresent()) {
HttpDevice device = deviceOpt.get();
// 缓存设备信息
cacheDevice(device);
return device;
}
return null;
} catch (Exception e) {
log.error("获取设备信息失败:deviceId={}", deviceId, e);
return null;
}
}
/**
* 检查设备是否存在
*/
public boolean deviceExists(String deviceId) {
return getDevice(deviceId) != null;
}
/**
* 验证设备信息
*/
public boolean validateDeviceInfo(DeviceRegistrationRequest request) {
// 检查必填字段
if (request.getDeviceId() == null || request.getDeviceId().trim().isEmpty()) {
return false;
}
if (request.getDeviceType() == null) {
return false;
}
// 检查设备ID格式
if (!request.getDeviceId().matches("^[a-zA-Z0-9_-]{6,32}$")) {
return false;
}
return true;
}
/**
* 获取设备配置
*/
public DeviceConfig getDeviceConfig(String deviceId) {
return configService.getDeviceConfig(deviceId);
}
/**
* 更新设备配置
*/
public void updateDeviceConfig(String deviceId, DeviceConfigUpdateRequest request) {
try {
HttpDevice device = getDevice(deviceId);
if (device != null) {
device.setConfig(request.getConfig());
device.setUpdateTime(LocalDateTime.now());
deviceRepository.save(device);
// 更新缓存
cacheDevice(device);
// 更新配置服务
configService.updateDeviceConfig(deviceId, request);
log.info("设备配置更新成功:deviceId={}", deviceId);
}
} catch (Exception e) {
log.error("更新设备配置失败:deviceId={}", deviceId, e);
throw new RuntimeException("配置更新失败", e);
}
}
// 缓存相关方法
/**
* 缓存设备信息
*/
private void cacheDevice(HttpDevice device) {
try {
String key = DEVICE_CACHE_PREFIX + device.getDeviceId();
redisTemplate.opsForValue().set(key, device, Duration.ofHours(1));
} catch (Exception e) {
log.warn("缓存设备信息失败:deviceId={}", device.getDeviceId(), e);
}
}
/**
* 获取缓存的设备信息
*/
private HttpDevice getCachedDevice(String deviceId) {
try {
String key = DEVICE_CACHE_PREFIX + deviceId;
return (HttpDevice) redisTemplate.opsForValue().get(key);
} catch (Exception e) {
log.warn("获取缓存设备信息失败:deviceId={}", deviceId, e);
return null;
}
}
/**
* 缓存API密钥
*/
private void cacheApiKey(String deviceId, String apiKey, LocalDateTime expireTime) {
try {
String key = API_KEY_CACHE_PREFIX + deviceId;
Duration duration = expireTime != null ?
Duration.between(LocalDateTime.now(), expireTime) : Duration.ofDays(1);
redisTemplate.opsForValue().set(key, apiKey, duration);
} catch (Exception e) {
log.warn("缓存API密钥失败:deviceId={}", deviceId, e);
}
}
/**
* 获取缓存的API密钥
*/
private String getCachedApiKey(String deviceId) {
try {
String key = API_KEY_CACHE_PREFIX + deviceId;
return (String) redisTemplate.opsForValue().get(key);
} catch (Exception e) {
log.warn("获取缓存API密钥失败:deviceId={}", deviceId, e);
return null;
}
}
/**
* 更新设备状态信息
*/
private void updateDeviceStatus(HttpDevice device, HeartbeatRequest request) {
// 更新固件版本
if (request.getFirmwareVersion() != null) {
device.setFirmwareVersion(request.getFirmwareVersion());
}
// 更新位置信息
if (request.getLocation() != null) {
device.setLocation(request.getLocation());
}
// 更新支持的API列表
if (request.getSupportedApis() != null) {
device.setSupportedApis(request.getSupportedApis());
}
}
}
# 2. 数据接收服务
/**
* HTTP数据接收服务
* 处理设备通过HTTP上报的各种数据
*/
@Service
@Slf4j
public class HttpDataReceiveService {
@Autowired
private HttpDeviceDataRepository dataRepository;
@Autowired
private HttpDeviceService deviceService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DataValidationService validationService;
private static final String DATA_CACHE_PREFIX = "http:data:";
private static final String DEVICE_DATA_QUEUE = "device.data.queue";
/**
* 接收单条数据
*/
public DataReceiveResponse receiveSingleData(String deviceId, SingleDataRequest request) {
try {
// 验证设备
if (!deviceService.deviceExists(deviceId)) {
return DataReceiveResponse.error("设备不存在");
}
// 验证数据格式
if (!validationService.validateSingleData(request)) {
return DataReceiveResponse.error("数据格式无效");
}
// 创建数据记录
HttpDeviceData data = new HttpDeviceData();
data.setDeviceId(deviceId);
data.setDataType(request.getDataType());
data.setDataValue(request.getDataValue());
data.setTimestamp(request.getTimestamp() != null ?
request.getTimestamp() : LocalDateTime.now());
data.setQuality(request.getQuality());
data.setUnit(request.getUnit());
data.setCreateTime(LocalDateTime.now());
// 保存到数据库
dataRepository.save(data);
// 缓存最新数据
cacheLatestData(deviceId, data);
// 异步处理数据
processDataAsync(data);
// 更新设备最后数据上报时间
deviceService.updateLastDataReport(deviceId);
log.info("接收单条数据成功:deviceId={}, dataType={}",
deviceId, request.getDataType());
return DataReceiveResponse.success("数据接收成功", data.getId());
} catch (Exception e) {
log.error("接收单条数据失败:deviceId={}", deviceId, e);
return DataReceiveResponse.error("数据接收失败:" + e.getMessage());
}
}
/**
* 接收批量数据
*/
public DataReceiveResponse receiveBatchData(String deviceId, BatchDataRequest request) {
try {
// 验证设备
if (!deviceService.deviceExists(deviceId)) {
return DataReceiveResponse.error("设备不存在");
}
// 验证批量数据
if (!validationService.validateBatchData(request)) {
return DataReceiveResponse.error("批量数据格式无效");
}
List<HttpDeviceData> dataList = new ArrayList<>();
List<Long> dataIds = new ArrayList<>();
// 处理每条数据
for (SingleDataRequest dataRequest : request.getDataList()) {
HttpDeviceData data = new HttpDeviceData();
data.setDeviceId(deviceId);
data.setDataType(dataRequest.getDataType());
data.setDataValue(dataRequest.getDataValue());
data.setTimestamp(dataRequest.getTimestamp() != null ?
dataRequest.getTimestamp() : LocalDateTime.now());
data.setQuality(dataRequest.getQuality());
data.setUnit(dataRequest.getUnit());
data.setCreateTime(LocalDateTime.now());
dataList.add(data);
}
// 批量保存
List<HttpDeviceData> savedData = dataRepository.saveAll(dataList);
savedData.forEach(data -> dataIds.add(data.getId()));
// 缓存最新数据(只缓存最后一条)
if (!savedData.isEmpty()) {
HttpDeviceData latestData = savedData.get(savedData.size() - 1);
cacheLatestData(deviceId, latestData);
}
// 异步处理批量数据
processBatchDataAsync(savedData);
// 更新设备最后数据上报时间
deviceService.updateLastDataReport(deviceId);
log.info("接收批量数据成功:deviceId={}, count={}",
deviceId, dataList.size());
return DataReceiveResponse.success("批量数据接收成功", dataIds);
} catch (Exception e) {
log.error("接收批量数据失败:deviceId={}", deviceId, e);
return DataReceiveResponse.error("批量数据接收失败:" + e.getMessage());
}
}
/**
* 接收事件数据
*/
public DataReceiveResponse receiveEventData(String deviceId, EventDataRequest request) {
try {
// 验证设备
if (!deviceService.deviceExists(deviceId)) {
return DataReceiveResponse.error("设备不存在");
}
// 验证事件数据
if (!validationService.validateEventData(request)) {
return DataReceiveResponse.error("事件数据格式无效");
}
// 创建事件数据记录
HttpDeviceData data = new HttpDeviceData();
data.setDeviceId(deviceId);
data.setDataType("EVENT");
data.setDataValue(request.getEventData());
data.setTimestamp(request.getTimestamp() != null ?
request.getTimestamp() : LocalDateTime.now());
data.setEventType(request.getEventType());
data.setEventLevel(request.getEventLevel());
data.setEventDescription(request.getEventDescription());
data.setCreateTime(LocalDateTime.now());
// 保存事件数据
dataRepository.save(data);
// 处理紧急事件
if ("CRITICAL".equals(request.getEventLevel()) ||
"ERROR".equals(request.getEventLevel())) {
processUrgentEvent(data);
}
// 异步处理事件
processEventAsync(data);
// 更新设备最后数据上报时间
deviceService.updateLastDataReport(deviceId);
log.info("接收事件数据成功:deviceId={}, eventType={}, level={}",
deviceId, request.getEventType(), request.getEventLevel());
return DataReceiveResponse.success("事件数据接收成功", data.getId());
} catch (Exception e) {
log.error("接收事件数据失败:deviceId={}", deviceId, e);
return DataReceiveResponse.error("事件数据接收失败:" + e.getMessage());
}
}
/**
* 缓存最新数据
*/
private void cacheLatestData(String deviceId, HttpDeviceData data) {
try {
String key = DATA_CACHE_PREFIX + deviceId + ":latest";
redisTemplate.opsForValue().set(key, data, Duration.ofHours(24));
} catch (Exception e) {
log.warn("缓存最新数据失败:deviceId={}", deviceId, e);
}
}
/**
* 异步处理单条数据
*/
@Async
private void processDataAsync(HttpDeviceData data) {
try {
// 发送到消息队列进行进一步处理
rabbitTemplate.convertAndSend(DEVICE_DATA_QUEUE, data);
// 数据质量检查
checkDataQuality(data);
// 数据聚合处理
aggregateData(data);
} catch (Exception e) {
log.error("异步处理数据失败:dataId={}", data.getId(), e);
}
}
/**
* 异步处理批量数据
*/
@Async
private void processBatchDataAsync(List<HttpDeviceData> dataList) {
try {
// 批量发送到消息队列
rabbitTemplate.convertAndSend(DEVICE_DATA_QUEUE + ".batch", dataList);
// 批量数据质量检查
dataList.forEach(this::checkDataQuality);
// 批量数据聚合
aggregateBatchData(dataList);
} catch (Exception e) {
log.error("异步处理批量数据失败:count={}", dataList.size(), e);
}
}
/**
* 异步处理事件
*/
@Async
private void processEventAsync(HttpDeviceData eventData) {
try {
// 发送事件到专门的事件队列
rabbitTemplate.convertAndSend("device.event.queue", eventData);
// 事件规则引擎处理
processEventRules(eventData);
} catch (Exception e) {
log.error("异步处理事件失败:eventId={}", eventData.getId(), e);
}
}
/**
* 处理紧急事件
*/
private void processUrgentEvent(HttpDeviceData eventData) {
try {
// 立即发送告警
sendUrgentAlert(eventData);
// 记录紧急事件日志
log.error("紧急事件:deviceId={}, eventType={}, description={}",
eventData.getDeviceId(), eventData.getEventType(),
eventData.getEventDescription());
} catch (Exception e) {
log.error("处理紧急事件失败:eventId={}", eventData.getId(), e);
}
}
/**
* 数据质量检查
*/
private void checkDataQuality(HttpDeviceData data) {
// 检查数据范围
if (!validationService.isDataInValidRange(data)) {
log.warn("数据超出有效范围:deviceId={}, dataType={}, value={}",
data.getDeviceId(), data.getDataType(), data.getDataValue());
}
// 检查数据连续性
if (!validationService.isDataContinuous(data)) {
log.warn("数据不连续:deviceId={}, dataType={}",
data.getDeviceId(), data.getDataType());
}
}
/**
* 数据聚合处理
*/
private void aggregateData(HttpDeviceData data) {
// 实现数据聚合逻辑
// 例如:计算平均值、最大值、最小值等
}
/**
* 批量数据聚合
*/
private void aggregateBatchData(List<HttpDeviceData> dataList) {
// 实现批量数据聚合逻辑
}
/**
* 事件规则引擎处理
*/
private void processEventRules(HttpDeviceData eventData) {
// 实现事件规则处理逻辑
}
/**
* 发送紧急告警
*/
private void sendUrgentAlert(HttpDeviceData eventData) {
// 实现紧急告警发送逻辑
}
}
# 3. 文件上传服务
/**
* HTTP文件上传服务
* 处理设备上传的各种文件:固件、日志、配置等
*/
@Service
@Slf4j
public class HttpFileUploadService {
@Autowired
private HttpDeviceService deviceService;
@Autowired
private MinioClient minioClient;
@Autowired
private FileValidationService fileValidationService;
@Autowired
private VirusScanService virusScanService;
@Value("${minio.bucket.device-files}")
private String deviceFilesBucket;
@Value("${file.upload.max-size}")
private long maxFileSize;
/**
* 上传设备文件
*/
public FileUploadResponse uploadFile(String deviceId, MultipartFile file,
FileUploadRequest request) {
try {
// 验证设备
if (!deviceService.deviceExists(deviceId)) {
return FileUploadResponse.error("设备不存在");
}
// 验证文件
FileValidationResult validation = validateFile(file, request);
if (!validation.isValid()) {
return FileUploadResponse.error(validation.getErrorMessage());
}
// 生成文件路径
String filePath = generateFilePath(deviceId, file.getOriginalFilename(),
request.getFileType());
// 病毒扫描
if (request.isVirusScanRequired()) {
VirusScanResult scanResult = virusScanService.scanFile(file);
if (!scanResult.isClean()) {
log.warn("文件病毒扫描失败:deviceId={}, fileName={}, threat={}",
deviceId, file.getOriginalFilename(), scanResult.getThreatName());
return FileUploadResponse.error("文件包含恶意内容");
}
}
// 上传到MinIO
String objectName = uploadToMinio(file, filePath);
// 记录文件信息
FileRecord fileRecord = createFileRecord(deviceId, file, request, objectName);
// 处理特殊文件类型
processSpecialFile(fileRecord, file);
log.info("文件上传成功:deviceId={}, fileName={}, size={}",
deviceId, file.getOriginalFilename(), file.getSize());
return FileUploadResponse.success("文件上传成功", fileRecord);
} catch (Exception e) {
log.error("文件上传失败:deviceId={}, fileName={}",
deviceId, file.getOriginalFilename(), e);
return FileUploadResponse.error("文件上传失败:" + e.getMessage());
}
}
/**
* 验证文件
*/
private FileValidationResult validateFile(MultipartFile file, FileUploadRequest request) {
// 检查文件是否为空
if (file.isEmpty()) {
return FileValidationResult.invalid("文件不能为空");
}
// 检查文件大小
if (file.getSize() > maxFileSize) {
return FileValidationResult.invalid("文件大小超过限制");
}
// 检查文件类型
if (!fileValidationService.isAllowedFileType(file, request.getFileType())) {
return FileValidationResult.invalid("不支持的文件类型");
}
// 检查文件名
if (!fileValidationService.isValidFileName(file.getOriginalFilename())) {
return FileValidationResult.invalid("文件名格式无效");
}
return FileValidationResult.valid();
}
/**
* 生成文件路径
*/
private String generateFilePath(String deviceId, String originalFilename, String fileType) {
String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"));
String extension = getFileExtension(originalFilename);
return String.format("%s/%s/%s_%s.%s", deviceId, fileType, fileType, timestamp, extension);
}
/**
* 上传到MinIO
*/
private String uploadToMinio(MultipartFile file, String filePath) throws Exception {
try (InputStream inputStream = file.getInputStream()) {
minioClient.putObject(
PutObjectArgs.builder()
.bucket(deviceFilesBucket)
.object(filePath)
.stream(inputStream, file.getSize(), -1)
.contentType(file.getContentType())
.build()
);
return filePath;
}
}
/**
* 创建文件记录
*/
private FileRecord createFileRecord(String deviceId, MultipartFile file,
FileUploadRequest request, String objectName) {
FileRecord record = new FileRecord();
record.setDeviceId(deviceId);
record.setOriginalFileName(file.getOriginalFilename());
record.setFileType(request.getFileType());
record.setFileSize(file.getSize());
record.setContentType(file.getContentType());
record.setObjectName(objectName);
record.setUploadTime(LocalDateTime.now());
record.setDescription(request.getDescription());
record.setVersion(request.getVersion());
return record;
}
/**
* 处理特殊文件类型
*/
private void processSpecialFile(FileRecord fileRecord, MultipartFile file) {
switch (fileRecord.getFileType().toUpperCase()) {
case "FIRMWARE":
processFirmwareFile(fileRecord, file);
break;
case "LOG":
processLogFile(fileRecord, file);
break;
case "CONFIG":
processConfigFile(fileRecord, file);
break;
case "IMAGE":
processImageFile(fileRecord, file);
break;
default:
// 普通文件,无需特殊处理
break;
}
}
/**
* 处理固件文件
*/
private void processFirmwareFile(FileRecord fileRecord, MultipartFile file) {
try {
// 验证固件文件格式
if (!fileValidationService.isValidFirmware(file)) {
log.warn("无效的固件文件:deviceId={}, fileName={}",
fileRecord.getDeviceId(), fileRecord.getOriginalFileName());
return;
}
// 提取固件信息
FirmwareInfo firmwareInfo = extractFirmwareInfo(file);
fileRecord.setMetadata(firmwareInfo.toJson());
// 通知固件管理服务
notifyFirmwareService(fileRecord, firmwareInfo);
} catch (Exception e) {
log.error("处理固件文件失败:deviceId={}", fileRecord.getDeviceId(), e);
}
}
/**
* 处理日志文件
*/
private void processLogFile(FileRecord fileRecord, MultipartFile file) {
try {
// 解析日志文件
LogAnalysisResult analysisResult = analyzeLogFile(file);
fileRecord.setMetadata(analysisResult.toJson());
// 如果发现错误日志,发送告警
if (analysisResult.hasErrors()) {
sendLogErrorAlert(fileRecord, analysisResult);
}
} catch (Exception e) {
log.error("处理日志文件失败:deviceId={}", fileRecord.getDeviceId(), e);
}
}
/**
* 处理配置文件
*/
private void processConfigFile(FileRecord fileRecord, MultipartFile file) {
try {
// 验证配置文件格式
if (!fileValidationService.isValidConfig(file)) {
log.warn("无效的配置文件:deviceId={}, fileName={}",
fileRecord.getDeviceId(), fileRecord.getOriginalFileName());
return;
}
// 解析配置文件
ConfigInfo configInfo = parseConfigFile(file);
fileRecord.setMetadata(configInfo.toJson());
// 更新设备配置
updateDeviceConfig(fileRecord.getDeviceId(), configInfo);
} catch (Exception e) {
log.error("处理配置文件失败:deviceId={}", fileRecord.getDeviceId(), e);
}
}
/**
* 处理图片文件
*/
private void processImageFile(FileRecord fileRecord, MultipartFile file) {
try {
// 提取图片信息
ImageInfo imageInfo = extractImageInfo(file);
fileRecord.setMetadata(imageInfo.toJson());
// 生成缩略图
generateThumbnail(fileRecord, file);
// 图片内容分析(如果需要)
if (shouldAnalyzeImage(fileRecord)) {
analyzeImageContent(fileRecord, file);
}
} catch (Exception e) {
log.error("处理图片文件失败:deviceId={}", fileRecord.getDeviceId(), e);
}
}
// 辅助方法
private String getFileExtension(String filename) {
if (filename == null || !filename.contains(".")) {
return "";
}
return filename.substring(filename.lastIndexOf(".") + 1);
}
private FirmwareInfo extractFirmwareInfo(MultipartFile file) {
// 实现固件信息提取逻辑
return new FirmwareInfo();
}
private void notifyFirmwareService(FileRecord fileRecord, FirmwareInfo firmwareInfo) {
// 实现固件服务通知逻辑
}
private LogAnalysisResult analyzeLogFile(MultipartFile file) {
// 实现日志分析逻辑
return new LogAnalysisResult();
}
private void sendLogErrorAlert(FileRecord fileRecord, LogAnalysisResult analysisResult) {
// 实现日志错误告警逻辑
}
private ConfigInfo parseConfigFile(MultipartFile file) {
// 实现配置文件解析逻辑
return new ConfigInfo();
}
private void updateDeviceConfig(String deviceId, ConfigInfo configInfo) {
// 实现设备配置更新逻辑
}
private ImageInfo extractImageInfo(MultipartFile file) {
// 实现图片信息提取逻辑
return new ImageInfo();
}
private void generateThumbnail(FileRecord fileRecord, MultipartFile file) {
// 实现缩略图生成逻辑
}
private boolean shouldAnalyzeImage(FileRecord fileRecord) {
// 判断是否需要分析图片内容
return false;
}
private void analyzeImageContent(FileRecord fileRecord, MultipartFile file) {
// 实现图片内容分析逻辑
}
}
# 9. 异常处理和安全机制
# 1. 全局异常处理
/**
* HTTP设备接入层全局异常处理器
*/
@RestControllerAdvice
@Slf4j
public class HttpDeviceExceptionHandler {
/**
* 处理设备不存在异常
*/
@ExceptionHandler(DeviceNotFoundException.class)
public ResponseEntity<ErrorResponse> handleDeviceNotFound(DeviceNotFoundException e) {
log.warn("设备不存在:{}", e.getMessage());
ErrorResponse error = ErrorResponse.builder()
.code("DEVICE_NOT_FOUND")
.message("设备不存在")
.timestamp(LocalDateTime.now())
.build();
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(error);
}
/**
* 处理认证失败异常
*/
@ExceptionHandler(AuthenticationException.class)
public ResponseEntity<ErrorResponse> handleAuthenticationFailed(AuthenticationException e) {
log.warn("认证失败:{}", e.getMessage());
ErrorResponse error = ErrorResponse.builder()
.code("AUTHENTICATION_FAILED")
.message("认证失败")
.timestamp(LocalDateTime.now())
.build();
return ResponseEntity.status(HttpStatus.UNAUTHORIZED).body(error);
}
/**
* 处理数据验证异常
*/
@ExceptionHandler(DataValidationException.class)
public ResponseEntity<ErrorResponse> handleDataValidation(DataValidationException e) {
log.warn("数据验证失败:{}", e.getMessage());
ErrorResponse error = ErrorResponse.builder()
.code("DATA_VALIDATION_FAILED")
.message(e.getMessage())
.details(e.getValidationErrors())
.timestamp(LocalDateTime.now())
.build();
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(error);
}
/**
* 处理文件上传异常
*/
@ExceptionHandler(FileUploadException.class)
public ResponseEntity<ErrorResponse> handleFileUpload(FileUploadException e) {
log.error("文件上传失败:{}", e.getMessage());
ErrorResponse error = ErrorResponse.builder()
.code("FILE_UPLOAD_FAILED")
.message("文件上传失败")
.timestamp(LocalDateTime.now())
.build();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);
}
/**
* 处理限流异常
*/
@ExceptionHandler(RateLimitException.class)
public ResponseEntity<ErrorResponse> handleRateLimit(RateLimitException e) {
log.warn("请求频率超限:{}", e.getMessage());
ErrorResponse error = ErrorResponse.builder()
.code("RATE_LIMIT_EXCEEDED")
.message("请求频率超限,请稍后重试")
.timestamp(LocalDateTime.now())
.build();
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(error);
}
/**
* 处理系统异常
*/
@ExceptionHandler(Exception.class)
public ResponseEntity<ErrorResponse> handleGeneral(Exception e) {
log.error("系统异常:", e);
ErrorResponse error = ErrorResponse.builder()
.code("INTERNAL_ERROR")
.message("系统内部错误")
.timestamp(LocalDateTime.now())
.build();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error);
}
}
# 2. 安全认证拦截器
/**
* HTTP设备认证拦截器
*/
@Component
@Slf4j
public class HttpDeviceAuthInterceptor implements HandlerInterceptor {
@Autowired
private HttpDeviceService deviceService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String API_KEY_HEADER = "X-API-Key";
private static final String DEVICE_ID_HEADER = "X-Device-Id";
private static final String AUTH_CACHE_PREFIX = "http:auth:";
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {
// 跳过OPTIONS请求
if ("OPTIONS".equals(request.getMethod())) {
return true;
}
// 获取认证信息
String apiKey = request.getHeader(API_KEY_HEADER);
String deviceId = request.getHeader(DEVICE_ID_HEADER);
if (apiKey == null || deviceId == null) {
log.warn("缺少认证信息:deviceId={}, apiKey={}", deviceId, apiKey != null ? "***" : null);
sendAuthError(response, "缺少认证信息");
return false;
}
// 验证API密钥
if (!validateApiKey(deviceId, apiKey)) {
log.warn("API密钥验证失败:deviceId={}", deviceId);
sendAuthError(response, "认证失败");
return false;
}
// 检查设备状态
if (!checkDeviceStatus(deviceId)) {
log.warn("设备状态异常:deviceId={}", deviceId);
sendAuthError(response, "设备状态异常");
return false;
}
// 限流检查
if (!checkRateLimit(deviceId, request)) {
log.warn("请求频率超限:deviceId={}", deviceId);
sendRateLimitError(response);
return false;
}
// 将设备ID存储到请求属性中
request.setAttribute("deviceId", deviceId);
return true;
}
/**
* 验证API密钥
*/
private boolean validateApiKey(String deviceId, String apiKey) {
try {
// 先从缓存中查找
String cacheKey = AUTH_CACHE_PREFIX + deviceId;
String cachedApiKey = (String) redisTemplate.opsForValue().get(cacheKey);
if (cachedApiKey != null) {
return cachedApiKey.equals(apiKey);
}
// 从数据库验证
boolean isValid = deviceService.validateApiKey(deviceId, apiKey);
// 缓存验证结果
if (isValid) {
redisTemplate.opsForValue().set(cacheKey, apiKey, Duration.ofMinutes(30));
}
return isValid;
} catch (Exception e) {
log.error("验证API密钥失败:deviceId={}", deviceId, e);
return false;
}
}
/**
* 检查设备状态
*/
private boolean checkDeviceStatus(String deviceId) {
try {
HttpDevice device = deviceService.getDevice(deviceId);
return device != null && DeviceStatus.ONLINE.equals(device.getStatus());
} catch (Exception e) {
log.error("检查设备状态失败:deviceId={}", deviceId, e);
return false;
}
}
/**
* 限流检查
*/
private boolean checkRateLimit(String deviceId, HttpServletRequest request) {
try {
String rateLimitKey = "rate_limit:" + deviceId;
String currentCount = (String) redisTemplate.opsForValue().get(rateLimitKey);
int count = currentCount != null ? Integer.parseInt(currentCount) : 0;
int maxRequests = getMaxRequestsPerMinute(deviceId);
if (count >= maxRequests) {
return false;
}
// 增加计数
redisTemplate.opsForValue().increment(rateLimitKey);
redisTemplate.expire(rateLimitKey, Duration.ofMinutes(1));
return true;
} catch (Exception e) {
log.error("限流检查失败:deviceId={}", deviceId, e);
return true; // 出错时允许通过
}
}
/**
* 获取设备每分钟最大请求数
*/
private int getMaxRequestsPerMinute(String deviceId) {
// 根据设备类型返回不同的限制
// 这里简化为固定值
return 100;
}
/**
* 发送认证错误响应
*/
private void sendAuthError(HttpServletResponse response, String message) throws IOException {
response.setStatus(HttpStatus.UNAUTHORIZED.value());
response.setContentType("application/json;charset=UTF-8");
ErrorResponse error = ErrorResponse.builder()
.code("AUTHENTICATION_FAILED")
.message(message)
.timestamp(LocalDateTime.now())
.build();
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
response.getWriter().write(mapper.writeValueAsString(error));
}
/**
* 发送限流错误响应
*/
private void sendRateLimitError(HttpServletResponse response) throws IOException {
response.setStatus(HttpStatus.TOO_MANY_REQUESTS.value());
response.setContentType("application/json;charset=UTF-8");
ErrorResponse error = ErrorResponse.builder()
.code("RATE_LIMIT_EXCEEDED")
.message("请求频率超限,请稍后重试")
.timestamp(LocalDateTime.now())
.build();
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
response.getWriter().write(mapper.writeValueAsString(error));
}
}
# 3. 数据验证服务
/**
* 数据验证服务
*/
@Service
@Slf4j
public class DataValidationService {
@Autowired
private HttpDeviceService deviceService;
/**
* 验证单条数据
*/
public boolean validateSingleData(SingleDataRequest request) {
List<String> errors = new ArrayList<>();
// 验证数据类型
if (request.getDataType() == null || request.getDataType().trim().isEmpty()) {
errors.add("数据类型不能为空");
}
// 验证数据值
if (request.getDataValue() == null) {
errors.add("数据值不能为空");
}
// 验证时间戳
if (request.getTimestamp() != null) {
LocalDateTime now = LocalDateTime.now();
if (request.getTimestamp().isAfter(now.plusMinutes(5))) {
errors.add("时间戳不能超过当前时间5分钟");
}
if (request.getTimestamp().isBefore(now.minusDays(7))) {
errors.add("时间戳不能早于7天前");
}
}
// 验证数据质量
if (request.getQuality() != null) {
if (request.getQuality() < 0 || request.getQuality() > 100) {
errors.add("数据质量必须在0-100之间");
}
}
if (!errors.isEmpty()) {
throw new DataValidationException("数据验证失败", errors);
}
return true;
}
/**
* 验证批量数据
*/
public boolean validateBatchData(BatchDataRequest request) {
if (request.getDataList() == null || request.getDataList().isEmpty()) {
throw new DataValidationException("批量数据不能为空",
Collections.singletonList("dataList不能为空"));
}
if (request.getDataList().size() > 1000) {
throw new DataValidationException("批量数据数量超限",
Collections.singletonList("单次最多上传1000条数据"));
}
// 验证每条数据
for (int i = 0; i < request.getDataList().size(); i++) {
try {
validateSingleData(request.getDataList().get(i));
} catch (DataValidationException e) {
throw new DataValidationException(
String.format("第%d条数据验证失败", i + 1), e.getValidationErrors());
}
}
return true;
}
/**
* 验证事件数据
*/
public boolean validateEventData(EventDataRequest request) {
List<String> errors = new ArrayList<>();
// 验证事件类型
if (request.getEventType() == null || request.getEventType().trim().isEmpty()) {
errors.add("事件类型不能为空");
}
// 验证事件级别
if (request.getEventLevel() == null || request.getEventLevel().trim().isEmpty()) {
errors.add("事件级别不能为空");
} else {
String[] validLevels = {"INFO", "WARN", "ERROR", "CRITICAL"};
if (!Arrays.asList(validLevels).contains(request.getEventLevel().toUpperCase())) {
errors.add("事件级别必须是:INFO、WARN、ERROR、CRITICAL之一");
}
}
// 验证事件数据
if (request.getEventData() == null) {
errors.add("事件数据不能为空");
}
if (!errors.isEmpty()) {
throw new DataValidationException("事件数据验证失败", errors);
}
return true;
}
/**
* 检查数据是否在有效范围内
*/
public boolean isDataInValidRange(HttpDeviceData data) {
try {
// 根据数据类型检查范围
switch (data.getDataType().toUpperCase()) {
case "TEMPERATURE":
double temp = Double.parseDouble(data.getDataValue());
return temp >= -50 && temp <= 100;
case "HUMIDITY":
double humidity = Double.parseDouble(data.getDataValue());
return humidity >= 0 && humidity <= 100;
case "PRESSURE":
double pressure = Double.parseDouble(data.getDataValue());
return pressure >= 0 && pressure <= 2000;
default:
return true; // 未知类型默认有效
}
} catch (NumberFormatException e) {
return false;
}
}
/**
* 检查数据连续性
*/
public boolean isDataContinuous(HttpDeviceData data) {
// 实现数据连续性检查逻辑
// 例如:检查与上一条数据的时间间隔是否合理
return true;
}
}
# 10. 测试用例
# 1. 设备管理测试
/**
* HTTP设备管理服务测试
*/
@SpringBootTest
@TestMethodOrder(OrderAnnotation.class)
class HttpDeviceServiceTest {
@Autowired
private HttpDeviceService deviceService;
@Autowired
private TestRestTemplate restTemplate;
@MockBean
private HttpDeviceRepository deviceRepository;
@MockBean
private RedisTemplate<String, Object> redisTemplate;
private HttpDevice testDevice;
@BeforeEach
void setUp() {
testDevice = new HttpDevice();
testDevice.setDeviceId("TEST_DEVICE_001");
testDevice.setDeviceName("测试设备");
testDevice.setDeviceType(DeviceType.SENSOR);
testDevice.setIpAddress("192.168.1.100");
testDevice.setStatus(DeviceStatus.ONLINE);
testDevice.setCreateTime(LocalDateTime.now());
}
@Test
@Order(1)
@DisplayName("设备注册测试")
void testRegisterDevice() {
// 准备测试数据
DeviceRegistrationRequest request = new DeviceRegistrationRequest();
request.setDeviceId("TEST_DEVICE_001");
request.setDeviceName("测试设备");
request.setDeviceType(DeviceType.SENSOR);
request.setIpAddress("192.168.1.100");
request.setMacAddress("00:11:22:33:44:55");
// Mock数据库操作
when(deviceRepository.findByDeviceId("TEST_DEVICE_001"))
.thenReturn(Optional.empty())
.thenReturn(Optional.of(testDevice));
when(deviceRepository.save(any(HttpDevice.class))).thenReturn(testDevice);
// 执行测试
DeviceRegistrationResponse response = deviceService.registerDevice(request);
// 验证结果
assertThat(response.isSuccess()).isTrue();
assertThat(response.getDeviceId()).isEqualTo("TEST_DEVICE_001");
assertThat(response.getApiKey()).isNotNull();
// 验证方法调用
verify(deviceRepository, times(1)).save(any(HttpDevice.class));
}
@Test
@Order(2)
@DisplayName("设备心跳测试")
void testUpdateHeartbeat() {
// 准备测试数据
HeartbeatRequest request = new HeartbeatRequest();
request.setFirmwareVersion("v1.2.0");
request.setLocation("{\"lat\":39.9042,\"lng\":116.4074}");
// Mock数据库操作
when(deviceRepository.findByDeviceId("TEST_DEVICE_001"))
.thenReturn(Optional.of(testDevice));
when(deviceRepository.save(any(HttpDevice.class))).thenReturn(testDevice);
// 执行测试
assertDoesNotThrow(() -> {
deviceService.updateHeartbeat("TEST_DEVICE_001", request);
});
// 验证方法调用
verify(deviceRepository, times(1)).save(any(HttpDevice.class));
}
@Test
@Order(3)
@DisplayName("API密钥验证测试")
void testValidateApiKey() {
// 准备测试数据
String apiKey = "test-api-key-123";
testDevice.setApiKey(apiKey);
testDevice.setApiKeyExpireTime(LocalDateTime.now().plusDays(30));
// Mock数据库操作
when(deviceRepository.findByDeviceId("TEST_DEVICE_001"))
.thenReturn(Optional.of(testDevice));
// 执行测试 - 有效密钥
boolean isValid = deviceService.validateApiKey("TEST_DEVICE_001", apiKey);
assertThat(isValid).isTrue();
// 执行测试 - 无效密钥
boolean isInvalid = deviceService.validateApiKey("TEST_DEVICE_001", "invalid-key");
assertThat(isInvalid).isFalse();
}
@Test
@Order(4)
@DisplayName("设备不存在测试")
void testDeviceNotFound() {
// Mock数据库操作
when(deviceRepository.findByDeviceId("NON_EXISTENT_DEVICE"))
.thenReturn(Optional.empty());
// 执行测试
HttpDevice device = deviceService.getDevice("NON_EXISTENT_DEVICE");
// 验证结果
assertThat(device).isNull();
}
}
# 2. 数据接收测试
/**
* HTTP数据接收服务测试
*/
@SpringBootTest
class HttpDataReceiveServiceTest {
@Autowired
private HttpDataReceiveService dataReceiveService;
@MockBean
private HttpDeviceDataRepository dataRepository;
@MockBean
private HttpDeviceService deviceService;
@MockBean
private DataValidationService validationService;
@Test
@DisplayName("单条数据接收测试")
void testReceiveSingleData() {
// 准备测试数据
SingleDataRequest request = new SingleDataRequest();
request.setDataType("TEMPERATURE");
request.setDataValue("25.5");
request.setTimestamp(LocalDateTime.now());
request.setQuality(95);
request.setUnit("°C");
HttpDeviceData savedData = new HttpDeviceData();
savedData.setId(1L);
savedData.setDeviceId("TEST_DEVICE_001");
savedData.setDataType("TEMPERATURE");
savedData.setDataValue("25.5");
// Mock服务调用
when(deviceService.deviceExists("TEST_DEVICE_001")).thenReturn(true);
when(validationService.validateSingleData(request)).thenReturn(true);
when(dataRepository.save(any(HttpDeviceData.class))).thenReturn(savedData);
// 执行测试
DataReceiveResponse response = dataReceiveService
.receiveSingleData("TEST_DEVICE_001", request);
// 验证结果
assertThat(response.isSuccess()).isTrue();
assertThat(response.getMessage()).isEqualTo("数据接收成功");
assertThat(response.getDataId()).isEqualTo(1L);
// 验证方法调用
verify(dataRepository, times(1)).save(any(HttpDeviceData.class));
verify(deviceService, times(1)).updateLastDataReport("TEST_DEVICE_001");
}
@Test
@DisplayName("批量数据接收测试")
void testReceiveBatchData() {
// 准备测试数据
List<SingleDataRequest> dataList = Arrays.asList(
createDataRequest("TEMPERATURE", "25.5"),
createDataRequest("HUMIDITY", "60.0"),
createDataRequest("PRESSURE", "1013.25")
);
BatchDataRequest request = new BatchDataRequest();
request.setDataList(dataList);
List<HttpDeviceData> savedDataList = Arrays.asList(
createDeviceData(1L, "TEMPERATURE", "25.5"),
createDeviceData(2L, "HUMIDITY", "60.0"),
createDeviceData(3L, "PRESSURE", "1013.25")
);
// Mock服务调用
when(deviceService.deviceExists("TEST_DEVICE_001")).thenReturn(true);
when(validationService.validateBatchData(request)).thenReturn(true);
when(dataRepository.saveAll(anyList())).thenReturn(savedDataList);
// 执行测试
DataReceiveResponse response = dataReceiveService
.receiveBatchData("TEST_DEVICE_001", request);
// 验证结果
assertThat(response.isSuccess()).isTrue();
assertThat(response.getMessage()).isEqualTo("批量数据接收成功");
assertThat(response.getDataIds()).hasSize(3);
// 验证方法调用
verify(dataRepository, times(1)).saveAll(anyList());
}
@Test
@DisplayName("设备不存在时数据接收测试")
void testReceiveDataWithNonExistentDevice() {
// 准备测试数据
SingleDataRequest request = createDataRequest("TEMPERATURE", "25.5");
// Mock服务调用
when(deviceService.deviceExists("NON_EXISTENT_DEVICE")).thenReturn(false);
// 执行测试
DataReceiveResponse response = dataReceiveService
.receiveSingleData("NON_EXISTENT_DEVICE", request);
// 验证结果
assertThat(response.isSuccess()).isFalse();
assertThat(response.getMessage()).isEqualTo("设备不存在");
// 验证没有保存数据
verify(dataRepository, never()).save(any(HttpDeviceData.class));
}
// 辅助方法
private SingleDataRequest createDataRequest(String dataType, String dataValue) {
SingleDataRequest request = new SingleDataRequest();
request.setDataType(dataType);
request.setDataValue(dataValue);
request.setTimestamp(LocalDateTime.now());
request.setQuality(95);
return request;
}
private HttpDeviceData createDeviceData(Long id, String dataType, String dataValue) {
HttpDeviceData data = new HttpDeviceData();
data.setId(id);
data.setDeviceId("TEST_DEVICE_001");
data.setDataType(dataType);
data.setDataValue(dataValue);
data.setCreateTime(LocalDateTime.now());
return data;
}
}
# 3. 集成测试
/**
* HTTP设备接入层集成测试
*/
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestMethodOrder(OrderAnnotation.class)
class HttpDeviceIntegrationTest {
@Autowired
private TestRestTemplate restTemplate;
@LocalServerPort
private int port;
private String baseUrl;
private String apiKey;
private final String deviceId = "INTEGRATION_TEST_DEVICE";
@BeforeEach
void setUp() {
baseUrl = "http://localhost:" + port + "/api/v1/devices";
}
@Test
@Order(1)
@DisplayName("完整设备注册流程测试")
void testCompleteDeviceRegistration() {
// 准备注册请求
DeviceRegistrationRequest request = new DeviceRegistrationRequest();
request.setDeviceId(deviceId);
request.setDeviceName("集成测试设备");
request.setDeviceType(DeviceType.SENSOR);
request.setIpAddress("192.168.1.200");
request.setMacAddress("00:11:22:33:44:66");
request.setManufacturer("TestManufacturer");
request.setModel("TestModel");
// 发送注册请求
ResponseEntity<DeviceRegistrationResponse> response = restTemplate
.postForEntity(baseUrl + "/register", request, DeviceRegistrationResponse.class);
// 验证响应
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).isNotNull();
assertThat(response.getBody().isSuccess()).isTrue();
assertThat(response.getBody().getDeviceId()).isEqualTo(deviceId);
assertThat(response.getBody().getApiKey()).isNotNull();
// 保存API密钥用于后续测试
apiKey = response.getBody().getApiKey();
}
@Test
@Order(2)
@DisplayName("设备心跳测试")
void testDeviceHeartbeat() {
// 准备心跳请求
HeartbeatRequest request = new HeartbeatRequest();
request.setFirmwareVersion("v1.0.0");
request.setLocation("{\"lat\":39.9042,\"lng\":116.4074}");
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.set("X-API-Key", apiKey);
headers.set("X-Device-Id", deviceId);
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<HeartbeatRequest> entity = new HttpEntity<>(request, headers);
// 发送心跳请求
ResponseEntity<String> response = restTemplate
.postForEntity(baseUrl + "/" + deviceId + "/heartbeat", entity, String.class);
// 验证响应
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
}
@Test
@Order(3)
@DisplayName("数据上报测试")
void testDataUpload() {
// 准备数据请求
SingleDataRequest request = new SingleDataRequest();
request.setDataType("TEMPERATURE");
request.setDataValue("23.5");
request.setTimestamp(LocalDateTime.now());
request.setQuality(98);
request.setUnit("°C");
// 设置请求头
HttpHeaders headers = new HttpHeaders();
headers.set("X-API-Key", apiKey);
headers.set("X-Device-Id", deviceId);
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<SingleDataRequest> entity = new HttpEntity<>(request, headers);
// 发送数据请求
ResponseEntity<DataReceiveResponse> response = restTemplate
.postForEntity(baseUrl + "/" + deviceId + "/data", entity, DataReceiveResponse.class);
// 验证响应
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).isNotNull();
assertThat(response.getBody().isSuccess()).isTrue();
assertThat(response.getBody().getDataId()).isNotNull();
}
@Test
@Order(4)
@DisplayName("认证失败测试")
void testAuthenticationFailure() {
// 准备数据请求
SingleDataRequest request = new SingleDataRequest();
request.setDataType("TEMPERATURE");
request.setDataValue("23.5");
// 设置错误的请求头
HttpHeaders headers = new HttpHeaders();
headers.set("X-API-Key", "invalid-api-key");
headers.set("X-Device-Id", deviceId);
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<SingleDataRequest> entity = new HttpEntity<>(request, headers);
// 发送数据请求
ResponseEntity<ErrorResponse> response = restTemplate
.postForEntity(baseUrl + "/" + deviceId + "/data", entity, ErrorResponse.class);
// 验证响应
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.UNAUTHORIZED);
assertThat(response.getBody()).isNotNull();
assertThat(response.getBody().getCode()).isEqualTo("AUTHENTICATION_FAILED");
}
}