TCP/UDP设备接入层详细文档

# TCP/UDP设备接入层详细文档

# 1. 背景故事

在IoT世界的深处,有一群特殊的设备——它们来自工业现场、嵌入式系统和对性能要求极高的应用场景。这些设备不满足于HTTP的"请求-响应"模式,也不需要MQTT的发布订阅机制,它们需要的是最直接、最高效的通信方式。

想象一下:

  • 一台高速运转的工业机器人,每秒需要发送数千条位置数据
  • 一个车联网系统,需要实时传输车辆的GPS轨迹
  • 一套安防监控系统,要求毫秒级的响应时间
  • 一个智能电网,需要可靠地传输电力数据

这些场景催生了TCP/UDP设备接入层的诞生。TCP提供可靠的连接,确保数据不丢失;UDP提供极速的传输,追求最低的延迟。它们就像IoT世界的高速公路,为那些对性能有极致要求的设备提供专属通道。

# 2. TCP/UDP协议原理

# 2.1 TCP协议特点

**TCP(传输控制协议)**是一种面向连接的、可靠的传输协议:

  1. 面向连接:通信前需要建立连接(三次握手)
  2. 可靠传输:保证数据包的顺序和完整性
  3. 流量控制:防止发送方发送过快导致接收方缓冲区溢出
  4. 拥塞控制:根据网络状况调整发送速率
  5. 全双工通信:支持双向数据传输

TCP连接建立过程(三次握手):

客户端                    服务器
   |                        |
   |-------- SYN ---------->|
   |                        |
   |<------- SYN+ACK -------|
   |                        |
   |-------- ACK ---------->|
   |                        |
   |    连接建立成功         |

# 2.2 UDP协议特点

**UDP(用户数据报协议)**是一种无连接的、不可靠的传输协议:

  1. 无连接:发送数据前不需要建立连接
  2. 不可靠:不保证数据包的顺序和到达
  3. 低开销:协议头部简单,开销小
  4. 高效率:传输速度快,延迟低
  5. 支持广播:可以向多个目标发送数据

UDP数据传输过程:

客户端                    服务器
   |                        |
   |-------- 数据包 ------->|
   |-------- 数据包 ------->|
   |-------- 数据包 ------->|
   |                        |
   |    无需确认应答         |

# 2.3 TCP vs UDP 对比

特性 TCP UDP
连接性 面向连接 无连接
可靠性 可靠传输 不可靠传输
速度 较慢 较快
开销 较大 较小
应用场景 数据完整性要求高 实时性要求高
流量控制 支持 不支持
拥塞控制 支持 不支持

# 3. 系统架构

# 3.1 整体架构图

┌─────────────────────────────────────────────────────────────┐
│                    TCP/UDP设备接入层                        │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │  TCP服务器   │    │  UDP服务器   │    │  协议解析器  │     │
│  │             │    │             │    │             │     │
│  │ - 连接管理   │    │ - 数据接收   │    │ - 消息解码   │     │
│  │ - 数据接收   │    │ - 数据发送   │    │ - 消息编码   │     │
│  │ - 心跳检测   │    │ - 广播支持   │    │ - 协议适配   │     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │  连接池管理  │    │  消息队列    │    │  数据处理    │     │
│  │             │    │             │    │             │     │
│  │ - TCP连接池  │    │ - 消息缓存   │    │ - 数据验证   │     │
│  │ - 连接监控   │    │ - 异步处理   │    │ - 数据转换   │     │
│  │ - 负载均衡   │    │ - 消息路由   │    │ - 数据存储   │     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │  设备管理    │    │  安全认证    │    │  监控告警    │     │
│  │             │    │             │    │             │     │
│  │ - 设备注册   │    │ - 设备认证   │    │ - 性能监控   │     │
│  │ - 状态管理   │    │ - 数据加密   │    │ - 异常告警   │     │
│  │ - 配置管理   │    │ - 访问控制   │    │ - 日志记录   │     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                      数据存储层                             │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │   MySQL     │    │    Redis    │    │  InfluxDB   │     │
│  │  设备信息    │    │   连接缓存   │    │   时序数据   │     │
│  │  配置数据    │    │   会话管理   │    │   性能指标   │     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
└─────────────────────────────────────────────────────────────┘

# 3.2 核心组件说明

  1. TCP服务器:处理TCP连接和数据传输
  2. UDP服务器:处理UDP数据包的接收和发送
  3. 协议解析器:解析和构造各种设备协议
  4. 连接池管理:管理TCP连接的生命周期
  5. 消息队列:异步处理接收到的数据
  6. 数据处理:验证、转换和存储设备数据
  7. 设备管理:管理设备的注册和状态
  8. 安全认证:确保通信安全
  9. 监控告警:监控系统性能和异常

# 4. 核心实体设计

# 4.1 TCP连接实体

/**
 * TCP连接实体
 * 代表一个TCP设备连接的完整信息
 */
@Entity
@Table(name = "tcp_connection")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TcpConnection {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    /**
     * 连接ID - 唯一标识一个TCP连接
     */
    @Column(name = "connection_id", unique = true, nullable = false)
    private String connectionId;
    
    /**
     * 设备ID - 关联的设备标识
     */
    @Column(name = "device_id", nullable = false)
    private String deviceId;
    
    /**
     * 客户端IP地址
     */
    @Column(name = "client_ip", nullable = false)
    private String clientIp;
    
    /**
     * 客户端端口
     */
    @Column(name = "client_port", nullable = false)
    private Integer clientPort;
    
    /**
     * 服务器端口
     */
    @Column(name = "server_port", nullable = false)
    private Integer serverPort;
    
    /**
     * 连接状态
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "status", nullable = false)
    private ConnectionStatus status;
    
    /**
     * 协议类型
     */
    @Column(name = "protocol_type")
    private String protocolType;
    
    /**
     * 协议版本
     */
    @Column(name = "protocol_version")
    private String protocolVersion;
    
    /**
     * 连接建立时间
     */
    @Column(name = "connect_time", nullable = false)
    private LocalDateTime connectTime;
    
    /**
     * 最后活跃时间
     */
    @Column(name = "last_active_time")
    private LocalDateTime lastActiveTime;
    
    /**
     * 最后心跳时间
     */
    @Column(name = "last_heartbeat_time")
    private LocalDateTime lastHeartbeatTime;
    
    /**
     * 连接属性(JSON格式)
     */
    @Column(name = "attributes", columnDefinition = "JSON")
    private String attributes;
    
    /**
     * 发送字节数
     */
    @Column(name = "bytes_sent")
    private Long bytesSent = 0L;
    
    /**
     * 接收字节数
     */
    @Column(name = "bytes_received")
    private Long bytesReceived = 0L;
    
    /**
     * 发送消息数
     */
    @Column(name = "messages_sent")
    private Long messagesSent = 0L;
    
    /**
     * 接收消息数
     */
    @Column(name = "messages_received")
    private Long messagesReceived = 0L;
    
    /**
     * 创建时间
     */
    @Column(name = "create_time", nullable = false)
    private LocalDateTime createTime;
    
    /**
     * 更新时间
     */
    @Column(name = "update_time", nullable = false)
    private LocalDateTime updateTime;
    
    /**
     * 连接状态枚举
     */
    public enum ConnectionStatus {
        CONNECTING,    // 连接中
        CONNECTED,     // 已连接
        AUTHENTICATED, // 已认证
        DISCONNECTED,  // 已断开
        ERROR         // 错误状态
    }
    
    @PrePersist
    protected void onCreate() {
        createTime = LocalDateTime.now();
        updateTime = LocalDateTime.now();
        if (connectTime == null) {
            connectTime = LocalDateTime.now();
        }
        if (status == null) {
            status = ConnectionStatus.CONNECTING;
        }
    }
    
    @PreUpdate
    protected void onUpdate() {
        updateTime = LocalDateTime.now();
    }
    
    /**
     * 更新活跃时间
     */
    public void updateActiveTime() {
        this.lastActiveTime = LocalDateTime.now();
    }
    
    /**
     * 更新心跳时间
     */
    public void updateHeartbeatTime() {
        this.lastHeartbeatTime = LocalDateTime.now();
        updateActiveTime();
    }
    
    /**
     * 增加发送统计
     */
    public void addSentStats(long bytes, long messages) {
        this.bytesSent += bytes;
        this.messagesSent += messages;
        updateActiveTime();
    }
    
    /**
     * 增加接收统计
     */
    public void addReceivedStats(long bytes, long messages) {
        this.bytesReceived += bytes;
        this.messagesReceived += messages;
        updateActiveTime();
    }
    
    /**
     * 检查连接是否超时
     */
    public boolean isTimeout(int timeoutMinutes) {
        if (lastActiveTime == null) {
            return false;
        }
        return lastActiveTime.isBefore(LocalDateTime.now().minusMinutes(timeoutMinutes));
    }
    
    /**
     * 检查心跳是否超时
     */
    public boolean isHeartbeatTimeout(int timeoutMinutes) {
        if (lastHeartbeatTime == null) {
            return false;
        }
        return lastHeartbeatTime.isBefore(LocalDateTime.now().minusMinutes(timeoutMinutes));
    }
}

# 5.2 UDP服务器实现

/**
 * UDP服务器实现
 * 基于Netty框架构建高性能UDP服务器
 */
@Component
@Slf4j
public class UdpServer {
    
    @Autowired
    private UdpSessionService sessionService;
    
    @Autowired
    private MessageProcessingService messageProcessingService;
    
    @Autowired
    private DeviceAuthenticationService authenticationService;
    
    @Value("${iot.udp.server.port:9999}")
    private int serverPort;
    
    @Value("${iot.udp.server.worker-threads:4}")
    private int workerThreads;
    
    @Value("${iot.udp.server.so-rcvbuf:65536}")
    private int soRcvBuf;
    
    @Value("${iot.udp.server.so-sndbuf:65536}")
    private int soSndBuf;
    
    private EventLoopGroup workerGroup;
    private Channel serverChannel;
    
