应用服务层架构设计

# 应用服务层架构设计

# 1. 章节概述

应用服务层是IoT系统的核心业务逻辑层,负责处理业务规则、数据转换、服务编排和用户交互。本章将详细介绍应用服务层的架构设计、微服务实现、API网关配置和业务服务开发。

# 1.1 学习目标

  • 掌握应用服务层的架构设计原则
  • 理解微服务架构在IoT系统中的应用
  • 学习API网关的配置和管理
  • 实现核心业务服务和数据处理逻辑
  • 掌握服务间通信和数据一致性保证

# 2. 应用服务层架构

# 2.1 整体架构

graph TB
    subgraph "客户端层"
        A[Web应用]
        B[移动应用]
        C[第三方系统]
    end
    
    subgraph "API网关层"
        D[API Gateway]
        E[负载均衡器]
        F[认证授权]
    end
    
    subgraph "应用服务层"
        G[设备管理服务]
        H[数据分析服务]
        I[告警服务]
        J[用户管理服务]
        K[配置管理服务]
        L[报表服务]
    end
    
    subgraph "基础设施层"
        M[服务注册中心]
        N[配置中心]
        O[消息队列]
        P[缓存服务]
        Q[数据库]
    end
    
    A --> D
    B --> D
    C --> D
    D --> E
    E --> F
    F --> G
    F --> H
    F --> I
    F --> J
    F --> K
    F --> L
    
    G --> M
    H --> M
    I --> M
    J --> M
    K --> M
    L --> M
    
    G --> N
    H --> N
    I --> N
    J --> N
    K --> N
    L --> N
    
    G --> O
    H --> O
    I --> O
    
    G --> P
    H --> P
    I --> P
    J --> P
    
    G --> Q
    H --> Q
    I --> Q
    J --> Q
    K --> Q
    L --> Q

# 2.2 微服务架构设计

// 示例:微服务基础框架
@SpringBootApplication
@EnableEurekaClient
@EnableConfigServer
@EnableCircuitBreaker
public class IoTMicroserviceApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(IoTMicroserviceApplication.class, args);
    }
    
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
}

// 微服务基础配置
@Configuration
public class MicroserviceConfig {
    
    @Value("${spring.application.name}")
    private String serviceName;
    
    @Value("${server.port}")
    private int serverPort;
    
    @Bean
    public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
            ConfigurableApplicationContext context) {
        return ServiceInstanceListSupplier.builder()
                .withDiscoveryClient()
                .withHealthChecks()
                .withCaching(Duration.ofSeconds(30))
                .build(context);
    }
    
    @Bean
    public HealthIndicator serviceHealthIndicator() {
        return new AbstractHealthIndicator() {
            @Override
            protected void doHealthCheck(Health.Builder builder) throws Exception {
                // 检查服务健康状态
                boolean isHealthy = checkServiceHealth();
                
                if (isHealthy) {
                    builder.up()
                           .withDetail("service", serviceName)
                           .withDetail("port", serverPort)
                           .withDetail("status", "UP");
                } else {
                    builder.down()
                           .withDetail("service", serviceName)
                           .withDetail("error", "Service health check failed");
                }
            }
        };
    }
    
    private boolean checkServiceHealth() {
        // 实现具体的健康检查逻辑
        try {
            // 检查数据库连接
            // 检查外部依赖
            // 检查关键组件状态
            return true;
        } catch (Exception e) {
            return false;
        }
    }
}

# 3. API网关实现

# 3.1 网关配置

// 示例:Spring Cloud Gateway配置
@Configuration
@EnableWebFluxSecurity
public class GatewayConfig {
    
    @Bean
    public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
        return builder.routes()
                // 设备管理服务路由
                .route("device-service", r -> r.path("/api/devices/**")
                        .filters(f -> f
                                .stripPrefix(2)
                                .addRequestHeader("X-Service", "device-service")
                                .circuitBreaker(config -> config
                                        .setName("device-service-cb")
                                        .setFallbackUri("forward:/fallback/device"))
                                .retry(config -> config
                                        .setRetries(3)
                                        .setBackoff(Duration.ofMillis(100), Duration.ofMillis(1000), 2, false)))
                        .uri("lb://device-service"))
                
                // 数据分析服务路由
                .route("analytics-service", r -> r.path("/api/analytics/**")
                        .filters(f -> f
                                .stripPrefix(2)
                                .addRequestHeader("X-Service", "analytics-service")
                                .requestRateLimiter(config -> config
                                        .setRateLimiter(redisRateLimiter())
                                        .setKeyResolver(userKeyResolver()))
                                .circuitBreaker(config -> config
                                        .setName("analytics-service-cb")
                                        .setFallbackUri("forward:/fallback/analytics")))
                        .uri("lb://analytics-service"))
                
                // 告警服务路由
                .route("alert-service", r -> r.path("/api/alerts/**")
                        .filters(f -> f
                                .stripPrefix(2)
                                .addRequestHeader("X-Service", "alert-service")
                                .circuitBreaker(config -> config
                                        .setName("alert-service-cb")
                                        .setFallbackUri("forward:/fallback/alert")))
                        .uri("lb://alert-service"))
                
                // 用户管理服务路由
                .route("user-service", r -> r.path("/api/users/**")
                        .filters(f -> f
                                .stripPrefix(2)
                                .addRequestHeader("X-Service", "user-service")
                                .circuitBreaker(config -> config
                                        .setName("user-service-cb")
                                        .setFallbackUri("forward:/fallback/user")))
                        .uri("lb://user-service"))
                
                .build();
    }
    
    @Bean
    public RedisRateLimiter redisRateLimiter() {
        return new RedisRateLimiter(10, 20, 1); // 每秒10个请求,突发20个
    }
    
    @Bean
    public KeyResolver userKeyResolver() {
        return exchange -> {
            // 基于用户ID进行限流
            String userId = exchange.getRequest().getHeaders().getFirst("X-User-Id");
            return Mono.just(userId != null ? userId : "anonymous");
        };
    }
    
    @Bean
    public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
        return http
                .csrf().disable()
                .authorizeExchange(exchanges -> exchanges
                        .pathMatchers("/api/auth/**").permitAll()
                        .pathMatchers("/api/public/**").permitAll()
                        .pathMatchers("/actuator/**").permitAll()
                        .anyExchange().authenticated()
                )
                .oauth2ResourceServer(oauth2 -> oauth2
                        .jwt(jwt -> jwt.jwtDecoder(jwtDecoder()))
                )
                .build();
    }
    
    @Bean
    public ReactiveJwtDecoder jwtDecoder() {
        NimbusReactiveJwtDecoder jwtDecoder = NimbusReactiveJwtDecoder
                .withJwkSetUri("http://auth-service/oauth2/jwks")
                .build();
        
        jwtDecoder.setJwtValidator(jwtValidator());
        return jwtDecoder;
    }
    
    @Bean
    public Converter<Jwt, Mono<AbstractAuthenticationToken>> jwtAuthenticationConverter() {
        JwtAuthenticationConverter converter = new JwtAuthenticationConverter();
        converter.setJwtGrantedAuthoritiesConverter(jwt -> {
            Collection<String> authorities = jwt.getClaimAsStringList("authorities");
            return authorities.stream()
                    .map(SimpleGrantedAuthority::new)
                    .collect(Collectors.toList());
        });
        
        return new ReactiveJwtAuthenticationConverterAdapter(converter);
    }
    
    private Converter<Jwt, Collection<GrantedAuthority>> jwtValidator() {
        return new JwtTimestampValidator();
    }
}

