TCP/UDP设备接入层详细文档
# TCP/UDP设备接入层详细文档
# 1. 背景故事
在IoT世界的深处,有一群特殊的设备——它们来自工业现场、嵌入式系统和对性能要求极高的应用场景。这些设备不满足于HTTP的"请求-响应"模式,也不需要MQTT的发布订阅机制,它们需要的是最直接、最高效的通信方式。
想象一下:
- 一台高速运转的工业机器人,每秒需要发送数千条位置数据
- 一个车联网系统,需要实时传输车辆的GPS轨迹
- 一套安防监控系统,要求毫秒级的响应时间
- 一个智能电网,需要可靠地传输电力数据
这些场景催生了TCP/UDP设备接入层的诞生。TCP提供可靠的连接,确保数据不丢失;UDP提供极速的传输,追求最低的延迟。它们就像IoT世界的高速公路,为那些对性能有极致要求的设备提供专属通道。
# 2. TCP/UDP协议原理
# 2.1 TCP协议特点
**TCP(传输控制协议)**是一种面向连接的、可靠的传输协议:
- 面向连接:通信前需要建立连接(三次握手)
- 可靠传输:保证数据包的顺序和完整性
- 流量控制:防止发送方发送过快导致接收方缓冲区溢出
- 拥塞控制:根据网络状况调整发送速率
- 全双工通信:支持双向数据传输
TCP连接建立过程(三次握手):
客户端 服务器
| |
|-------- SYN ---------->|
| |
|<------- SYN+ACK -------|
| |
|-------- ACK ---------->|
| |
| 连接建立成功 |
# 2.2 UDP协议特点
**UDP(用户数据报协议)**是一种无连接的、不可靠的传输协议:
- 无连接:发送数据前不需要建立连接
- 不可靠:不保证数据包的顺序和到达
- 低开销:协议头部简单,开销小
- 高效率:传输速度快,延迟低
- 支持广播:可以向多个目标发送数据
UDP数据传输过程:
客户端 服务器
| |
|-------- 数据包 ------->|
|-------- 数据包 ------->|
|-------- 数据包 ------->|
| |
| 无需确认应答 |
# 2.3 TCP vs UDP 对比
特性 | TCP | UDP |
---|---|---|
连接性 | 面向连接 | 无连接 |
可靠性 | 可靠传输 | 不可靠传输 |
速度 | 较慢 | 较快 |
开销 | 较大 | 较小 |
应用场景 | 数据完整性要求高 | 实时性要求高 |
流量控制 | 支持 | 不支持 |
拥塞控制 | 支持 | 不支持 |
# 3. 系统架构
# 3.1 整体架构图
┌─────────────────────────────────────────────────────────────┐
│ TCP/UDP设备接入层 │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ TCP服务器 │ │ UDP服务器 │ │ 协议解析器 │ │
│ │ │ │ │ │ │ │
│ │ - 连接管理 │ │ - 数据接收 │ │ - 消息解码 │ │
│ │ - 数据接收 │ │ - 数据发送 │ │ - 消息编码 │ │
│ │ - 心跳检测 │ │ - 广播支持 │ │ - 协议适配 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 连接池管理 │ │ 消息队列 │ │ 数据处理 │ │
│ │ │ │ │ │ │ │
│ │ - TCP连接池 │ │ - 消息缓存 │ │ - 数据验证 │ │
│ │ - 连接监控 │ │ - 异步处理 │ │ - 数据转换 │ │
│ │ - 负载均衡 │ │ - 消息路由 │ │ - 数据存储 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 设备管理 │ │ 安全认证 │ │ 监控告警 │ │
│ │ │ │ │ │ │ │
│ │ - 设备注册 │ │ - 设备认证 │ │ - 性能监控 │ │
│ │ - 状态管理 │ │ - 数据加密 │ │ - 异常告警 │ │
│ │ - 配置管理 │ │ - 访问控制 │ │ - 日志记录 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 数据存储层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ MySQL │ │ Redis │ │ InfluxDB │ │
│ │ 设备信息 │ │ 连接缓存 │ │ 时序数据 │ │
│ │ 配置数据 │ │ 会话管理 │ │ 性能指标 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
# 3.2 核心组件说明
- TCP服务器:处理TCP连接和数据传输
- UDP服务器:处理UDP数据包的接收和发送
- 协议解析器:解析和构造各种设备协议
- 连接池管理:管理TCP连接的生命周期
- 消息队列:异步处理接收到的数据
- 数据处理:验证、转换和存储设备数据
- 设备管理:管理设备的注册和状态
- 安全认证:确保通信安全
- 监控告警:监控系统性能和异常
# 4. 核心实体设计
# 4.1 TCP连接实体
/**
* TCP连接实体
* 代表一个TCP设备连接的完整信息
*/
@Entity
@Table(name = "tcp_connection")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TcpConnection {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 连接ID - 唯一标识一个TCP连接
*/
@Column(name = "connection_id", unique = true, nullable = false)
private String connectionId;
/**
* 设备ID - 关联的设备标识
*/
@Column(name = "device_id", nullable = false)
private String deviceId;
/**
* 客户端IP地址
*/
@Column(name = "client_ip", nullable = false)
private String clientIp;
/**
* 客户端端口
*/
@Column(name = "client_port", nullable = false)
private Integer clientPort;
/**
* 服务器端口
*/
@Column(name = "server_port", nullable = false)
private Integer serverPort;
/**
* 连接状态
*/
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
private ConnectionStatus status;
/**
* 协议类型
*/
@Column(name = "protocol_type")
private String protocolType;
/**
* 协议版本
*/
@Column(name = "protocol_version")
private String protocolVersion;
/**
* 连接建立时间
*/
@Column(name = "connect_time", nullable = false)
private LocalDateTime connectTime;
/**
* 最后活跃时间
*/
@Column(name = "last_active_time")
private LocalDateTime lastActiveTime;
/**
* 最后心跳时间
*/
@Column(name = "last_heartbeat_time")
private LocalDateTime lastHeartbeatTime;
/**
* 连接属性(JSON格式)
*/
@Column(name = "attributes", columnDefinition = "JSON")
private String attributes;
/**
* 发送字节数
*/
@Column(name = "bytes_sent")
private Long bytesSent = 0L;
/**
* 接收字节数
*/
@Column(name = "bytes_received")
private Long bytesReceived = 0L;
/**
* 发送消息数
*/
@Column(name = "messages_sent")
private Long messagesSent = 0L;
/**
* 接收消息数
*/
@Column(name = "messages_received")
private Long messagesReceived = 0L;
/**
* 创建时间
*/
@Column(name = "create_time", nullable = false)
private LocalDateTime createTime;
/**
* 更新时间
*/
@Column(name = "update_time", nullable = false)
private LocalDateTime updateTime;
/**
* 连接状态枚举
*/
public enum ConnectionStatus {
CONNECTING, // 连接中
CONNECTED, // 已连接
AUTHENTICATED, // 已认证
DISCONNECTED, // 已断开
ERROR // 错误状态
}
@PrePersist
protected void onCreate() {
createTime = LocalDateTime.now();
updateTime = LocalDateTime.now();
if (connectTime == null) {
connectTime = LocalDateTime.now();
}
if (status == null) {
status = ConnectionStatus.CONNECTING;
}
}
@PreUpdate
protected void onUpdate() {
updateTime = LocalDateTime.now();
}
/**
* 更新活跃时间
*/
public void updateActiveTime() {
this.lastActiveTime = LocalDateTime.now();
}
/**
* 更新心跳时间
*/
public void updateHeartbeatTime() {
this.lastHeartbeatTime = LocalDateTime.now();
updateActiveTime();
}
/**
* 增加发送统计
*/
public void addSentStats(long bytes, long messages) {
this.bytesSent += bytes;
this.messagesSent += messages;
updateActiveTime();
}
/**
* 增加接收统计
*/
public void addReceivedStats(long bytes, long messages) {
this.bytesReceived += bytes;
this.messagesReceived += messages;
updateActiveTime();
}
/**
* 检查连接是否超时
*/
public boolean isTimeout(int timeoutMinutes) {
if (lastActiveTime == null) {
return false;
}
return lastActiveTime.isBefore(LocalDateTime.now().minusMinutes(timeoutMinutes));
}
/**
* 检查心跳是否超时
*/
public boolean isHeartbeatTimeout(int timeoutMinutes) {
if (lastHeartbeatTime == null) {
return false;
}
return lastHeartbeatTime.isBefore(LocalDateTime.now().minusMinutes(timeoutMinutes));
}
}
# 5.2 UDP服务器实现
/**
* UDP服务器实现
* 基于Netty框架构建高性能UDP服务器
*/
@Component
@Slf4j
public class UdpServer {
@Autowired
private UdpSessionService sessionService;
@Autowired
private MessageProcessingService messageProcessingService;
@Autowired
private DeviceAuthenticationService authenticationService;
@Value("${iot.udp.server.port:9999}")
private int serverPort;
@Value("${iot.udp.server.worker-threads:4}")
private int workerThreads;
@Value("${iot.udp.server.so-rcvbuf:65536}")
private int soRcvBuf;
@Value("${iot.udp.server.so-sndbuf:65536}")
private int soSndBuf;
private EventLoopGroup workerGroup;
private Channel serverChannel;
/**
* 启动UDP服务器
*/
@PostConstruct
public void start() {
try {
// 创建事件循环组
workerGroup = new NioEventLoopGroup(workerThreads);
// 创建服务器启动器
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.SO_RCVBUF, soRcvBuf)
.option(ChannelOption.SO_SNDBUF, soSndBuf)
.handler(new UdpServerHandler());
// 绑定端口并启动服务器
ChannelFuture future = bootstrap.bind(serverPort).sync();
serverChannel = future.channel();
log.info("UDP服务器启动成功,监听端口: {}", serverPort);
// 监听服务器关闭
serverChannel.closeFuture().addListener(closeFuture -> {
log.info("UDP服务器已关闭");
});
} catch (Exception e) {
log.error("UDP服务器启动失败", e);
shutdown();
}
}
/**
* 关闭UDP服务器
*/
@PreDestroy
public void shutdown() {
try {
if (serverChannel != null) {
serverChannel.close().sync();
}
} catch (InterruptedException e) {
log.error("关闭服务器通道时发生异常", e);
} finally {
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
log.info("UDP服务器资源已释放");
}
}
/**
* UDP服务器处理器
*/
private class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
// 获取客户端地址信息
InetSocketAddress sender = packet.sender();
String clientIp = sender.getAddress().getHostAddress();
int clientPort = sender.getPort();
// 获取数据内容
ByteBuf content = packet.content();
byte[] data = new byte[content.readableBytes()];
content.readBytes(data);
try {
// 获取或创建UDP会话
String sessionKey = clientIp + ":" + clientPort;
UdpSession session = sessionService.getOrCreateSession(sessionKey, clientIp, clientPort, serverPort);
// 更新会话统计
session.addReceivedStats(data.length, 1);
sessionService.updateSession(session);
// 创建设备消息
DeviceMessage message = new DeviceMessage();
message.setConnectionId(session.getSessionId());
message.setTransportProtocol(DeviceMessage.TransportProtocol.UDP);
message.setDirection(DeviceMessage.MessageDirection.INBOUND);
message.setRawContent(bytesToHex(data));
message.setMessageLength(data.length);
message.setClientIp(clientIp);
message.setClientPort(clientPort);
message.setServerPort(serverPort);
// 异步处理消息
messageProcessingService.processMessage(message, ctx.channel(), sender);
} catch (Exception e) {
log.error("处理UDP消息时发生异常: {}:{}", clientIp, clientPort, e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("UDP服务器发生异常", cause);
}
/**
* 字节数组转十六进制字符串
*/
private String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X", b));
}
return sb.toString();
}
}
/**
* 向指定地址发送UDP数据
*/
public void sendData(String clientIp, int clientPort, byte[] data) {
if (serverChannel == null || !serverChannel.isActive()) {
log.warn("UDP服务器通道不可用");
return;
}
try {
// 创建目标地址
InetSocketAddress recipient = new InetSocketAddress(clientIp, clientPort);
// 创建数据包
ByteBuf buffer = Unpooled.copiedBuffer(data);
DatagramPacket packet = new DatagramPacket(buffer, recipient);
// 发送数据包
serverChannel.writeAndFlush(packet).addListener(future -> {
if (future.isSuccess()) {
// 更新会话发送统计
String sessionKey = clientIp + ":" + clientPort;
UdpSession session = sessionService.getSession(sessionKey);
if (session != null) {
session.addSentStats(data.length, 1);
sessionService.updateSession(session);
}
log.debug("UDP数据发送成功: {}:{} - {} bytes", clientIp, clientPort, data.length);
} else {
log.error("UDP数据发送失败: {}:{}", clientIp, clientPort, future.cause());
}
});
} catch (Exception e) {
log.error("发送UDP数据时发生异常: {}:{}", clientIp, clientPort, e);
}
}
/**
* 广播UDP数据
*/
public void broadcastData(String broadcastAddress, int port, byte[] data) {
sendData(broadcastAddress, port, data);
}
/**
* 向活跃会话发送数据
*/
public void sendToActiveSessions(byte[] data) {
List<UdpSession> activeSessions = sessionService.getActiveSessions();
for (UdpSession session : activeSessions) {
sendData(session.getClientIp(), session.getClientPort(), data);
}
}
/**
* 获取服务器状态
*/
public Map<String, Object> getServerStatus() {
Map<String, Object> status = new HashMap<>();
status.put("port", serverPort);
status.put("running", serverChannel != null && serverChannel.isActive());
status.put("activeSessions", sessionService.getActiveSessionCount());
status.put("totalSessions", sessionService.getTotalSessionCount());
return status;
}
}
# 5.3 消息处理服务
/**
* 消息处理服务
* 负责处理TCP/UDP接收到的消息
*/
@Service
@Slf4j
public class MessageProcessingService {
@Autowired
private DeviceMessageRepository messageRepository;
@Autowired
private ProtocolParserService protocolParserService;
@Autowired
private DeviceService deviceService;
@Autowired
private DeviceDataService deviceDataService;
@Autowired
private CommandResponseService commandResponseService;
@Autowired
@Qualifier("messageProcessingExecutor")
private ThreadPoolTaskExecutor messageProcessingExecutor;
/**
* 处理TCP消息
*/
public void processMessage(DeviceMessage message, Channel channel) {
messageProcessingExecutor.submit(() -> {
try {
// 保存原始消息
message = messageRepository.save(message);
// 解析消息协议
parseMessageProtocol(message);
// 根据消息类型进行处理
processMessageByType(message, channel, null);
} catch (Exception e) {
log.error("处理TCP消息失败: {}", message.getMessageId(), e);
message.markAsFailed(e.getMessage());
messageRepository.save(message);
}
});
}
/**
* 处理UDP消息
*/
public void processMessage(DeviceMessage message, Channel channel, InetSocketAddress sender) {
messageProcessingExecutor.submit(() -> {
try {
// 保存原始消息
message = messageRepository.save(message);
// 解析消息协议
parseMessageProtocol(message);
// 根据消息类型进行处理
processMessageByType(message, channel, sender);
} catch (Exception e) {
log.error("处理UDP消息失败: {}", message.getMessageId(), e);
message.markAsFailed(e.getMessage());
messageRepository.save(message);
}
});
}
/**
* 解析消息协议
*/
private void parseMessageProtocol(DeviceMessage message) {
try {
// 标记为处理中
message.markAsProcessing();
messageRepository.save(message);
// 将十六进制字符串转换为字节数组
byte[] rawData = hexToBytes(message.getRawContent());
// 使用协议解析器解析消息
ProtocolParseResult parseResult = protocolParserService.parseMessage(rawData);
if (parseResult.isSuccess()) {
// 解析成功,更新消息信息
message.setProtocolType(parseResult.getProtocolType());
message.setMessageType(parseResult.getMessageType());
message.setDeviceId(parseResult.getDeviceId());
message.setParsedContent(parseResult.getParsedContentJson());
message.setSequenceNumber(parseResult.getSequenceNumber());
log.debug("消息解析成功: {} - 协议: {}, 类型: {}, 设备: {}",
message.getMessageId(),
parseResult.getProtocolType(),
parseResult.getMessageType(),
parseResult.getDeviceId());
} else {
// 解析失败
log.warn("消息解析失败: {} - {}", message.getMessageId(), parseResult.getErrorMessage());
message.setMessageType(DeviceMessage.MessageType.OTHER);
message.setErrorMessage(parseResult.getErrorMessage());
}
} catch (Exception e) {
log.error("解析消息协议时发生异常: {}", message.getMessageId(), e);
message.setMessageType(DeviceMessage.MessageType.OTHER);
message.setErrorMessage(e.getMessage());
}
}
/**
* 根据消息类型进行处理
*/
private void processMessageByType(DeviceMessage message, Channel channel, InetSocketAddress sender) {
try {
switch (message.getMessageType()) {
case HEARTBEAT:
processHeartbeatMessage(message, channel, sender);
break;
case DATA:
processDataMessage(message);
break;
case AUTHENTICATION:
processAuthenticationMessage(message, channel, sender);
break;
case COMMAND:
processCommandMessage(message);
break;
case RESPONSE:
processResponseMessage(message);
break;
case EVENT:
processEventMessage(message);
break;
case CONFIGURATION:
processConfigurationMessage(message);
break;
case FIRMWARE:
processFirmwareMessage(message);
break;
default:
processOtherMessage(message);
break;
}
// 标记为已处理
message.markAsProcessed();
messageRepository.save(message);
} catch (Exception e) {
log.error("处理消息类型时发生异常: {} - {}", message.getMessageId(), message.getMessageType(), e);
throw e;
}
}
/**
* 处理心跳消息
*/
private void processHeartbeatMessage(DeviceMessage message, Channel channel, InetSocketAddress sender) {
log.debug("处理心跳消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
// 更新设备最后心跳时间
if (StringUtils.hasText(message.getDeviceId())) {
deviceService.updateLastHeartbeat(message.getDeviceId());
// 更新连接心跳时间
if (message.getTransportProtocol() == DeviceMessage.TransportProtocol.TCP) {
// TCP连接心跳更新在TcpConnectionService中处理
}
}
// 发送心跳响应(如果需要)
sendHeartbeatResponse(message, channel, sender);
}
/**
* 处理数据消息
*/
private void processDataMessage(DeviceMessage message) {
log.debug("处理数据消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
if (StringUtils.hasText(message.getDeviceId()) && StringUtils.hasText(message.getParsedContent())) {
try {
// 解析设备数据
DeviceData deviceData = parseDeviceData(message);
// 保存设备数据
deviceDataService.saveDeviceData(deviceData);
// 更新设备最后数据上报时间
deviceService.updateLastDataReport(message.getDeviceId());
log.debug("设备数据保存成功: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
} catch (Exception e) {
log.error("处理设备数据时发生异常: {} - 设备: {}", message.getMessageId(), message.getDeviceId(), e);
}
}
}
/**
* 处理认证消息
*/
private void processAuthenticationMessage(DeviceMessage message, Channel channel, InetSocketAddress sender) {
log.debug("处理认证消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
// 实现设备认证逻辑
// ...
}
/**
* 处理命令消息
*/
private void processCommandMessage(DeviceMessage message) {
log.debug("处理命令消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
// 实现命令处理逻辑
// ...
}
/**
* 处理响应消息
*/
private void processResponseMessage(DeviceMessage message) {
log.debug("处理响应消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
// 处理命令响应
if (StringUtils.hasText(message.getParsedContent())) {
commandResponseService.handleCommandResponse(message);
}
}
/**
* 处理事件消息
*/
private void processEventMessage(DeviceMessage message) {
log.debug("处理事件消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
// 实现事件处理逻辑
// ...
}
/**
* 处理配置消息
*/
private void processConfigurationMessage(DeviceMessage message) {
log.debug("处理配置消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
// 实现配置处理逻辑
// ...
}
/**
* 处理固件消息
*/
private void processFirmwareMessage(DeviceMessage message) {
log.debug("处理固件消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
// 实现固件处理逻辑
// ...
}
/**
* 处理其他消息
*/
private void processOtherMessage(DeviceMessage message) {
log.debug("处理其他消息: {} - 设备: {}", message.getMessageId(), message.getDeviceId());
// 记录未知消息类型
log.warn("收到未知类型消息: {} - 原始内容: {}", message.getMessageId(), message.getRawContent());
}
/**
* 发送心跳响应
*/
private void sendHeartbeatResponse(DeviceMessage message, Channel channel, InetSocketAddress sender) {
// 根据协议类型构造心跳响应
byte[] responseData = protocolParserService.buildHeartbeatResponse(message.getProtocolType());
if (responseData != null && responseData.length > 0) {
if (message.getTransportProtocol() == DeviceMessage.TransportProtocol.TCP) {
// TCP响应
if (channel != null && channel.isActive()) {
channel.writeAndFlush(responseData);
}
} else if (message.getTransportProtocol() == DeviceMessage.TransportProtocol.UDP) {
// UDP响应
if (channel != null && channel.isActive() && sender != null) {
ByteBuf buffer = Unpooled.copiedBuffer(responseData);
DatagramPacket packet = new DatagramPacket(buffer, sender);
channel.writeAndFlush(packet);
}
}
}
}
/**
* 解析设备数据
*/
private DeviceData parseDeviceData(DeviceMessage message) {
// 根据解析后的内容创建设备数据对象
DeviceData deviceData = new DeviceData();
deviceData.setDeviceId(message.getDeviceId());
deviceData.setDataType("sensor"); // 根据实际情况设置
deviceData.setTimestamp(message.getTimestamp());
deviceData.setData(message.getParsedContent());
return deviceData;
}
/**
* 十六进制字符串转字节数组
*/
private byte[] hexToBytes(String hex) {
int len = hex.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4)
+ Character.digit(hex.charAt(i + 1), 16));
}
return data;
}
}
# 5.4 协议解析服务
/**
* 协议解析服务
* 负责解析各种设备协议
*/
@Service
@Slf4j
public class ProtocolParserService {
@Autowired
private List<ProtocolParser> protocolParsers;
/**
* 解析消息
*/
public ProtocolParseResult parseMessage(byte[] data) {
// 尝试使用各种协议解析器解析消息
for (ProtocolParser parser : protocolParsers) {
if (parser.canParse(data)) {
try {
return parser.parse(data);
} catch (Exception e) {
log.warn("协议解析器 {} 解析失败", parser.getProtocolType(), e);
}
}
}
// 如果所有解析器都无法解析,返回失败结果
return ProtocolParseResult.failure("无法识别的协议格式");
}
/**
* 构造心跳响应
*/
public byte[] buildHeartbeatResponse(String protocolType) {
for (ProtocolParser parser : protocolParsers) {
if (parser.getProtocolType().equals(protocolType)) {
return parser.buildHeartbeatResponse();
}
}
return null;
}
/**
* 构造命令消息
*/
public byte[] buildCommandMessage(String protocolType, String deviceId, String command, Object params) {
for (ProtocolParser parser : protocolParsers) {
if (parser.getProtocolType().equals(protocolType)) {
return parser.buildCommandMessage(deviceId, command, params);
}
}
return null;
}
}
/**
* 协议解析器接口
*/
public interface ProtocolParser {
/**
* 获取协议类型
*/
String getProtocolType();
/**
* 判断是否可以解析该数据
*/
boolean canParse(byte[] data);
/**
* 解析数据
*/
ProtocolParseResult parse(byte[] data);
/**
* 构造心跳响应
*/
byte[] buildHeartbeatResponse();
/**
* 构造命令消息
*/
byte[] buildCommandMessage(String deviceId, String command, Object params);
}
/**
* 协议解析结果
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ProtocolParseResult {
private boolean success;
private String protocolType;
private DeviceMessage.MessageType messageType;
private String deviceId;
private String parsedContentJson;
private Long sequenceNumber;
private String errorMessage;
public static ProtocolParseResult success(String protocolType,
DeviceMessage.MessageType messageType,
String deviceId,
String parsedContentJson) {
ProtocolParseResult result = new ProtocolParseResult();
result.setSuccess(true);
result.setProtocolType(protocolType);
result.setMessageType(messageType);
result.setDeviceId(deviceId);
result.setParsedContentJson(parsedContentJson);
return result;
}
public static ProtocolParseResult failure(String errorMessage) {
ProtocolParseResult result = new ProtocolParseResult();
result.setSuccess(false);
result.setErrorMessage(errorMessage);
return result;
}
}
/**
* 示例协议解析器 - 简单的TLV协议
*/
@Component
@Slf4j
public class SimpleTlvProtocolParser implements ProtocolParser {
private static final String PROTOCOL_TYPE = "SIMPLE_TLV";
private static final byte HEARTBEAT_TYPE = 0x01;
private static final byte DATA_TYPE = 0x02;
private static final byte COMMAND_TYPE = 0x03;
private static final byte RESPONSE_TYPE = 0x04;
@Override
public String getProtocolType() {
return PROTOCOL_TYPE;
}
@Override
public boolean canParse(byte[] data) {
// 简单的TLV协议:至少需要3个字节(Type + Length + Value)
if (data == null || data.length < 3) {
return false;
}
// 检查类型字段是否为已知类型
byte type = data[0];
return type == HEARTBEAT_TYPE || type == DATA_TYPE || type == COMMAND_TYPE || type == RESPONSE_TYPE;
}
@Override
public ProtocolParseResult parse(byte[] data) {
try {
// 解析TLV结构
byte type = data[0];
int length = data[1] & 0xFF;
if (data.length < 2 + length) {
return ProtocolParseResult.failure("数据长度不足");
}
byte[] value = new byte[length];
System.arraycopy(data, 2, value, 0, length);
// 根据类型解析消息
DeviceMessage.MessageType messageType;
String deviceId = "";
Map<String, Object> parsedData = new HashMap<>();
switch (type) {
case HEARTBEAT_TYPE:
messageType = DeviceMessage.MessageType.HEARTBEAT;
deviceId = new String(value, StandardCharsets.UTF_8);
parsedData.put("type", "heartbeat");
parsedData.put("deviceId", deviceId);
break;
case DATA_TYPE:
messageType = DeviceMessage.MessageType.DATA;
// 假设前8字节是设备ID,后面是数据
if (value.length >= 8) {
deviceId = new String(value, 0, 8, StandardCharsets.UTF_8).trim();
byte[] sensorData = new byte[value.length - 8];
System.arraycopy(value, 8, sensorData, 0, sensorData.length);
parsedData.put("type", "data");
parsedData.put("deviceId", deviceId);
parsedData.put("sensorData", bytesToHex(sensorData));
}
break;
case COMMAND_TYPE:
messageType = DeviceMessage.MessageType.COMMAND;
// 解析命令数据
parsedData.put("type", "command");
parsedData.put("commandData", bytesToHex(value));
break;
case RESPONSE_TYPE:
messageType = DeviceMessage.MessageType.RESPONSE;
// 解析响应数据
parsedData.put("type", "response");
parsedData.put("responseData", bytesToHex(value));
break;
default:
return ProtocolParseResult.failure("未知的消息类型: " + type);
}
// 转换为JSON字符串
ObjectMapper objectMapper = new ObjectMapper();
String parsedContentJson = objectMapper.writeValueAsString(parsedData);
return ProtocolParseResult.success(PROTOCOL_TYPE, messageType, deviceId, parsedContentJson);
} catch (Exception e) {
log.error("解析SimpleTLV协议时发生异常", e);
return ProtocolParseResult.failure("解析异常: " + e.getMessage());
}
}
@Override
public byte[] buildHeartbeatResponse() {
// 构造心跳响应:Type(1) + Length(1) + Value("OK")
byte[] response = new byte[4];
response[0] = HEARTBEAT_TYPE;
response[1] = 2; // "OK"的长度
response[2] = 'O';
response[3] = 'K';
return response;
}
@Override
public byte[] buildCommandMessage(String deviceId, String command, Object params) {
try {
// 构造命令消息
Map<String, Object> commandData = new HashMap<>();
commandData.put("deviceId", deviceId);
commandData.put("command", command);
commandData.put("params", params);
ObjectMapper objectMapper = new ObjectMapper();
String jsonData = objectMapper.writeValueAsString(commandData);
byte[] valueBytes = jsonData.getBytes(StandardCharsets.UTF_8);
byte[] message = new byte[2 + valueBytes.length];
message[0] = COMMAND_TYPE;
message[1] = (byte) valueBytes.length;
System.arraycopy(valueBytes, 0, message, 2, valueBytes.length);
return message;
} catch (Exception e) {
log.error("构造命令消息时发生异常", e);
return null;
}
}
/**
* 字节数组转十六进制字符串
*/
private String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X", b));
}
return sb.toString();
}
}
# 6. 数据库设计
# 6.1 TCP连接表
-- TCP连接表
CREATE TABLE tcp_connection (
connection_id VARCHAR(64) PRIMARY KEY COMMENT '连接ID',
device_id VARCHAR(64) COMMENT '设备ID',
client_ip VARCHAR(45) NOT NULL COMMENT '客户端IP地址',
client_port INT NOT NULL COMMENT '客户端端口',
server_port INT NOT NULL COMMENT '服务器端口',
connection_status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE' COMMENT '连接状态:ACTIVE, INACTIVE, CLOSED',
protocol_type VARCHAR(50) COMMENT '协议类型',
connect_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '连接时间',
last_heartbeat TIMESTAMP COMMENT '最后心跳时间',
last_activity TIMESTAMP COMMENT '最后活动时间',
disconnect_time TIMESTAMP COMMENT '断开连接时间',
disconnect_reason VARCHAR(255) COMMENT '断开原因',
bytes_received BIGINT DEFAULT 0 COMMENT '接收字节数',
bytes_sent BIGINT DEFAULT 0 COMMENT '发送字节数',
messages_received INT DEFAULT 0 COMMENT '接收消息数',
messages_sent INT DEFAULT 0 COMMENT '发送消息数',
error_count INT DEFAULT 0 COMMENT '错误次数',
created_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_device_id (device_id),
INDEX idx_client_ip (client_ip),
INDEX idx_connection_status (connection_status),
INDEX idx_connect_time (connect_time),
INDEX idx_last_activity (last_activity)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='TCP连接信息表';
# 6.2 UDP会话表
-- UDP会话表
CREATE TABLE udp_session (
session_id VARCHAR(64) PRIMARY KEY COMMENT '会话ID',
session_key VARCHAR(128) UNIQUE NOT NULL COMMENT '会话键(IP:Port)',
device_id VARCHAR(64) COMMENT '设备ID',
client_ip VARCHAR(45) NOT NULL COMMENT '客户端IP地址',
client_port INT NOT NULL COMMENT '客户端端口',
server_port INT NOT NULL COMMENT '服务器端口',
session_status VARCHAR(20) NOT NULL DEFAULT 'ACTIVE' COMMENT '会话状态:ACTIVE, INACTIVE, EXPIRED',
protocol_type VARCHAR(50) COMMENT '协议类型',
first_contact TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '首次联系时间',
last_contact TIMESTAMP COMMENT '最后联系时间',
expire_time TIMESTAMP COMMENT '过期时间',
bytes_received BIGINT DEFAULT 0 COMMENT '接收字节数',
bytes_sent BIGINT DEFAULT 0 COMMENT '发送字节数',
packets_received INT DEFAULT 0 COMMENT '接收数据包数',
packets_sent INT DEFAULT 0 COMMENT '发送数据包数',
error_count INT DEFAULT 0 COMMENT '错误次数',
created_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_session_key (session_key),
INDEX idx_device_id (device_id),
INDEX idx_client_ip (client_ip),
INDEX idx_session_status (session_status),
INDEX idx_last_contact (last_contact)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='UDP会话信息表';
# 6.3 设备消息表
-- 设备消息表
CREATE TABLE device_message (
message_id VARCHAR(64) PRIMARY KEY COMMENT '消息ID',
connection_id VARCHAR(64) COMMENT '连接/会话ID',
device_id VARCHAR(64) COMMENT '设备ID',
transport_protocol VARCHAR(10) NOT NULL COMMENT '传输协议:TCP, UDP',
protocol_type VARCHAR(50) COMMENT '应用协议类型',
message_type VARCHAR(20) COMMENT '消息类型:HEARTBEAT, DATA, COMMAND, RESPONSE, EVENT等',
direction VARCHAR(10) NOT NULL COMMENT '消息方向:INBOUND, OUTBOUND',
raw_content TEXT COMMENT '原始消息内容(十六进制)',
parsed_content TEXT COMMENT '解析后的消息内容(JSON)',
message_length INT COMMENT '消息长度',
sequence_number BIGINT COMMENT '序列号',
client_ip VARCHAR(45) COMMENT '客户端IP',
client_port INT COMMENT '客户端端口',
server_port INT COMMENT '服务器端口',
processing_status VARCHAR(20) DEFAULT 'PENDING' COMMENT '处理状态:PENDING, PROCESSING, PROCESSED, FAILED',
error_message TEXT COMMENT '错误信息',
processing_time BIGINT COMMENT '处理耗时(毫秒)',
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '消息时间戳',
created_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
INDEX idx_connection_id (connection_id),
INDEX idx_device_id (device_id),
INDEX idx_transport_protocol (transport_protocol),
INDEX idx_message_type (message_type),
INDEX idx_direction (direction),
INDEX idx_processing_status (processing_status),
INDEX idx_timestamp (timestamp),
INDEX idx_created_time (created_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='设备消息表';
-- 按月分表(示例)
CREATE TABLE device_message_202401 LIKE device_message;
CREATE TABLE device_message_202402 LIKE device_message;
-- ... 其他月份表
# 6.4 连接统计表
-- 连接统计表
CREATE TABLE connection_statistics (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
stat_date DATE NOT NULL COMMENT '统计日期',
stat_hour TINYINT COMMENT '统计小时(0-23)',
transport_protocol VARCHAR(10) NOT NULL COMMENT '传输协议:TCP, UDP',
total_connections INT DEFAULT 0 COMMENT '总连接数',
active_connections INT DEFAULT 0 COMMENT '活跃连接数',
new_connections INT DEFAULT 0 COMMENT '新建连接数',
closed_connections INT DEFAULT 0 COMMENT '关闭连接数',
total_messages BIGINT DEFAULT 0 COMMENT '总消息数',
inbound_messages BIGINT DEFAULT 0 COMMENT '入站消息数',
outbound_messages BIGINT DEFAULT 0 COMMENT '出站消息数',
total_bytes BIGINT DEFAULT 0 COMMENT '总字节数',
inbound_bytes BIGINT DEFAULT 0 COMMENT '入站字节数',
outbound_bytes BIGINT DEFAULT 0 COMMENT '出站字节数',
error_count INT DEFAULT 0 COMMENT '错误次数',
avg_processing_time DECIMAL(10,2) COMMENT '平均处理时间(毫秒)',
created_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
UNIQUE KEY uk_stat_date_hour_protocol (stat_date, stat_hour, transport_protocol),
INDEX idx_stat_date (stat_date),
INDEX idx_transport_protocol (transport_protocol)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='连接统计表';
# 7. 性能优化策略
# 7.1 连接池配置
# application.yml
iot:
tcp:
server:
# TCP服务器配置
port: 8888
boss-threads: 1
worker-threads: 8
# 连接配置
so-backlog: 1024
so-keepalive: true
tcp-nodelay: true
so-reuseaddr: true
# 缓冲区配置
so-rcvbuf: 65536
so-sndbuf: 65536
# 连接超时配置
connect-timeout: 30000
read-timeout: 300000
write-timeout: 30000
# 连接池配置
max-connections: 10000
connection-idle-timeout: 600000
udp:
server:
# UDP服务器配置
port: 9999
worker-threads: 4
# 缓冲区配置
so-rcvbuf: 65536
so-sndbuf: 65536
# 会话配置
session-timeout: 300000
max-sessions: 50000
message:
processing:
# 消息处理线程池配置
core-pool-size: 10
max-pool-size: 50
queue-capacity: 1000
keep-alive-seconds: 60
thread-name-prefix: "message-processor-"
# 批处理配置
batch-size: 100
batch-timeout: 1000
cache:
# 缓存配置
connection-cache-size: 10000
connection-cache-ttl: 3600
session-cache-size: 50000
session-cache-ttl: 1800
device-cache-size: 100000
device-cache-ttl: 7200
# 7.2 缓存策略配置
/**
* 缓存配置
*/
@Configuration
@EnableCaching
public class CacheConfig {
/**
* 连接缓存管理器
*/
@Bean("connectionCacheManager")
public CacheManager connectionCacheManager() {
RedisCacheManager.Builder builder = RedisCacheManager
.RedisCacheManagerBuilder
.fromConnectionFactory(redisConnectionFactory())
.cacheDefaults(connectionCacheConfiguration());
return builder.build();
}
/**
* 连接缓存配置
*/
private RedisCacheConfiguration connectionCacheConfiguration() {
return RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofHours(1)) // 1小时过期
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()))
.disableCachingNullValues();
}
/**
* 会话缓存管理器
*/
@Bean("sessionCacheManager")
public CacheManager sessionCacheManager() {
RedisCacheManager.Builder builder = RedisCacheManager
.RedisCacheManagerBuilder
.fromConnectionFactory(redisConnectionFactory())
.cacheDefaults(sessionCacheConfiguration());
return builder.build();
}
/**
* 会话缓存配置
*/
private RedisCacheConfiguration sessionCacheConfiguration() {
return RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30)) // 30分钟过期
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()))
.disableCachingNullValues();
}
}
# 7.3 异步处理配置
/**
* 异步处理配置
*/
@Configuration
@EnableAsync
public class AsyncConfig {
/**
* 消息处理线程池
*/
@Bean("messageProcessingExecutor")
public ThreadPoolTaskExecutor messageProcessingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("message-processor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
/**
* 数据库操作线程池
*/
@Bean("databaseExecutor")
public ThreadPoolTaskExecutor databaseExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(500);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("database-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
/**
* 统计任务线程池
*/
@Bean("statisticsExecutor")
public ThreadPoolTaskExecutor statisticsExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("statistics-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
executor.initialize();
return executor;
}
}
# 7.4 数据库优化配置
# application.yml
spring:
datasource:
# 数据库连接池配置
hikari:
minimum-idle: 10
maximum-pool-size: 50
idle-timeout: 300000
max-lifetime: 1800000
connection-timeout: 30000
validation-timeout: 5000
leak-detection-threshold: 60000
jpa:
hibernate:
ddl-auto: none
properties:
hibernate:
# 批处理配置
jdbc:
batch_size: 50
batch_versioned_data: true
order_inserts: true
order_updates: true
# 查询优化
query:
plan_cache_max_size: 2048
# 缓存配置
cache:
use_second_level_cache: true
use_query_cache: true
region:
factory_class: org.hibernate.cache.jcache.JCacheRegionFactory
# MyBatis配置
mybatis:
configuration:
# 开启二级缓存
cache-enabled: true
# 延迟加载
lazy-loading-enabled: true
aggressive-lazy-loading: false
# 批处理
default-executor-type: batch
# 8. 监控和告警
# 8.1 性能监控服务
/**
* 性能监控服务
*/
@Service
@Slf4j
public class PerformanceMonitoringService {
@Autowired
private MeterRegistry meterRegistry;
@Autowired
private TcpConnectionService tcpConnectionService;
@Autowired
private UdpSessionService udpSessionService;
// 性能指标
private final Counter tcpConnectionCounter;
private final Counter udpSessionCounter;
private final Counter messageCounter;
private final Timer messageProcessingTimer;
private final Gauge activeConnectionsGauge;
private final Gauge activeSessionsGauge;
public PerformanceMonitoringService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// 初始化计数器
this.tcpConnectionCounter = Counter.builder("tcp.connections.total")
.description("Total TCP connections")
.register(meterRegistry);
this.udpSessionCounter = Counter.builder("udp.sessions.total")
.description("Total UDP sessions")
.register(meterRegistry);
this.messageCounter = Counter.builder("messages.total")
.description("Total messages processed")
.tag("protocol", "tcp")
.register(meterRegistry);
// 初始化计时器
this.messageProcessingTimer = Timer.builder("message.processing.time")
.description("Message processing time")
.register(meterRegistry);
// 初始化仪表
this.activeConnectionsGauge = Gauge.builder("tcp.connections.active")
.description("Active TCP connections")
.register(meterRegistry, this, PerformanceMonitoringService::getActiveTcpConnections);
this.activeSessionsGauge = Gauge.builder("udp.sessions.active")
.description("Active UDP sessions")
.register(meterRegistry, this, PerformanceMonitoringService::getActiveUdpSessions);
}
/**
* 记录TCP连接
*/
public void recordTcpConnection() {
tcpConnectionCounter.increment();
}
/**
* 记录UDP会话
*/
public void recordUdpSession() {
udpSessionCounter.increment();
}
/**
* 记录消息处理
*/
public void recordMessageProcessing(String protocol, String messageType, long processingTime) {
messageCounter.increment(
Tags.of(
"protocol", protocol,
"type", messageType
)
);
messageProcessingTimer.record(processingTime, TimeUnit.MILLISECONDS);
}
/**
* 获取活跃TCP连接数
*/
public double getActiveTcpConnections() {
return tcpConnectionService.getActiveConnectionCount();
}
/**
* 获取活跃UDP会话数
*/
public double getActiveUdpSessions() {
return udpSessionService.getActiveSessionCount();
}
/**
* 获取系统性能指标
*/
public Map<String, Object> getPerformanceMetrics() {
Map<String, Object> metrics = new HashMap<>();
// 连接指标
metrics.put("tcp_connections_total", tcpConnectionCounter.count());
metrics.put("tcp_connections_active", getActiveTcpConnections());
metrics.put("udp_sessions_total", udpSessionCounter.count());
metrics.put("udp_sessions_active", getActiveUdpSessions());
// 消息指标
metrics.put("messages_total", messageCounter.count());
metrics.put("message_processing_avg_time", messageProcessingTimer.mean(TimeUnit.MILLISECONDS));
metrics.put("message_processing_max_time", messageProcessingTimer.max(TimeUnit.MILLISECONDS));
// 系统指标
Runtime runtime = Runtime.getRuntime();
metrics.put("memory_used", runtime.totalMemory() - runtime.freeMemory());
metrics.put("memory_total", runtime.totalMemory());
metrics.put("memory_max", runtime.maxMemory());
metrics.put("cpu_cores", runtime.availableProcessors());
return metrics;
}
}
# 8.2 健康检查服务
/**
* 健康检查服务
*/
@Component
public class HealthCheckService {
@Autowired
private TcpServer tcpServer;
@Autowired
private UdpServer udpServer;
@Autowired
private DataSource dataSource;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* TCP服务器健康检查
*/
@Bean
public HealthIndicator tcpServerHealthIndicator() {
return () -> {
try {
Map<String, Object> status = tcpServer.getServerStatus();
boolean isRunning = (Boolean) status.get("running");
if (isRunning) {
return Health.up()
.withDetails(status)
.build();
} else {
return Health.down()
.withDetail("reason", "TCP server is not running")
.withDetails(status)
.build();
}
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.build();
}
};
}
/**
* UDP服务器健康检查
*/
@Bean
public HealthIndicator udpServerHealthIndicator() {
return () -> {
try {
Map<String, Object> status = udpServer.getServerStatus();
boolean isRunning = (Boolean) status.get("running");
if (isRunning) {
return Health.up()
.withDetails(status)
.build();
} else {
return Health.down()
.withDetail("reason", "UDP server is not running")
.withDetails(status)
.build();
}
} catch (Exception e) {
return Health.down()
.withDetail("error", e.getMessage())
.build();
}
};
}
/**
* 数据库健康检查
*/
@Bean
public HealthIndicator databaseHealthIndicator() {
return () -> {
try (Connection connection = dataSource.getConnection()) {
if (connection.isValid(5)) {
return Health.up()
.withDetail("database", "Available")
.build();
} else {
return Health.down()
.withDetail("database", "Connection invalid")
.build();
}
} catch (Exception e) {
return Health.down()
.withDetail("database", "Connection failed: " + e.getMessage())
.build();
}
};
}
/**
* Redis健康检查
*/
@Bean
public HealthIndicator redisHealthIndicator() {
return () -> {
try {
String pong = redisTemplate.getConnectionFactory()
.getConnection()
.ping();
if ("PONG".equals(pong)) {
return Health.up()
.withDetail("redis", "Available")
.build();
} else {
return Health.down()
.withDetail("redis", "Ping failed")
.build();
}
} catch (Exception e) {
return Health.down()
.withDetail("redis", "Connection failed: " + e.getMessage())
.build();
}
};
}
}
# 8.3 告警配置
/**
* 告警服务
*/
@Service
@Slf4j
public class AlertService {
@Autowired
private PerformanceMonitoringService monitoringService;
@Autowired
private NotificationService notificationService;
@Value("${iot.alert.tcp.max-connections:8000}")
private int maxTcpConnections;
@Value("${iot.alert.udp.max-sessions:40000}")
private int maxUdpSessions;
@Value("${iot.alert.message.max-processing-time:5000}")
private long maxMessageProcessingTime;
@Value("${iot.alert.memory.max-usage-percent:80}")
private double maxMemoryUsagePercent;
/**
* 检查连接数告警
*/
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkConnectionAlerts() {
try {
// 检查TCP连接数
double activeTcpConnections = monitoringService.getActiveTcpConnections();
if (activeTcpConnections > maxTcpConnections) {
sendAlert("TCP连接数过高",
String.format("当前TCP连接数: %.0f, 阈值: %d", activeTcpConnections, maxTcpConnections),
AlertLevel.WARNING);
}
// 检查UDP会话数
double activeUdpSessions = monitoringService.getActiveUdpSessions();
if (activeUdpSessions > maxUdpSessions) {
sendAlert("UDP会话数过高",
String.format("当前UDP会话数: %.0f, 阈值: %d", activeUdpSessions, maxUdpSessions),
AlertLevel.WARNING);
}
} catch (Exception e) {
log.error("检查连接告警时发生异常", e);
}
}
/**
* 检查性能告警
*/
@Scheduled(fixedRate = 300000) // 每5分钟检查一次
public void checkPerformanceAlerts() {
try {
Map<String, Object> metrics = monitoringService.getPerformanceMetrics();
// 检查消息处理时间
Double avgProcessingTime = (Double) metrics.get("message_processing_avg_time");
if (avgProcessingTime != null && avgProcessingTime > maxMessageProcessingTime) {
sendAlert("消息处理时间过长",
String.format("平均处理时间: %.2f ms, 阈值: %d ms", avgProcessingTime, maxMessageProcessingTime),
AlertLevel.WARNING);
}
// 检查内存使用率
Long memoryUsed = (Long) metrics.get("memory_used");
Long memoryMax = (Long) metrics.get("memory_max");
if (memoryUsed != null && memoryMax != null) {
double usagePercent = (double) memoryUsed / memoryMax * 100;
if (usagePercent > maxMemoryUsagePercent) {
sendAlert("内存使用率过高",
String.format("内存使用率: %.2f%%, 阈值: %.2f%%", usagePercent, maxMemoryUsagePercent),
AlertLevel.CRITICAL);
}
}
} catch (Exception e) {
log.error("检查性能告警时发生异常", e);
}
}
/**
* 发送告警
*/
private void sendAlert(String title, String message, AlertLevel level) {
Alert alert = new Alert();
alert.setTitle(title);
alert.setMessage(message);
alert.setLevel(level);
alert.setTimestamp(LocalDateTime.now());
alert.setSource("TCP/UDP Access Layer");
notificationService.sendAlert(alert);
log.warn("发送告警: {} - {}", title, message);
}
/**
* 告警级别
*/
public enum AlertLevel {
INFO, WARNING, CRITICAL
}
/**
* 告警对象
*/
@Data
public static class Alert {
private String title;
private String message;
private AlertLevel level;
private LocalDateTime timestamp;
private String source;
}
}
# 9. 配置文件
# 9.1 主配置文件 (application.yml)
# 应用基础配置
server:
port: 8080
servlet:
context-path: /iot
spring:
application:
name: iot-tcp-udp-access-layer
profiles:
active: dev
# 数据库配置
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/iot_platform?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
username: ${DB_USERNAME:root}
password: ${DB_PASSWORD:123456}
hikari:
minimum-idle: 10
maximum-pool-size: 50
idle-timeout: 300000
max-lifetime: 1800000
connection-timeout: 30000
validation-timeout: 5000
leak-detection-threshold: 60000
# Redis配置
redis:
host: ${REDIS_HOST:localhost}
port: ${REDIS_PORT:6379}
password: ${REDIS_PASSWORD:}
database: 0
timeout: 5000ms
lettuce:
pool:
max-active: 20
max-idle: 10
min-idle: 5
max-wait: 2000ms
# JPA配置
jpa:
hibernate:
ddl-auto: none
show-sql: false
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL8Dialect
format_sql: true
jdbc:
batch_size: 50
batch_versioned_data: true
order_inserts: true
order_updates: true
cache:
use_second_level_cache: true
use_query_cache: true
region:
factory_class: org.hibernate.cache.jcache.JCacheRegionFactory
# 物联网TCP/UDP接入层配置
iot:
tcp:
server:
# TCP服务器基础配置
enabled: true
port: 8888
boss-threads: 1
worker-threads: 8
# 网络配置
so-backlog: 1024
so-keepalive: true
tcp-nodelay: true
so-reuseaddr: true
so-rcvbuf: 65536
so-sndbuf: 65536
# 连接管理配置
max-connections: 10000
connection-idle-timeout: 600000
connect-timeout: 30000
read-timeout: 300000
write-timeout: 30000
# 心跳配置
heartbeat-interval: 60000
heartbeat-timeout: 180000
udp:
server:
# UDP服务器基础配置
enabled: true
port: 9999
worker-threads: 4
# 网络配置
so-rcvbuf: 65536
so-sndbuf: 65536
# 会话管理配置
max-sessions: 50000
session-timeout: 300000
session-cleanup-interval: 60000
message:
processing:
# 消息处理线程池配置
core-pool-size: 10
max-pool-size: 50
queue-capacity: 1000
keep-alive-seconds: 60
thread-name-prefix: "message-processor-"
# 批处理配置
batch-enabled: true
batch-size: 100
batch-timeout: 1000
# 消息大小限制
max-message-size: 1048576 # 1MB
cache:
# 缓存配置
connection-cache-size: 10000
connection-cache-ttl: 3600
session-cache-size: 50000
session-cache-ttl: 1800
device-cache-size: 100000
device-cache-ttl: 7200
protocol:
# 协议解析配置
parsers:
- name: "simple-tlv"
class: "com.iot.protocol.SimpleTlvProtocolParser"
enabled: true
- name: "modbus"
class: "com.iot.protocol.ModbusProtocolParser"
enabled: false
alert:
# 告警配置
tcp:
max-connections: 8000
udp:
max-sessions: 40000
message:
max-processing-time: 5000
memory:
max-usage-percent: 80
# 监控配置
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus
endpoint:
health:
show-details: always
metrics:
export:
prometheus:
enabled: true
# 日志配置
logging:
level:
com.iot: DEBUG
io.netty: INFO
org.springframework: INFO
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"
file: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"
file:
name: logs/iot-tcp-udp-access.log
max-size: 100MB
max-history: 30
# 9.2 开发环境配置 (application-dev.yml)
# 开发环境配置
spring:
datasource:
url: jdbc:mysql://localhost:3306/iot_platform_dev?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
username: dev_user
password: dev_password
redis:
host: localhost
port: 6379
password:
jpa:
show-sql: true
hibernate:
ddl-auto: update
iot:
tcp:
server:
port: 8888
max-connections: 1000
udp:
server:
port: 9999
max-sessions: 5000
logging:
level:
com.iot: DEBUG
io.netty: DEBUG
org.springframework.cache: DEBUG
# 9.3 生产环境配置 (application-prod.yml)
# 生产环境配置
spring:
datasource:
url: jdbc:mysql://${DB_HOST:mysql-cluster}:${DB_PORT:3306}/${DB_NAME:iot_platform}?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=Asia/Shanghai
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
hikari:
minimum-idle: 20
maximum-pool-size: 100
redis:
host: ${REDIS_HOST:redis-cluster}
port: ${REDIS_PORT:6379}
password: ${REDIS_PASSWORD}
cluster:
nodes: ${REDIS_CLUSTER_NODES}
jpa:
show-sql: false
hibernate:
ddl-auto: none
iot:
tcp:
server:
port: ${TCP_PORT:8888}
worker-threads: 16
max-connections: 20000
udp:
server:
port: ${UDP_PORT:9999}
worker-threads: 8
max-sessions: 100000
message:
processing:
core-pool-size: 20
max-pool-size: 100
logging:
level:
com.iot: INFO
root: WARN
file:
name: /var/log/iot/tcp-udp-access.log
# 9.4 测试环境配置 (application-test.yml)
# 测试环境配置
spring:
datasource:
url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
driver-class-name: org.h2.Driver
username: sa
password:
h2:
console:
enabled: true
jpa:
hibernate:
ddl-auto: create-drop
show-sql: true
iot:
tcp:
server:
enabled: false # 测试时禁用TCP服务器
udp:
server:
enabled: false # 测试时禁用UDP服务器
logging:
level:
com.iot: DEBUG
# 10. 总结
# 10.1 核心特性
本TCP/UDP设备接入层具备以下核心特性:
高性能网络通信
- 基于Netty框架,支持高并发连接
- TCP支持万级连接,UDP支持十万级会话
- 优化的线程模型和内存管理
灵活的协议支持
- 可插拔的协议解析器架构
- 支持多种设备通信协议
- 简单易扩展的协议适配机制
完善的连接管理
- 自动连接状态监控
- 心跳检测和超时处理
- 连接池和会话管理
强大的消息处理
- 异步消息处理机制
- 批量处理优化
- 消息路由和分发
全面的监控告警
- 实时性能指标监控
- 健康检查机制
- 智能告警系统
# 10.2 适用场景
工业物联网
- 工厂设备数据采集
- 生产线监控系统
- 设备状态管理
智能家居
- 家电设备控制
- 传感器数据收集
- 安防系统集成
车联网
- 车载设备通信
- 位置信息上报
- 远程诊断系统
智慧城市
- 环境监测设备
- 交通信号控制
- 公共设施管理
# 10.3 性能指标
- TCP连接数: 支持10,000+并发连接
- UDP会话数: 支持50,000+并发会话
- 消息吞吐量: 100,000+消息/秒
- 响应时间: 平均<10ms
- 可用性: 99.9%+
# 10.4 最佳实践
部署建议
- 使用负载均衡分散连接压力
- 配置合适的线程池大小
- 监控系统资源使用情况
安全建议
- 实施设备认证机制
- 加密敏感数据传输
- 定期更新安全策略
运维建议
- 建立完善的监控体系
- 制定故障应急预案
- 定期进行性能调优
开发建议
- 遵循协议解析器接口规范
- 实现幂等的消息处理逻辑
- 充分利用缓存机制
通过本文档的详细介绍,开发者可以深入理解TCP/UDP设备接入层的设计原理和实现细节,为构建高性能、高可靠的物联网设备接入系统提供有力支撑。
本文档持续更新中,如有问题或建议,请联系开发团队。
# 4.2 UDP会话实体
/**
* UDP会话实体
* 由于UDP是无连接的,我们通过会话来跟踪设备通信
*/
@Entity
@Table(name = "udp_session")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UdpSession {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 会话ID - 唯一标识一个UDP会话
*/
@Column(name = "session_id", unique = true, nullable = false)
private String sessionId;
/**
* 设备ID - 关联的设备标识
*/
@Column(name = "device_id", nullable = false)
private String deviceId;
/**
* 客户端IP地址
*/
@Column(name = "client_ip", nullable = false)
private String clientIp;
/**
* 客户端端口
*/
@Column(name = "client_port", nullable = false)
private Integer clientPort;
/**
* 服务器端口
*/
@Column(name = "server_port", nullable = false)
private Integer serverPort;
/**
* 会话状态
*/
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false)
private SessionStatus status;
/**
* 协议类型
*/
@Column(name = "protocol_type")
private String protocolType;
/**
* 协议版本
*/
@Column(name = "protocol_version")
private String protocolVersion;
/**
* 会话开始时间
*/
@Column(name = "start_time", nullable = false)
private LocalDateTime startTime;
/**
* 最后活跃时间
*/
@Column(name = "last_active_time")
private LocalDateTime lastActiveTime;
/**
* 会话过期时间
*/
@Column(name = "expire_time")
private LocalDateTime expireTime;
/**
* 会话属性(JSON格式)
*/
@Column(name = "attributes", columnDefinition = "JSON")
private String attributes;
/**
* 发送包数
*/
@Column(name = "packets_sent")
private Long packetsSent = 0L;
/**
* 接收包数
*/
@Column(name = "packets_received")
private Long packetsReceived = 0L;
/**
* 发送字节数
*/
@Column(name = "bytes_sent")
private Long bytesSent = 0L;
/**
* 接收字节数
*/
@Column(name = "bytes_received")
private Long bytesReceived = 0L;
/**
* 丢包数
*/
@Column(name = "packets_lost")
private Long packetsLost = 0L;
/**
* 创建时间
*/
@Column(name = "create_time", nullable = false)
private LocalDateTime createTime;
/**
* 更新时间
*/
@Column(name = "update_time", nullable = false)
private LocalDateTime updateTime;
/**
* 会话状态枚举
*/
public enum SessionStatus {
ACTIVE, // 活跃
INACTIVE, // 非活跃
EXPIRED, // 已过期
CLOSED // 已关闭
}
@PrePersist
protected void onCreate() {
createTime = LocalDateTime.now();
updateTime = LocalDateTime.now();
if (startTime == null) {
startTime = LocalDateTime.now();
}
if (status == null) {
status = SessionStatus.ACTIVE;
}
}
@PreUpdate
protected void onUpdate() {
updateTime = LocalDateTime.now();
}
/**
* 更新活跃时间
*/
public void updateActiveTime() {
this.lastActiveTime = LocalDateTime.now();
}
/**
* 增加发送统计
*/
public void addSentStats(long bytes, long packets) {
this.bytesSent += bytes;
this.packetsSent += packets;
updateActiveTime();
}
/**
* 增加接收统计
*/
public void addReceivedStats(long bytes, long packets) {
this.bytesReceived += bytes;
this.packetsReceived += packets;
updateActiveTime();
}
/**
* 增加丢包统计
*/
public void addLostPackets(long packets) {
this.packetsLost += packets;
}
/**
* 检查会话是否过期
*/
public boolean isExpired() {
if (expireTime == null) {
return false;
}
return LocalDateTime.now().isAfter(expireTime);
}
/**
* 检查会话是否超时
*/
public boolean isTimeout(int timeoutMinutes) {
if (lastActiveTime == null) {
return false;
}
return lastActiveTime.isBefore(LocalDateTime.now().minusMinutes(timeoutMinutes));
}
/**
* 计算丢包率
*/
public double getPacketLossRate() {
long totalPackets = packetsReceived + packetsLost;
if (totalPackets == 0) {
return 0.0;
}
return (double) packetsLost / totalPackets * 100;
}
}
# 4.3 设备消息实体
/**
* 设备消息实体
* 存储通过TCP/UDP接收到的设备消息
*/
@Entity
@Table(name = "device_message")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DeviceMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 消息ID - 唯一标识一条消息
*/
@Column(name = "message_id", unique = true, nullable = false)
private String messageId;
/**
* 设备ID
*/
@Column(name = "device_id", nullable = false)
private String deviceId;
/**
* 连接ID(TCP)或会话ID(UDP)
*/
@Column(name = "connection_id")
private String connectionId;
/**
* 传输协议类型
*/
@Enumerated(EnumType.STRING)
@Column(name = "transport_protocol", nullable = false)
private TransportProtocol transportProtocol;
/**
* 应用协议类型
*/
@Column(name = "protocol_type")
private String protocolType;
/**
* 消息类型
*/
@Enumerated(EnumType.STRING)
@Column(name = "message_type", nullable = false)
private MessageType messageType;
/**
* 消息方向
*/
@Enumerated(EnumType.STRING)
@Column(name = "direction", nullable = false)
private MessageDirection direction;
/**
* 原始消息内容(十六进制)
*/
@Column(name = "raw_content", columnDefinition = "TEXT")
private String rawContent;
/**
* 解析后的消息内容(JSON格式)
*/
@Column(name = "parsed_content", columnDefinition = "JSON")
private String parsedContent;
/**
* 消息长度(字节)
*/
@Column(name = "message_length")
private Integer messageLength;
/**
* 消息序号
*/
@Column(name = "sequence_number")
private Long sequenceNumber;
/**
* 消息时间戳
*/
@Column(name = "timestamp", nullable = false)
private LocalDateTime timestamp;
/**
* 客户端IP
*/
@Column(name = "client_ip")
private String clientIp;
/**
* 客户端端口
*/
@Column(name = "client_port")
private Integer clientPort;
/**
* 服务器端口
*/
@Column(name = "server_port")
private Integer serverPort;
/**
* 处理状态
*/
@Enumerated(EnumType.STRING)
@Column(name = "process_status")
private ProcessStatus processStatus;
/**
* 处理时间
*/
@Column(name = "process_time")
private LocalDateTime processTime;
/**
* 错误信息
*/
@Column(name = "error_message")
private String errorMessage;
/**
* 创建时间
*/
@Column(name = "create_time", nullable = false)
private LocalDateTime createTime;
/**
* 传输协议枚举
*/
public enum TransportProtocol {
TCP, UDP
}
/**
* 消息类型枚举
*/
public enum MessageType {
HEARTBEAT, // 心跳消息
DATA, // 数据消息
COMMAND, // 命令消息
RESPONSE, // 响应消息
EVENT, // 事件消息
AUTHENTICATION, // 认证消息
CONFIGURATION, // 配置消息
FIRMWARE, // 固件消息
OTHER // 其他消息
}
/**
* 消息方向枚举
*/
public enum MessageDirection {
INBOUND, // 入站(设备到服务器)
OUTBOUND // 出站(服务器到设备)
}
/**
* 处理状态枚举
*/
public enum ProcessStatus {
PENDING, // 待处理
PROCESSING, // 处理中
PROCESSED, // 已处理
FAILED, // 处理失败
IGNORED // 已忽略
}
@PrePersist
protected void onCreate() {
createTime = LocalDateTime.now();
if (timestamp == null) {
timestamp = LocalDateTime.now();
}
if (processStatus == null) {
processStatus = ProcessStatus.PENDING;
}
if (messageId == null) {
messageId = UUID.randomUUID().toString();
}
}
/**
* 标记为处理中
*/
public void markAsProcessing() {
this.processStatus = ProcessStatus.PROCESSING;
this.processTime = LocalDateTime.now();
}
/**
* 标记为已处理
*/
public void markAsProcessed() {
this.processStatus = ProcessStatus.PROCESSED;
this.processTime = LocalDateTime.now();
}
/**
* 标记为处理失败
*/
public void markAsFailed(String errorMessage) {
this.processStatus = ProcessStatus.FAILED;
this.processTime = LocalDateTime.now();
this.errorMessage = errorMessage;
}
/**
* 检查是否为心跳消息
*/
public boolean isHeartbeat() {
return MessageType.HEARTBEAT.equals(this.messageType);
}
/**
* 检查是否为数据消息
*/
public boolean isDataMessage() {
return MessageType.DATA.equals(this.messageType);
}
/**
* 检查是否为入站消息
*/
public boolean isInbound() {
return MessageDirection.INBOUND.equals(this.direction);
}
/**
* 检查是否为出站消息
*/
public boolean isOutbound() {
return MessageDirection.OUTBOUND.equals(this.direction);
}
}
# 5. 核心服务实现
# 5.1 TCP服务器实现
/**
* TCP服务器实现
* 基于Netty框架构建高性能TCP服务器
*/
@Component
@Slf4j
public class TcpServer {
@Autowired
private TcpConnectionService connectionService;
@Autowired
private MessageProcessingService messageProcessingService;
@Autowired
private DeviceAuthenticationService authenticationService;
@Value("${iot.tcp.server.port:8888}")
private int serverPort;
@Value("${iot.tcp.server.boss-threads:1}")
private int bossThreads;
@Value("${iot.tcp.server.worker-threads:8}")
private int workerThreads;
@Value("${iot.tcp.server.so-backlog:128}")
private int soBacklog;
@Value("${iot.tcp.server.so-keepalive:true}")
private boolean soKeepAlive;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel serverChannel;
/**
* 启动TCP服务器
*/
@PostConstruct
public void start() {
try {
// 创建事件循环组
bossGroup = new NioEventLoopGroup(bossThreads);
workerGroup = new NioEventLoopGroup(workerThreads);
// 创建服务器启动器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, soBacklog)
.childOption(ChannelOption.SO_KEEPALIVE, soKeepAlive)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new TcpChannelInitializer());
// 绑定端口并启动服务器
ChannelFuture future = bootstrap.bind(serverPort).sync();
serverChannel = future.channel();
log.info("TCP服务器启动成功,监听端口: {}", serverPort);
// 监听服务器关闭
serverChannel.closeFuture().addListener(closeFuture -> {
log.info("TCP服务器已关闭");
});
} catch (Exception e) {
log.error("TCP服务器启动失败", e);
shutdown();
}
}
/**
* 关闭TCP服务器
*/
@PreDestroy
public void shutdown() {
try {
if (serverChannel != null) {
serverChannel.close().sync();
}
} catch (InterruptedException e) {
log.error("关闭服务器通道时发生异常", e);
} finally {
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
log.info("TCP服务器资源已释放");
}
}
/**
* TCP通道初始化器
*/
private class TcpChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加编解码器
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
65536, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
// 添加字节数组编解码器
pipeline.addLast("decoder", new ByteArrayDecoder());
pipeline.addLast("encoder", new ByteArrayEncoder());
// 添加空闲状态处理器(心跳检测)
pipeline.addLast("idleStateHandler", new IdleStateHandler(
300, 0, 0, TimeUnit.SECONDS)); // 5分钟读超时
// 添加业务处理器
pipeline.addLast("tcpHandler", new TcpServerHandler());
}
}
/**
* TCP服务器处理器
*/
@ChannelHandler.Sharable
private class TcpServerHandler extends SimpleChannelInboundHandler<byte[]> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 客户端连接建立
String connectionId = generateConnectionId(ctx.channel());
String clientIp = getClientIp(ctx.channel());
int clientPort = getClientPort(ctx.channel());
log.info("TCP客户端连接建立: {} - {}:{}", connectionId, clientIp, clientPort);
// 创建连接记录
TcpConnection connection = new TcpConnection();
connection.setConnectionId(connectionId);
connection.setClientIp(clientIp);
connection.setClientPort(clientPort);
connection.setServerPort(serverPort);
connection.setStatus(TcpConnection.ConnectionStatus.CONNECTED);
connection.setConnectTime(LocalDateTime.now());
// 保存连接到上下文
ctx.channel().attr(AttributeKey.valueOf("connection")).set(connection);
// 保存连接到服务
connectionService.saveConnection(connection);
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 客户端连接断开
TcpConnection connection = getConnection(ctx.channel());
if (connection != null) {
log.info("TCP客户端连接断开: {}", connection.getConnectionId());
// 更新连接状态
connection.setStatus(TcpConnection.ConnectionStatus.DISCONNECTED);
connectionService.updateConnection(connection);
}
super.channelInactive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, byte[] data) throws Exception {
// 接收到客户端数据
TcpConnection connection = getConnection(ctx.channel());
if (connection == null) {
log.warn("未找到连接信息,忽略消息");
return;
}
try {
// 更新连接统计
connection.addReceivedStats(data.length, 1);
connectionService.updateConnection(connection);
// 创建设备消息
DeviceMessage message = new DeviceMessage();
message.setConnectionId(connection.getConnectionId());
message.setTransportProtocol(DeviceMessage.TransportProtocol.TCP);
message.setDirection(DeviceMessage.MessageDirection.INBOUND);
message.setRawContent(bytesToHex(data));
message.setMessageLength(data.length);
message.setClientIp(connection.getClientIp());
message.setClientPort(connection.getClientPort());
message.setServerPort(connection.getServerPort());
// 异步处理消息
messageProcessingService.processMessage(message, ctx.channel());
} catch (Exception e) {
log.error("处理TCP消息时发生异常", e);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 读超时,关闭连接
TcpConnection connection = getConnection(ctx.channel());
if (connection != null) {
log.warn("TCP连接读超时,关闭连接: {}", connection.getConnectionId());
}
ctx.close();
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
TcpConnection connection = getConnection(ctx.channel());
if (connection != null) {
log.error("TCP连接发生异常: {}", connection.getConnectionId(), cause);
// 更新连接状态
connection.setStatus(TcpConnection.ConnectionStatus.ERROR);
connectionService.updateConnection(connection);
} else {
log.error("TCP连接发生异常", cause);
}
ctx.close();
}
/**
* 获取连接信息
*/
private TcpConnection getConnection(Channel channel) {
return (TcpConnection) channel.attr(AttributeKey.valueOf("connection")).get();
}
/**
* 生成连接ID
*/
private String generateConnectionId(Channel channel) {
return "tcp_" + channel.id().asShortText() + "_" + System.currentTimeMillis();
}
/**
* 获取客户端IP
*/
private String getClientIp(Channel channel) {
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
return remoteAddress.getAddress().getHostAddress();
}
/**
* 获取客户端端口
*/
private int getClientPort(Channel channel) {
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
return remoteAddress.getPort();
}
/**
* 字节数组转十六进制字符串
*/
private String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X", b));
}
return sb.toString();
}
}
/**
* 向指定连接发送数据
*/
public void sendData(String connectionId, byte[] data) {
TcpConnection connection = connectionService.getConnection(connectionId);
if (connection == null) {
log.warn("未找到连接: {}", connectionId);
return;
}
Channel channel = connectionService.getChannel(connectionId);
if (channel == null || !channel.isActive()) {
log.warn("连接通道不可用: {}", connectionId);
return;
}
try {
// 发送数据
channel.writeAndFlush(data).addListener(future -> {
if (future.isSuccess()) {
// 更新发送统计
connection.addSentStats(data.length, 1);
connectionService.updateConnection(connection);
log.debug("TCP数据发送成功: {} - {} bytes", connectionId, data.length);
} else {
log.error("TCP数据发送失败: {}", connectionId, future.cause());
}
});
} catch (Exception e) {
log.error("发送TCP数据时发生异常: {}", connectionId, e);
}
}
/**
* 广播数据到所有连接
*/
public void broadcastData(byte[] data) {
List<TcpConnection> connections = connectionService.getActiveConnections();
for (TcpConnection connection : connections) {
sendData(connection.getConnectionId(), data);
}
}
/**
* 获取服务器状态
*/
public Map<String, Object> getServerStatus() {
Map<String, Object> status = new HashMap<>();
status.put("port", serverPort);
status.put("running", serverChannel != null && serverChannel.isActive());
status.put("activeConnections", connectionService.getActiveConnectionCount());
status.put("totalConnections", connectionService.getTotalConnectionCount());
return status;
}
}