应用服务层架构设计
# 应用服务层架构设计
# 1. 章节概述
应用服务层是IoT系统的核心业务逻辑层,负责处理业务规则、数据转换、服务编排和用户交互。本章将详细介绍应用服务层的架构设计、微服务实现、API网关配置和业务服务开发。
# 1.1 学习目标
- 掌握应用服务层的架构设计原则
- 理解微服务架构在IoT系统中的应用
- 学习API网关的配置和管理
- 实现核心业务服务和数据处理逻辑
- 掌握服务间通信和数据一致性保证
# 2. 应用服务层架构
# 2.1 整体架构
graph TB
subgraph "客户端层"
A[Web应用]
B[移动应用]
C[第三方系统]
end
subgraph "API网关层"
D[API Gateway]
E[负载均衡器]
F[认证授权]
end
subgraph "应用服务层"
G[设备管理服务]
H[数据分析服务]
I[告警服务]
J[用户管理服务]
K[配置管理服务]
L[报表服务]
end
subgraph "基础设施层"
M[服务注册中心]
N[配置中心]
O[消息队列]
P[缓存服务]
Q[数据库]
end
A --> D
B --> D
C --> D
D --> E
E --> F
F --> G
F --> H
F --> I
F --> J
F --> K
F --> L
G --> M
H --> M
I --> M
J --> M
K --> M
L --> M
G --> N
H --> N
I --> N
J --> N
K --> N
L --> N
G --> O
H --> O
I --> O
G --> P
H --> P
I --> P
J --> P
G --> Q
H --> Q
I --> Q
J --> Q
K --> Q
L --> Q
# 2.2 微服务架构设计
// 示例:微服务基础框架
@SpringBootApplication
@EnableEurekaClient
@EnableConfigServer
@EnableCircuitBreaker
public class IoTMicroserviceApplication {
public static void main(String[] args) {
SpringApplication.run(IoTMicroserviceApplication.class, args);
}
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}
// 微服务基础配置
@Configuration
public class MicroserviceConfig {
@Value("${spring.application.name}")
private String serviceName;
@Value("${server.port}")
private int serverPort;
@Bean
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder()
.withDiscoveryClient()
.withHealthChecks()
.withCaching(Duration.ofSeconds(30))
.build(context);
}
@Bean
public HealthIndicator serviceHealthIndicator() {
return new AbstractHealthIndicator() {
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
// 检查服务健康状态
boolean isHealthy = checkServiceHealth();
if (isHealthy) {
builder.up()
.withDetail("service", serviceName)
.withDetail("port", serverPort)
.withDetail("status", "UP");
} else {
builder.down()
.withDetail("service", serviceName)
.withDetail("error", "Service health check failed");
}
}
};
}
private boolean checkServiceHealth() {
// 实现具体的健康检查逻辑
try {
// 检查数据库连接
// 检查外部依赖
// 检查关键组件状态
return true;
} catch (Exception e) {
return false;
}
}
}
# 3. API网关实现
# 3.1 网关配置
// 示例:Spring Cloud Gateway配置
@Configuration
@EnableWebFluxSecurity
public class GatewayConfig {
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
return builder.routes()
// 设备管理服务路由
.route("device-service", r -> r.path("/api/devices/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Service", "device-service")
.circuitBreaker(config -> config
.setName("device-service-cb")
.setFallbackUri("forward:/fallback/device"))
.retry(config -> config
.setRetries(3)
.setBackoff(Duration.ofMillis(100), Duration.ofMillis(1000), 2, false)))
.uri("lb://device-service"))
// 数据分析服务路由
.route("analytics-service", r -> r.path("/api/analytics/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Service", "analytics-service")
.requestRateLimiter(config -> config
.setRateLimiter(redisRateLimiter())
.setKeyResolver(userKeyResolver()))
.circuitBreaker(config -> config
.setName("analytics-service-cb")
.setFallbackUri("forward:/fallback/analytics")))
.uri("lb://analytics-service"))
// 告警服务路由
.route("alert-service", r -> r.path("/api/alerts/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Service", "alert-service")
.circuitBreaker(config -> config
.setName("alert-service-cb")
.setFallbackUri("forward:/fallback/alert")))
.uri("lb://alert-service"))
// 用户管理服务路由
.route("user-service", r -> r.path("/api/users/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Service", "user-service")
.circuitBreaker(config -> config
.setName("user-service-cb")
.setFallbackUri("forward:/fallback/user")))
.uri("lb://user-service"))
.build();
}
@Bean
public RedisRateLimiter redisRateLimiter() {
return new RedisRateLimiter(10, 20, 1); // 每秒10个请求,突发20个
}
@Bean
public KeyResolver userKeyResolver() {
return exchange -> {
// 基于用户ID进行限流
String userId = exchange.getRequest().getHeaders().getFirst("X-User-Id");
return Mono.just(userId != null ? userId : "anonymous");
};
}
@Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
return http
.csrf().disable()
.authorizeExchange(exchanges -> exchanges
.pathMatchers("/api/auth/**").permitAll()
.pathMatchers("/api/public/**").permitAll()
.pathMatchers("/actuator/**").permitAll()
.anyExchange().authenticated()
)
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt.jwtDecoder(jwtDecoder()))
)
.build();
}
@Bean
public ReactiveJwtDecoder jwtDecoder() {
NimbusReactiveJwtDecoder jwtDecoder = NimbusReactiveJwtDecoder
.withJwkSetUri("http://auth-service/oauth2/jwks")
.build();
jwtDecoder.setJwtValidator(jwtValidator());
return jwtDecoder;
}
@Bean
public Converter<Jwt, Mono<AbstractAuthenticationToken>> jwtAuthenticationConverter() {
JwtAuthenticationConverter converter = new JwtAuthenticationConverter();
converter.setJwtGrantedAuthoritiesConverter(jwt -> {
Collection<String> authorities = jwt.getClaimAsStringList("authorities");
return authorities.stream()
.map(SimpleGrantedAuthority::new)
.collect(Collectors.toList());
});
return new ReactiveJwtAuthenticationConverterAdapter(converter);
}
private Converter<Jwt, Collection<GrantedAuthority>> jwtValidator() {
return new JwtTimestampValidator();
}
}
// 网关过滤器
@Component
public class GlobalAuthenticationFilter implements GlobalFilter, Ordered {
private final ReactiveJwtDecoder jwtDecoder;
private final RedisTemplate<String, Object> redisTemplate;
public GlobalAuthenticationFilter(ReactiveJwtDecoder jwtDecoder,
RedisTemplate<String, Object> redisTemplate) {
this.jwtDecoder = jwtDecoder;
this.redisTemplate = redisTemplate;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String path = request.getURI().getPath();
// 跳过不需要认证的路径
if (isPublicPath(path)) {
return chain.filter(exchange);
}
// 提取JWT令牌
String token = extractToken(request);
if (token == null) {
return handleUnauthorized(exchange);
}
// 验证JWT令牌
return jwtDecoder.decode(token)
.flatMap(jwt -> {
// 检查令牌是否在黑名单中
String jti = jwt.getClaimAsString("jti");
if (isTokenBlacklisted(jti)) {
return handleUnauthorized(exchange);
}
// 添加用户信息到请求头
ServerHttpRequest mutatedRequest = request.mutate()
.header("X-User-Id", jwt.getSubject())
.header("X-User-Roles", String.join(",", jwt.getClaimAsStringList("roles")))
.header("X-Tenant-Id", jwt.getClaimAsString("tenant_id"))
.build();
return chain.filter(exchange.mutate().request(mutatedRequest).build());
})
.onErrorResume(JwtException.class, ex -> handleUnauthorized(exchange));
}
private boolean isPublicPath(String path) {
return path.startsWith("/api/auth/") ||
path.startsWith("/api/public/") ||
path.startsWith("/actuator/");
}
private String extractToken(ServerHttpRequest request) {
String authHeader = request.getHeaders().getFirst("Authorization");
if (authHeader != null && authHeader.startsWith("Bearer ")) {
return authHeader.substring(7);
}
return null;
}
private boolean isTokenBlacklisted(String jti) {
return redisTemplate.hasKey("blacklist:" + jti);
}
private Mono<Void> handleUnauthorized(ServerWebExchange exchange) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.UNAUTHORIZED);
String body = "{\"error\":\"Unauthorized\",\"message\":\"Invalid or missing authentication token\"}";
DataBuffer buffer = response.bufferFactory().wrap(body.getBytes(StandardCharsets.UTF_8));
response.getHeaders().add("Content-Type", "application/json");
return response.writeWith(Mono.just(buffer));
}
@Override
public int getOrder() {
return -100; // 高优先级
}
}
# 3.2 网关监控和指标
// 示例:网关监控配置
@Component
public class GatewayMetricsFilter implements GlobalFilter, Ordered {
private final MeterRegistry meterRegistry;
private final Timer.Builder timerBuilder;
public GatewayMetricsFilter(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.timerBuilder = Timer.builder("gateway.requests")
.description("Gateway request duration");
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
long startTime = System.nanoTime();
return chain.filter(exchange)
.doFinally(signalType -> {
long duration = System.nanoTime() - startTime;
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
// 记录请求指标
Timer.Sample sample = Timer.start(meterRegistry);
sample.stop(timerBuilder
.tag("method", request.getMethodValue())
.tag("uri", request.getURI().getPath())
.tag("status", String.valueOf(response.getStatusCode().value()))
.register(meterRegistry));
// 记录计数器
Counter.builder("gateway.requests.total")
.tag("method", request.getMethodValue())
.tag("status", String.valueOf(response.getStatusCode().value()))
.register(meterRegistry)
.increment();
// 记录响应时间分布
DistributionSummary.builder("gateway.response.time")
.tag("uri", request.getURI().getPath())
.register(meterRegistry)
.record(duration / 1_000_000.0); // 转换为毫秒
});
}
@Override
public int getOrder() {
return 0;
}
}
// 网关健康检查
@Component
public class GatewayHealthIndicator implements HealthIndicator {
private final DiscoveryClient discoveryClient;
private final RedisTemplate<String, Object> redisTemplate;
public GatewayHealthIndicator(DiscoveryClient discoveryClient,
RedisTemplate<String, Object> redisTemplate) {
this.discoveryClient = discoveryClient;
this.redisTemplate = redisTemplate;
}
@Override
public Health health() {
Health.Builder builder = new Health.Builder();
try {
// 检查服务发现
List<String> services = discoveryClient.getServices();
builder.withDetail("discovery.services.count", services.size());
builder.withDetail("discovery.services", services);
// 检查Redis连接
redisTemplate.opsForValue().get("health-check");
builder.withDetail("redis.status", "UP");
// 检查关键服务状态
Map<String, String> serviceStatus = new HashMap<>();
for (String service : Arrays.asList("device-service", "analytics-service", "alert-service")) {
List<ServiceInstance> instances = discoveryClient.getInstances(service);
serviceStatus.put(service, instances.isEmpty() ? "DOWN" : "UP");
}
builder.withDetail("services.status", serviceStatus);
// 判断整体健康状态
boolean allServicesUp = serviceStatus.values().stream()
.allMatch("UP"::equals);
if (allServicesUp) {
builder.up();
} else {
builder.down();
}
} catch (Exception e) {
builder.down().withException(e);
}
return builder.build();
}
}
# 4. 核心业务服务
# 4.1 设备管理服务
// 示例:设备管理服务实现
@RestController
@RequestMapping("/api/devices")
@Validated
public class DeviceController {
private final DeviceService deviceService;
private final DeviceEventPublisher eventPublisher;
public DeviceController(DeviceService deviceService,
DeviceEventPublisher eventPublisher) {
this.deviceService = deviceService;
this.eventPublisher = eventPublisher;
}
@PostMapping
public ResponseEntity<DeviceDTO> createDevice(
@Valid @RequestBody CreateDeviceRequest request,
@RequestHeader("X-User-Id") String userId,
@RequestHeader("X-Tenant-Id") String tenantId) {
DeviceDTO device = deviceService.createDevice(request, userId, tenantId);
// 发布设备创建事件
eventPublisher.publishDeviceCreated(device);
return ResponseEntity.status(HttpStatus.CREATED).body(device);
}
@GetMapping("/{deviceId}")
public ResponseEntity<DeviceDTO> getDevice(
@PathVariable String deviceId,
@RequestHeader("X-Tenant-Id") String tenantId) {
DeviceDTO device = deviceService.getDevice(deviceId, tenantId);
return ResponseEntity.ok(device);
}
@GetMapping
public ResponseEntity<PagedResponse<DeviceDTO>> getDevices(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size,
@RequestParam(required = false) String status,
@RequestParam(required = false) String type,
@RequestParam(required = false) String location,
@RequestHeader("X-Tenant-Id") String tenantId) {
DeviceQueryParams params = DeviceQueryParams.builder()
.tenantId(tenantId)
.status(status)
.type(type)
.location(location)
.page(page)
.size(size)
.build();
PagedResponse<DeviceDTO> devices = deviceService.getDevices(params);
return ResponseEntity.ok(devices);
}
@PutMapping("/{deviceId}")
public ResponseEntity<DeviceDTO> updateDevice(
@PathVariable String deviceId,
@Valid @RequestBody UpdateDeviceRequest request,
@RequestHeader("X-User-Id") String userId,
@RequestHeader("X-Tenant-Id") String tenantId) {
DeviceDTO device = deviceService.updateDevice(deviceId, request, userId, tenantId);
// 发布设备更新事件
eventPublisher.publishDeviceUpdated(device);
return ResponseEntity.ok(device);
}
@DeleteMapping("/{deviceId}")
public ResponseEntity<Void> deleteDevice(
@PathVariable String deviceId,
@RequestHeader("X-User-Id") String userId,
@RequestHeader("X-Tenant-Id") String tenantId) {
deviceService.deleteDevice(deviceId, userId, tenantId);
// 发布设备删除事件
eventPublisher.publishDeviceDeleted(deviceId, tenantId);
return ResponseEntity.noContent().build();
}
@PostMapping("/{deviceId}/commands")
public ResponseEntity<CommandResponse> sendCommand(
@PathVariable String deviceId,
@Valid @RequestBody SendCommandRequest request,
@RequestHeader("X-User-Id") String userId,
@RequestHeader("X-Tenant-Id") String tenantId) {
CommandResponse response = deviceService.sendCommand(deviceId, request, userId, tenantId);
return ResponseEntity.ok(response);
}
@GetMapping("/{deviceId}/status")
public ResponseEntity<DeviceStatusDTO> getDeviceStatus(
@PathVariable String deviceId,
@RequestHeader("X-Tenant-Id") String tenantId) {
DeviceStatusDTO status = deviceService.getDeviceStatus(deviceId, tenantId);
return ResponseEntity.ok(status);
}
@GetMapping("/{deviceId}/data")
public ResponseEntity<List<SensorDataDTO>> getDeviceData(
@PathVariable String deviceId,
@RequestParam(required = false) String sensorType,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime,
@RequestParam(defaultValue = "100") int limit,
@RequestHeader("X-Tenant-Id") String tenantId) {
DataQueryParams params = DataQueryParams.builder()
.deviceId(deviceId)
.tenantId(tenantId)
.sensorType(sensorType)
.startTime(startTime)
.endTime(endTime)
.limit(limit)
.build();
List<SensorDataDTO> data = deviceService.getDeviceData(params);
return ResponseEntity.ok(data);
}
}
@Service
@Transactional
public class DeviceService {
private final DeviceRepository deviceRepository;
private final DeviceStatusRepository statusRepository;
private final SensorDataRepository dataRepository;
private final DeviceCommandService commandService;
private final RedisTemplate<String, Object> redisTemplate;
private final DeviceMapper deviceMapper;
public DeviceService(DeviceRepository deviceRepository,
DeviceStatusRepository statusRepository,
SensorDataRepository dataRepository,
DeviceCommandService commandService,
RedisTemplate<String, Object> redisTemplate,
DeviceMapper deviceMapper) {
this.deviceRepository = deviceRepository;
this.statusRepository = statusRepository;
this.dataRepository = dataRepository;
this.commandService = commandService;
this.redisTemplate = redisTemplate;
this.deviceMapper = deviceMapper;
}
public DeviceDTO createDevice(CreateDeviceRequest request, String userId, String tenantId) {
// 验证设备ID唯一性
if (deviceRepository.existsByDeviceIdAndTenantId(request.getDeviceId(), tenantId)) {
throw new DeviceAlreadyExistsException("Device already exists: " + request.getDeviceId());
}
// 创建设备实体
Device device = Device.builder()
.deviceId(request.getDeviceId())
.name(request.getName())
.type(request.getType())
.model(request.getModel())
.manufacturer(request.getManufacturer())
.location(request.getLocation())
.description(request.getDescription())
.configuration(request.getConfiguration())
.tenantId(tenantId)
.createdBy(userId)
.createdAt(LocalDateTime.now())
.status(DeviceStatus.INACTIVE)
.build();
device = deviceRepository.save(device);
// 创建设备状态记录
DeviceStatusEntity status = DeviceStatusEntity.builder()
.deviceId(device.getDeviceId())
.tenantId(tenantId)
.status(DeviceStatus.INACTIVE)
.lastHeartbeat(null)
.lastDataTime(null)
.isOnline(false)
.build();
statusRepository.save(status);
// 缓存设备信息
cacheDevice(device);
return deviceMapper.toDTO(device);
}
@Cacheable(value = "devices", key = "#deviceId + ':' + #tenantId")
public DeviceDTO getDevice(String deviceId, String tenantId) {
Device device = deviceRepository.findByDeviceIdAndTenantId(deviceId, tenantId)
.orElseThrow(() -> new DeviceNotFoundException("Device not found: " + deviceId));
return deviceMapper.toDTO(device);
}
public PagedResponse<DeviceDTO> getDevices(DeviceQueryParams params) {
Specification<Device> spec = DeviceSpecifications.buildSpecification(params);
Pageable pageable = PageRequest.of(params.getPage(), params.getSize(),
Sort.by(Sort.Direction.DESC, "createdAt"));
Page<Device> devicePage = deviceRepository.findAll(spec, pageable);
List<DeviceDTO> devices = devicePage.getContent().stream()
.map(deviceMapper::toDTO)
.collect(Collectors.toList());
return PagedResponse.<DeviceDTO>builder()
.content(devices)
.page(devicePage.getNumber())
.size(devicePage.getSize())
.totalElements(devicePage.getTotalElements())
.totalPages(devicePage.getTotalPages())
.build();
}
@CacheEvict(value = "devices", key = "#deviceId + ':' + #tenantId")
public DeviceDTO updateDevice(String deviceId, UpdateDeviceRequest request, String userId, String tenantId) {
Device device = deviceRepository.findByDeviceIdAndTenantId(deviceId, tenantId)
.orElseThrow(() -> new DeviceNotFoundException("Device not found: " + deviceId));
// 更新设备信息
if (request.getName() != null) {
device.setName(request.getName());
}
if (request.getLocation() != null) {
device.setLocation(request.getLocation());
}
if (request.getDescription() != null) {
device.setDescription(request.getDescription());
}
if (request.getConfiguration() != null) {
device.setConfiguration(request.getConfiguration());
}
device.setUpdatedBy(userId);
device.setUpdatedAt(LocalDateTime.now());
device = deviceRepository.save(device);
// 更新缓存
cacheDevice(device);
return deviceMapper.toDTO(device);
}
@CacheEvict(value = "devices", key = "#deviceId + ':' + #tenantId")
public void deleteDevice(String deviceId, String userId, String tenantId) {
Device device = deviceRepository.findByDeviceIdAndTenantId(deviceId, tenantId)
.orElseThrow(() -> new DeviceNotFoundException("Device not found: " + deviceId));
// 软删除
device.setDeleted(true);
device.setDeletedBy(userId);
device.setDeletedAt(LocalDateTime.now());
deviceRepository.save(device);
// 删除状态记录
statusRepository.deleteByDeviceIdAndTenantId(deviceId, tenantId);
// 清除缓存
clearDeviceCache(deviceId, tenantId);
}
public CommandResponse sendCommand(String deviceId, SendCommandRequest request, String userId, String tenantId) {
// 验证设备存在
Device device = deviceRepository.findByDeviceIdAndTenantId(deviceId, tenantId)
.orElseThrow(() -> new DeviceNotFoundException("Device not found: " + deviceId));
// 检查设备是否在线
DeviceStatusEntity status = statusRepository.findByDeviceIdAndTenantId(deviceId, tenantId)
.orElseThrow(() -> new DeviceStatusNotFoundException("Device status not found: " + deviceId));
if (!status.isOnline()) {
throw new DeviceOfflineException("Device is offline: " + deviceId);
}
// 发送命令
return commandService.sendCommand(deviceId, request, userId, tenantId);
}
@Cacheable(value = "device-status", key = "#deviceId + ':' + #tenantId")
public DeviceStatusDTO getDeviceStatus(String deviceId, String tenantId) {
DeviceStatusEntity status = statusRepository.findByDeviceIdAndTenantId(deviceId, tenantId)
.orElseThrow(() -> new DeviceStatusNotFoundException("Device status not found: " + deviceId));
return DeviceStatusDTO.builder()
.deviceId(status.getDeviceId())
.status(status.getStatus())
.isOnline(status.isOnline())
.lastHeartbeat(status.getLastHeartbeat())
.lastDataTime(status.getLastDataTime())
.build();
}
public List<SensorDataDTO> getDeviceData(DataQueryParams params) {
// 验证设备存在
deviceRepository.findByDeviceIdAndTenantId(params.getDeviceId(), params.getTenantId())
.orElseThrow(() -> new DeviceNotFoundException("Device not found: " + params.getDeviceId()));
// 查询传感器数据
List<SensorData> dataList = dataRepository.findDeviceData(params);
return dataList.stream()
.map(this::mapToSensorDataDTO)
.collect(Collectors.toList());
}
private void cacheDevice(Device device) {
String key = "device:" + device.getDeviceId() + ":" + device.getTenantId();
redisTemplate.opsForValue().set(key, deviceMapper.toDTO(device), Duration.ofHours(1));
}
private void clearDeviceCache(String deviceId, String tenantId) {
String key = "device:" + deviceId + ":" + tenantId;
redisTemplate.delete(key);
}
private SensorDataDTO mapToSensorDataDTO(SensorData data) {
return SensorDataDTO.builder()
.deviceId(data.getDeviceId())
.sensorType(data.getSensorType())
.value(data.getValue())
.unit(data.getUnit())
.timestamp(data.getTimestamp())
.build();
}
}
# 4.2 数据分析服务
// 示例:数据分析服务实现
@RestController
@RequestMapping("/api/analytics")
public class AnalyticsController {
private final AnalyticsService analyticsService;
public AnalyticsController(AnalyticsService analyticsService) {
this.analyticsService = analyticsService;
}
@GetMapping("/devices/{deviceId}/statistics")
public ResponseEntity<DeviceStatisticsDTO> getDeviceStatistics(
@PathVariable String deviceId,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime,
@RequestParam(required = false) String sensorType,
@RequestHeader("X-Tenant-Id") String tenantId) {
StatisticsQueryParams params = StatisticsQueryParams.builder()
.deviceId(deviceId)
.tenantId(tenantId)
.startTime(startTime)
.endTime(endTime)
.sensorType(sensorType)
.build();
DeviceStatisticsDTO statistics = analyticsService.getDeviceStatistics(params);
return ResponseEntity.ok(statistics);
}
@GetMapping("/devices/{deviceId}/trends")
public ResponseEntity<List<TrendDataDTO>> getDeviceTrends(
@PathVariable String deviceId,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime,
@RequestParam String sensorType,
@RequestParam(defaultValue = "1h") String interval,
@RequestHeader("X-Tenant-Id") String tenantId) {
TrendQueryParams params = TrendQueryParams.builder()
.deviceId(deviceId)
.tenantId(tenantId)
.startTime(startTime)
.endTime(endTime)
.sensorType(sensorType)
.interval(interval)
.build();
List<TrendDataDTO> trends = analyticsService.getDeviceTrends(params);
return ResponseEntity.ok(trends);
}
@GetMapping("/anomalies")
public ResponseEntity<PagedResponse<AnomalyDTO>> getAnomalies(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size,
@RequestParam(required = false) String deviceId,
@RequestParam(required = false) String severity,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime,
@RequestHeader("X-Tenant-Id") String tenantId) {
AnomalyQueryParams params = AnomalyQueryParams.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.severity(severity)
.startTime(startTime)
.endTime(endTime)
.page(page)
.size(size)
.build();
PagedResponse<AnomalyDTO> anomalies = analyticsService.getAnomalies(params);
return ResponseEntity.ok(anomalies);
}
@PostMapping("/reports")
public ResponseEntity<ReportDTO> generateReport(
@Valid @RequestBody GenerateReportRequest request,
@RequestHeader("X-User-Id") String userId,
@RequestHeader("X-Tenant-Id") String tenantId) {
ReportDTO report = analyticsService.generateReport(request, userId, tenantId);
return ResponseEntity.status(HttpStatus.CREATED).body(report);
}
@GetMapping("/dashboard")
public ResponseEntity<DashboardDTO> getDashboard(
@RequestParam(required = false) List<String> deviceIds,
@RequestHeader("X-Tenant-Id") String tenantId) {
DashboardDTO dashboard = analyticsService.getDashboard(deviceIds, tenantId);
return ResponseEntity.ok(dashboard);
}
}
@Service
public class AnalyticsService {
private final SensorDataRepository dataRepository;
private final AnomalyRepository anomalyRepository;
private final DeviceRepository deviceRepository;
private final ReportRepository reportRepository;
private final AnalyticsEngine analyticsEngine;
private final RedisTemplate<String, Object> redisTemplate;
public AnalyticsService(SensorDataRepository dataRepository,
AnomalyRepository anomalyRepository,
DeviceRepository deviceRepository,
ReportRepository reportRepository,
AnalyticsEngine analyticsEngine,
RedisTemplate<String, Object> redisTemplate) {
this.dataRepository = dataRepository;
this.anomalyRepository = anomalyRepository;
this.deviceRepository = deviceRepository;
this.reportRepository = reportRepository;
this.analyticsEngine = analyticsEngine;
this.redisTemplate = redisTemplate;
}
@Cacheable(value = "device-statistics", key = "#params.deviceId + ':' + #params.startTime + ':' + #params.endTime")
public DeviceStatisticsDTO getDeviceStatistics(StatisticsQueryParams params) {
// 验证设备存在
deviceRepository.findByDeviceIdAndTenantId(params.getDeviceId(), params.getTenantId())
.orElseThrow(() -> new DeviceNotFoundException("Device not found: " + params.getDeviceId()));
// 查询统计数据
List<SensorData> dataList = dataRepository.findByDeviceIdAndTimeRange(
params.getDeviceId(), params.getStartTime(), params.getEndTime());
if (dataList.isEmpty()) {
return DeviceStatisticsDTO.empty(params.getDeviceId());
}
// 按传感器类型分组计算统计信息
Map<String, List<SensorData>> groupedData = dataList.stream()
.filter(data -> params.getSensorType() == null || params.getSensorType().equals(data.getSensorType()))
.collect(Collectors.groupingBy(SensorData::getSensorType));
Map<String, SensorStatistics> statisticsMap = new HashMap<>();
for (Map.Entry<String, List<SensorData>> entry : groupedData.entrySet()) {
String sensorType = entry.getKey();
List<SensorData> sensorDataList = entry.getValue();
SensorStatistics stats = calculateStatistics(sensorDataList);
statisticsMap.put(sensorType, stats);
}
return DeviceStatisticsDTO.builder()
.deviceId(params.getDeviceId())
.startTime(params.getStartTime())
.endTime(params.getEndTime())
.totalDataPoints(dataList.size())
.sensorStatistics(statisticsMap)
.build();
}
public List<TrendDataDTO> getDeviceTrends(TrendQueryParams params) {
// 验证设备存在
deviceRepository.findByDeviceIdAndTenantId(params.getDeviceId(), params.getTenantId())
.orElseThrow(() -> new DeviceNotFoundException("Device not found: " + params.getDeviceId()));
// 解析时间间隔
Duration interval = parseInterval(params.getInterval());
// 生成时间窗口
List<TimeWindow> windows = generateTimeWindows(params.getStartTime(), params.getEndTime(), interval);
List<TrendDataDTO> trends = new ArrayList<>();
for (TimeWindow window : windows) {
List<SensorData> windowData = dataRepository.findByDeviceIdAndSensorTypeAndTimeRange(
params.getDeviceId(), params.getSensorType(), window.getStart(), window.getEnd());
if (!windowData.isEmpty()) {
double avgValue = windowData.stream()
.mapToDouble(SensorData::getValue)
.average()
.orElse(0.0);
TrendDataDTO trend = TrendDataDTO.builder()
.timestamp(window.getStart())
.value(avgValue)
.count(windowData.size())
.build();
trends.add(trend);
}
}
return trends;
}
public PagedResponse<AnomalyDTO> getAnomalies(AnomalyQueryParams params) {
Specification<Anomaly> spec = AnomalySpecifications.buildSpecification(params);
Pageable pageable = PageRequest.of(params.getPage(), params.getSize(),
Sort.by(Sort.Direction.DESC, "timestamp"));
Page<Anomaly> anomalyPage = anomalyRepository.findAll(spec, pageable);
List<AnomalyDTO> anomalies = anomalyPage.getContent().stream()
.map(this::mapToAnomalyDTO)
.collect(Collectors.toList());
return PagedResponse.<AnomalyDTO>builder()
.content(anomalies)
.page(anomalyPage.getNumber())
.size(anomalyPage.getSize())
.totalElements(anomalyPage.getTotalElements())
.totalPages(anomalyPage.getTotalPages())
.build();
}
public ReportDTO generateReport(GenerateReportRequest request, String userId, String tenantId) {
// 创建报表记录
Report report = Report.builder()
.id(UUID.randomUUID().toString())
.name(request.getName())
.type(request.getType())
.parameters(request.getParameters())
.tenantId(tenantId)
.createdBy(userId)
.createdAt(LocalDateTime.now())
.status(ReportStatus.GENERATING)
.build();
report = reportRepository.save(report);
// 异步生成报表
CompletableFuture.runAsync(() -> {
try {
generateReportAsync(report);
} catch (Exception e) {
log.error("报表生成失败: {}", report.getId(), e);
updateReportStatus(report.getId(), ReportStatus.FAILED, e.getMessage());
}
});
return mapToReportDTO(report);
}
@Cacheable(value = "dashboard", key = "#tenantId + ':' + (#deviceIds != null ? #deviceIds.hashCode() : 'all')")
public DashboardDTO getDashboard(List<String> deviceIds, String tenantId) {
// 获取设备列表
List<Device> devices;
if (deviceIds != null && !deviceIds.isEmpty()) {
devices = deviceRepository.findByDeviceIdInAndTenantId(deviceIds, tenantId);
} else {
devices = deviceRepository.findByTenantIdAndDeletedFalse(tenantId);
}
if (devices.isEmpty()) {
return DashboardDTO.empty();
}
// 计算仪表板指标
DashboardMetrics metrics = calculateDashboardMetrics(devices, tenantId);
return DashboardDTO.builder()
.totalDevices(devices.size())
.onlineDevices(metrics.getOnlineDevices())
.offlineDevices(metrics.getOfflineDevices())
.activeAlerts(metrics.getActiveAlerts())
.totalDataPoints(metrics.getTotalDataPoints())
.deviceStatusDistribution(metrics.getDeviceStatusDistribution())
.recentAlerts(metrics.getRecentAlerts())
.topDevicesByActivity(metrics.getTopDevicesByActivity())
.build();
}
private SensorStatistics calculateStatistics(List<SensorData> dataList) {
double[] values = dataList.stream()
.mapToDouble(SensorData::getValue)
.toArray();
return SensorStatistics.builder()
.count(values.length)
.min(Arrays.stream(values).min().orElse(0.0))
.max(Arrays.stream(values).max().orElse(0.0))
.mean(Arrays.stream(values).average().orElse(0.0))
.standardDeviation(calculateStandardDeviation(values))
.build();
}
private double calculateStandardDeviation(double[] values) {
double mean = Arrays.stream(values).average().orElse(0.0);
double sumSquaredDiffs = Arrays.stream(values)
.map(value -> Math.pow(value - mean, 2))
.sum();
return Math.sqrt(sumSquaredDiffs / values.length);
}
private Duration parseInterval(String interval) {
// 解析间隔字符串,如 "1h", "30m", "1d"
Pattern pattern = Pattern.compile("(\\d+)([hmsd])");
Matcher matcher = pattern.matcher(interval);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid interval format: " + interval);
}
int value = Integer.parseInt(matcher.group(1));
String unit = matcher.group(2);
switch (unit) {
case "s": return Duration.ofSeconds(value);
case "m": return Duration.ofMinutes(value);
case "h": return Duration.ofHours(value);
case "d": return Duration.ofDays(value);
default: throw new IllegalArgumentException("Unsupported time unit: " + unit);
}
}
private List<TimeWindow> generateTimeWindows(LocalDateTime start, LocalDateTime end, Duration interval) {
List<TimeWindow> windows = new ArrayList<>();
LocalDateTime current = start;
while (current.isBefore(end)) {
LocalDateTime windowEnd = current.plus(interval);
if (windowEnd.isAfter(end)) {
windowEnd = end;
}
windows.add(new TimeWindow(current, windowEnd));
current = windowEnd;
}
return windows;
}
private void generateReportAsync(Report report) {
try {
// 根据报表类型生成不同的报表
ReportData reportData = analyticsEngine.generateReport(report);
// 更新报表状态和数据
report.setData(reportData);
report.setStatus(ReportStatus.COMPLETED);
report.setCompletedAt(LocalDateTime.now());
reportRepository.save(report);
} catch (Exception e) {
log.error("报表生成异常: {}", report.getId(), e);
throw e;
}
}
private void updateReportStatus(String reportId, ReportStatus status, String errorMessage) {
reportRepository.findById(reportId).ifPresent(report -> {
report.setStatus(status);
if (errorMessage != null) {
report.setErrorMessage(errorMessage);
}
reportRepository.save(report);
});
}
private DashboardMetrics calculateDashboardMetrics(List<Device> devices, String tenantId) {
// 实现仪表板指标计算逻辑
// 这里简化实现,实际应该从各个服务获取数据
long onlineDevices = devices.stream()
.mapToLong(device -> isDeviceOnline(device.getDeviceId(), tenantId) ? 1 : 0)
.sum();
long offlineDevices = devices.size() - onlineDevices;
return DashboardMetrics.builder()
.onlineDevices((int) onlineDevices)
.offlineDevices((int) offlineDevices)
.activeAlerts(getActiveAlertsCount(tenantId))
.totalDataPoints(getTotalDataPointsCount(tenantId))
.deviceStatusDistribution(getDeviceStatusDistribution(devices))
.recentAlerts(getRecentAlerts(tenantId, 10))
.topDevicesByActivity(getTopDevicesByActivity(devices, tenantId, 5))
.build();
}
private boolean isDeviceOnline(String deviceId, String tenantId) {
// 从缓存或数据库检查设备在线状态
String key = "device:status:" + deviceId + ":" + tenantId;
Object status = redisTemplate.opsForValue().get(key);
return status != null && "online".equals(status.toString());
}
private int getActiveAlertsCount(String tenantId) {
// 获取活跃告警数量
return (int) anomalyRepository.countByTenantIdAndResolvedFalse(tenantId);
}
private long getTotalDataPointsCount(String tenantId) {
// 获取总数据点数量
return dataRepository.countByTenantId(tenantId);
}
private Map<String, Integer> getDeviceStatusDistribution(List<Device> devices) {
return devices.stream()
.collect(Collectors.groupingBy(
device -> device.getStatus().toString(),
Collectors.collectingAndThen(Collectors.counting(), Math::toIntExact)
));
}
private List<AnomalyDTO> getRecentAlerts(String tenantId, int limit) {
List<Anomaly> recentAnomalies = anomalyRepository.findTopByTenantIdOrderByTimestampDesc(tenantId, limit);
return recentAnomalies.stream()
.map(this::mapToAnomalyDTO)
.collect(Collectors.toList());
}
private List<DeviceActivityDTO> getTopDevicesByActivity(List<Device> devices, String tenantId, int limit) {
// 计算设备活跃度(基于数据点数量)
return devices.stream()
.map(device -> {
long dataCount = dataRepository.countByDeviceIdAndTenantId(device.getDeviceId(), tenantId);
return DeviceActivityDTO.builder()
.deviceId(device.getDeviceId())
.deviceName(device.getName())
.dataPointCount(dataCount)
.build();
})
.sorted(Comparator.comparing(DeviceActivityDTO::getDataPointCount).reversed())
.limit(limit)
.collect(Collectors.toList());
}
private AnomalyDTO mapToAnomalyDTO(Anomaly anomaly) {
return AnomalyDTO.builder()
.id(anomaly.getId())
.deviceId(anomaly.getDeviceId())
.sensorType(anomaly.getSensorType())
.anomalyType(anomaly.getAnomalyType())
.severity(anomaly.getSeverity())
.description(anomaly.getDescription())
.value(anomaly.getValue())
.threshold(anomaly.getThreshold())
.timestamp(anomaly.getTimestamp())
.resolved(anomaly.isResolved())
.build();
}
private ReportDTO mapToReportDTO(Report report) {
return ReportDTO.builder()
.id(report.getId())
.name(report.getName())
.type(report.getType())
.status(report.getStatus())
.createdAt(report.getCreatedAt())
.completedAt(report.getCompletedAt())
.downloadUrl(report.getStatus() == ReportStatus.COMPLETED ?
"/api/reports/" + report.getId() + "/download" : null)
.build();
}
// 内部类
@Data
@Builder
public static class TimeWindow {
private final LocalDateTime start;
private final LocalDateTime end;
}
@Data
@Builder
public static class DashboardMetrics {
private int onlineDevices;
private int offlineDevices;
private int activeAlerts;
private long totalDataPoints;
private Map<String, Integer> deviceStatusDistribution;
private List<AnomalyDTO> recentAlerts;
private List<DeviceActivityDTO> topDevicesByActivity;
}
}
# 4.3 告警服务
// 示例:告警服务实现
@RestController
@RequestMapping("/api/alerts")
public class AlertController {
private final AlertService alertService;
public AlertController(AlertService alertService) {
this.alertService = alertService;
}
@PostMapping("/rules")
public ResponseEntity<AlertRuleDTO> createAlertRule(
@Valid @RequestBody CreateAlertRuleRequest request,
@RequestHeader("X-User-Id") String userId,
@RequestHeader("X-Tenant-Id") String tenantId) {
AlertRuleDTO rule = alertService.createAlertRule(request, userId, tenantId);
return ResponseEntity.status(HttpStatus.CREATED).body(rule);
}
@GetMapping("/rules")
public ResponseEntity<PagedResponse<AlertRuleDTO>> getAlertRules(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size,
@RequestParam(required = false) String deviceId,
@RequestParam(required = false) Boolean enabled,
@RequestHeader("X-Tenant-Id") String tenantId) {
AlertRuleQueryParams params = AlertRuleQueryParams.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.enabled(enabled)
.page(page)
.size(size)
.build();
PagedResponse<AlertRuleDTO> rules = alertService.getAlertRules(params);
return ResponseEntity.ok(rules);
}
@PutMapping("/rules/{ruleId}")
public ResponseEntity<AlertRuleDTO> updateAlertRule(
@PathVariable String ruleId,
@Valid @RequestBody UpdateAlertRuleRequest request,
@RequestHeader("X-User-Id") String userId,
@RequestHeader("X-Tenant-Id") String tenantId) {
AlertRuleDTO rule = alertService.updateAlertRule(ruleId, request, userId, tenantId);
return ResponseEntity.ok(rule);
}
@DeleteMapping("/rules/{ruleId}")
public ResponseEntity<Void> deleteAlertRule(
@PathVariable String ruleId,
@RequestHeader("X-User-Id") String userId,
@RequestHeader("X-Tenant-Id") String tenantId) {
alertService.deleteAlertRule(ruleId, userId, tenantId);
return ResponseEntity.noContent().build();
}
@GetMapping
public ResponseEntity<PagedResponse<AlertDTO>> getAlerts(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size,
@RequestParam(required = false) String deviceId,
@RequestParam(required = false) String severity,
@RequestParam(required = false) Boolean acknowledged,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime,
@RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime,
@RequestHeader("X-Tenant-Id") String tenantId) {
AlertQueryParams params = AlertQueryParams.builder()
.tenantId(tenantId)
.deviceId(deviceId)
.severity(severity)
.acknowledged(acknowledged)
.startTime(startTime)
.endTime(endTime)
.page(page)
.size(size)
.build();
PagedResponse<AlertDTO> alerts = alertService.getAlerts(params);
return ResponseEntity.ok(alerts);
}
@PostMapping("/{alertId}/acknowledge")
public ResponseEntity<Void> acknowledgeAlert(
@PathVariable String alertId,
@RequestBody(required = false) AcknowledgeAlertRequest request,
@RequestHeader("X-User-Id") String userId,
@RequestHeader("X-Tenant-Id") String tenantId) {
alertService.acknowledgeAlert(alertId, request, userId, tenantId);
return ResponseEntity.ok().build();
}
@PostMapping("/{alertId}/resolve")
public ResponseEntity<Void> resolveAlert(
@PathVariable String alertId,
@RequestBody(required = false) ResolveAlertRequest request,
@RequestHeader("X-User-Id") String userId,
@RequestHeader("X-Tenant-Id") String tenantId) {
alertService.resolveAlert(alertId, request, userId, tenantId);
return ResponseEntity.ok().build();
}
}
@Service
@Transactional
public class AlertService {
private final AlertRuleRepository ruleRepository;
private final AlertRepository alertRepository;
private final AlertNotificationService notificationService;
private final AlertEvaluationEngine evaluationEngine;
private final RedisTemplate<String, Object> redisTemplate;
public AlertService(AlertRuleRepository ruleRepository,
AlertRepository alertRepository,
AlertNotificationService notificationService,
AlertEvaluationEngine evaluationEngine,
RedisTemplate<String, Object> redisTemplate) {
this.ruleRepository = ruleRepository;
this.alertRepository = alertRepository;
this.notificationService = notificationService;
this.evaluationEngine = evaluationEngine;
this.redisTemplate = redisTemplate;
}
public AlertRuleDTO createAlertRule(CreateAlertRuleRequest request, String userId, String tenantId) {
// 验证规则配置
validateAlertRuleConfig(request);
AlertRule rule = AlertRule.builder()
.id(UUID.randomUUID().toString())
.name(request.getName())
.description(request.getDescription())
.deviceId(request.getDeviceId())
.sensorType(request.getSensorType())
.condition(request.getCondition())
.threshold(request.getThreshold())
.severity(request.getSeverity())
.enabled(request.isEnabled())
.notificationChannels(request.getNotificationChannels())
.tenantId(tenantId)
.createdBy(userId)
.createdAt(LocalDateTime.now())
.build();
rule = ruleRepository.save(rule);
// 如果规则启用,添加到评估引擎
if (rule.isEnabled()) {
evaluationEngine.addRule(rule);
}
return mapToAlertRuleDTO(rule);
}
public PagedResponse<AlertRuleDTO> getAlertRules(AlertRuleQueryParams params) {
Specification<AlertRule> spec = AlertRuleSpecifications.buildSpecification(params);
Pageable pageable = PageRequest.of(params.getPage(), params.getSize(),
Sort.by(Sort.Direction.DESC, "createdAt"));
Page<AlertRule> rulePage = ruleRepository.findAll(spec, pageable);
List<AlertRuleDTO> rules = rulePage.getContent().stream()
.map(this::mapToAlertRuleDTO)
.collect(Collectors.toList());
return PagedResponse.<AlertRuleDTO>builder()
.content(rules)
.page(rulePage.getNumber())
.size(rulePage.getSize())
.totalElements(rulePage.getTotalElements())
.totalPages(rulePage.getTotalPages())
.build();
}
public AlertRuleDTO updateAlertRule(String ruleId, UpdateAlertRuleRequest request, String userId, String tenantId) {
AlertRule rule = ruleRepository.findByIdAndTenantId(ruleId, tenantId)
.orElseThrow(() -> new AlertRuleNotFoundException("Alert rule not found: " + ruleId));
// 更新规则属性
if (request.getName() != null) {
rule.setName(request.getName());
}
if (request.getDescription() != null) {
rule.setDescription(request.getDescription());
}
if (request.getCondition() != null) {
rule.setCondition(request.getCondition());
}
if (request.getThreshold() != null) {
rule.setThreshold(request.getThreshold());
}
if (request.getSeverity() != null) {
rule.setSeverity(request.getSeverity());
}
if (request.getEnabled() != null) {
boolean wasEnabled = rule.isEnabled();
rule.setEnabled(request.getEnabled());
// 更新评估引擎中的规则
if (wasEnabled && !request.getEnabled()) {
evaluationEngine.removeRule(ruleId);
} else if (!wasEnabled && request.getEnabled()) {
evaluationEngine.addRule(rule);
} else if (request.getEnabled()) {
evaluationEngine.updateRule(rule);
}
}
if (request.getNotificationChannels() != null) {
rule.setNotificationChannels(request.getNotificationChannels());
}
rule.setUpdatedBy(userId);
rule.setUpdatedAt(LocalDateTime.now());
rule = ruleRepository.save(rule);
return mapToAlertRuleDTO(rule);
}
public void deleteAlertRule(String ruleId, String userId, String tenantId) {
AlertRule rule = ruleRepository.findByIdAndTenantId(ruleId, tenantId)
.orElseThrow(() -> new AlertRuleNotFoundException("Alert rule not found: " + ruleId));
// 从评估引擎中移除规则
if (rule.isEnabled()) {
evaluationEngine.removeRule(ruleId);
}
// 软删除
rule.setDeleted(true);
rule.setDeletedBy(userId);
rule.setDeletedAt(LocalDateTime.now());
ruleRepository.save(rule);
}
public PagedResponse<AlertDTO> getAlerts(AlertQueryParams params) {
Specification<Alert> spec = AlertSpecifications.buildSpecification(params);
Pageable pageable = PageRequest.of(params.getPage(), params.getSize(),
Sort.by(Sort.Direction.DESC, "createdAt"));
Page<Alert> alertPage = alertRepository.findAll(spec, pageable);
List<AlertDTO> alerts = alertPage.getContent().stream()
.map(this::mapToAlertDTO)
.collect(Collectors.toList());
return PagedResponse.<AlertDTO>builder()
.content(alerts)
.page(alertPage.getNumber())
.size(alertPage.getSize())
.totalElements(alertPage.getTotalElements())
.totalPages(alertPage.getTotalPages())
.build();
}
public void acknowledgeAlert(String alertId, AcknowledgeAlertRequest request, String userId, String tenantId) {
Alert alert = alertRepository.findByIdAndTenantId(alertId, tenantId)
.orElseThrow(() -> new AlertNotFoundException("Alert not found: " + alertId));
if (alert.isAcknowledged()) {
throw new AlertAlreadyAcknowledgedException("Alert already acknowledged: " + alertId);
}
alert.setAcknowledged(true);
alert.setAcknowledgedBy(userId);
alert.setAcknowledgedAt(LocalDateTime.now());
if (request != null && request.getComment() != null) {
alert.setAcknowledgeComment(request.getComment());
}
alertRepository.save(alert);
// 发送确认通知
notificationService.sendAcknowledgmentNotification(alert, userId);
}
public void resolveAlert(String alertId, ResolveAlertRequest request, String userId, String tenantId) {
Alert alert = alertRepository.findByIdAndTenantId(alertId, tenantId)
.orElseThrow(() -> new AlertNotFoundException("Alert not found: " + alertId));
if (alert.isResolved()) {
throw new AlertAlreadyResolvedException("Alert already resolved: " + alertId);
}
alert.setResolved(true);
alert.setResolvedBy(userId);
alert.setResolvedAt(LocalDateTime.now());
if (request != null && request.getComment() != null) {
alert.setResolveComment(request.getComment());
}
alertRepository.save(alert);
// 发送解决通知
notificationService.sendResolutionNotification(alert, userId);
}
// 处理传感器数据,评估告警规则
@EventListener
public void handleSensorData(SensorDataEvent event) {
SensorData data = event.getSensorData();
// 获取相关的告警规则
List<AlertRule> rules = ruleRepository.findByDeviceIdAndSensorTypeAndEnabledTrue(
data.getDeviceId(), data.getSensorType());
for (AlertRule rule : rules) {
try {
// 评估规则
boolean triggered = evaluationEngine.evaluateRule(rule, data);
if (triggered) {
// 检查是否已存在未解决的告警
boolean existingAlert = alertRepository.existsByRuleIdAndDeviceIdAndResolvedFalse(
rule.getId(), data.getDeviceId());
if (!existingAlert) {
// 创建新告警
createAlert(rule, data);
}
}
} catch (Exception e) {
log.error("告警规则评估失败: ruleId={}, deviceId={}", rule.getId(), data.getDeviceId(), e);
}
}
}
private void createAlert(AlertRule rule, SensorData data) {
Alert alert = Alert.builder()
.id(UUID.randomUUID().toString())
.ruleId(rule.getId())
.ruleName(rule.getName())
.deviceId(data.getDeviceId())
.sensorType(data.getSensorType())
.severity(rule.getSeverity())
.message(generateAlertMessage(rule, data))
.value(data.getValue())
.threshold(rule.getThreshold())
.condition(rule.getCondition())
.tenantId(rule.getTenantId())
.createdAt(LocalDateTime.now())
.acknowledged(false)
.resolved(false)
.build();
alert = alertRepository.save(alert);
// 发送告警通知
notificationService.sendAlertNotification(alert, rule.getNotificationChannels());
// 发布告警事件
publishAlertEvent(alert);
}
private String generateAlertMessage(AlertRule rule, SensorData data) {
return String.format("设备 %s 的 %s 传感器值 %.2f %s 阈值 %.2f",
data.getDeviceId(),
data.getSensorType(),
data.getValue(),
getConditionDescription(rule.getCondition()),
rule.getThreshold());
}
private String getConditionDescription(AlertCondition condition) {
switch (condition) {
case GREATER_THAN: return "超过";
case LESS_THAN: return "低于";
case EQUALS: return "等于";
case NOT_EQUALS: return "不等于";
default: return "满足条件";
}
}
private void validateAlertRuleConfig(CreateAlertRuleRequest request) {
// 验证规则配置的有效性
if (request.getThreshold() == null) {
throw new InvalidAlertRuleException("Threshold is required");
}
if (request.getCondition() == null) {
throw new InvalidAlertRuleException("Condition is required");
}
if (request.getSeverity() == null) {
throw new InvalidAlertRuleException("Severity is required");
}
}
private void publishAlertEvent(Alert alert) {
AlertCreatedEvent event = AlertCreatedEvent.builder()
.alertId(alert.getId())
.deviceId(alert.getDeviceId())
.severity(alert.getSeverity())
.message(alert.getMessage())
.tenantId(alert.getTenantId())
.timestamp(alert.getCreatedAt())
.build();
// 发布到消息队列
// applicationEventPublisher.publishEvent(event);
}
private AlertRuleDTO mapToAlertRuleDTO(AlertRule rule) {
return AlertRuleDTO.builder()
.id(rule.getId())
.name(rule.getName())
.description(rule.getDescription())
.deviceId(rule.getDeviceId())
.sensorType(rule.getSensorType())
.condition(rule.getCondition())
.threshold(rule.getThreshold())
.severity(rule.getSeverity())
.enabled(rule.isEnabled())
.notificationChannels(rule.getNotificationChannels())
.createdAt(rule.getCreatedAt())
.updatedAt(rule.getUpdatedAt())
.build();
}
private AlertDTO mapToAlertDTO(Alert alert) {
return AlertDTO.builder()
.id(alert.getId())
.ruleId(alert.getRuleId())
.ruleName(alert.getRuleName())
.deviceId(alert.getDeviceId())
.sensorType(alert.getSensorType())
.severity(alert.getSeverity())
.message(alert.getMessage())
.value(alert.getValue())
.threshold(alert.getThreshold())
.condition(alert.getCondition())
.acknowledged(alert.isAcknowledged())
.acknowledgedBy(alert.getAcknowledgedBy())
.acknowledgedAt(alert.getAcknowledgedAt())
.resolved(alert.isResolved())
.resolvedBy(alert.getResolvedBy())
.resolvedAt(alert.getResolvedAt())
.createdAt(alert.getCreatedAt())
.build();
}
}
# 5. 最佳实践总结
# 5.1 设计原则
- 单一职责: 每个微服务专注于特定的业务领域
- 松耦合: 服务间通过API和消息队列通信
- 高内聚: 相关功能组织在同一个服务内
- 可扩展性: 支持水平扩展和负载均衡
- 容错性: 实现熔断、重试和降级机制
# 5.2 开发建议
- API设计: 遵循RESTful设计原则,使用统一的响应格式
- 数据一致性: 使用分布式事务或最终一致性模式
- 缓存策略: 合理使用缓存提高性能
- 监控告警: 实现全面的服务监控和告警
- 安全防护: 实现认证授权和数据加密
# 6. 下一步学习
- 学习前端展示层的设计和实现
- 深入了解容器化部署和运维
- 掌握微服务治理和服务网格
- 研究分布式系统的设计模式