// 网关过滤器
@Component
public class GlobalAuthenticationFilter implements GlobalFilter, Ordered {
    
    private final ReactiveJwtDecoder jwtDecoder;
    private final RedisTemplate<String, Object> redisTemplate;
    
    public GlobalAuthenticationFilter(ReactiveJwtDecoder jwtDecoder, 
                                    RedisTemplate<String, Object> redisTemplate) {
        this.jwtDecoder = jwtDecoder;
        this.redisTemplate = redisTemplate;
    }
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String path = request.getURI().getPath();
        
        // 跳过不需要认证的路径
        if (isPublicPath(path)) {
            return chain.filter(exchange);
        }
        
        // 提取JWT令牌
        String token = extractToken(request);
        if (token == null) {
            return handleUnauthorized(exchange);
        }
        
        // 验证JWT令牌
        return jwtDecoder.decode(token)
                .flatMap(jwt -> {
                    // 检查令牌是否在黑名单中
                    String jti = jwt.getClaimAsString("jti");
                    if (isTokenBlacklisted(jti)) {
                        return handleUnauthorized(exchange);
                    }
                    
                    // 添加用户信息到请求头
                    ServerHttpRequest mutatedRequest = request.mutate()
                            .header("X-User-Id", jwt.getSubject())
                            .header("X-User-Roles", String.join(",", jwt.getClaimAsStringList("roles")))
                            .header("X-Tenant-Id", jwt.getClaimAsString("tenant_id"))
                            .build();
                    
                    return chain.filter(exchange.mutate().request(mutatedRequest).build());
                })
                .onErrorResume(JwtException.class, ex -> handleUnauthorized(exchange));
    }
    
    private boolean isPublicPath(String path) {
        return path.startsWith("/api/auth/") || 
               path.startsWith("/api/public/") || 
               path.startsWith("/actuator/");
    }
    
    private String extractToken(ServerHttpRequest request) {
        String authHeader = request.getHeaders().getFirst("Authorization");
        if (authHeader != null && authHeader.startsWith("Bearer ")) {
            return authHeader.substring(7);
        }
        return null;
    }
    
    private boolean isTokenBlacklisted(String jti) {
        return redisTemplate.hasKey("blacklist:" + jti);
    }
    
    private Mono<Void> handleUnauthorized(ServerWebExchange exchange) {
        ServerHttpResponse response = exchange.getResponse();
        response.setStatusCode(HttpStatus.UNAUTHORIZED);
        
        String body = "{\"error\":\"Unauthorized\",\"message\":\"Invalid or missing authentication token\"}";
        DataBuffer buffer = response.bufferFactory().wrap(body.getBytes(StandardCharsets.UTF_8));
        
        response.getHeaders().add("Content-Type", "application/json");
        return response.writeWith(Mono.just(buffer));
    }
    
    @Override
    public int getOrder() {
        return -100; // 高优先级
    }
}

# 3.2 网关监控和指标

// 示例:网关监控配置
@Component
public class GatewayMetricsFilter implements GlobalFilter, Ordered {
    
    private final MeterRegistry meterRegistry;
    private final Timer.Builder timerBuilder;
    
    public GatewayMetricsFilter(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.timerBuilder = Timer.builder("gateway.requests")
                .description("Gateway request duration");
    }
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        long startTime = System.nanoTime();
        
        return chain.filter(exchange)
                .doFinally(signalType -> {
                    long duration = System.nanoTime() - startTime;
                    
                    ServerHttpRequest request = exchange.getRequest();
                    ServerHttpResponse response = exchange.getResponse();
                    
                    // 记录请求指标
                    Timer.Sample sample = Timer.start(meterRegistry);
                    sample.stop(timerBuilder
                            .tag("method", request.getMethodValue())
                            .tag("uri", request.getURI().getPath())
                            .tag("status", String.valueOf(response.getStatusCode().value()))
                            .register(meterRegistry));
                    
                    // 记录计数器
                    Counter.builder("gateway.requests.total")
                            .tag("method", request.getMethodValue())
                            .tag("status", String.valueOf(response.getStatusCode().value()))
                            .register(meterRegistry)
                            .increment();
                    
                    // 记录响应时间分布
                    DistributionSummary.builder("gateway.response.time")
                            .tag("uri", request.getURI().getPath())
                            .register(meterRegistry)
                            .record(duration / 1_000_000.0); // 转换为毫秒
                });
    }
    
    @Override
    public int getOrder() {
        return 0;
    }
}

// 网关健康检查
@Component
public class GatewayHealthIndicator implements HealthIndicator {
    
    private final DiscoveryClient discoveryClient;
    private final RedisTemplate<String, Object> redisTemplate;
    
    public GatewayHealthIndicator(DiscoveryClient discoveryClient, 
                                 RedisTemplate<String, Object> redisTemplate) {
        this.discoveryClient = discoveryClient;
        this.redisTemplate = redisTemplate;
    }
    
    @Override
    public Health health() {
        Health.Builder builder = new Health.Builder();
        
        try {
            // 检查服务发现
            List<String> services = discoveryClient.getServices();
            builder.withDetail("discovery.services.count", services.size());
            builder.withDetail("discovery.services", services);
            
            // 检查Redis连接
            redisTemplate.opsForValue().get("health-check");
            builder.withDetail("redis.status", "UP");
            
            // 检查关键服务状态
            Map<String, String> serviceStatus = new HashMap<>();
            for (String service : Arrays.asList("device-service", "analytics-service", "alert-service")) {
                List<ServiceInstance> instances = discoveryClient.getInstances(service);
                serviceStatus.put(service, instances.isEmpty() ? "DOWN" : "UP");
            }
            builder.withDetail("services.status", serviceStatus);
            
            // 判断整体健康状态
            boolean allServicesUp = serviceStatus.values().stream()
                    .allMatch("UP"::equals);
            
            if (allServicesUp) {
                builder.up();
            } else {
                builder.down();
            }
            
        } catch (Exception e) {
            builder.down().withException(e);
        }
        
        return builder.build();
    }
}

# 4. 核心业务服务

# 4.1 设备管理服务

// 示例:设备管理服务实现
@RestController
@RequestMapping("/api/devices")
@Validated
public class DeviceController {
    
    private final DeviceService deviceService;
    private final DeviceEventPublisher eventPublisher;
    
    public DeviceController(DeviceService deviceService, 
                           DeviceEventPublisher eventPublisher) {
        this.deviceService = deviceService;
        this.eventPublisher = eventPublisher;
    }
    
    @PostMapping
    public ResponseEntity<DeviceDTO> createDevice(
            @Valid @RequestBody CreateDeviceRequest request,
            @RequestHeader("X-User-Id") String userId,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        DeviceDTO device = deviceService.createDevice(request, userId, tenantId);
        
        // 发布设备创建事件
        eventPublisher.publishDeviceCreated(device);
        
        return ResponseEntity.status(HttpStatus.CREATED).body(device);
    }
    
    @GetMapping("/{deviceId}")
    public ResponseEntity<DeviceDTO> getDevice(
            @PathVariable String deviceId,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        DeviceDTO device = deviceService.getDevice(deviceId, tenantId);
        return ResponseEntity.ok(device);
    }
    