    /**
     * 启动UDP服务器
     */
    @PostConstruct
    public void start() {
        try {
            // 创建事件循环组
            workerGroup = new NioEventLoopGroup(workerThreads);
            
            // 创建服务器启动器
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup)
                    .channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)
                    .option(ChannelOption.SO_RCVBUF, soRcvBuf)
                    .option(ChannelOption.SO_SNDBUF, soSndBuf)
                    .handler(new UdpServerHandler());
            
            // 绑定端口并启动服务器
            ChannelFuture future = bootstrap.bind(serverPort).sync();
            serverChannel = future.channel();
            
            log.info("UDP服务器启动成功,监听端口: {}", serverPort);
            
            // 监听服务器关闭
            serverChannel.closeFuture().addListener(closeFuture -> {
                log.info("UDP服务器已关闭");
            });
            
        } catch (Exception e) {
            log.error("UDP服务器启动失败", e);
            shutdown();
        }
    }
    
    /**
     * 关闭UDP服务器
     */
    @PreDestroy
    public void shutdown() {
        try {
            if (serverChannel != null) {
                serverChannel.close().sync();
            }
        } catch (InterruptedException e) {
            log.error("关闭服务器通道时发生异常", e);
        } finally {
            if (workerGroup != null) {
                workerGroup.shutdownGracefully();
            }
            log.info("UDP服务器资源已释放");
        }
    }
    
    /**
     * UDP服务器处理器
     */
    private class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
            // 获取客户端地址信息
            InetSocketAddress sender = packet.sender();
            String clientIp = sender.getAddress().getHostAddress();
            int clientPort = sender.getPort();
            
            // 获取数据内容
            ByteBuf content = packet.content();
            byte[] data = new byte[content.readableBytes()];
            content.readBytes(data);
            
            try {
                // 获取或创建UDP会话
                String sessionKey = clientIp + ":" + clientPort;
                UdpSession session = sessionService.getOrCreateSession(sessionKey, clientIp, clientPort, serverPort);
                
                // 更新会话统计
                session.addReceivedStats(data.length, 1);
                sessionService.updateSession(session);
                
                // 创建设备消息
                DeviceMessage message = new DeviceMessage();
                message.setConnectionId(session.getSessionId());
                message.setTransportProtocol(DeviceMessage.TransportProtocol.UDP);
                message.setDirection(DeviceMessage.MessageDirection.INBOUND);
                message.setRawContent(bytesToHex(data));
                message.setMessageLength(data.length);
                message.setClientIp(clientIp);
                message.setClientPort(clientPort);
                message.setServerPort(serverPort);
                
                // 异步处理消息
                messageProcessingService.processMessage(message, ctx.channel(), sender);
                
            } catch (Exception e) {
                log.error("处理UDP消息时发生异常: {}:{}", clientIp, clientPort, e);
            }
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("UDP服务器发生异常", cause);
        }
        
        /**
         * 字节数组转十六进制字符串
         */
        private String bytesToHex(byte[] bytes) {
            StringBuilder sb = new StringBuilder();
            for (byte b : bytes) {
                sb.append(String.format("%02X", b));
            }
            return sb.toString();
        }
    }
    
    /**
     * 向指定地址发送UDP数据
     */
    public void sendData(String clientIp, int clientPort, byte[] data) {
        if (serverChannel == null || !serverChannel.isActive()) {
            log.warn("UDP服务器通道不可用");
            return;
        }
        
        try {
            // 创建目标地址
            InetSocketAddress recipient = new InetSocketAddress(clientIp, clientPort);
            
            // 创建数据包
            ByteBuf buffer = Unpooled.copiedBuffer(data);
            DatagramPacket packet = new DatagramPacket(buffer, recipient);
            
            // 发送数据包
            serverChannel.writeAndFlush(packet).addListener(future -> {
                if (future.isSuccess()) {
                    // 更新会话发送统计
                    String sessionKey = clientIp + ":" + clientPort;
                    UdpSession session = sessionService.getSession(sessionKey);
                    if (session != null) {
                        session.addSentStats(data.length, 1);
                        sessionService.updateSession(session);
                    }
                    
                    log.debug("UDP数据发送成功: {}:{} - {} bytes", clientIp, clientPort, data.length);
                } else {
                    log.error("UDP数据发送失败: {}:{}", clientIp, clientPort, future.cause());
                }
            });
            
        } catch (Exception e) {
            log.error("发送UDP数据时发生异常: {}:{}", clientIp, clientPort, e);
        }
    }
    
    /**
     * 广播UDP数据
     */
    public void broadcastData(String broadcastAddress, int port, byte[] data) {
        sendData(broadcastAddress, port, data);
    }
    
    /**
     * 向活跃会话发送数据
     */
    public void sendToActiveSessions(byte[] data) {
        List<UdpSession> activeSessions = sessionService.getActiveSessions();
        for (UdpSession session : activeSessions) {
            sendData(session.getClientIp(), session.getClientPort(), data);
        }
    }
    
    /**
     * 获取服务器状态
     */
    public Map<String, Object> getServerStatus() {
        Map<String, Object> status = new HashMap<>();
        status.put("port", serverPort);
        status.put("running", serverChannel != null && serverChannel.isActive());
        status.put("activeSessions", sessionService.getActiveSessionCount());
        status.put("totalSessions", sessionService.getTotalSessionCount());
        return status;
    }
}

# 5.3 消息处理服务

/**
 * 消息处理服务
 * 负责处理TCP/UDP接收到的消息
 */
@Service
@Slf4j
public class MessageProcessingService {
    
    @Autowired
    private DeviceMessageRepository messageRepository;
    
    @Autowired
    private ProtocolParserService protocolParserService;
    
    @Autowired
    private DeviceService deviceService;
    
    @Autowired
    private DeviceDataService deviceDataService;
    
    @Autowired
    private CommandResponseService commandResponseService;
    
    @Autowired
    @Qualifier("messageProcessingExecutor")
    private ThreadPoolTaskExecutor messageProcessingExecutor;
    
    /**
     * 处理TCP消息
     */
    public void processMessage(DeviceMessage message, Channel channel) {
        messageProcessingExecutor.submit(() -> {
            try {
                // 保存原始消息
                message = messageRepository.save(message);
                
                // 解析消息协议
                parseMessageProtocol(message);
                
                // 根据消息类型进行处理
                processMessageByType(message, channel, null);
                
            } catch (Exception e) {
                log.error("处理TCP消息失败: {}", message.getMessageId(), e);
                message.markAsFailed(e.getMessage());
                messageRepository.save(message);
            }
        });
    }
    
    /**
     * 处理UDP消息
     */
    public void processMessage(DeviceMessage message, Channel channel, InetSocketAddress sender) {
        messageProcessingExecutor.submit(() -> {
            try {
                // 保存原始消息
                message = messageRepository.save(message);
                
                // 解析消息协议
                parseMessageProtocol(message);
                
                // 根据消息类型进行处理
                processMessageByType(message, channel, sender);
                
            } catch (Exception e) {
                log.error("处理UDP消息失败: {}", message.getMessageId(), e);
                message.markAsFailed(e.getMessage());
                messageRepository.save(message);
            }
        });
    }
    
    /**
     * 解析消息协议
     */
    private void parseMessageProtocol(DeviceMessage message) {
        try {
            // 标记为处理中
            message.markAsProcessing();
            messageRepository.save(message);
            
            // 将十六进制字符串转换为字节数组
            byte[] rawData = hexToBytes(message.getRawContent());
            
            // 使用协议解析器解析消息
            ProtocolParseResult parseResult = protocolParserService.parseMessage(rawData);
            
            if (parseResult.isSuccess()) {
                // 解析成功,更新消息信息
                message.setProtocolType(parseResult.getProtocolType());
                message.setMessageType(parseResult.getMessageType());
                message.setDeviceId(parseResult.getDeviceId());
                message.setParsedContent(parseResult.getParsedContentJson());
                message.setSequenceNumber(parseResult.getSequenceNumber());
                
                log.debug("消息解析成功: {} - 协议: {}, 类型: {}, 设备: {}", 
                        message.getMessageId(), 
                        parseResult.getProtocolType(),
                        parseResult.getMessageType(),
                        parseResult.getDeviceId());
            } else {
                // 解析失败
                log.warn("消息解析失败: {} - {}", message.getMessageId(), parseResult.getErrorMessage());
                message.setMessageType(DeviceMessage.MessageType.OTHER);
                message.setErrorMessage(parseResult.getErrorMessage());
            }
            
        } catch (Exception e) {
            log.error("解析消息协议时发生异常: {}", message.getMessageId(), e);
            message.setMessageType(DeviceMessage.MessageType.OTHER);
            message.setErrorMessage(e.getMessage());
        }
    }
    
    /**
     * 根据消息类型进行处理
     */
    private void processMessageByType(DeviceMessage message, Channel channel, InetSocketAddress sender) {
        try {
            switch (message.getMessageType()) {
                case HEARTBEAT:
                    processHeartbeatMessage(message, channel, sender);
                    break;
                case DATA:
                    processDataMessage(message);
                    break;
                case AUTHENTICATION:
                    processAuthenticationMessage(message, channel, sender);
                    break;
                case COMMAND:
                    processCommandMessage(message);
                    break;
                case RESPONSE:
                    processResponseMessage(message);
                    break;
                case EVENT:
                    processEventMessage(message);
                    break;
                case CONFIGURATION:
                    processConfigurationMessage(message);
                    break;
                case FIRMWARE:
                    processFirmwareMessage(message);
                    break;
                default:
                    processOtherMessage(message);
                    break;
            }
            
            // 标记为已处理
            message.markAsProcessed();
            messageRepository.save(message);
            
        } catch (Exception e) {
            log.error("处理消息类型时发生异常: {} - {}", message.getMessageId(), message.getMessageType(), e);
            throw e;
        }
    }
    
