物联网数据接收服务
# 物联网数据接收服务
# 概述
数据接收服务是物联网平台的核心组件,负责接收、处理、存储来自各种设备的数据。本文档详细介绍数据接收服务的架构设计、实现方案和优化策略。
# 服务架构
数据接收服务
├── 数据接入层
│ ├── MQTT接入
│ ├── HTTP接入
│ ├── CoAP接入
│ └── TCP/UDP接入
├── 数据处理层
│ ├── 数据验证
│ ├── 数据解析
│ ├── 数据转换
│ └── 数据路由
├── 数据存储层
│ ├── 实时数据存储
│ ├── 历史数据存储
│ ├── 时序数据库
│ └── 数据分片
└── 数据分发层
├── 实时推送
├── 消息队列
├── 数据订阅
└── 告警触发
# 核心实体设计
# 设备数据实体
@Entity
@Table(name = "iot_device_data")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class IoTDeviceData {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "device_id", nullable = false, length = 100)
private String deviceId;
@Column(name = "product_id", nullable = false, length = 100)
private String productId;
@Column(name = "data_type", nullable = false, length = 50)
@Enumerated(EnumType.STRING)
private DataType dataType;
@Column(name = "data_content", columnDefinition = "JSON")
private String dataContent;
@Column(name = "raw_data", columnDefinition = "TEXT")
private String rawData;
@Column(name = "data_size")
private Integer dataSize;
@Column(name = "quality", nullable = false)
private Integer quality = 100; // 数据质量评分
@Column(name = "timestamp", nullable = false)
private LocalDateTime timestamp;
@Column(name = "receive_time", nullable = false)
private LocalDateTime receiveTime;
@Column(name = "process_time")
private LocalDateTime processTime;
@Column(name = "source_ip", length = 50)
private String sourceIp;
@Column(name = "protocol", length = 20)
private String protocol;
@Column(name = "status", nullable = false, length = 20)
@Enumerated(EnumType.STRING)
private DataStatus status = DataStatus.RECEIVED;
@Column(name = "error_message", length = 500)
private String errorMessage;
@Column(name = "create_time", nullable = false)
private LocalDateTime createTime;
@PrePersist
protected void onCreate() {
createTime = LocalDateTime.now();
if (receiveTime == null) {
receiveTime = LocalDateTime.now();
}
}
}
/**
* 数据类型枚举
*/
public enum DataType {
TELEMETRY("遥测数据"),
ATTRIBUTE("属性数据"),
EVENT("事件数据"),
ALARM("告警数据"),
COMMAND_RESPONSE("命令响应"),
HEARTBEAT("心跳数据");
private final String description;
DataType(String description) {
this.description = description;
}
}
/**
* 数据状态枚举
*/
public enum DataStatus {
RECEIVED("已接收"),
PROCESSING("处理中"),
PROCESSED("已处理"),
STORED("已存储"),
FAILED("处理失败"),
DISCARDED("已丢弃");
private final String description;
DataStatus(String description) {
this.description = description;
}
}
# 数据处理规则实体
@Entity
@Table(name = "iot_data_processing_rule")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DataProcessingRule {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "rule_name", nullable = false, length = 100)
private String ruleName;
@Column(name = "product_id", length = 100)
private String productId;
@Column(name = "device_id", length = 100)
private String deviceId;
@Column(name = "data_type", length = 50)
@Enumerated(EnumType.STRING)
private DataType dataType;
@Column(name = "filter_condition", columnDefinition = "JSON")
private String filterCondition;
@Column(name = "transformation_script", columnDefinition = "TEXT")
private String transformationScript;
@Column(name = "validation_rules", columnDefinition = "JSON")
private String validationRules;
@Column(name = "storage_config", columnDefinition = "JSON")
private String storageConfig;
@Column(name = "enabled", nullable = false)
private Boolean enabled = true;
@Column(name = "priority", nullable = false)
private Integer priority = 0;
@Column(name = "create_time", nullable = false)
private LocalDateTime createTime;
@Column(name = "update_time", nullable = false)
private LocalDateTime updateTime;
@PrePersist
protected void onCreate() {
createTime = LocalDateTime.now();
updateTime = LocalDateTime.now();
}
@PreUpdate
protected void onUpdate() {
updateTime = LocalDateTime.now();
}
}
# 数据接收服务实现
# MQTT数据接收
@Component
@Slf4j
public class MqttDataReceiver {
@Autowired
private DataProcessingService dataProcessingService;
@Autowired
private DeviceAuthService deviceAuthService;
@Autowired
private DataValidationService dataValidationService;
@Autowired
private MeterRegistry meterRegistry;
private final Counter receivedCounter;
private final Timer processingTimer;
public MqttDataReceiver(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.receivedCounter = Counter.builder("iot.data.received")
.description("接收到的数据条数")
.register(meterRegistry);
this.processingTimer = Timer.builder("iot.data.processing.duration")
.description("数据处理耗时")
.register(meterRegistry);
}
/**
* 处理MQTT消息
*/
@EventListener
public void handleMqttMessage(MqttMessageReceivedEvent event) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
String topic = event.getTopic();
String payload = event.getPayload();
String clientId = event.getClientId();
log.debug("接收到MQTT消息, topic: {}, clientId: {}, payload: {}", topic, clientId, payload);
// 1. 解析主题获取设备信息
DeviceTopicInfo topicInfo = parseDeviceTopic(topic);
// 2. 验证设备权限
if (!deviceAuthService.hasPermission(topicInfo.getDeviceId(), "data", "publish")) {
log.warn("设备无数据发布权限, deviceId: {}", topicInfo.getDeviceId());
return;
}
// 3. 构建数据对象
IoTDeviceData deviceData = buildDeviceData(topicInfo, payload, "MQTT", event.getSourceIp());
// 4. 数据验证
ValidationResult validationResult = dataValidationService.validate(deviceData);
if (!validationResult.isValid()) {
log.warn("数据验证失败, deviceId: {}, errors: {}",
topicInfo.getDeviceId(), validationResult.getErrors());
deviceData.setStatus(DataStatus.FAILED);
deviceData.setErrorMessage(String.join(",", validationResult.getErrors()));
}
// 5. 异步处理数据
dataProcessingService.processDataAsync(deviceData);
// 6. 更新指标
receivedCounter.increment(Tags.of(
"device_id", topicInfo.getDeviceId(),
"data_type", topicInfo.getDataType().name(),
"protocol", "MQTT"
));
} catch (Exception e) {
log.error("处理MQTT消息失败", e);
} finally {
sample.stop(processingTimer);
}
}
/**
* 解析设备主题
*/
private DeviceTopicInfo parseDeviceTopic(String topic) {
// 主题格式: /device/{deviceId}/data/{dataType}
String[] parts = topic.split("/");
if (parts.length < 5) {
throw new IllegalArgumentException("无效的主题格式: " + topic);
}
String deviceId = parts[2];
String dataTypeStr = parts[4];
DataType dataType = DataType.valueOf(dataTypeStr.toUpperCase());
return DeviceTopicInfo.builder()
.deviceId(deviceId)
.dataType(dataType)
.topic(topic)
.build();
}
/**
* 构建设备数据对象
*/
private IoTDeviceData buildDeviceData(DeviceTopicInfo topicInfo, String payload,
String protocol, String sourceIp) {
return IoTDeviceData.builder()
.deviceId(topicInfo.getDeviceId())
.dataType(topicInfo.getDataType())
.dataContent(payload)
.rawData(payload)
.dataSize(payload.getBytes().length)
.timestamp(LocalDateTime.now())
.receiveTime(LocalDateTime.now())
.sourceIp(sourceIp)
.protocol(protocol)
.status(DataStatus.RECEIVED)
.build();
}
}
# HTTP数据接收
@RestController
@RequestMapping("/api/v1/data")
@Slf4j
public class HttpDataReceiver {
@Autowired
private DataProcessingService dataProcessingService;
@Autowired
private DeviceAuthService deviceAuthService;
@Autowired
private DataValidationService dataValidationService;
@Autowired
private RateLimitService rateLimitService;
/**
* 接收设备数据
*/
@PostMapping("/upload")
public ResponseEntity<DataUploadResponse> uploadData(
@RequestHeader("Device-Id") String deviceId,
@RequestHeader("Authorization") String authorization,
@RequestBody DataUploadRequest request,
HttpServletRequest httpRequest) {
try {
// 1. 认证验证
if (!deviceAuthService.validateToken(deviceId, authorization)) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED)
.body(DataUploadResponse.failure("认证失败"));
}
// 2. 限流检查
if (!rateLimitService.isAllowed(deviceId, "data_upload")) {
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
.body(DataUploadResponse.failure("请求过于频繁"));
}
// 3. 权限检查
if (!deviceAuthService.hasPermission(deviceId, "data", "upload")) {
return ResponseEntity.status(HttpStatus.FORBIDDEN)
.body(DataUploadResponse.failure("无数据上传权限"));
}
// 4. 批量处理数据
List<String> successIds = new ArrayList<>();
List<String> failureIds = new ArrayList<>();
for (DeviceDataItem dataItem : request.getDataItems()) {
try {
IoTDeviceData deviceData = convertToDeviceData(deviceId, dataItem, httpRequest);
// 数据验证
ValidationResult validationResult = dataValidationService.validate(deviceData);
if (validationResult.isValid()) {
dataProcessingService.processDataAsync(deviceData);
successIds.add(dataItem.getId());
} else {
log.warn("数据验证失败, deviceId: {}, dataId: {}, errors: {}",
deviceId, dataItem.getId(), validationResult.getErrors());
failureIds.add(dataItem.getId());
}
} catch (Exception e) {
log.error("处理数据项失败, deviceId: {}, dataId: {}", deviceId, dataItem.getId(), e);
failureIds.add(dataItem.getId());
}
}
return ResponseEntity.ok(DataUploadResponse.builder()
.success(true)
.message("数据上传完成")
.successCount(successIds.size())
.failureCount(failureIds.size())
.successIds(successIds)
.failureIds(failureIds)
.build());
} catch (Exception e) {
log.error("数据上传失败, deviceId: {}", deviceId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(DataUploadResponse.failure("服务器内部错误"));
}
}
/**
* 转换为设备数据对象
*/
private IoTDeviceData convertToDeviceData(String deviceId, DeviceDataItem dataItem,
HttpServletRequest request) {
return IoTDeviceData.builder()
.deviceId(deviceId)
.dataType(DataType.valueOf(dataItem.getDataType().toUpperCase()))
.dataContent(dataItem.getData())
.rawData(dataItem.getData())
.dataSize(dataItem.getData().getBytes().length)
.timestamp(dataItem.getTimestamp() != null ? dataItem.getTimestamp() : LocalDateTime.now())
.receiveTime(LocalDateTime.now())
.sourceIp(getClientIpAddress(request))
.protocol("HTTP")
.status(DataStatus.RECEIVED)
.build();
}
/**
* 获取客户端IP地址
*/
private String getClientIpAddress(HttpServletRequest request) {
String xForwardedFor = request.getHeader("X-Forwarded-For");
if (StringUtils.isNotBlank(xForwardedFor)) {
return xForwardedFor.split(",")[0].trim();
}
String xRealIp = request.getHeader("X-Real-IP");
if (StringUtils.isNotBlank(xRealIp)) {
return xRealIp;
}
return request.getRemoteAddr();
}
}
# 数据处理服务
# 核心处理服务
@Service
@Slf4j
public class DataProcessingService {
@Autowired
private DataProcessingRuleRepository ruleRepository;
@Autowired
private DataStorageService dataStorageService;
@Autowired
private DataTransformationService transformationService;
@Autowired
private DataDistributionService distributionService;
@Autowired
private AsyncTaskExecutor taskExecutor;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 异步处理数据
*/
@Async("dataProcessingExecutor")
public CompletableFuture<Void> processDataAsync(IoTDeviceData deviceData) {
return CompletableFuture.runAsync(() -> {
try {
processData(deviceData);
} catch (Exception e) {
log.error("异步处理数据失败, deviceId: {}", deviceData.getDeviceId(), e);
}
}, taskExecutor);
}
/**
* 处理设备数据
*/
public void processData(IoTDeviceData deviceData) {
try {
deviceData.setStatus(DataStatus.PROCESSING);
deviceData.setProcessTime(LocalDateTime.now());
// 1. 获取处理规则
List<DataProcessingRule> rules = getProcessingRules(deviceData);
// 2. 应用处理规则
for (DataProcessingRule rule : rules) {
applyProcessingRule(deviceData, rule);
}
// 3. 数据转换
IoTDeviceData transformedData = transformationService.transform(deviceData);
// 4. 数据存储
dataStorageService.store(transformedData);
// 5. 数据分发
distributionService.distribute(transformedData);
transformedData.setStatus(DataStatus.PROCESSED);
log.debug("数据处理完成, deviceId: {}, dataId: {}",
deviceData.getDeviceId(), deviceData.getId());
} catch (Exception e) {
log.error("数据处理失败, deviceId: {}", deviceData.getDeviceId(), e);
deviceData.setStatus(DataStatus.FAILED);
deviceData.setErrorMessage(e.getMessage());
}
}
/**
* 获取处理规则
*/
private List<DataProcessingRule> getProcessingRules(IoTDeviceData deviceData) {
// 先从缓存获取
String cacheKey = "processing_rules:" + deviceData.getDeviceId() + ":" + deviceData.getDataType();
List<DataProcessingRule> cachedRules = getCachedRules(cacheKey);
if (cachedRules != null) {
return cachedRules;
}
// 从数据库查询
List<DataProcessingRule> rules = ruleRepository.findApplicableRules(
deviceData.getDeviceId(),
deviceData.getProductId(),
deviceData.getDataType()
);
// 缓存规则
cacheRules(cacheKey, rules);
return rules;
}
/**
* 应用处理规则
*/
private void applyProcessingRule(IoTDeviceData deviceData, DataProcessingRule rule) {
try {
// 1. 检查过滤条件
if (!matchesFilterCondition(deviceData, rule.getFilterCondition())) {
return;
}
// 2. 应用转换脚本
if (StringUtils.isNotBlank(rule.getTransformationScript())) {
transformationService.applyScript(deviceData, rule.getTransformationScript());
}
// 3. 应用验证规则
if (StringUtils.isNotBlank(rule.getValidationRules())) {
ValidationResult result = validateWithRules(deviceData, rule.getValidationRules());
if (!result.isValid()) {
deviceData.setQuality(deviceData.getQuality() - 10); // 降低数据质量评分
}
}
log.debug("应用处理规则完成, rule: {}, deviceId: {}",
rule.getRuleName(), deviceData.getDeviceId());
} catch (Exception e) {
log.error("应用处理规则失败, rule: {}, deviceId: {}",
rule.getRuleName(), deviceData.getDeviceId(), e);
}
}
/**
* 检查过滤条件
*/
private boolean matchesFilterCondition(IoTDeviceData deviceData, String filterCondition) {
if (StringUtils.isBlank(filterCondition)) {
return true;
}
try {
// 使用SpEL表达式评估过滤条件
SpelExpressionParser parser = new SpelExpressionParser();
Expression expression = parser.parseExpression(filterCondition);
StandardEvaluationContext context = new StandardEvaluationContext(deviceData);
return Boolean.TRUE.equals(expression.getValue(context, Boolean.class));
} catch (Exception e) {
log.warn("过滤条件评估失败, condition: {}, deviceId: {}",
filterCondition, deviceData.getDeviceId(), e);
return true; // 默认通过
}
}
/**
* 批量处理数据
*/
@Async("batchProcessingExecutor")
public CompletableFuture<BatchProcessingResult> batchProcessData(List<IoTDeviceData> dataList) {
return CompletableFuture.supplyAsync(() -> {
int successCount = 0;
int failureCount = 0;
List<String> errors = new ArrayList<>();
for (IoTDeviceData data : dataList) {
try {
processData(data);
successCount++;
} catch (Exception e) {
failureCount++;
errors.add("设备 " + data.getDeviceId() + " 处理失败: " + e.getMessage());
}
}
return BatchProcessingResult.builder()
.totalCount(dataList.size())
.successCount(successCount)
.failureCount(failureCount)
.errors(errors)
.build();
}, taskExecutor);
}
/**
* 缓存处理规则
*/
private void cacheRules(String cacheKey, List<DataProcessingRule> rules) {
try {
redisTemplate.opsForValue().set(cacheKey, rules, Duration.ofMinutes(30));
} catch (Exception e) {
log.warn("缓存处理规则失败, key: {}", cacheKey, e);
}
}
/**
* 获取缓存的处理规则
*/
@SuppressWarnings("unchecked")
private List<DataProcessingRule> getCachedRules(String cacheKey) {
try {
return (List<DataProcessingRule>) redisTemplate.opsForValue().get(cacheKey);
} catch (Exception e) {
log.warn("获取缓存处理规则失败, key: {}", cacheKey, e);
return null;
}
}
}
# 数据验证服务
@Service
@Slf4j
public class DataValidationService {
@Autowired
private DeviceManagementService deviceManagementService;
/**
* 验证设备数据
*/
public ValidationResult validate(IoTDeviceData deviceData) {
List<String> errors = new ArrayList<>();
try {
// 1. 基础字段验证
validateBasicFields(deviceData, errors);
// 2. 设备存在性验证
validateDeviceExists(deviceData, errors);
// 3. 数据格式验证
validateDataFormat(deviceData, errors);
// 4. 数据大小验证
validateDataSize(deviceData, errors);
// 5. 时间戳验证
validateTimestamp(deviceData, errors);
// 6. 数据类型验证
validateDataType(deviceData, errors);
} catch (Exception e) {
log.error("数据验证异常, deviceId: {}", deviceData.getDeviceId(), e);
errors.add("验证过程异常: " + e.getMessage());
}
return ValidationResult.builder()
.valid(errors.isEmpty())
.errors(errors)
.build();
}
/**
* 验证基础字段
*/
private void validateBasicFields(IoTDeviceData deviceData, List<String> errors) {
if (StringUtils.isBlank(deviceData.getDeviceId())) {
errors.add("设备ID不能为空");
}
if (deviceData.getDataType() == null) {
errors.add("数据类型不能为空");
}
if (StringUtils.isBlank(deviceData.getDataContent())) {
errors.add("数据内容不能为空");
}
if (deviceData.getTimestamp() == null) {
errors.add("时间戳不能为空");
}
}
/**
* 验证设备存在性
*/
private void validateDeviceExists(IoTDeviceData deviceData, List<String> errors) {
try {
IoTDevice device = deviceManagementService.getDeviceById(deviceData.getDeviceId());
if (device == null) {
errors.add("设备不存在: " + deviceData.getDeviceId());
return;
}
if (device.getStatus() == DeviceStatus.DISABLED) {
errors.add("设备已被禁用: " + deviceData.getDeviceId());
}
// 设置产品ID
deviceData.setProductId(device.getProductId());
} catch (Exception e) {
errors.add("验证设备存在性失败: " + e.getMessage());
}
}
/**
* 验证数据格式
*/
private void validateDataFormat(IoTDeviceData deviceData, List<String> errors) {
try {
String dataContent = deviceData.getDataContent();
// 尝试解析JSON格式
if (dataContent.trim().startsWith("{") || dataContent.trim().startsWith("[")) {
ObjectMapper mapper = new ObjectMapper();
mapper.readTree(dataContent);
}
} catch (JsonProcessingException e) {
errors.add("数据格式无效: " + e.getMessage());
}
}
/**
* 验证数据大小
*/
private void validateDataSize(IoTDeviceData deviceData, List<String> errors) {
int dataSize = deviceData.getDataSize();
// 单条数据不能超过1MB
if (dataSize > 1024 * 1024) {
errors.add("数据大小超限: " + dataSize + " bytes");
}
// 数据不能为空
if (dataSize == 0) {
errors.add("数据内容为空");
}
}
/**
* 验证时间戳
*/
private void validateTimestamp(IoTDeviceData deviceData, List<String> errors) {
LocalDateTime timestamp = deviceData.getTimestamp();
LocalDateTime now = LocalDateTime.now();
// 时间戳不能是未来时间(允许5分钟误差)
if (timestamp.isAfter(now.plusMinutes(5))) {
errors.add("时间戳不能是未来时间: " + timestamp);
}
// 时间戳不能太久远(不能超过7天)
if (timestamp.isBefore(now.minusDays(7))) {
errors.add("时间戳过于久远: " + timestamp);
}
}
/**
* 验证数据类型
*/
private void validateDataType(IoTDeviceData deviceData, List<String> errors) {
DataType dataType = deviceData.getDataType();
String dataContent = deviceData.getDataContent();
switch (dataType) {
case TELEMETRY:
validateTelemetryData(dataContent, errors);
break;
case ATTRIBUTE:
validateAttributeData(dataContent, errors);
break;
case EVENT:
validateEventData(dataContent, errors);
break;
case ALARM:
validateAlarmData(dataContent, errors);
break;
case HEARTBEAT:
validateHeartbeatData(dataContent, errors);
break;
}
}
/**
* 验证遥测数据
*/
private void validateTelemetryData(String dataContent, List<String> errors) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(dataContent);
// 遥测数据必须包含至少一个数值字段
boolean hasNumericField = false;
Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
if (field.getValue().isNumber()) {
hasNumericField = true;
break;
}
}
if (!hasNumericField) {
errors.add("遥测数据必须包含至少一个数值字段");
}
} catch (Exception e) {
errors.add("遥测数据格式验证失败: " + e.getMessage());
}
}
/**
* 验证属性数据
*/
private void validateAttributeData(String dataContent, List<String> errors) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(dataContent);
// 属性数据必须是对象格式
if (!node.isObject()) {
errors.add("属性数据必须是JSON对象格式");
}
} catch (Exception e) {
errors.add("属性数据格式验证失败: " + e.getMessage());
}
}
/**
* 验证事件数据
*/
private void validateEventData(String dataContent, List<String> errors) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(dataContent);
// 事件数据必须包含eventType字段
if (!node.has("eventType")) {
errors.add("事件数据必须包含eventType字段");
}
} catch (Exception e) {
errors.add("事件数据格式验证失败: " + e.getMessage());
}
}
/**
* 验证告警数据
*/
private void validateAlarmData(String dataContent, List<String> errors) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(dataContent);
// 告警数据必须包含alarmType和severity字段
if (!node.has("alarmType")) {
errors.add("告警数据必须包含alarmType字段");
}
if (!node.has("severity")) {
errors.add("告警数据必须包含severity字段");
}
} catch (Exception e) {
errors.add("告警数据格式验证失败: " + e.getMessage());
}
}
/**
* 验证心跳数据
*/
private void validateHeartbeatData(String dataContent, List<String> errors) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(dataContent);
// 心跳数据必须包含status字段
if (!node.has("status")) {
errors.add("心跳数据必须包含status字段");
}
} catch (Exception e) {
errors.add("心跳数据格式验证失败: " + e.getMessage());
}
}
}
# 数据存储服务
# 分层存储策略
@Service
@Slf4j
public class DataStorageService {
@Autowired
private IoTDeviceDataRepository deviceDataRepository;
@Autowired
private InfluxDBTemplate influxDBTemplate;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private AsyncTaskExecutor storageExecutor;
/**
* 存储设备数据
*/
public void store(IoTDeviceData deviceData) {
try {
// 1. 实时数据存储到Redis
storeToRedis(deviceData);
// 2. 时序数据存储到InfluxDB
storeToInfluxDB(deviceData);
// 3. 异步存储到MySQL(用于查询和分析)
storeToMySQLAsync(deviceData);
deviceData.setStatus(DataStatus.STORED);
} catch (Exception e) {
log.error("数据存储失败, deviceId: {}", deviceData.getDeviceId(), e);
throw new DataStorageException("数据存储失败", e);
}
}
/**
* 存储到Redis(实时数据)
*/
private void storeToRedis(IoTDeviceData deviceData) {
try {
String key = "device:data:latest:" + deviceData.getDeviceId();
// 存储最新数据
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("dataType", deviceData.getDataType().name());
dataMap.put("dataContent", deviceData.getDataContent());
dataMap.put("timestamp", deviceData.getTimestamp().toString());
dataMap.put("quality", deviceData.getQuality());
redisTemplate.opsForHash().putAll(key, dataMap);
redisTemplate.expire(key, Duration.ofHours(24)); // 24小时过期
// 存储到时序列表(用于趋势分析)
String timeSeriesKey = "device:data:timeseries:" + deviceData.getDeviceId();
String timeSeriesValue = deviceData.getTimestamp() + "|" + deviceData.getDataContent();
redisTemplate.opsForList().leftPush(timeSeriesKey, timeSeriesValue);
redisTemplate.opsForList().trim(timeSeriesKey, 0, 999); // 保留最近1000条
redisTemplate.expire(timeSeriesKey, Duration.ofDays(7));
} catch (Exception e) {
log.error("存储到Redis失败, deviceId: {}", deviceData.getDeviceId(), e);
}
}
/**
* 存储到InfluxDB(时序数据)
*/
private void storeToInfluxDB(IoTDeviceData deviceData) {
try {
Point.Builder pointBuilder = Point.measurement("device_data")
.tag("device_id", deviceData.getDeviceId())
.tag("product_id", deviceData.getProductId())
.tag("data_type", deviceData.getDataType().name())
.tag("protocol", deviceData.getProtocol())
.time(deviceData.getTimestamp().atZone(ZoneId.systemDefault()).toInstant(), TimeUnit.MILLISECONDS)
.field("data_content", deviceData.getDataContent())
.field("data_size", deviceData.getDataSize())
.field("quality", deviceData.getQuality());
// 如果是遥测数据,解析数值字段
if (deviceData.getDataType() == DataType.TELEMETRY) {
parseAndAddNumericFields(pointBuilder, deviceData.getDataContent());
}
Point point = pointBuilder.build();
influxDBTemplate.write(point);
} catch (Exception e) {
log.error("存储到InfluxDB失败, deviceId: {}", deviceData.getDeviceId(), e);
}
}
/**
* 异步存储到MySQL
*/
@Async("storageExecutor")
public void storeToMySQLAsync(IoTDeviceData deviceData) {
try {
deviceDataRepository.save(deviceData);
log.debug("数据存储到MySQL完成, deviceId: {}, dataId: {}",
deviceData.getDeviceId(), deviceData.getId());
} catch (Exception e) {
log.error("存储到MySQL失败, deviceId: {}", deviceData.getDeviceId(), e);
}
}
/**
* 解析并添加数值字段
*/
private void parseAndAddNumericFields(Point.Builder pointBuilder, String dataContent) {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(dataContent);
Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
String fieldName = field.getKey();
JsonNode fieldValue = field.getValue();
if (fieldValue.isNumber()) {
if (fieldValue.isInt()) {
pointBuilder.field(fieldName, fieldValue.asInt());
} else if (fieldValue.isLong()) {
pointBuilder.field(fieldName, fieldValue.asLong());
} else {
pointBuilder.field(fieldName, fieldValue.asDouble());
}
} else if (fieldValue.isBoolean()) {
pointBuilder.field(fieldName, fieldValue.asBoolean());
} else if (fieldValue.isTextual()) {
pointBuilder.field(fieldName, fieldValue.asText());
}
}
} catch (Exception e) {
log.warn("解析数值字段失败, dataContent: {}", dataContent, e);
}
}
/**
* 批量存储数据
*/
@Async("batchStorageExecutor")
public CompletableFuture<Void> batchStore(List<IoTDeviceData> dataList) {
return CompletableFuture.runAsync(() -> {
try {
// 批量存储到MySQL
deviceDataRepository.saveAll(dataList);
// 批量存储到InfluxDB
List<Point> points = dataList.stream()
.map(this::convertToInfluxPoint)
.collect(Collectors.toList());
influxDBTemplate.write(points);
log.info("批量存储完成, 数据条数: {}", dataList.size());
} catch (Exception e) {
log.error("批量存储失败, 数据条数: {}", dataList.size(), e);
}
}, storageExecutor);
}
/**
* 转换为InfluxDB Point
*/
private Point convertToInfluxPoint(IoTDeviceData deviceData) {
Point.Builder pointBuilder = Point.measurement("device_data")
.tag("device_id", deviceData.getDeviceId())
.tag("product_id", deviceData.getProductId())
.tag("data_type", deviceData.getDataType().name())
.time(deviceData.getTimestamp().atZone(ZoneId.systemDefault()).toInstant(), TimeUnit.MILLISECONDS)
.field("data_content", deviceData.getDataContent())
.field("data_size", deviceData.getDataSize())
.field("quality", deviceData.getQuality());
if (deviceData.getDataType() == DataType.TELEMETRY) {
parseAndAddNumericFields(pointBuilder, deviceData.getDataContent());
}
return pointBuilder.build();
}
}
# 数据分发服务
# 实时数据分发
@Service
@Slf4j
public class DataDistributionService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private WebSocketMessageSender webSocketSender;
@Autowired
private DataSubscriptionService subscriptionService;
@Autowired
private AlarmService alarmService;
/**
* 分发设备数据
*/
public void distribute(IoTDeviceData deviceData) {
try {
// 1. 发送到消息队列
sendToMessageQueue(deviceData);
// 2. WebSocket实时推送
sendToWebSocket(deviceData);
// 3. 处理数据订阅
processDataSubscriptions(deviceData);
// 4. 触发告警检查
checkAndTriggerAlarms(deviceData);
log.debug("数据分发完成, deviceId: {}", deviceData.getDeviceId());
} catch (Exception e) {
log.error("数据分发失败, deviceId: {}", deviceData.getDeviceId(), e);
}
}
/**
* 发送到消息队列
*/
private void sendToMessageQueue(IoTDeviceData deviceData) {
try {
String routingKey = "device.data." + deviceData.getDataType().name().toLowerCase();
DeviceDataMessage message = DeviceDataMessage.builder()
.deviceId(deviceData.getDeviceId())
.productId(deviceData.getProductId())
.dataType(deviceData.getDataType())
.dataContent(deviceData.getDataContent())
.timestamp(deviceData.getTimestamp())
.quality(deviceData.getQuality())
.build();
rabbitTemplate.convertAndSend("iot.data.exchange", routingKey, message);
} catch (Exception e) {
log.error("发送到消息队列失败, deviceId: {}", deviceData.getDeviceId(), e);
}
}
/**
* WebSocket实时推送
*/
private void sendToWebSocket(IoTDeviceData deviceData) {
try {
// 推送给订阅该设备的用户
List<String> subscribers = subscriptionService.getDeviceSubscribers(deviceData.getDeviceId());
for (String userId : subscribers) {
WebSocketMessage wsMessage = WebSocketMessage.builder()
.type("DEVICE_DATA")
.deviceId(deviceData.getDeviceId())
.data(deviceData.getDataContent())
.timestamp(deviceData.getTimestamp())
.build();
webSocketSender.sendToUser(userId, wsMessage);
}
} catch (Exception e) {
log.error("WebSocket推送失败, deviceId: {}", deviceData.getDeviceId(), e);
}
}
/**
* 处理数据订阅
*/
private void processDataSubscriptions(IoTDeviceData deviceData) {
try {
List<DataSubscription> subscriptions = subscriptionService.getActiveSubscriptions(
deviceData.getDeviceId(), deviceData.getDataType());
for (DataSubscription subscription : subscriptions) {
processSubscription(deviceData, subscription);
}
} catch (Exception e) {
log.error("处理数据订阅失败, deviceId: {}", deviceData.getDeviceId(), e);
}
}
/**
* 处理单个订阅
*/
private void processSubscription(IoTDeviceData deviceData, DataSubscription subscription) {
try {
// 检查订阅条件
if (!matchesSubscriptionCondition(deviceData, subscription.getCondition())) {
return;
}
// 根据订阅类型处理
switch (subscription.getType()) {
case HTTP_CALLBACK:
sendHttpCallback(deviceData, subscription);
break;
case EMAIL:
sendEmailNotification(deviceData, subscription);
break;
case SMS:
sendSmsNotification(deviceData, subscription);
break;
case WEBHOOK:
sendWebhook(deviceData, subscription);
break;
}
} catch (Exception e) {
log.error("处理订阅失败, subscriptionId: {}, deviceId: {}",
subscription.getId(), deviceData.getDeviceId(), e);
}
}
/**
* 检查和触发告警
*/
private void checkAndTriggerAlarms(IoTDeviceData deviceData) {
try {
if (deviceData.getDataType() == DataType.ALARM) {
alarmService.processAlarmData(deviceData);
} else {
alarmService.checkAlarmRules(deviceData);
}
} catch (Exception e) {
log.error("告警检查失败, deviceId: {}", deviceData.getDeviceId(), e);
}
}
}
# 性能优化
# 数据接收优化配置
@Configuration
@EnableAsync
public class DataProcessingConfig {
/**
* 数据处理线程池
*/
@Bean("dataProcessingExecutor")
public TaskExecutor dataProcessingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("data-processing-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* 批量处理线程池
*/
@Bean("batchProcessingExecutor")
public TaskExecutor batchProcessingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("batch-processing-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* 存储线程池
*/
@Bean("storageExecutor")
public TaskExecutor storageExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(15);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(2000);
executor.setThreadNamePrefix("storage-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
# 监控指标
# 数据接收监控
@Component
public class DataReceptionMetrics {
private final Counter dataReceivedCounter;
private final Counter dataProcessedCounter;
private final Counter dataFailedCounter;
private final Timer dataProcessingTimer;
private final Gauge dataQueueSize;
public DataReceptionMetrics(MeterRegistry meterRegistry) {
this.dataReceivedCounter = Counter.builder("iot.data.received.total")
.description("接收到的数据总数")
.register(meterRegistry);
this.dataProcessedCounter = Counter.builder("iot.data.processed.total")
.description("处理成功的数据总数")
.register(meterRegistry);
this.dataFailedCounter = Counter.builder("iot.data.failed.total")
.description("处理失败的数据总数")
.register(meterRegistry);
this.dataProcessingTimer = Timer.builder("iot.data.processing.duration")
.description("数据处理耗时")
.register(meterRegistry);
this.dataQueueSize = Gauge.builder("iot.data.queue.size")
.description("数据处理队列大小")
.register(meterRegistry, this, metrics -> getCurrentQueueSize());
}
public void incrementDataReceived(String protocol, String dataType) {
dataReceivedCounter.increment(Tags.of(
"protocol", protocol,
"data_type", dataType
));
}
public void incrementDataProcessed(String deviceId, String dataType) {
dataProcessedCounter.increment(Tags.of(
"device_id", deviceId,
"data_type", dataType
));
}
public void incrementDataFailed(String deviceId, String reason) {
dataFailedCounter.increment(Tags.of(
"device_id", deviceId,
"failure_reason", reason
));
}
public Timer.Sample startProcessingTimer() {
return Timer.start();
}
public void stopProcessingTimer(Timer.Sample sample, String deviceId) {
sample.stop(Timer.builder("iot.data.processing.duration")
.tag("device_id", deviceId)
.register(meterRegistry));
}
private double getCurrentQueueSize() {
// 实现获取当前队列大小的逻辑
return 0;
}
}
# 总结
数据接收服务是物联网平台的数据入口,通过多协议接入、实时处理、分层存储、智能分发等机制,确保设备数据的可靠接收和高效处理。合理的架构设计和性能优化策略保证了服务的高并发处理能力和数据一致性。