    @GetMapping
    public ResponseEntity<PagedResponse<DeviceDTO>> getDevices(
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size,
            @RequestParam(required = false) String status,
            @RequestParam(required = false) String type,
            @RequestParam(required = false) String location,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        DeviceQueryParams params = DeviceQueryParams.builder()
                .tenantId(tenantId)
                .status(status)
                .type(type)
                .location(location)
                .page(page)
                .size(size)
                .build();
        
        PagedResponse<DeviceDTO> devices = deviceService.getDevices(params);
        return ResponseEntity.ok(devices);
    }
    
    @PutMapping("/{deviceId}")
    public ResponseEntity<DeviceDTO> updateDevice(
            @PathVariable String deviceId,
            @Valid @RequestBody UpdateDeviceRequest request,
            @RequestHeader("X-User-Id") String userId,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        DeviceDTO device = deviceService.updateDevice(deviceId, request, userId, tenantId);
        
        // 发布设备更新事件
        eventPublisher.publishDeviceUpdated(device);
        
        return ResponseEntity.ok(device);
    }
    
    @DeleteMapping("/{deviceId}")
    public ResponseEntity<Void> deleteDevice(
            @PathVariable String deviceId,
            @RequestHeader("X-User-Id") String userId,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        deviceService.deleteDevice(deviceId, userId, tenantId);
        
        // 发布设备删除事件
        eventPublisher.publishDeviceDeleted(deviceId, tenantId);
        
        return ResponseEntity.noContent().build();
    }
    
    @PostMapping("/{deviceId}/commands")
    public ResponseEntity<CommandResponse> sendCommand(
            @PathVariable String deviceId,
            @Valid @RequestBody SendCommandRequest request,
            @RequestHeader("X-User-Id") String userId,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        CommandResponse response = deviceService.sendCommand(deviceId, request, userId, tenantId);
        return ResponseEntity.ok(response);
    }
    
    @GetMapping("/{deviceId}/status")
    public ResponseEntity<DeviceStatusDTO> getDeviceStatus(
            @PathVariable String deviceId,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        DeviceStatusDTO status = deviceService.getDeviceStatus(deviceId, tenantId);
        return ResponseEntity.ok(status);
    }
    
    @GetMapping("/{deviceId}/data")
    public ResponseEntity<List<SensorDataDTO>> getDeviceData(
            @PathVariable String deviceId,
            @RequestParam(required = false) String sensorType,
            @RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime,
            @RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime,
            @RequestParam(defaultValue = "100") int limit,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        DataQueryParams params = DataQueryParams.builder()
                .deviceId(deviceId)
                .tenantId(tenantId)
                .sensorType(sensorType)
                .startTime(startTime)
                .endTime(endTime)
                .limit(limit)
                .build();
        
        List<SensorDataDTO> data = deviceService.getDeviceData(params);
        return ResponseEntity.ok(data);
    }
}

@Service
@Transactional
public class DeviceService {
    
    private final DeviceRepository deviceRepository;
    private final DeviceStatusRepository statusRepository;
    private final SensorDataRepository dataRepository;
    private final DeviceCommandService commandService;
    private final RedisTemplate<String, Object> redisTemplate;
    private final DeviceMapper deviceMapper;
    
    public DeviceService(DeviceRepository deviceRepository,
                        DeviceStatusRepository statusRepository,
                        SensorDataRepository dataRepository,
                        DeviceCommandService commandService,
                        RedisTemplate<String, Object> redisTemplate,
                        DeviceMapper deviceMapper) {
        this.deviceRepository = deviceRepository;
        this.statusRepository = statusRepository;
        this.dataRepository = dataRepository;
        this.commandService = commandService;
        this.redisTemplate = redisTemplate;
        this.deviceMapper = deviceMapper;
    }
    
    public DeviceDTO createDevice(CreateDeviceRequest request, String userId, String tenantId) {
        // 验证设备ID唯一性
        if (deviceRepository.existsByDeviceIdAndTenantId(request.getDeviceId(), tenantId)) {
            throw new DeviceAlreadyExistsException("Device already exists: " + request.getDeviceId());
        }
        
        // 创建设备实体
        Device device = Device.builder()
                .deviceId(request.getDeviceId())
                .name(request.getName())
                .type(request.getType())
                .model(request.getModel())
                .manufacturer(request.getManufacturer())
                .location(request.getLocation())
                .description(request.getDescription())
                .configuration(request.getConfiguration())
                .tenantId(tenantId)
                .createdBy(userId)
                .createdAt(LocalDateTime.now())
                .status(DeviceStatus.INACTIVE)
                .build();
        
        device = deviceRepository.save(device);
        
        // 创建设备状态记录
        DeviceStatusEntity status = DeviceStatusEntity.builder()
                .deviceId(device.getDeviceId())
                .tenantId(tenantId)
                .status(DeviceStatus.INACTIVE)
                .lastHeartbeat(null)
                .lastDataTime(null)
                .isOnline(false)
                .build();
        
        statusRepository.save(status);
        
        // 缓存设备信息
        cacheDevice(device);
        
        return deviceMapper.toDTO(device);
    }
    
    @Cacheable(value = "devices", key = "#deviceId + ':' + #tenantId")
    public DeviceDTO getDevice(String deviceId, String tenantId) {
        Device device = deviceRepository.findByDeviceIdAndTenantId(deviceId, tenantId)
                .orElseThrow(() -> new DeviceNotFoundException("Device not found: " + deviceId));
        
        return deviceMapper.toDTO(device);
    }
    
    public PagedResponse<DeviceDTO> getDevices(DeviceQueryParams params) {
        Specification<Device> spec = DeviceSpecifications.buildSpecification(params);
        
        Pageable pageable = PageRequest.of(params.getPage(), params.getSize(),
                Sort.by(Sort.Direction.DESC, "createdAt"));
        
        Page<Device> devicePage = deviceRepository.findAll(spec, pageable);
        
        List<DeviceDTO> devices = devicePage.getContent().stream()
                .map(deviceMapper::toDTO)
                .collect(Collectors.toList());
        
        return PagedResponse.<DeviceDTO>builder()
                .content(devices)
                .page(devicePage.getNumber())
                .size(devicePage.getSize())
                .totalElements(devicePage.getTotalElements())
                .totalPages(devicePage.getTotalPages())
                .build();
    }
    
    @CacheEvict(value = "devices", key = "#deviceId + ':' + #tenantId")
    public DeviceDTO updateDevice(String deviceId, UpdateDeviceRequest request, String userId, String tenantId) {
        Device device = deviceRepository.findByDeviceIdAndTenantId(deviceId, tenantId)
                .orElseThrow(() -> new DeviceNotFoundException("Device not found: " + deviceId));
        
        // 更新设备信息
        if (request.getName() != null) {
            device.setName(request.getName());
        }
        if (request.getLocation() != null) {
            device.setLocation(request.getLocation());
        }
        if (request.getDescription() != null) {
            device.setDescription(request.getDescription());
        }
        if (request.getConfiguration() != null) {
            device.setConfiguration(request.getConfiguration());
        }
        
        device.setUpdatedBy(userId);
        device.setUpdatedAt(LocalDateTime.now());
        
        device = deviceRepository.save(device);
        
        // 更新缓存
        cacheDevice(device);
        
        return deviceMapper.toDTO(device);
    }
    