    /**
     * 处理心跳消息
     */
    private void processHeartbeatMessage(DeviceMessage message, Channel channel, InetSocketAddress sender) {
        log.debug("处理心跳消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
        
        // 更新设备最后心跳时间
        if (StringUtils.hasText(message.getDeviceId())) {
            deviceService.updateLastHeartbeat(message.getDeviceId());
            
            // 更新连接心跳时间
            if (message.getTransportProtocol() == DeviceMessage.TransportProtocol.TCP) {
                // TCP连接心跳更新在TcpConnectionService中处理
            }
        }
        
        // 发送心跳响应(如果需要)
        sendHeartbeatResponse(message, channel, sender);
    }
    
    /**
     * 处理数据消息
     */
    private void processDataMessage(DeviceMessage message) {
        log.debug("处理数据消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
        
        if (StringUtils.hasText(message.getDeviceId()) && StringUtils.hasText(message.getParsedContent())) {
            try {
                // 解析设备数据
                DeviceData deviceData = parseDeviceData(message);
                
                // 保存设备数据
                deviceDataService.saveDeviceData(deviceData);
                
                // 更新设备最后数据上报时间
                deviceService.updateLastDataReport(message.getDeviceId());
                
                log.debug("设备数据保存成功: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
                
            } catch (Exception e) {
                log.error("处理设备数据时发生异常: {} - 设备: {}", message.getMessageId(), message.getDeviceId(), e);
            }
        }
    }
    
    /**
     * 处理认证消息
     */
    private void processAuthenticationMessage(DeviceMessage message, Channel channel, InetSocketAddress sender) {
        log.debug("处理认证消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
        
        // 实现设备认证逻辑
        // ...
    }
    
    /**
     * 处理命令消息
     */
    private void processCommandMessage(DeviceMessage message) {
        log.debug("处理命令消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
        
        // 实现命令处理逻辑
        // ...
    }
    
    /**
     * 处理响应消息
     */
    private void processResponseMessage(DeviceMessage message) {
        log.debug("处理响应消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
        
        // 处理命令响应
        if (StringUtils.hasText(message.getParsedContent())) {
            commandResponseService.handleCommandResponse(message);
        }
    }
    
    /**
     * 处理事件消息
     */
    private void processEventMessage(DeviceMessage message) {
        log.debug("处理事件消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
        
        // 实现事件处理逻辑
        // ...
    }
    
    /**
     * 处理配置消息
     */
    private void processConfigurationMessage(DeviceMessage message) {
        log.debug("处理配置消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
        
        // 实现配置处理逻辑
        // ...
    }
    
    /**
     * 处理固件消息
     */
    private void processFirmwareMessage(DeviceMessage message) {
        log.debug("处理固件消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
        
        // 实现固件处理逻辑
        // ...
    }
    
    /**
     * 处理其他消息
     */
    private void processOtherMessage(DeviceMessage message) {
        log.debug("处理其他消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
        
        // 记录未知消息类型
        log.warn("收到未知类型消息: {} - 原始内容: {}", message.getMessageId(), message.getRawContent());
    }
    
    /**
     * 发送心跳响应
     */
    private void sendHeartbeatResponse(DeviceMessage message, Channel channel, InetSocketAddress sender) {
        // 根据协议类型构造心跳响应
        byte[] responseData = protocolParserService.buildHeartbeatResponse(message.getProtocolType());
        
        if (responseData != null && responseData.length > 0) {
            if (message.getTransportProtocol() == DeviceMessage.TransportProtocol.TCP) {
                // TCP响应
                if (channel != null && channel.isActive()) {
                    channel.writeAndFlush(responseData);
                }
            } else if (message.getTransportProtocol() == DeviceMessage.TransportProtocol.UDP) {
                // UDP响应
                if (channel != null && channel.isActive() && sender != null) {
                    ByteBuf buffer = Unpooled.copiedBuffer(responseData);
                    DatagramPacket packet = new DatagramPacket(buffer, sender);
                    channel.writeAndFlush(packet);
                }
            }
        }
    }
    
    /**
     * 解析设备数据
     */
    private DeviceData parseDeviceData(DeviceMessage message) {
        // 根据解析后的内容创建设备数据对象
        DeviceData deviceData = new DeviceData();
        deviceData.setDeviceId(message.getDeviceId());
        deviceData.setDataType("sensor"); // 根据实际情况设置
        deviceData.setTimestamp(message.getTimestamp());
        deviceData.setData(message.getParsedContent());
        
        return deviceData;
    }
    
    /**
     * 十六进制字符串转字节数组
     */
    private byte[] hexToBytes(String hex) {
        int len = hex.length();
        byte[] data = new byte[len / 2];
        for (int i = 0; i < len; i += 2) {
            data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4)
                    + Character.digit(hex.charAt(i + 1), 16));
        }
        return data;
    }
}

# 5.4 协议解析服务

/**
 * 协议解析服务
 * 负责解析各种设备协议
 */
@Service
@Slf4j
public class ProtocolParserService {
    
    @Autowired
    private List<ProtocolParser> protocolParsers;
    
    /**
     * 解析消息
     */
    public ProtocolParseResult parseMessage(byte[] data) {
        // 尝试使用各种协议解析器解析消息
        for (ProtocolParser parser : protocolParsers) {
            if (parser.canParse(data)) {
                try {
                    return parser.parse(data);
                } catch (Exception e) {
                    log.warn("协议解析器 {} 解析失败", parser.getProtocolType(), e);
                }
            }
        }
        
        // 如果所有解析器都无法解析,返回失败结果
        return ProtocolParseResult.failure("无法识别的协议格式");
    }
    
    /**
     * 构造心跳响应
     */
    public byte[] buildHeartbeatResponse(String protocolType) {
        for (ProtocolParser parser : protocolParsers) {
            if (parser.getProtocolType().equals(protocolType)) {
                return parser.buildHeartbeatResponse();
            }
        }
        return null;
    }
    
    /**
     * 构造命令消息
     */
    public byte[] buildCommandMessage(String protocolType, String deviceId, String command, Object params) {
        for (ProtocolParser parser : protocolParsers) {
            if (parser.getProtocolType().equals(protocolType)) {
                return parser.buildCommandMessage(deviceId, command, params);
            }
        }
        return null;
    }
}

/**
 * 协议解析器接口
 */
public interface ProtocolParser {
    
    /**
     * 获取协议类型
     */
    String getProtocolType();
    
    /**
     * 判断是否可以解析该数据
     */
    boolean canParse(byte[] data);
    
    /**
     * 解析数据
     */
    ProtocolParseResult parse(byte[] data);
    
    /**
     * 构造心跳响应
     */
    byte[] buildHeartbeatResponse();
    
    /**
     * 构造命令消息
     */
    byte[] buildCommandMessage(String deviceId, String command, Object params);
}

/**
 * 协议解析结果
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ProtocolParseResult {
    
    private boolean success;
    private String protocolType;
    private DeviceMessage.MessageType messageType;
    private String deviceId;
    private String parsedContentJson;
    private Long sequenceNumber;
    private String errorMessage;
    
    public static ProtocolParseResult success(String protocolType, 
                                            DeviceMessage.MessageType messageType,
                                            String deviceId, 
                                            String parsedContentJson) {
        ProtocolParseResult result = new ProtocolParseResult();
        result.setSuccess(true);
        result.setProtocolType(protocolType);
        result.setMessageType(messageType);
        result.setDeviceId(deviceId);
        result.setParsedContentJson(parsedContentJson);
        return result;
    }
    
    public static ProtocolParseResult failure(String errorMessage) {
        ProtocolParseResult result = new ProtocolParseResult();
        result.setSuccess(false);
        result.setErrorMessage(errorMessage);
        return result;
    }
}

/**
 * 示例协议解析器 - 简单的TLV协议
 */
@Component
@Slf4j
public class SimpleTlvProtocolParser implements ProtocolParser {
    
    private static final String PROTOCOL_TYPE = "SIMPLE_TLV";
    private static final byte HEARTBEAT_TYPE = 0x01;
    private static final byte DATA_TYPE = 0x02;
    private static final byte COMMAND_TYPE = 0x03;
    private static final byte RESPONSE_TYPE = 0x04;
    
    @Override
    public String getProtocolType() {
        return PROTOCOL_TYPE;
    }
    
    @Override
    public boolean canParse(byte[] data) {
        // 简单的TLV协议:至少需要3个字节(Type + Length + Value)
        if (data == null || data.length < 3) {
            return false;
        }
        
        // 检查类型字段是否为已知类型
        byte type = data[0];
        return type == HEARTBEAT_TYPE || type == DATA_TYPE || type == COMMAND_TYPE || type == RESPONSE_TYPE;
    }
    
    @Override
    public ProtocolParseResult parse(byte[] data) {
        try {
            // 解析TLV结构
            byte type = data[0];
            int length = data[1] & 0xFF;
            
            if (data.length < 2 + length) {
                return ProtocolParseResult.failure("数据长度不足");
            }
            
            byte[] value = new byte[length];
            System.arraycopy(data, 2, value, 0, length);
            
            // 根据类型解析消息
            DeviceMessage.MessageType messageType;
            String deviceId = "";
            Map<String, Object> parsedData = new HashMap<>();
            
            switch (type) {
                case HEARTBEAT_TYPE:
                    messageType = DeviceMessage.MessageType.HEARTBEAT;
                    deviceId = new String(value, StandardCharsets.UTF_8);
                    parsedData.put("type", "heartbeat");
                    parsedData.put("deviceId", deviceId);
                    break;
                    
                case DATA_TYPE:
                    messageType = DeviceMessage.MessageType.DATA;
                    // 假设前8字节是设备ID,后面是数据
                    if (value.length >= 8) {
                        deviceId = new String(value, 0, 8, StandardCharsets.UTF_8).trim();
                        byte[] sensorData = new byte[value.length - 8];
                        System.arraycopy(value, 8, sensorData, 0, sensorData.length);
                        
                        parsedData.put("type", "data");
                        parsedData.put("deviceId", deviceId);
                        parsedData.put("sensorData", bytesToHex(sensorData));
                    }
                    break;
                    
                case COMMAND_TYPE:
                    messageType = DeviceMessage.MessageType.COMMAND;
                    // 解析命令数据
                    parsedData.put("type", "command");
                    parsedData.put("commandData", bytesToHex(value));
                    break;
                    
                case RESPONSE_TYPE:
                    messageType = DeviceMessage.MessageType.RESPONSE;
                    // 解析响应数据
                    parsedData.put("type", "response");
                    parsedData.put("responseData", bytesToHex(value));
                    break;
                    
                default:
                    return ProtocolParseResult.failure("未知的消息类型: " + type);
            }
            
            // 转换为JSON字符串
            ObjectMapper objectMapper = new ObjectMapper();
            String parsedContentJson = objectMapper.writeValueAsString(parsedData);
            
            return ProtocolParseResult.success(PROTOCOL_TYPE, messageType, deviceId, parsedContentJson);
            
        } catch (Exception e) {
            log.error("解析SimpleTLV协议时发生异常", e);
            return ProtocolParseResult.failure("解析异常: " + e.getMessage());
        }
    }
    
    @Override
    public byte[] buildHeartbeatResponse() {
        // 构造心跳响应:Type(1) + Length(1) + Value("OK")
        byte[] response = new byte[4];
        response[0] = HEARTBEAT_TYPE;
        response[1] = 2; // "OK"的长度
        response[2] = 'O';
        response[3] = 'K';
        return response;
    }
    
    @Override
    public byte[] buildCommandMessage(String deviceId, String command, Object params) {
        try {
            // 构造命令消息
            Map<String, Object> commandData = new HashMap<>();
            commandData.put("deviceId", deviceId);
            commandData.put("command", command);
            commandData.put("params", params);
            
            ObjectMapper objectMapper = new ObjectMapper();
            String jsonData = objectMapper.writeValueAsString(commandData);
            byte[] valueBytes = jsonData.getBytes(StandardCharsets.UTF_8);
            
            byte[] message = new byte[2 + valueBytes.length];
            message[0] = COMMAND_TYPE;
            message[1] = (byte) valueBytes.length;
            System.arraycopy(valueBytes, 0, message, 2, valueBytes.length);
            
            return message;
            
        } catch (Exception e) {
            log.error("构造命令消息时发生异常", e);
            return null;
        }
    }
    
    /**
     * 字节数组转十六进制字符串
     */
    private String bytesToHex(byte[] bytes) {
        StringBuilder sb = new StringBuilder();
        for (byte b : bytes) {
            sb.append(String.format("%02X", b));
        }
        return sb.toString();
    }
}

# 6. 数据库设计

# 6.1 TCP连接表

-- TCP连接表
CREATE TABLE tcp_connection (
    connection_id VARCHAR(64) PRIMARY KEY COMMENT '连接ID',
    device_id VARCHAR(64) COMMENT '设备ID',
    client_ip VARCHAR(45) NOT NULL COMMENT '客户端IP地址',
    client_port INT NOT NULL COMMENT '客户端端口',
    server_port INT NOT NULL COMMENT '服务器端口',
    connection_status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE' COMMENT '连接状态:ACTIVE, INACTIVE, CLOSED',
    protocol_type VARCHAR(50) COMMENT '协议类型',
    connect_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '连接时间',
    last_heartbeat TIMESTAMP COMMENT '最后心跳时间',
    last_activity TIMESTAMP COMMENT '最后活动时间',
    disconnect_time TIMESTAMP COMMENT '断开连接时间',
    disconnect_reason VARCHAR(255) COMMENT '断开原因',
    bytes_received BIGINT DEFAULT 0 COMMENT '接收字节数',
    bytes_sent BIGINT DEFAULT 0 COMMENT '发送字节数',
    messages_received INT DEFAULT 0 COMMENT '接收消息数',
    messages_sent INT DEFAULT 0 COMMENT '发送消息数',
    error_count INT DEFAULT 0 COMMENT '错误次数',
    created_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    updated_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    
    INDEX idx_device_id (device_id),
    INDEX idx_client_ip (client_ip),
    INDEX idx_connection_status (connection_status),
    INDEX idx_connect_time (connect_time),
    INDEX idx_last_activity (last_activity)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='TCP连接信息表';

# 6.2 UDP会话表

-- UDP会话表
CREATE TABLE udp_session (
    session_id VARCHAR(64) PRIMARY KEY COMMENT '会话ID',
    session_key VARCHAR(128) UNIQUE NOT NULL COMMENT '会话键(IP:Port)',
    device_id VARCHAR(64) COMMENT '设备ID',
    client_ip VARCHAR(45) NOT NULL COMMENT '客户端IP地址',
    client_port INT NOT NULL COMMENT '客户端端口',
    server_port INT NOT NULL COMMENT '服务器端口',
    session_status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE' COMMENT '会话状态:ACTIVE, INACTIVE, EXPIRED',
    protocol_type VARCHAR(50) COMMENT '协议类型',
    first_contact TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '首次联系时间',
    last_contact TIMESTAMP COMMENT '最后联系时间',
    expire_time TIMESTAMP COMMENT '过期时间',
    bytes_received BIGINT DEFAULT 0 COMMENT '接收字节数',
    bytes_sent BIGINT DEFAULT 0 COMMENT '发送字节数',
    packets_received INT DEFAULT 0 COMMENT '接收数据包数',
    packets_sent INT DEFAULT 0 COMMENT '发送数据包数',
    error_count INT DEFAULT 0 COMMENT '错误次数',
    created_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    updated_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    
    INDEX idx_session_key (session_key),
    INDEX idx_device_id (device_id),
    INDEX idx_client_ip (client_ip),
    INDEX idx_session_status (session_status),
    INDEX idx_last_contact (last_contact)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='UDP会话信息表';

# 6.3 设备消息表

-- 设备消息表
CREATE TABLE device_message (
    message_id VARCHAR(64) PRIMARY KEY COMMENT '消息ID',
    connection_id VARCHAR(64) COMMENT '连接/会话ID',
    device_id VARCHAR(64) COMMENT '设备ID',
    transport_protocol VARCHAR(10) NOT NULL COMMENT '传输协议:TCP, UDP',
    protocol_type VARCHAR(50) COMMENT '应用协议类型',
    message_type VARCHAR(20) COMMENT '消息类型:HEARTBEAT, DATA, COMMAND, RESPONSE, EVENT等',
    direction VARCHAR(10) NOT NULL COMMENT '消息方向:INBOUND, OUTBOUND',
    raw_content TEXT COMMENT '原始消息内容(十六进制)',
    parsed_content TEXT COMMENT '解析后的消息内容(JSON)',
    message_length INT COMMENT '消息长度',
    sequence_number BIGINT COMMENT '序列号',
    client_ip VARCHAR(45) COMMENT '客户端IP',
    client_port INT COMMENT '客户端端口',
    server_port INT COMMENT '服务器端口',
    processing_status VARCHAR(20) DEFAULT 'PENDING' COMMENT '处理状态:PENDING, PROCESSING, PROCESSED, FAILED',
    error_message TEXT COMMENT '错误信息',
    processing_time BIGINT COMMENT '处理耗时(毫秒)',
    timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '消息时间戳',
    created_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    
    INDEX idx_connection_id (connection_id),
    INDEX idx_device_id (device_id),
    INDEX idx_transport_protocol (transport_protocol),
    INDEX idx_message_type (message_type),
    INDEX idx_direction (direction),
    INDEX idx_processing_status (processing_status),
    INDEX idx_timestamp (timestamp),
    INDEX idx_created_time (created_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备消息表';

-- 按月分表(示例)
CREATE TABLE device_message_202401 LIKE device_message;
CREATE TABLE device_message_202402 LIKE device_message;
-- ... 其他月份表

# 6.4 连接统计表

-- 连接统计表
CREATE TABLE connection_statistics (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    stat_date DATE NOT NULL COMMENT '统计日期',
    stat_hour TINYINT COMMENT '统计小时(0-23)',
    transport_protocol VARCHAR(10) NOT NULL COMMENT '传输协议:TCP, UDP',
    total_connections INT DEFAULT 0 COMMENT '总连接数',
    active_connections INT DEFAULT 0 COMMENT '活跃连接数',
    new_connections INT DEFAULT 0 COMMENT '新建连接数',
    closed_connections INT DEFAULT 0 COMMENT '关闭连接数',
    total_messages BIGINT DEFAULT 0 COMMENT '总消息数',
    inbound_messages BIGINT DEFAULT 0 COMMENT '入站消息数',
    outbound_messages BIGINT DEFAULT 0 COMMENT '出站消息数',
    total_bytes BIGINT DEFAULT 0 COMMENT '总字节数',
    inbound_bytes BIGINT DEFAULT 0 COMMENT '入站字节数',
    outbound_bytes BIGINT DEFAULT 0 COMMENT '出站字节数',
    error_count INT DEFAULT 0 COMMENT '错误次数',
    avg_processing_time DECIMAL(10,2) COMMENT '平均处理时间(毫秒)',
    created_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    
    UNIQUE KEY uk_stat_date_hour_protocol (stat_date, stat_hour, transport_protocol),
    INDEX idx_stat_date (stat_date),
    INDEX idx_transport_protocol (transport_protocol)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='连接统计表';

# 7. 性能优化策略

# 7.1 连接池配置

# application.yml
iot:
  tcp:
    server:
      # TCP服务器配置
      port: 8888
      boss-threads: 1
      worker-threads: 8
      # 连接配置
      so-backlog: 1024
      so-keepalive: true
      tcp-nodelay: true
      so-reuseaddr: true
      # 缓冲区配置
      so-rcvbuf: 65536
      so-sndbuf: 65536
      # 连接超时配置
      connect-timeout: 30000
      read-timeout: 300000
      write-timeout: 30000
      # 连接池配置
      max-connections: 10000
      connection-idle-timeout: 600000
      
  udp:
    server:
      # UDP服务器配置
      port: 9999
      worker-threads: 4
      # 缓冲区配置
      so-rcvbuf: 65536
      so-sndbuf: 65536
      # 会话配置
      session-timeout: 300000
      max-sessions: 50000
      
  message:
    processing:
      # 消息处理线程池配置
      core-pool-size: 10
      max-pool-size: 50
      queue-capacity: 1000
      keep-alive-seconds: 60
      thread-name-prefix: "message-processor-"
      # 批处理配置
      batch-size: 100
      batch-timeout: 1000
      
  cache:
    # 缓存配置
    connection-cache-size: 10000
    connection-cache-ttl: 3600
    session-cache-size: 50000
    session-cache-ttl: 1800
    device-cache-size: 100000
    device-cache-ttl: 7200

# 7.2 缓存策略配置

/**
 * 缓存配置
 */
@Configuration
@EnableCaching
public class CacheConfig {
    
    /**
     * 连接缓存管理器
     */
    @Bean("connectionCacheManager")
    public CacheManager connectionCacheManager() {
        RedisCacheManager.Builder builder = RedisCacheManager
                .RedisCacheManagerBuilder
                .fromConnectionFactory(redisConnectionFactory())
                .cacheDefaults(connectionCacheConfiguration());
        
        return builder.build();
    }
    
    /**
     * 连接缓存配置
     */
    private RedisCacheConfiguration connectionCacheConfiguration() {
        return RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofHours(1)) // 1小时过期
                .serializeKeysWith(RedisSerializationContext.SerializationPair
                        .fromSerializer(new StringRedisSerializer()))
                .serializeValuesWith(RedisSerializationContext.SerializationPair
                        .fromSerializer(new GenericJackson2JsonRedisSerializer()))
                .disableCachingNullValues();
    }
    
    /**
     * 会话缓存管理器
     */
    @Bean("sessionCacheManager")
    public CacheManager sessionCacheManager() {
        RedisCacheManager.Builder builder = RedisCacheManager
                .RedisCacheManagerBuilder
                .fromConnectionFactory(redisConnectionFactory())
                .cacheDefaults(sessionCacheConfiguration());
        
        return builder.build();
    }
    
    /**
     * 会话缓存配置
     */
    private RedisCacheConfiguration sessionCacheConfiguration() {
        return RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofMinutes(30)) // 30分钟过期
                .serializeKeysWith(RedisSerializationContext.SerializationPair
                        .fromSerializer(new StringRedisSerializer()))
                .serializeValuesWith(RedisSerializationContext.SerializationPair
                        .fromSerializer(new GenericJackson2JsonRedisSerializer()))
                .disableCachingNullValues();
    }
}

# 7.3 异步处理配置

/**
 * 异步处理配置
 */
@Configuration
@EnableAsync
public class AsyncConfig {
    
    /**
     * 消息处理线程池
     */
    @Bean("messageProcessingExecutor")
    public ThreadPoolTaskExecutor messageProcessingExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(1000);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("message-processor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(30);
        executor.initialize();
        return executor;
    }
    
    /**
     * 数据库操作线程池
     */
    @Bean("databaseExecutor")
    public ThreadPoolTaskExecutor databaseExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(500);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("database-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
    
    /**
     * 统计任务线程池
     */
    @Bean("statisticsExecutor")
    public ThreadPoolTaskExecutor statisticsExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("statistics-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        executor.initialize();
        return executor;
    }
}

# 7.4 数据库优化配置

# application.yml
spring:
  datasource:
    # 数据库连接池配置
    hikari:
      minimum-idle: 10
      maximum-pool-size: 50
      idle-timeout: 300000
      max-lifetime: 1800000
      connection-timeout: 30000
      validation-timeout: 5000
      leak-detection-threshold: 60000
      
  jpa:
    hibernate:
      ddl-auto: none
    properties:
      hibernate:
        # 批处理配置
        jdbc:
          batch_size: 50
          batch_versioned_data: true
        order_inserts: true
        order_updates: true
        # 查询优化
        query:
          plan_cache_max_size: 2048
        # 缓存配置
        cache:
          use_second_level_cache: true
          use_query_cache: true
          region:
            factory_class: org.hibernate.cache.jcache.JCacheRegionFactory
            
# MyBatis配置
mybatis:
  configuration:
    # 开启二级缓存
    cache-enabled: true
    # 延迟加载
    lazy-loading-enabled: true
    aggressive-lazy-loading: false
    # 批处理
    default-executor-type: batch

# 8. 监控和告警

# 8.1 性能监控服务

/**
 * 性能监控服务
 */
@Service
@Slf4j
public class PerformanceMonitoringService {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    @Autowired
    private TcpConnectionService tcpConnectionService;
    
    @Autowired
    private UdpSessionService udpSessionService;
    
    // 性能指标
    private final Counter tcpConnectionCounter;
    private final Counter udpSessionCounter;
    private final Counter messageCounter;
    private final Timer messageProcessingTimer;
    private final Gauge activeConnectionsGauge;
    private final Gauge activeSessionsGauge;
    
    public PerformanceMonitoringService(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        
        // 初始化计数器
        this.tcpConnectionCounter = Counter.builder("tcp.connections.total")
                .description("Total TCP connections")
                .register(meterRegistry);
                
        this.udpSessionCounter = Counter.builder("udp.sessions.total")
                .description("Total UDP sessions")
                .register(meterRegistry);
                
        this.messageCounter = Counter.builder("messages.total")
                .description("Total messages processed")
                .tag("protocol", "tcp")
                .register(meterRegistry);
                
        // 初始化计时器
        this.messageProcessingTimer = Timer.builder("message.processing.time")
                .description("Message processing time")
                .register(meterRegistry);
                
        // 初始化仪表
        this.activeConnectionsGauge = Gauge.builder("tcp.connections.active")
                .description("Active TCP connections")
                .register(meterRegistry, this, PerformanceMonitoringService::getActiveTcpConnections);
                
        this.activeSessionsGauge = Gauge.builder("udp.sessions.active")
                .description("Active UDP sessions")
                .register(meterRegistry, this, PerformanceMonitoringService::getActiveUdpSessions);
    }
    
    /**
     * 记录TCP连接
     */
    public void recordTcpConnection() {
        tcpConnectionCounter.increment();
    }
    
    /**
     * 记录UDP会话
     */
    public void recordUdpSession() {
        udpSessionCounter.increment();
    }
    
    /**
     * 记录消息处理
     */
    public void recordMessageProcessing(String protocol, String messageType, long processingTime) {
        messageCounter.increment(
                Tags.of(
                        "protocol", protocol,
                        "type", messageType
                )
        );
        
        messageProcessingTimer.record(processingTime, TimeUnit.MILLISECONDS);
    }
    
    /**
     * 获取活跃TCP连接数
     */
    public double getActiveTcpConnections() {
        return tcpConnectionService.getActiveConnectionCount();
    }
    
    /**
     * 获取活跃UDP会话数
     */
    public double getActiveUdpSessions() {
        return udpSessionService.getActiveSessionCount();
    }
    
    /**
     * 获取系统性能指标
     */
    public Map<String, Object> getPerformanceMetrics() {
        Map<String, Object> metrics = new HashMap<>();
        
        // 连接指标
        metrics.put("tcp_connections_total", tcpConnectionCounter.count());
        metrics.put("tcp_connections_active", getActiveTcpConnections());
        metrics.put("udp_sessions_total", udpSessionCounter.count());
        metrics.put("udp_sessions_active", getActiveUdpSessions());
        
        // 消息指标
        metrics.put("messages_total", messageCounter.count());
        metrics.put("message_processing_avg_time", messageProcessingTimer.mean(TimeUnit.MILLISECONDS));
        metrics.put("message_processing_max_time", messageProcessingTimer.max(TimeUnit.MILLISECONDS));
        
        // 系统指标
        Runtime runtime = Runtime.getRuntime();
        metrics.put("memory_used", runtime.totalMemory() - runtime.freeMemory());
        metrics.put("memory_total", runtime.totalMemory());
        metrics.put("memory_max", runtime.maxMemory());
        metrics.put("cpu_cores", runtime.availableProcessors());
        
        return metrics;
    }
}

# 8.2 健康检查服务

/**
 * 健康检查服务
 */
@Component
public class HealthCheckService {
    
    @Autowired
    private TcpServer tcpServer;
    
    @Autowired
    private UdpServer udpServer;
    
    @Autowired
    private DataSource dataSource;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * TCP服务器健康检查
     */
    @Bean
    public HealthIndicator tcpServerHealthIndicator() {
        return () -> {
            try {
                Map<String, Object> status = tcpServer.getServerStatus();
                boolean isRunning = (Boolean) status.get("running");
                
                if (isRunning) {
                    return Health.up()
                            .withDetails(status)
                            .build();
                } else {
                    return Health.down()
                            .withDetail("reason", "TCP server is not running")
                            .withDetails(status)
                            .build();
                }
            } catch (Exception e) {
                return Health.down()
                        .withDetail("error", e.getMessage())
                        .build();
            }
        };
    }
    
    /**
     * UDP服务器健康检查
     */
    @Bean
    public HealthIndicator udpServerHealthIndicator() {
        return () -> {
            try {
                Map<String, Object> status = udpServer.getServerStatus();
                boolean isRunning = (Boolean) status.get("running");
                
                if (isRunning) {
                    return Health.up()
                            .withDetails(status)
                            .build();
                } else {
                    return Health.down()
                            .withDetail("reason", "UDP server is not running")
                            .withDetails(status)
                            .build();
                }
            } catch (Exception e) {
                return Health.down()
                        .withDetail("error", e.getMessage())
                        .build();
            }
        };
    }
    
    /**
     * 数据库健康检查
     */
    @Bean
    public HealthIndicator databaseHealthIndicator() {
        return () -> {
            try (Connection connection = dataSource.getConnection()) {
                if (connection.isValid(5)) {
                    return Health.up()
                            .withDetail("database", "Available")
                            .build();
                } else {
                    return Health.down()
                            .withDetail("database", "Connection invalid")
                            .build();
                }
            } catch (Exception e) {
                return Health.down()
                        .withDetail("database", "Connection failed: " + e.getMessage())
                        .build();
            }
        };
    }
    
    /**
     * Redis健康检查
     */
    @Bean
    public HealthIndicator redisHealthIndicator() {
        return () -> {
            try {
                String pong = redisTemplate.getConnectionFactory()
                        .getConnection()
                        .ping();
                        
                if ("PONG".equals(pong)) {
                    return Health.up()
                            .withDetail("redis", "Available")
                            .build();
                } else {
                    return Health.down()
                            .withDetail("redis", "Ping failed")
                            .build();
                }
            } catch (Exception e) {
                return Health.down()
                        .withDetail("redis", "Connection failed: " + e.getMessage())
                        .build();
            }
        };
    }
}

# 8.3 告警配置

/**
 * 告警服务
 */
@Service
@Slf4j
public class AlertService {
    
    @Autowired
    private PerformanceMonitoringService monitoringService;
    
    @Autowired
    private NotificationService notificationService;
    
    @Value("${iot.alert.tcp.max-connections:8000}")
    private int maxTcpConnections;
    
    @Value("${iot.alert.udp.max-sessions:40000}")
    private int maxUdpSessions;
    
    @Value("${iot.alert.message.max-processing-time:5000}")
    private long maxMessageProcessingTime;
    
    @Value("${iot.alert.memory.max-usage-percent:80}")
    private double maxMemoryUsagePercent;
    
    /**
     * 检查连接数告警
     */
    @Scheduled(fixedRate = 60000) // 每分钟检查一次
    public void checkConnectionAlerts() {
        try {
            // 检查TCP连接数
            double activeTcpConnections = monitoringService.getActiveTcpConnections();
            if (activeTcpConnections > maxTcpConnections) {
                sendAlert("TCP连接数过高", 
                        String.format("当前TCP连接数: %.0f, 阈值: %d", activeTcpConnections, maxTcpConnections),
                        AlertLevel.WARNING);
            }
            
            // 检查UDP会话数
            double activeUdpSessions = monitoringService.getActiveUdpSessions();
            if (activeUdpSessions > maxUdpSessions) {
                sendAlert("UDP会话数过高", 
                        String.format("当前UDP会话数: %.0f, 阈值: %d", activeUdpSessions, maxUdpSessions),
                        AlertLevel.WARNING);
            }
            
        } catch (Exception e) {
            log.error("检查连接告警时发生异常", e);
        }
    }
    
    /**
     * 检查性能告警
     */
    @Scheduled(fixedRate = 300000) // 每5分钟检查一次
    public void checkPerformanceAlerts() {
        try {
            Map<String, Object> metrics = monitoringService.getPerformanceMetrics();
            
            // 检查消息处理时间
            Double avgProcessingTime = (Double) metrics.get("message_processing_avg_time");
            if (avgProcessingTime != null && avgProcessingTime > maxMessageProcessingTime) {
                sendAlert("消息处理时间过长", 
                        String.format("平均处理时间: %.2f ms, 阈值: %d ms", avgProcessingTime, maxMessageProcessingTime),
                        AlertLevel.WARNING);
            }
            
            // 检查内存使用率
            Long memoryUsed = (Long) metrics.get("memory_used");
            Long memoryMax = (Long) metrics.get("memory_max");
            if (memoryUsed != null && memoryMax != null) {
                double usagePercent = (double) memoryUsed / memoryMax * 100;
                if (usagePercent > maxMemoryUsagePercent) {
                    sendAlert("内存使用率过高", 
                            String.format("内存使用率: %.2f%%, 阈值: %.2f%%", usagePercent, maxMemoryUsagePercent),
                            AlertLevel.CRITICAL);
                }
            }
            
        } catch (Exception e) {
            log.error("检查性能告警时发生异常", e);
        }
    }
    
    /**
     * 发送告警
     */
    private void sendAlert(String title, String message, AlertLevel level) {
        Alert alert = new Alert();
        alert.setTitle(title);
        alert.setMessage(message);
        alert.setLevel(level);
        alert.setTimestamp(LocalDateTime.now());
        alert.setSource("TCP/UDP Access Layer");
        
        notificationService.sendAlert(alert);
        log.warn("发送告警: {} - {}", title, message);
    }
    
    /**
     * 告警级别
     */
    public enum AlertLevel {
        INFO, WARNING, CRITICAL
    }
    
    /**
     * 告警对象
     */
    @Data
    public static class Alert {
        private String title;
        private String message;
        private AlertLevel level;
        private LocalDateTime timestamp;
        private String source;
    }
}

# 9. 配置文件

# 9.1 主配置文件 (application.yml)

# 应用基础配置
server:
  port: 8080
  servlet:
    context-path: /iot

spring:
  application:
    name: iot-tcp-udp-access-layer
  profiles:
    active: dev
    
  # 数据库配置
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/iot_platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
    username: ${DB_USERNAME:root}
    password: ${DB_PASSWORD:123456}
    hikari:
      minimum-idle: 10
      maximum-pool-size: 50
      idle-timeout: 300000
      max-lifetime: 1800000
      connection-timeout: 30000
      validation-timeout: 5000
      leak-detection-threshold: 60000
      
  # Redis配置
  redis:
    host: ${REDIS_HOST:localhost}
    port: ${REDIS_PORT:6379}
    password: ${REDIS_PASSWORD:}
    database: 0
    timeout: 5000ms
    lettuce:
      pool:
        max-active: 20
        max-idle: 10
        min-idle: 5
        max-wait: 2000ms
        
  # JPA配置
  jpa:
    hibernate:
      ddl-auto: none
    show-sql: false
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQL8Dialect
        format_sql: true
        jdbc:
          batch_size: 50
          batch_versioned_data: true
        order_inserts: true
        order_updates: true
        cache:
          use_second_level_cache: true
          use_query_cache: true
          region:
            factory_class: org.hibernate.cache.jcache.JCacheRegionFactory
            
# 物联网TCP/UDP接入层配置
iot:
  tcp:
    server:
      # TCP服务器基础配置
      enabled: true
      port: 8888
      boss-threads: 1
      worker-threads: 8
      
      # 网络配置
      so-backlog: 1024
      so-keepalive: true
      tcp-nodelay: true
      so-reuseaddr: true
      so-rcvbuf: 65536
      so-sndbuf: 65536
      
      # 连接管理配置
      max-connections: 10000
      connection-idle-timeout: 600000
      connect-timeout: 30000
      read-timeout: 300000
      write-timeout: 30000
      
      # 心跳配置
      heartbeat-interval: 60000
      heartbeat-timeout: 180000
      
  udp:
    server:
      # UDP服务器基础配置
      enabled: true
      port: 9999
      worker-threads: 4
      
      # 网络配置
      so-rcvbuf: 65536
      so-sndbuf: 65536
      
      # 会话管理配置
      max-sessions: 50000
      session-timeout: 300000
      session-cleanup-interval: 60000
      
  message:
    processing:
      # 消息处理线程池配置
      core-pool-size: 10
      max-pool-size: 50
      queue-capacity: 1000
      keep-alive-seconds: 60
      thread-name-prefix: "message-processor-"
      
      # 批处理配置
      batch-enabled: true
      batch-size: 100
      batch-timeout: 1000
      
      # 消息大小限制
      max-message-size: 1048576  # 1MB
      
  cache:
    # 缓存配置
    connection-cache-size: 10000
    connection-cache-ttl: 3600
    session-cache-size: 50000
    session-cache-ttl: 1800
    device-cache-size: 100000
    device-cache-ttl: 7200
    
  protocol:
    # 协议解析配置
    parsers:
      - name: "simple-tlv"
        class: "com.iot.protocol.SimpleTlvProtocolParser"
        enabled: true
      - name: "modbus"
        class: "com.iot.protocol.ModbusProtocolParser"
        enabled: false
        
  alert:
    # 告警配置
    tcp:
      max-connections: 8000
    udp:
      max-sessions: 40000
    message:
      max-processing-time: 5000
    memory:
      max-usage-percent: 80
      
# 监控配置
management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,prometheus
  endpoint:
    health:
      show-details: always
  metrics:
    export:
      prometheus:
        enabled: true
        
# 日志配置
logging:
  level:
    com.iot: DEBUG
    io.netty: INFO
    org.springframework: INFO
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"
    file: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"
  file:
    name: logs/iot-tcp-udp-access.log
    max-size: 100MB
    max-history: 30

# 9.2 开发环境配置 (application-dev.yml)

# 开发环境配置
spring:
  datasource:
    url: jdbc:mysql://localhost:3306/iot_platform_dev?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
    username: dev_user
    password: dev_password
    
  redis:
    host: localhost
    port: 6379
    password: 
    
  jpa:
    show-sql: true
    hibernate:
      ddl-auto: update
      
iot:
  tcp:
    server:
      port: 8888
      max-connections: 1000
  udp:
    server:
      port: 9999
      max-sessions: 5000
      
logging:
  level:
    com.iot: DEBUG
    io.netty: DEBUG
    org.springframework.cache: DEBUG

# 9.3 生产环境配置 (application-prod.yml)

# 生产环境配置
spring:
  datasource:
    url: jdbc:mysql://${DB_HOST:mysql-cluster}:${DB_PORT:3306}/${DB_NAME:iot_platform}?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=Asia/Shanghai
    username: ${DB_USERNAME}
    password: ${DB_PASSWORD}
    hikari:
      minimum-idle: 20
      maximum-pool-size: 100
      
  redis:
    host: ${REDIS_HOST:redis-cluster}
    port: ${REDIS_PORT:6379}
    password: ${REDIS_PASSWORD}
    cluster:
      nodes: ${REDIS_CLUSTER_NODES}
      
  jpa:
    show-sql: false
    hibernate:
      ddl-auto: none
      
iot:
  tcp:
    server:
      port: ${TCP_PORT:8888}
      worker-threads: 16
      max-connections: 20000
  udp:
    server:
      port: ${UDP_PORT:9999}
      worker-threads: 8
      max-sessions: 100000
      
  message:
    processing:
      core-pool-size: 20
      max-pool-size: 100
      
logging:
  level:
    com.iot: INFO
    root: WARN
  file:
    name: /var/log/iot/tcp-udp-access.log

# 9.4 测试环境配置 (application-test.yml)

# 测试环境配置
spring:
  datasource:
    url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
    driver-class-name: org.h2.Driver
    username: sa
    password: 
    
  h2:
    console:
      enabled: true
      
  jpa:
    hibernate:
      ddl-auto: create-drop
    show-sql: true
    
iot:
  tcp:
    server:
      enabled: false  # 测试时禁用TCP服务器
  udp:
    server:
      enabled: false  # 测试时禁用UDP服务器
      
logging:
  level:
    com.iot: DEBUG

# 10. 总结

# 10.1 核心特性

本TCP/UDP设备接入层具备以下核心特性:

  1. 高性能网络通信

    • 基于Netty框架,支持高并发连接
    • TCP支持万级连接,UDP支持十万级会话
    • 优化的线程模型和内存管理
  2. 灵活的协议支持

    • 可插拔的协议解析器架构
    • 支持多种设备通信协议
    • 简单易扩展的协议适配机制
  3. 完善的连接管理

    • 自动连接状态监控
    • 心跳检测和超时处理
    • 连接池和会话管理
  4. 强大的消息处理

    • 异步消息处理机制
    • 批量处理优化
    • 消息路由和分发
  5. 全面的监控告警

    • 实时性能指标监控
    • 健康检查机制
    • 智能告警系统

# 10.2 适用场景

  1. 工业物联网

    • 工厂设备数据采集
    • 生产线监控系统
    • 设备状态管理
  2. 智能家居

    • 家电设备控制
    • 传感器数据收集
    • 安防系统集成
  3. 车联网

    • 车载设备通信
    • 位置信息上报
    • 远程诊断系统
  4. 智慧城市

    • 环境监测设备
    • 交通信号控制
    • 公共设施管理

# 10.3 性能指标

  • TCP连接数: 支持10,000+并发连接
  • UDP会话数: 支持50,000+并发会话
  • 消息吞吐量: 100,000+消息/秒
  • 响应时间: 平均<10ms
  • 可用性: 99.9%+

# 10.4 最佳实践

  1. 部署建议

    • 使用负载均衡分散连接压力
    • 配置合适的线程池大小
    • 监控系统资源使用情况
  2. 安全建议

    • 实施设备认证机制
    • 加密敏感数据传输
    • 定期更新安全策略
  3. 运维建议

    • 建立完善的监控体系
    • 制定故障应急预案
    • 定期进行性能调优
  4. 开发建议

    • 遵循协议解析器接口规范
    • 实现幂等的消息处理逻辑
    • 充分利用缓存机制

通过本文档的详细介绍,开发者可以深入理解TCP/UDP设备接入层的设计原理和实现细节,为构建高性能、高可靠的物联网设备接入系统提供有力支撑。


本文档持续更新中,如有问题或建议,请联系开发团队。

# 4.2 UDP会话实体

/**
 * UDP会话实体
 * 由于UDP是无连接的,我们通过会话来跟踪设备通信
 */
@Entity
@Table(name = "udp_session")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UdpSession {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    /**
     * 会话ID - 唯一标识一个UDP会话
     */
    @Column(name = "session_id", unique = true, nullable = false)
    private String sessionId;
    
    /**
     * 设备ID - 关联的设备标识
     */
    @Column(name = "device_id", nullable = false)
    private String deviceId;
    
    /**
     * 客户端IP地址
     */
    @Column(name = "client_ip", nullable = false)
    private String clientIp;
    
    /**
     * 客户端端口
     */
    @Column(name = "client_port", nullable = false)
    private Integer clientPort;
    
    /**
     * 服务器端口
     */
    @Column(name = "server_port", nullable = false)
    private Integer serverPort;
    
    /**
     * 会话状态
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "status", nullable = false)
    private SessionStatus status;
    
    /**
     * 协议类型
     */
    @Column(name = "protocol_type")
    private String protocolType;
    
    /**
     * 协议版本
     */
    @Column(name = "protocol_version")
    private String protocolVersion;
    
    /**
     * 会话开始时间
     */
    @Column(name = "start_time", nullable = false)
    private LocalDateTime startTime;
    
    /**
     * 最后活跃时间
     */
    @Column(name = "last_active_time")
    private LocalDateTime lastActiveTime;
    
    /**
     * 会话过期时间
     */
    @Column(name = "expire_time")
    private LocalDateTime expireTime;
    
    /**
     * 会话属性(JSON格式)
     */
    @Column(name = "attributes", columnDefinition = "JSON")
    private String attributes;
    
    /**
     * 发送包数
     */
    @Column(name = "packets_sent")
    private Long packetsSent = 0L;
    
    /**
     * 接收包数
     */
    @Column(name = "packets_received")
    private Long packetsReceived = 0L;
    
    /**
     * 发送字节数
     */
    @Column(name = "bytes_sent")
    private Long bytesSent = 0L;
    
    /**
     * 接收字节数
     */
    @Column(name = "bytes_received")
    private Long bytesReceived = 0L;
    
    /**
     * 丢包数
     */
    @Column(name = "packets_lost")
    private Long packetsLost = 0L;
    
    /**
     * 创建时间
     */
    @Column(name = "create_time", nullable = false)
    private LocalDateTime createTime;
    
    /**
     * 更新时间
     */
    @Column(name = "update_time", nullable = false)
    private LocalDateTime updateTime;
    
    /**
     * 会话状态枚举
     */
    public enum SessionStatus {
        ACTIVE,    // 活跃
        INACTIVE,  // 非活跃
        EXPIRED,   // 已过期
        CLOSED     // 已关闭
    }
    
    @PrePersist
    protected void onCreate() {
        createTime = LocalDateTime.now();
        updateTime = LocalDateTime.now();
        if (startTime == null) {
            startTime = LocalDateTime.now();
        }
        if (status == null) {
            status = SessionStatus.ACTIVE;
        }
    }
    
    @PreUpdate
    protected void onUpdate() {
        updateTime = LocalDateTime.now();
    }
    
    /**
     * 更新活跃时间
     */
    public void updateActiveTime() {
        this.lastActiveTime = LocalDateTime.now();
    }
    
    /**
     * 增加发送统计
     */
    public void addSentStats(long bytes, long packets) {
        this.bytesSent += bytes;
        this.packetsSent += packets;
        updateActiveTime();
    }
    
    /**
     * 增加接收统计
     */
    public void addReceivedStats(long bytes, long packets) {
        this.bytesReceived += bytes;
        this.packetsReceived += packets;
        updateActiveTime();
    }
    
    /**
     * 增加丢包统计
     */
    public void addLostPackets(long packets) {
        this.packetsLost += packets;
    }
    
    /**
     * 检查会话是否过期
     */
    public boolean isExpired() {
        if (expireTime == null) {
            return false;
        }
        return LocalDateTime.now().isAfter(expireTime);
    }
    
    /**
     * 检查会话是否超时
     */
    public boolean isTimeout(int timeoutMinutes) {
        if (lastActiveTime == null) {
            return false;
        }
        return lastActiveTime.isBefore(LocalDateTime.now().minusMinutes(timeoutMinutes));
    }
    
    /**
     * 计算丢包率
     */
    public double getPacketLossRate() {
        long totalPackets = packetsReceived + packetsLost;
        if (totalPackets == 0) {
            return 0.0;
        }
        return (double) packetsLost / totalPackets * 100;
    }
}

# 4.3 设备消息实体

/**
 * 设备消息实体
 * 存储通过TCP/UDP接收到的设备消息
 */
@Entity
@Table(name = "device_message")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DeviceMessage {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    /**
     * 消息ID - 唯一标识一条消息
     */
    @Column(name = "message_id", unique = true, nullable = false)
    private String messageId;
    
    /**
     * 设备ID
     */
    @Column(name = "device_id", nullable = false)
    private String deviceId;
    
    /**
     * 连接ID(TCP)或会话ID(UDP)
     */
    @Column(name = "connection_id")
    private String connectionId;
    
    /**
     * 传输协议类型
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "transport_protocol", nullable = false)
    private TransportProtocol transportProtocol;
    
    /**
     * 应用协议类型
     */
    @Column(name = "protocol_type")
    private String protocolType;
    
    /**
     * 消息类型
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "message_type", nullable = false)
    private MessageType messageType;
    
    /**
     * 消息方向
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "direction", nullable = false)
    private MessageDirection direction;
    
    /**
     * 原始消息内容(十六进制)
     */
    @Column(name = "raw_content", columnDefinition = "TEXT")
    private String rawContent;
    
    /**
     * 解析后的消息内容(JSON格式)
     */
    @Column(name = "parsed_content", columnDefinition = "JSON")
    private String parsedContent;
    
    /**
     * 消息长度(字节)
     */
    @Column(name = "message_length")
    private Integer messageLength;
    
    /**
     * 消息序号
     */
    @Column(name = "sequence_number")
    private Long sequenceNumber;
    
    /**
     * 消息时间戳
     */
    @Column(name = "timestamp", nullable = false)
    private LocalDateTime timestamp;
    
    /**
     * 客户端IP
     */
    @Column(name = "client_ip")
    private String clientIp;
    
    /**
     * 客户端端口
     */
    @Column(name = "client_port")
    private Integer clientPort;
    
    /**
     * 服务器端口
     */
    @Column(name = "server_port")
    private Integer serverPort;
    
    /**
     * 处理状态
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "process_status")
    private ProcessStatus processStatus;
    
    /**
     * 处理时间
     */
    @Column(name = "process_time")
    private LocalDateTime processTime;
    
    /**
     * 错误信息
     */
    @Column(name = "error_message")
    private String errorMessage;
    
    /**
     * 创建时间
     */
    @Column(name = "create_time", nullable = false)
    private LocalDateTime createTime;
    
    /**
     * 传输协议枚举
     */
    public enum TransportProtocol {
        TCP, UDP
    }
    
    /**
     * 消息类型枚举
     */
    public enum MessageType {
        HEARTBEAT,      // 心跳消息
        DATA,           // 数据消息
        COMMAND,        // 命令消息
        RESPONSE,       // 响应消息
        EVENT,          // 事件消息
        AUTHENTICATION, // 认证消息
        CONFIGURATION,  // 配置消息
        FIRMWARE,       // 固件消息
        OTHER          // 其他消息
    }
    
    /**
     * 消息方向枚举
     */
    public enum MessageDirection {
        INBOUND,  // 入站(设备到服务器)
        OUTBOUND  // 出站(服务器到设备)
    }
    
    /**
     * 处理状态枚举
     */
    public enum ProcessStatus {
        PENDING,    // 待处理
        PROCESSING, // 处理中
        PROCESSED,  // 已处理
        FAILED,     // 处理失败
        IGNORED     // 已忽略
    }
    
    @PrePersist
    protected void onCreate() {
        createTime = LocalDateTime.now();
        if (timestamp == null) {
            timestamp = LocalDateTime.now();
        }
        if (processStatus == null) {
            processStatus = ProcessStatus.PENDING;
        }
        if (messageId == null) {
            messageId = UUID.randomUUID().toString();
        }
    }
    
    /**
     * 标记为处理中
     */
    public void markAsProcessing() {
        this.processStatus = ProcessStatus.PROCESSING;
        this.processTime = LocalDateTime.now();
    }
    
    /**
     * 标记为已处理
     */
    public void markAsProcessed() {
        this.processStatus = ProcessStatus.PROCESSED;
        this.processTime = LocalDateTime.now();
    }
    
    /**
     * 标记为处理失败
     */
    public void markAsFailed(String errorMessage) {
        this.processStatus = ProcessStatus.FAILED;
        this.processTime = LocalDateTime.now();
        this.errorMessage = errorMessage;
    }
    
    /**
     * 检查是否为心跳消息
     */
    public boolean isHeartbeat() {
        return MessageType.HEARTBEAT.equals(this.messageType);
    }
    
    /**
     * 检查是否为数据消息
     */
    public boolean isDataMessage() {
        return MessageType.DATA.equals(this.messageType);
    }
    
    /**
     * 检查是否为入站消息
     */
    public boolean isInbound() {
        return MessageDirection.INBOUND.equals(this.direction);
    }
    
    /**
     * 检查是否为出站消息
     */
    public boolean isOutbound() {
        return MessageDirection.OUTBOUND.equals(this.direction);
    }
}

# 5. 核心服务实现

# 5.1 TCP服务器实现

/**
 * TCP服务器实现
 * 基于Netty框架构建高性能TCP服务器
 */
@Component
@Slf4j
public class TcpServer {
    
    @Autowired
    private TcpConnectionService connectionService;
    
    @Autowired
    private MessageProcessingService messageProcessingService;
    
    @Autowired
    private DeviceAuthenticationService authenticationService;
    
    @Value("${iot.tcp.server.port:8888}")
    private int serverPort;
    
    @Value("${iot.tcp.server.boss-threads:1}")
    private int bossThreads;
    
    @Value("${iot.tcp.server.worker-threads:8}")
    private int workerThreads;
    
    @Value("${iot.tcp.server.so-backlog:128}")
    private int soBacklog;
    
    @Value("${iot.tcp.server.so-keepalive:true}")
    private boolean soKeepAlive;
    
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private Channel serverChannel;
    
    /**
     * 启动TCP服务器
     */
    @PostConstruct
    public void start() {
        try {
            // 创建事件循环组
            bossGroup = new NioEventLoopGroup(bossThreads);
            workerGroup = new NioEventLoopGroup(workerThreads);
            
            // 创建服务器启动器
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, soBacklog)
                    .childOption(ChannelOption.SO_KEEPALIVE, soKeepAlive)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childHandler(new TcpChannelInitializer());
            
            // 绑定端口并启动服务器
            ChannelFuture future = bootstrap.bind(serverPort).sync();
            serverChannel = future.channel();
            
            log.info("TCP服务器启动成功,监听端口: {}", serverPort);
            
            // 监听服务器关闭
            serverChannel.closeFuture().addListener(closeFuture -> {
                log.info("TCP服务器已关闭");
            });
            
        } catch (Exception e) {
            log.error("TCP服务器启动失败", e);
            shutdown();
        }
    }
    
    /**
     * 关闭TCP服务器
     */
    @PreDestroy
    public void shutdown() {
        try {
            if (serverChannel != null) {
                serverChannel.close().sync();
            }
        } catch (InterruptedException e) {
            log.error("关闭服务器通道时发生异常", e);
        } finally {
            if (workerGroup != null) {
                workerGroup.shutdownGracefully();
            }
            if (bossGroup != null) {
                bossGroup.shutdownGracefully();
            }
            log.info("TCP服务器资源已释放");
        }
    }
    
    /**
     * TCP通道初始化器
     */
    private class TcpChannelInitializer extends ChannelInitializer<SocketChannel> {
        
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            
            // 添加编解码器
            pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
                    65536, 0, 4, 0, 4));
            pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
            
            // 添加字节数组编解码器
            pipeline.addLast("decoder", new ByteArrayDecoder());
            pipeline.addLast("encoder", new ByteArrayEncoder());
            
            // 添加空闲状态处理器(心跳检测)
            pipeline.addLast("idleStateHandler", new IdleStateHandler(
                    300, 0, 0, TimeUnit.SECONDS)); // 5分钟读超时
            
            // 添加业务处理器
            pipeline.addLast("tcpHandler", new TcpServerHandler());
        }
    }
    
    /**
     * TCP服务器处理器
     */
    @ChannelHandler.Sharable
    private class TcpServerHandler extends SimpleChannelInboundHandler<byte[]> {
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 客户端连接建立
            String connectionId = generateConnectionId(ctx.channel());
            String clientIp = getClientIp(ctx.channel());
            int clientPort = getClientPort(ctx.channel());
            
            log.info("TCP客户端连接建立: {} - {}:{}", connectionId, clientIp, clientPort);
            
            // 创建连接记录
            TcpConnection connection = new TcpConnection();
            connection.setConnectionId(connectionId);
            connection.setClientIp(clientIp);
            connection.setClientPort(clientPort);
            connection.setServerPort(serverPort);
            connection.setStatus(TcpConnection.ConnectionStatus.CONNECTED);
            connection.setConnectTime(LocalDateTime.now());
            
            // 保存连接到上下文
            ctx.channel().attr(AttributeKey.valueOf("connection")).set(connection);
            
            // 保存连接到服务
            connectionService.saveConnection(connection);
            
            super.channelActive(ctx);
        }
        
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            // 客户端连接断开
            TcpConnection connection = getConnection(ctx.channel());
            if (connection != null) {
                log.info("TCP客户端连接断开: {}", connection.getConnectionId());
                
                // 更新连接状态
                connection.setStatus(TcpConnection.ConnectionStatus.DISCONNECTED);
                connectionService.updateConnection(connection);
            }
            
            super.channelInactive(ctx);
        }
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, byte[] data) throws Exception {
            // 接收到客户端数据
            TcpConnection connection = getConnection(ctx.channel());
            if (connection == null) {
                log.warn("未找到连接信息,忽略消息");
                return;
            }
            
            try {
                // 更新连接统计
                connection.addReceivedStats(data.length, 1);
                connectionService.updateConnection(connection);
                
                // 创建设备消息
                DeviceMessage message = new DeviceMessage();
                message.setConnectionId(connection.getConnectionId());
                message.setTransportProtocol(DeviceMessage.TransportProtocol.TCP);
                message.setDirection(DeviceMessage.MessageDirection.INBOUND);
                message.setRawContent(bytesToHex(data));
                message.setMessageLength(data.length);
                message.setClientIp(connection.getClientIp());
                message.setClientPort(connection.getClientPort());
                message.setServerPort(connection.getServerPort());
                
                // 异步处理消息
                messageProcessingService.processMessage(message, ctx.channel());
                
            } catch (Exception e) {
                log.error("处理TCP消息时发生异常", e);
            }
        }
        
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.READER_IDLE) {
                    // 读超时,关闭连接
                    TcpConnection connection = getConnection(ctx.channel());
                    if (connection != null) {
                        log.warn("TCP连接读超时,关闭连接: {}", connection.getConnectionId());
                    }
                    ctx.close();
                }
            }
            super.userEventTriggered(ctx, evt);
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            TcpConnection connection = getConnection(ctx.channel());
            if (connection != null) {
                log.error("TCP连接发生异常: {}", connection.getConnectionId(), cause);
                
                // 更新连接状态
                connection.setStatus(TcpConnection.ConnectionStatus.ERROR);
                connectionService.updateConnection(connection);
            } else {
                log.error("TCP连接发生异常", cause);
            }
            
            ctx.close();
        }
        
        /**
         * 获取连接信息
         */
        private TcpConnection getConnection(Channel channel) {
            return (TcpConnection) channel.attr(AttributeKey.valueOf("connection")).get();
        }
        
        /**
         * 生成连接ID
         */
        private String generateConnectionId(Channel channel) {
            return "tcp_" + channel.id().asShortText() + "_" + System.currentTimeMillis();
        }
        
        /**
         * 获取客户端IP
         */
        private String getClientIp(Channel channel) {
            InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
            return remoteAddress.getAddress().getHostAddress();
        }
        
        /**
         * 获取客户端端口
         */
        private int getClientPort(Channel channel) {
            InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
            return remoteAddress.getPort();
        }
        
        /**
         * 字节数组转十六进制字符串
         */
        private String bytesToHex(byte[] bytes) {
            StringBuilder sb = new StringBuilder();
            for (byte b : bytes) {
                sb.append(String.format("%02X", b));
            }
            return sb.toString();
        }
    }
    
    /**
     * 向指定连接发送数据
     */
    public void sendData(String connectionId, byte[] data) {
        TcpConnection connection = connectionService.getConnection(connectionId);
        if (connection == null) {
            log.warn("未找到连接: {}", connectionId);
            return;
        }
        
        Channel channel = connectionService.getChannel(connectionId);
        if (channel == null || !channel.isActive()) {
            log.warn("连接通道不可用: {}", connectionId);
            return;
        }
        
        try {
            // 发送数据
            channel.writeAndFlush(data).addListener(future -> {
                if (future.isSuccess()) {
                    // 更新发送统计
                    connection.addSentStats(data.length, 1);
                    connectionService.updateConnection(connection);
                    
                    log.debug("TCP数据发送成功: {} - {} bytes", connectionId, data.length);
                } else {
                    log.error("TCP数据发送失败: {}", connectionId, future.cause());
                }
            });
            
        } catch (Exception e) {
            log.error("发送TCP数据时发生异常: {}", connectionId, e);
        }
    }
    
    /**
     * 广播数据到所有连接
     */
    public void broadcastData(byte[] data) {
        List<TcpConnection> connections = connectionService.getActiveConnections();
        for (TcpConnection connection : connections) {
            sendData(connection.getConnectionId(), data);
        }
    }
    
    /**
     * 获取服务器状态
     */
    public Map<String, Object> getServerStatus() {
        Map<String, Object> status = new HashMap<>();
        status.put("port", serverPort);
        status.put("running", serverChannel != null && serverChannel.isActive());
        status.put("activeConnections", connectionService.getActiveConnectionCount());
        status.put("totalConnections", connectionService.getTotalConnectionCount());
        return status;
    }
}