    @CacheEvict(value = "devices", key = "#deviceId + ':' + #tenantId")
    public void deleteDevice(String deviceId, String userId, String tenantId) {
        Device device = deviceRepository.findByDeviceIdAndTenantId(deviceId, tenantId)
                .orElseThrow(() -> new DeviceNotFoundException("Device not found: " + deviceId));
        
        // 软删除
        device.setDeleted(true);
        device.setDeletedBy(userId);
        device.setDeletedAt(LocalDateTime.now());
        
        deviceRepository.save(device);
        
        // 删除状态记录
        statusRepository.deleteByDeviceIdAndTenantId(deviceId, tenantId);
        
        // 清除缓存
        clearDeviceCache(deviceId, tenantId);
    }
    
    public CommandResponse sendCommand(String deviceId, SendCommandRequest request, String userId, String tenantId) {
        // 验证设备存在
        Device device = deviceRepository.findByDeviceIdAndTenantId(deviceId, tenantId)
                .orElseThrow(() -> new DeviceNotFoundException("Device not found: " + deviceId));
        
        // 检查设备是否在线
        DeviceStatusEntity status = statusRepository.findByDeviceIdAndTenantId(deviceId, tenantId)
                .orElseThrow(() -> new DeviceStatusNotFoundException("Device status not found: " + deviceId));
        
        if (!status.isOnline()) {
            throw new DeviceOfflineException("Device is offline: " + deviceId);
        }
        
        // 发送命令
        return commandService.sendCommand(deviceId, request, userId, tenantId);
    }
    
    @Cacheable(value = "device-status", key = "#deviceId + ':' + #tenantId")
    public DeviceStatusDTO getDeviceStatus(String deviceId, String tenantId) {
        DeviceStatusEntity status = statusRepository.findByDeviceIdAndTenantId(deviceId, tenantId)
                .orElseThrow(() -> new DeviceStatusNotFoundException("Device status not found: " + deviceId));
        
        return DeviceStatusDTO.builder()
                .deviceId(status.getDeviceId())
                .status(status.getStatus())
                .isOnline(status.isOnline())
                .lastHeartbeat(status.getLastHeartbeat())
                .lastDataTime(status.getLastDataTime())
                .build();
    }
    
    public List<SensorDataDTO> getDeviceData(DataQueryParams params) {
        // 验证设备存在
        deviceRepository.findByDeviceIdAndTenantId(params.getDeviceId(), params.getTenantId())
                .orElseThrow(() -> new DeviceNotFoundException("Device not found: " + params.getDeviceId()));
        
        // 查询传感器数据
        List<SensorData> dataList = dataRepository.findDeviceData(params);
        
        return dataList.stream()
                .map(this::mapToSensorDataDTO)
                .collect(Collectors.toList());
    }
    
    private void cacheDevice(Device device) {
        String key = "device:" + device.getDeviceId() + ":" + device.getTenantId();
        redisTemplate.opsForValue().set(key, deviceMapper.toDTO(device), Duration.ofHours(1));
    }
    
    private void clearDeviceCache(String deviceId, String tenantId) {
        String key = "device:" + deviceId + ":" + tenantId;
        redisTemplate.delete(key);
    }
    
    private SensorDataDTO mapToSensorDataDTO(SensorData data) {
        return SensorDataDTO.builder()
                .deviceId(data.getDeviceId())
                .sensorType(data.getSensorType())
                .value(data.getValue())
                .unit(data.getUnit())
                .timestamp(data.getTimestamp())
                .build();
    }
}

# 4.2 数据分析服务

// 示例:数据分析服务实现
@RestController
@RequestMapping("/api/analytics")
public class AnalyticsController {
    
    private final AnalyticsService analyticsService;
    
    public AnalyticsController(AnalyticsService analyticsService) {
        this.analyticsService = analyticsService;
    }
    
    @GetMapping("/devices/{deviceId}/statistics")
    public ResponseEntity<DeviceStatisticsDTO> getDeviceStatistics(
            @PathVariable String deviceId,
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime,
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime,
            @RequestParam(required = false) String sensorType,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        StatisticsQueryParams params = StatisticsQueryParams.builder()
                .deviceId(deviceId)
                .tenantId(tenantId)
                .startTime(startTime)
                .endTime(endTime)
                .sensorType(sensorType)
                .build();
        
        DeviceStatisticsDTO statistics = analyticsService.getDeviceStatistics(params);
        return ResponseEntity.ok(statistics);
    }
    
    @GetMapping("/devices/{deviceId}/trends")
    public ResponseEntity<List<TrendDataDTO>> getDeviceTrends(
            @PathVariable String deviceId,
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime,
            @RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime,
            @RequestParam String sensorType,
            @RequestParam(defaultValue = "1h") String interval,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        TrendQueryParams params = TrendQueryParams.builder()
                .deviceId(deviceId)
                .tenantId(tenantId)
                .startTime(startTime)
                .endTime(endTime)
                .sensorType(sensorType)
                .interval(interval)
                .build();
        
        List<TrendDataDTO> trends = analyticsService.getDeviceTrends(params);
        return ResponseEntity.ok(trends);
    }
    
    @GetMapping("/anomalies")
    public ResponseEntity<PagedResponse<AnomalyDTO>> getAnomalies(
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size,
            @RequestParam(required = false) String deviceId,
            @RequestParam(required = false) String severity,
            @RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime,
            @RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        AnomalyQueryParams params = AnomalyQueryParams.builder()
                .tenantId(tenantId)
                .deviceId(deviceId)
                .severity(severity)
                .startTime(startTime)
                .endTime(endTime)
                .page(page)
                .size(size)
                .build();
        
        PagedResponse<AnomalyDTO> anomalies = analyticsService.getAnomalies(params);
        return ResponseEntity.ok(anomalies);
    }
    
    @PostMapping("/reports")
    public ResponseEntity<ReportDTO> generateReport(
            @Valid @RequestBody GenerateReportRequest request,
            @RequestHeader("X-User-Id") String userId,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        ReportDTO report = analyticsService.generateReport(request, userId, tenantId);
        return ResponseEntity.status(HttpStatus.CREATED).body(report);
    }
    
    @GetMapping("/dashboard")
    public ResponseEntity<DashboardDTO> getDashboard(
            @RequestParam(required = false) List<String> deviceIds,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        DashboardDTO dashboard = analyticsService.getDashboard(deviceIds, tenantId);
        return ResponseEntity.ok(dashboard);
    }
}

@Service
public class AnalyticsService {
    
    private final SensorDataRepository dataRepository;
    private final AnomalyRepository anomalyRepository;
    private final DeviceRepository deviceRepository;
    private final ReportRepository reportRepository;
    private final AnalyticsEngine analyticsEngine;
    private final RedisTemplate<String, Object> redisTemplate;
    
    public AnalyticsService(SensorDataRepository dataRepository,
                           AnomalyRepository anomalyRepository,
                           DeviceRepository deviceRepository,
                           ReportRepository reportRepository,
                           AnalyticsEngine analyticsEngine,
                           RedisTemplate<String, Object> redisTemplate) {
        this.dataRepository = dataRepository;
        this.anomalyRepository = anomalyRepository;
        this.deviceRepository = deviceRepository;
        this.reportRepository = reportRepository;
        this.analyticsEngine = analyticsEngine;
        this.redisTemplate = redisTemplate;
    }
    
    @Cacheable(value = "device-statistics", key = "#params.deviceId + ':' + #params.startTime + ':' + #params.endTime")
    public DeviceStatisticsDTO getDeviceStatistics(StatisticsQueryParams params) {
        // 验证设备存在
        deviceRepository.findByDeviceIdAndTenantId(params.getDeviceId(), params.getTenantId())
                .orElseThrow(() -> new DeviceNotFoundException("Device not found: " + params.getDeviceId()));
        
        // 查询统计数据
        List<SensorData> dataList = dataRepository.findByDeviceIdAndTimeRange(
                params.getDeviceId(), params.getStartTime(), params.getEndTime());
        
        if (dataList.isEmpty()) {
            return DeviceStatisticsDTO.empty(params.getDeviceId());
        }
        
        // 按传感器类型分组计算统计信息
        Map<String, List<SensorData>> groupedData = dataList.stream()
                .filter(data -> params.getSensorType() == null || params.getSensorType().equals(data.getSensorType()))
                .collect(Collectors.groupingBy(SensorData::getSensorType));
        
        Map<String, SensorStatistics> statisticsMap = new HashMap<>();
        
        for (Map.Entry<String, List<SensorData>> entry : groupedData.entrySet()) {
            String sensorType = entry.getKey();
            List<SensorData> sensorDataList = entry.getValue();
            
            SensorStatistics stats = calculateStatistics(sensorDataList);
            statisticsMap.put(sensorType, stats);
        }
        
        return DeviceStatisticsDTO.builder()
                .deviceId(params.getDeviceId())
                .startTime(params.getStartTime())
                .endTime(params.getEndTime())
                .totalDataPoints(dataList.size())
                .sensorStatistics(statisticsMap)
                .build();
    }
    
    public List<TrendDataDTO> getDeviceTrends(TrendQueryParams params) {
        // 验证设备存在
        deviceRepository.findByDeviceIdAndTenantId(params.getDeviceId(), params.getTenantId())
                .orElseThrow(() -> new DeviceNotFoundException("Device not found: " + params.getDeviceId()));
        
        // 解析时间间隔
        Duration interval = parseInterval(params.getInterval());
        
        // 生成时间窗口
        List<TimeWindow> windows = generateTimeWindows(params.getStartTime(), params.getEndTime(), interval);
        
        List<TrendDataDTO> trends = new ArrayList<>();
        
        for (TimeWindow window : windows) {
            List<SensorData> windowData = dataRepository.findByDeviceIdAndSensorTypeAndTimeRange(
                    params.getDeviceId(), params.getSensorType(), window.getStart(), window.getEnd());
            
            if (!windowData.isEmpty()) {
                double avgValue = windowData.stream()
                        .mapToDouble(SensorData::getValue)
                        .average()
                        .orElse(0.0);
                
                TrendDataDTO trend = TrendDataDTO.builder()
                        .timestamp(window.getStart())
                        .value(avgValue)
                        .count(windowData.size())
                        .build();
                
                trends.add(trend);
            }
        }
        
        return trends;
    }
    
    public PagedResponse<AnomalyDTO> getAnomalies(AnomalyQueryParams params) {
        Specification<Anomaly> spec = AnomalySpecifications.buildSpecification(params);
        
        Pageable pageable = PageRequest.of(params.getPage(), params.getSize(),
                Sort.by(Sort.Direction.DESC, "timestamp"));
        
        Page<Anomaly> anomalyPage = anomalyRepository.findAll(spec, pageable);
        
        List<AnomalyDTO> anomalies = anomalyPage.getContent().stream()
                .map(this::mapToAnomalyDTO)
                .collect(Collectors.toList());
        
        return PagedResponse.<AnomalyDTO>builder()
                .content(anomalies)
                .page(anomalyPage.getNumber())
                .size(anomalyPage.getSize())
                .totalElements(anomalyPage.getTotalElements())
                .totalPages(anomalyPage.getTotalPages())
                .build();
    }
    
    public ReportDTO generateReport(GenerateReportRequest request, String userId, String tenantId) {
        // 创建报表记录
        Report report = Report.builder()
                .id(UUID.randomUUID().toString())
                .name(request.getName())
                .type(request.getType())
                .parameters(request.getParameters())
                .tenantId(tenantId)
                .createdBy(userId)
                .createdAt(LocalDateTime.now())
                .status(ReportStatus.GENERATING)
                .build();
        
        report = reportRepository.save(report);
        
        // 异步生成报表
        CompletableFuture.runAsync(() -> {
            try {
                generateReportAsync(report);
            } catch (Exception e) {
                log.error("报表生成失败: {}", report.getId(), e);
                updateReportStatus(report.getId(), ReportStatus.FAILED, e.getMessage());
            }
        });
        
        return mapToReportDTO(report);
    }
    
    @Cacheable(value = "dashboard", key = "#tenantId + ':' + (#deviceIds != null ? #deviceIds.hashCode() : 'all')")
    public DashboardDTO getDashboard(List<String> deviceIds, String tenantId) {
        // 获取设备列表
        List<Device> devices;
        if (deviceIds != null && !deviceIds.isEmpty()) {
            devices = deviceRepository.findByDeviceIdInAndTenantId(deviceIds, tenantId);
        } else {
            devices = deviceRepository.findByTenantIdAndDeletedFalse(tenantId);
        }
        
        if (devices.isEmpty()) {
            return DashboardDTO.empty();
        }
        
        // 计算仪表板指标
        DashboardMetrics metrics = calculateDashboardMetrics(devices, tenantId);
        
        return DashboardDTO.builder()
                .totalDevices(devices.size())
                .onlineDevices(metrics.getOnlineDevices())
                .offlineDevices(metrics.getOfflineDevices())
                .activeAlerts(metrics.getActiveAlerts())
                .totalDataPoints(metrics.getTotalDataPoints())
                .deviceStatusDistribution(metrics.getDeviceStatusDistribution())
                .recentAlerts(metrics.getRecentAlerts())
                .topDevicesByActivity(metrics.getTopDevicesByActivity())
                .build();
    }
    
    private SensorStatistics calculateStatistics(List<SensorData> dataList) {
        double[] values = dataList.stream()
                .mapToDouble(SensorData::getValue)
                .toArray();
        
        return SensorStatistics.builder()
                .count(values.length)
                .min(Arrays.stream(values).min().orElse(0.0))
                .max(Arrays.stream(values).max().orElse(0.0))
                .mean(Arrays.stream(values).average().orElse(0.0))
                .standardDeviation(calculateStandardDeviation(values))
                .build();
    }
    
    private double calculateStandardDeviation(double[] values) {
        double mean = Arrays.stream(values).average().orElse(0.0);
        double sumSquaredDiffs = Arrays.stream(values)
                .map(value -> Math.pow(value - mean, 2))
                .sum();
        return Math.sqrt(sumSquaredDiffs / values.length);
    }
    
    private Duration parseInterval(String interval) {
        // 解析间隔字符串,如 "1h", "30m", "1d"
        Pattern pattern = Pattern.compile("(\\d+)([hmsd])");
        Matcher matcher = pattern.matcher(interval);
        
        if (!matcher.matches()) {
            throw new IllegalArgumentException("Invalid interval format: " + interval);
        }
        
        int value = Integer.parseInt(matcher.group(1));
        String unit = matcher.group(2);
        
        switch (unit) {
            case "s": return Duration.ofSeconds(value);
            case "m": return Duration.ofMinutes(value);
            case "h": return Duration.ofHours(value);
            case "d": return Duration.ofDays(value);
            default: throw new IllegalArgumentException("Unsupported time unit: " + unit);
        }
    }
    
    private List<TimeWindow> generateTimeWindows(LocalDateTime start, LocalDateTime end, Duration interval) {
        List<TimeWindow> windows = new ArrayList<>();
        LocalDateTime current = start;
        
        while (current.isBefore(end)) {
            LocalDateTime windowEnd = current.plus(interval);
            if (windowEnd.isAfter(end)) {
                windowEnd = end;
            }
            
            windows.add(new TimeWindow(current, windowEnd));
            current = windowEnd;
        }
        
        return windows;
    }
    
    private void generateReportAsync(Report report) {
        try {
            // 根据报表类型生成不同的报表
            ReportData reportData = analyticsEngine.generateReport(report);
            
            // 更新报表状态和数据
            report.setData(reportData);
            report.setStatus(ReportStatus.COMPLETED);
            report.setCompletedAt(LocalDateTime.now());
            
            reportRepository.save(report);
            
        } catch (Exception e) {
            log.error("报表生成异常: {}", report.getId(), e);
            throw e;
        }
    }
    
    private void updateReportStatus(String reportId, ReportStatus status, String errorMessage) {
        reportRepository.findById(reportId).ifPresent(report -> {
            report.setStatus(status);
            if (errorMessage != null) {
                report.setErrorMessage(errorMessage);
            }
            reportRepository.save(report);
        });
    }
    
    private DashboardMetrics calculateDashboardMetrics(List<Device> devices, String tenantId) {
        // 实现仪表板指标计算逻辑
        // 这里简化实现,实际应该从各个服务获取数据
        
        long onlineDevices = devices.stream()
                .mapToLong(device -> isDeviceOnline(device.getDeviceId(), tenantId) ? 1 : 0)
                .sum();
        
        long offlineDevices = devices.size() - onlineDevices;
        
        return DashboardMetrics.builder()
                .onlineDevices((int) onlineDevices)
                .offlineDevices((int) offlineDevices)
                .activeAlerts(getActiveAlertsCount(tenantId))
                .totalDataPoints(getTotalDataPointsCount(tenantId))
                .deviceStatusDistribution(getDeviceStatusDistribution(devices))
                .recentAlerts(getRecentAlerts(tenantId, 10))
                .topDevicesByActivity(getTopDevicesByActivity(devices, tenantId, 5))
                .build();
    }
    
    private boolean isDeviceOnline(String deviceId, String tenantId) {
        // 从缓存或数据库检查设备在线状态
        String key = "device:status:" + deviceId + ":" + tenantId;
        Object status = redisTemplate.opsForValue().get(key);
        return status != null && "online".equals(status.toString());
    }
    
    private int getActiveAlertsCount(String tenantId) {
        // 获取活跃告警数量
        return (int) anomalyRepository.countByTenantIdAndResolvedFalse(tenantId);
    }
    
    private long getTotalDataPointsCount(String tenantId) {
        // 获取总数据点数量
        return dataRepository.countByTenantId(tenantId);
    }
    
    private Map<String, Integer> getDeviceStatusDistribution(List<Device> devices) {
        return devices.stream()
                .collect(Collectors.groupingBy(
                        device -> device.getStatus().toString(),
                        Collectors.collectingAndThen(Collectors.counting(), Math::toIntExact)
                ));
    }
    
    private List<AnomalyDTO> getRecentAlerts(String tenantId, int limit) {
        List<Anomaly> recentAnomalies = anomalyRepository.findTopByTenantIdOrderByTimestampDesc(tenantId, limit);
        return recentAnomalies.stream()
                .map(this::mapToAnomalyDTO)
                .collect(Collectors.toList());
    }
    
    private List<DeviceActivityDTO> getTopDevicesByActivity(List<Device> devices, String tenantId, int limit) {
        // 计算设备活跃度(基于数据点数量)
        return devices.stream()
                .map(device -> {
                    long dataCount = dataRepository.countByDeviceIdAndTenantId(device.getDeviceId(), tenantId);
                    return DeviceActivityDTO.builder()
                            .deviceId(device.getDeviceId())
                            .deviceName(device.getName())
                            .dataPointCount(dataCount)
                            .build();
                })
                .sorted(Comparator.comparing(DeviceActivityDTO::getDataPointCount).reversed())
                .limit(limit)
                .collect(Collectors.toList());
    }
    
    private AnomalyDTO mapToAnomalyDTO(Anomaly anomaly) {
        return AnomalyDTO.builder()
                .id(anomaly.getId())
                .deviceId(anomaly.getDeviceId())
                .sensorType(anomaly.getSensorType())
                .anomalyType(anomaly.getAnomalyType())
                .severity(anomaly.getSeverity())
                .description(anomaly.getDescription())
                .value(anomaly.getValue())
                .threshold(anomaly.getThreshold())
                .timestamp(anomaly.getTimestamp())
                .resolved(anomaly.isResolved())
                .build();
    }
    
    private ReportDTO mapToReportDTO(Report report) {
        return ReportDTO.builder()
                .id(report.getId())
                .name(report.getName())
                .type(report.getType())
                .status(report.getStatus())
                .createdAt(report.getCreatedAt())
                .completedAt(report.getCompletedAt())
                .downloadUrl(report.getStatus() == ReportStatus.COMPLETED ? 
                    "/api/reports/" + report.getId() + "/download" : null)
                .build();
    }
    
    // 内部类
    @Data
    @Builder
    public static class TimeWindow {
        private final LocalDateTime start;
        private final LocalDateTime end;
    }
    
    @Data
    @Builder
    public static class DashboardMetrics {
        private int onlineDevices;
        private int offlineDevices;
        private int activeAlerts;
        private long totalDataPoints;
        private Map<String, Integer> deviceStatusDistribution;
        private List<AnomalyDTO> recentAlerts;
        private List<DeviceActivityDTO> topDevicesByActivity;
    }
}

# 4.3 告警服务

// 示例:告警服务实现
@RestController
@RequestMapping("/api/alerts")
public class AlertController {
    
    private final AlertService alertService;
    
    public AlertController(AlertService alertService) {
        this.alertService = alertService;
    }
    
    @PostMapping("/rules")
    public ResponseEntity<AlertRuleDTO> createAlertRule(
            @Valid @RequestBody CreateAlertRuleRequest request,
            @RequestHeader("X-User-Id") String userId,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        AlertRuleDTO rule = alertService.createAlertRule(request, userId, tenantId);
        return ResponseEntity.status(HttpStatus.CREATED).body(rule);
    }
    
    @GetMapping("/rules")
    public ResponseEntity<PagedResponse<AlertRuleDTO>> getAlertRules(
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size,
            @RequestParam(required = false) String deviceId,
            @RequestParam(required = false) Boolean enabled,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        AlertRuleQueryParams params = AlertRuleQueryParams.builder()
                .tenantId(tenantId)
                .deviceId(deviceId)
                .enabled(enabled)
                .page(page)
                .size(size)
                .build();
        
        PagedResponse<AlertRuleDTO> rules = alertService.getAlertRules(params);
        return ResponseEntity.ok(rules);
    }
    
    @PutMapping("/rules/{ruleId}")
    public ResponseEntity<AlertRuleDTO> updateAlertRule(
            @PathVariable String ruleId,
            @Valid @RequestBody UpdateAlertRuleRequest request,
            @RequestHeader("X-User-Id") String userId,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        AlertRuleDTO rule = alertService.updateAlertRule(ruleId, request, userId, tenantId);
        return ResponseEntity.ok(rule);
    }
    
    @DeleteMapping("/rules/{ruleId}")
    public ResponseEntity<Void> deleteAlertRule(
            @PathVariable String ruleId,
            @RequestHeader("X-User-Id") String userId,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        alertService.deleteAlertRule(ruleId, userId, tenantId);
        return ResponseEntity.noContent().build();
    }
    
    @GetMapping
    public ResponseEntity<PagedResponse<AlertDTO>> getAlerts(
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size,
            @RequestParam(required = false) String deviceId,
            @RequestParam(required = false) String severity,
            @RequestParam(required = false) Boolean acknowledged,
            @RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime startTime,
            @RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime endTime,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        AlertQueryParams params = AlertQueryParams.builder()
                .tenantId(tenantId)
                .deviceId(deviceId)
                .severity(severity)
                .acknowledged(acknowledged)
                .startTime(startTime)
                .endTime(endTime)
                .page(page)
                .size(size)
                .build();
        
        PagedResponse<AlertDTO> alerts = alertService.getAlerts(params);
        return ResponseEntity.ok(alerts);
    }
    
    @PostMapping("/{alertId}/acknowledge")
    public ResponseEntity<Void> acknowledgeAlert(
            @PathVariable String alertId,
            @RequestBody(required = false) AcknowledgeAlertRequest request,
            @RequestHeader("X-User-Id") String userId,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        alertService.acknowledgeAlert(alertId, request, userId, tenantId);
        return ResponseEntity.ok().build();
    }
    
    @PostMapping("/{alertId}/resolve")
    public ResponseEntity<Void> resolveAlert(
            @PathVariable String alertId,
            @RequestBody(required = false) ResolveAlertRequest request,
            @RequestHeader("X-User-Id") String userId,
            @RequestHeader("X-Tenant-Id") String tenantId) {
        
        alertService.resolveAlert(alertId, request, userId, tenantId);
        return ResponseEntity.ok().build();
    }
}

@Service
@Transactional
public class AlertService {
    
    private final AlertRuleRepository ruleRepository;
    private final AlertRepository alertRepository;
    private final AlertNotificationService notificationService;
    private final AlertEvaluationEngine evaluationEngine;
    private final RedisTemplate<String, Object> redisTemplate;
    
    public AlertService(AlertRuleRepository ruleRepository,
                       AlertRepository alertRepository,
                       AlertNotificationService notificationService,
                       AlertEvaluationEngine evaluationEngine,
                       RedisTemplate<String, Object> redisTemplate) {
        this.ruleRepository = ruleRepository;
        this.alertRepository = alertRepository;
        this.notificationService = notificationService;
        this.evaluationEngine = evaluationEngine;
        this.redisTemplate = redisTemplate;
    }
    
    public AlertRuleDTO createAlertRule(CreateAlertRuleRequest request, String userId, String tenantId) {
        // 验证规则配置
        validateAlertRuleConfig(request);
        
        AlertRule rule = AlertRule.builder()
                .id(UUID.randomUUID().toString())
                .name(request.getName())
                .description(request.getDescription())
                .deviceId(request.getDeviceId())
                .sensorType(request.getSensorType())
                .condition(request.getCondition())
                .threshold(request.getThreshold())
                .severity(request.getSeverity())
                .enabled(request.isEnabled())
                .notificationChannels(request.getNotificationChannels())
                .tenantId(tenantId)
                .createdBy(userId)
                .createdAt(LocalDateTime.now())
                .build();
        
        rule = ruleRepository.save(rule);
        
        // 如果规则启用,添加到评估引擎
        if (rule.isEnabled()) {
            evaluationEngine.addRule(rule);
        }
        
        return mapToAlertRuleDTO(rule);
    }
    
    public PagedResponse<AlertRuleDTO> getAlertRules(AlertRuleQueryParams params) {
        Specification<AlertRule> spec = AlertRuleSpecifications.buildSpecification(params);
        
        Pageable pageable = PageRequest.of(params.getPage(), params.getSize(),
                Sort.by(Sort.Direction.DESC, "createdAt"));
        
        Page<AlertRule> rulePage = ruleRepository.findAll(spec, pageable);
        
        List<AlertRuleDTO> rules = rulePage.getContent().stream()
                .map(this::mapToAlertRuleDTO)
                .collect(Collectors.toList());
        
        return PagedResponse.<AlertRuleDTO>builder()
                .content(rules)
                .page(rulePage.getNumber())
                .size(rulePage.getSize())
                .totalElements(rulePage.getTotalElements())
                .totalPages(rulePage.getTotalPages())
                .build();
    }
    
    public AlertRuleDTO updateAlertRule(String ruleId, UpdateAlertRuleRequest request, String userId, String tenantId) {
        AlertRule rule = ruleRepository.findByIdAndTenantId(ruleId, tenantId)
                .orElseThrow(() -> new AlertRuleNotFoundException("Alert rule not found: " + ruleId));
        
        // 更新规则属性
        if (request.getName() != null) {
            rule.setName(request.getName());
        }
        if (request.getDescription() != null) {
            rule.setDescription(request.getDescription());
        }
        if (request.getCondition() != null) {
            rule.setCondition(request.getCondition());
        }
        if (request.getThreshold() != null) {
            rule.setThreshold(request.getThreshold());
        }
        if (request.getSeverity() != null) {
            rule.setSeverity(request.getSeverity());
        }
        if (request.getEnabled() != null) {
            boolean wasEnabled = rule.isEnabled();
            rule.setEnabled(request.getEnabled());
            
            // 更新评估引擎中的规则
            if (wasEnabled && !request.getEnabled()) {
                evaluationEngine.removeRule(ruleId);
            } else if (!wasEnabled && request.getEnabled()) {
                evaluationEngine.addRule(rule);
            } else if (request.getEnabled()) {
                evaluationEngine.updateRule(rule);
            }
        }
        if (request.getNotificationChannels() != null) {
            rule.setNotificationChannels(request.getNotificationChannels());
        }
        
        rule.setUpdatedBy(userId);
        rule.setUpdatedAt(LocalDateTime.now());
        
        rule = ruleRepository.save(rule);
        
        return mapToAlertRuleDTO(rule);
    }
    
    public void deleteAlertRule(String ruleId, String userId, String tenantId) {
        AlertRule rule = ruleRepository.findByIdAndTenantId(ruleId, tenantId)
                .orElseThrow(() -> new AlertRuleNotFoundException("Alert rule not found: " + ruleId));
        
        // 从评估引擎中移除规则
        if (rule.isEnabled()) {
            evaluationEngine.removeRule(ruleId);
        }
        
        // 软删除
        rule.setDeleted(true);
        rule.setDeletedBy(userId);
        rule.setDeletedAt(LocalDateTime.now());
        
        ruleRepository.save(rule);
    }
    
    public PagedResponse<AlertDTO> getAlerts(AlertQueryParams params) {
        Specification<Alert> spec = AlertSpecifications.buildSpecification(params);
        
        Pageable pageable = PageRequest.of(params.getPage(), params.getSize(),
                Sort.by(Sort.Direction.DESC, "createdAt"));
        
        Page<Alert> alertPage = alertRepository.findAll(spec, pageable);
        
        List<AlertDTO> alerts = alertPage.getContent().stream()
                .map(this::mapToAlertDTO)
                .collect(Collectors.toList());
        
        return PagedResponse.<AlertDTO>builder()
                .content(alerts)
                .page(alertPage.getNumber())
                .size(alertPage.getSize())
                .totalElements(alertPage.getTotalElements())
                .totalPages(alertPage.getTotalPages())
                .build();
    }
    
    public void acknowledgeAlert(String alertId, AcknowledgeAlertRequest request, String userId, String tenantId) {
        Alert alert = alertRepository.findByIdAndTenantId(alertId, tenantId)
                .orElseThrow(() -> new AlertNotFoundException("Alert not found: " + alertId));
        
        if (alert.isAcknowledged()) {
            throw new AlertAlreadyAcknowledgedException("Alert already acknowledged: " + alertId);
        }
        
        alert.setAcknowledged(true);
        alert.setAcknowledgedBy(userId);
        alert.setAcknowledgedAt(LocalDateTime.now());
        if (request != null && request.getComment() != null) {
            alert.setAcknowledgeComment(request.getComment());
        }
        
        alertRepository.save(alert);
        
        // 发送确认通知
        notificationService.sendAcknowledgmentNotification(alert, userId);
    }
    
    public void resolveAlert(String alertId, ResolveAlertRequest request, String userId, String tenantId) {
        Alert alert = alertRepository.findByIdAndTenantId(alertId, tenantId)
                .orElseThrow(() -> new AlertNotFoundException("Alert not found: " + alertId));
        
        if (alert.isResolved()) {
            throw new AlertAlreadyResolvedException("Alert already resolved: " + alertId);
        }
        
        alert.setResolved(true);
        alert.setResolvedBy(userId);
        alert.setResolvedAt(LocalDateTime.now());
        if (request != null && request.getComment() != null) {
            alert.setResolveComment(request.getComment());
        }
        
        alertRepository.save(alert);
        
        // 发送解决通知
        notificationService.sendResolutionNotification(alert, userId);
    }
    
    // 处理传感器数据,评估告警规则
    @EventListener
    public void handleSensorData(SensorDataEvent event) {
        SensorData data = event.getSensorData();
        
        // 获取相关的告警规则
        List<AlertRule> rules = ruleRepository.findByDeviceIdAndSensorTypeAndEnabledTrue(
                data.getDeviceId(), data.getSensorType());
        
        for (AlertRule rule : rules) {
            try {
                // 评估规则
                boolean triggered = evaluationEngine.evaluateRule(rule, data);
                
                if (triggered) {
                    // 检查是否已存在未解决的告警
                    boolean existingAlert = alertRepository.existsByRuleIdAndDeviceIdAndResolvedFalse(
                            rule.getId(), data.getDeviceId());
                    
                    if (!existingAlert) {
                        // 创建新告警
                        createAlert(rule, data);
                    }
                }
                
            } catch (Exception e) {
                log.error("告警规则评估失败: ruleId={}, deviceId={}", rule.getId(), data.getDeviceId(), e);
            }
        }
    }
    
    private void createAlert(AlertRule rule, SensorData data) {
        Alert alert = Alert.builder()
                .id(UUID.randomUUID().toString())
                .ruleId(rule.getId())
                .ruleName(rule.getName())
                .deviceId(data.getDeviceId())
                .sensorType(data.getSensorType())
                .severity(rule.getSeverity())
                .message(generateAlertMessage(rule, data))
                .value(data.getValue())
                .threshold(rule.getThreshold())
                .condition(rule.getCondition())
                .tenantId(rule.getTenantId())
                .createdAt(LocalDateTime.now())
                .acknowledged(false)
                .resolved(false)
                .build();
        
        alert = alertRepository.save(alert);
        
        // 发送告警通知
        notificationService.sendAlertNotification(alert, rule.getNotificationChannels());
        
        // 发布告警事件
        publishAlertEvent(alert);
    }
    
    private String generateAlertMessage(AlertRule rule, SensorData data) {
        return String.format("设备 %s 的 %s 传感器值 %.2f %s 阈值 %.2f",
                data.getDeviceId(),
                data.getSensorType(),
                data.getValue(),
                getConditionDescription(rule.getCondition()),
                rule.getThreshold());
    }
    
    private String getConditionDescription(AlertCondition condition) {
        switch (condition) {
            case GREATER_THAN: return "超过";
            case LESS_THAN: return "低于";
            case EQUALS: return "等于";
            case NOT_EQUALS: return "不等于";
            default: return "满足条件";
        }
    }
    
    private void validateAlertRuleConfig(CreateAlertRuleRequest request) {
        // 验证规则配置的有效性
        if (request.getThreshold() == null) {
            throw new InvalidAlertRuleException("Threshold is required");
        }
        
        if (request.getCondition() == null) {
            throw new InvalidAlertRuleException("Condition is required");
        }
        
        if (request.getSeverity() == null) {
            throw new InvalidAlertRuleException("Severity is required");
        }
    }
    
    private void publishAlertEvent(Alert alert) {
        AlertCreatedEvent event = AlertCreatedEvent.builder()
                .alertId(alert.getId())
                .deviceId(alert.getDeviceId())
                .severity(alert.getSeverity())
                .message(alert.getMessage())
                .tenantId(alert.getTenantId())
                .timestamp(alert.getCreatedAt())
                .build();
        
        // 发布到消息队列
        // applicationEventPublisher.publishEvent(event);
    }
    
    private AlertRuleDTO mapToAlertRuleDTO(AlertRule rule) {
        return AlertRuleDTO.builder()
                .id(rule.getId())
                .name(rule.getName())
                .description(rule.getDescription())
                .deviceId(rule.getDeviceId())
                .sensorType(rule.getSensorType())
                .condition(rule.getCondition())
                .threshold(rule.getThreshold())
                .severity(rule.getSeverity())
                .enabled(rule.isEnabled())
                .notificationChannels(rule.getNotificationChannels())
                .createdAt(rule.getCreatedAt())
                .updatedAt(rule.getUpdatedAt())
                .build();
    }
    
    private AlertDTO mapToAlertDTO(Alert alert) {
        return AlertDTO.builder()
                .id(alert.getId())
                .ruleId(alert.getRuleId())
                .ruleName(alert.getRuleName())
                .deviceId(alert.getDeviceId())
                .sensorType(alert.getSensorType())
                .severity(alert.getSeverity())
                .message(alert.getMessage())
                .value(alert.getValue())
                .threshold(alert.getThreshold())
                .condition(alert.getCondition())
                .acknowledged(alert.isAcknowledged())
                .acknowledgedBy(alert.getAcknowledgedBy())
                .acknowledgedAt(alert.getAcknowledgedAt())
                .resolved(alert.isResolved())
                .resolvedBy(alert.getResolvedBy())
                .resolvedAt(alert.getResolvedAt())
                .createdAt(alert.getCreatedAt())
                .build();
    }
}

# 5. 最佳实践总结

# 5.1 设计原则

  • 单一职责: 每个微服务专注于特定的业务领域
  • 松耦合: 服务间通过API和消息队列通信
  • 高内聚: 相关功能组织在同一个服务内
  • 可扩展性: 支持水平扩展和负载均衡
  • 容错性: 实现熔断、重试和降级机制

# 5.2 开发建议

  • API设计: 遵循RESTful设计原则,使用统一的响应格式
  • 数据一致性: 使用分布式事务或最终一致性模式
  • 缓存策略: 合理使用缓存提高性能
  • 监控告警: 实现全面的服务监控和告警
  • 安全防护: 实现认证授权和数据加密

# 6. 下一步学习

  • 学习前端展示层的设计和实现
  • 深入了解容器化部署和运维
  • 掌握微服务治理和服务网格
  • 研究分布式系统的设计模式