百万台设备Netty网关架构设计
# 百万台设备Netty网关架构设计
# 目录
- 百万台设备Netty网关架构设计
- 目录
- 1. 整体架构概述
- 2. Netty网关设计
- 3. 负载均衡设计
- 1. 域名解析
- 2. Nginx upstream 配置
- 3. Nginx server 块
- 4. ⚠ 注意:Netty TCP 场景下的 Nginx 配置
- 5. 整体流程图
- 4. Kafka集成设计
- 5. 高可用设计
- 一、分布式 IoT 网关架构图
- 二、Nginx stream 配置(TCP 一致性哈希)
- 三、Netty 网关 + Redis + Kafka 代码示例
- 四、Redis 数据示例
- 五、Kafka 消费示例(后端业务服务)
- 六、宕机平滑迁移流程
- 1. 正常情况下(网关没挂)
- 2. 网关宕机后会发生什么
- 3. Nginx 的一致性哈希重新分配
- 4. 新网关接管设备
- 5. 过程总结(小白版)
- 6. 宕机迁移策略
- 7. 性能优化
- 8. 监控告警
- 9. 成本优化策略
- 10. 部署架构图
- 总结
# 1. 整体架构概述
# 1.1 架构目标
- 支持百万级设备并发连接
- 低成本高性能
- 高可用性和容错能力
- 水平扩展能力
- 消息可靠传输
# 1.2 核心组件
设备层 -> 负载均衡器 -> Netty网关集群 -> Kafka集群 -> 业务服务集群
# 2. Netty网关设计
# 2.1 网关架构
@Component
public class IoTGatewayServer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ServerBootstrap bootstrap;
@PostConstruct
public void start() {
// 使用Epoll提升性能(Linux环境)
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new IoTChannelInitializer());
}
}
下面是你提供的 Netty 服务器启动配置的逐行详细注解(中文):
bootstrap = new ServerBootstrap()
创建
ServerBootstrap
实例,是 Netty 服务端启动的核心辅助类,用于配置服务器端的各项参数。
.group(bossGroup, workerGroup)
设置 主从线程组(EventLoopGroup):
bossGroup
:负责接收客户端连接请求;workerGroup
:负责处理客户端的 I/O 读写操作。
.channel(NioServerSocketChannel.class)
设置服务器通道类型:
- 使用基于 NIO 的
NioServerSocketChannel
,这是 Netty 为服务器端提供的异步非阻塞通道。
.option(ChannelOption.SO_BACKLOG, 1024)
设置服务器端套接字的 连接队列大小:
- 对应 Java 原生
ServerSocketChannel
的backlog
参数;- 表示连接请求的等待队列大小(尚未被
accept
的连接数)最大为 1024。
.option(ChannelOption.SO_REUSEADDR, true)
允许地址复用:
- 服务器重启时,允许重新绑定处于
TIME_WAIT
状态的端口;- 对于高并发环境可提高端口复用效率。
.childOption(ChannelOption.TCP_NODELAY, true)
关闭 Nagle 算法,开启小包即时发送:
true
表示禁用 Nagle 算法,提升低延迟通信效率;- 适用于实时性要求较高的应用场景(如物联网设备)。
.childOption(ChannelOption.SO_KEEPALIVE, true)
开启 TCP 心跳机制(保活):
- 用于检测长时间空闲连接是否存活;
- 防止客户端突然掉线服务端长期不知。
.childHandler(new IoTChannelInitializer());
设置子通道的初始化器:
- 每当有新客户端连接时,
IoTChannelInitializer
会为这个连接初始化 pipeline(添加编码器、解码器、业务处理器等);- 是处理每个
SocketChannel
的关键入口。
# 2.2 连接管理
@Component
public class ConnectionManager {
// 使用ConcurrentHashMap管理连接
private final ConcurrentHashMap<String, Channel> deviceChannels = new ConcurrentHashMap<>();
private final AtomicLong connectionCount = new AtomicLong(0);
public void addConnection(String deviceId, Channel channel) {
deviceChannels.put(deviceId, channel);
connectionCount.incrementAndGet();
// 连接心跳检测
channel.pipeline().addLast(new IdleStateHandler(60, 30, 0));
}
public void removeConnection(String deviceId) {
Channel channel = deviceChannels.remove(deviceId);
if (channel != null) {
connectionCount.decrementAndGet();
channel.close();
}
}
public long getConnectionCount() {
return connectionCount.get();
}
}
@Component
public class ConnectionManager {
声明一个 Spring Bean:
- 使用
@Component
注解表示该类由 Spring 容器自动管理;- 常用于统一管理设备连接。
private final ConcurrentHashMap<String, Channel> deviceChannels = new ConcurrentHashMap<>();
使用线程安全的
ConcurrentHashMap
存储设备连接:
key
: 设备唯一标识(如 deviceId);value
: 对应的 NettyChannel
,表示该设备的连接通道;- 便于实现设备连接状态的快速查找、管理与转发。
private final AtomicLong connectionCount = new AtomicLong(0);
使用
AtomicLong
统计当前连接数量,支持高并发下的线程安全自增/自减操作。
public void addConnection(String deviceId, Channel channel) {
deviceChannels.put(deviceId, channel);
connectionCount.incrementAndGet();
添加新连接:
- 将设备与其连接的 Channel 建立映射;
- 连接计数加 1;
channel.pipeline().addLast(new IdleStateHandler(60, 30, 0));
为当前连接添加 心跳检测处理器:
readerIdleTime = 60
秒:60 秒内未读取数据则触发READER_IDLE
;writerIdleTime = 30
秒:30 秒内未写入数据则触发WRITER_IDLE
;allIdleTime = 0
:关闭ALL_IDLE
;- 用于检测连接空闲状态,避免死连接或长时间不通信的设备;
public void removeConnection(String deviceId) {
Channel channel = deviceChannels.remove(deviceId);
移除连接:
- 从
deviceChannels
映射中移除该设备对应的 Channel;- 如果存在该连接则执行以下操作:
if (channel != null) {
connectionCount.decrementAndGet();
channel.close();
}
- 连接计数减 1;
- 主动关闭对应的 Channel(断开连接);
public long getConnectionCount() {
return connectionCount.get();
}
获取当前连接数(用于监控系统负载、统计连接活跃度等);
# ✅ 总结功能:
- 支持百万连接高并发安全管理;
- 提供设备 → Channel 快速映射能力;
- 支持连接数统计;
- 支持心跳检测与断开处理;
- 可作为设备连接网关中间件的核心组件。
# 3. 负载均衡设计
# 3.1 多层负载均衡
# 3.1.1 DNS负载均衡
# 域名解析到多个IP
iot-gateway.example.com
├── 192.168.1.10 (网关集群1)
├── 192.168.1.11 (网关集群2)
└── 192.168.1.12 (网关集群3)
# 3.1.2 硬件负载均衡器配置
# Nginx配置示例
upstream iot_gateway {
# 一致性哈希,保证设备连接到固定网关
hash $remote_addr consistent;
server 192.168.1.10:8080 weight=1 max_fails=3 fail_timeout=30s;
server 192.168.1.11:8080 weight=1 max_fails=3 fail_timeout=30s;
server 192.168.1.12:8080 weight=1 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
location / {
proxy_pass http://iot_gateway;
proxy_connect_timeout 5s;
proxy_timeout 60s;
}
}
# 3.2 应用层负载均衡
@Component
public class GatewayLoadBalancer {
private final List<GatewayNode> gatewayNodes;
private final ConsistentHash<GatewayNode> consistentHash;
public GatewayNode selectGateway(String deviceId) {
// 使用一致性哈希算法
return consistentHash.get(deviceId);
}
public void addGatewayNode(GatewayNode node) {
gatewayNodes.add(node);
consistentHash.add(node);
}
public void removeGatewayNode(GatewayNode node) {
gatewayNodes.remove(node);
consistentHash.remove(node);
}
}
# 1. 域名解析
iot-gateway.example.com
├── 192.168.1.10 (网关集群1)
├── 192.168.1.11 (网关集群2)
└── 192.168.1.12 (网关集群3)
- 目的:让同一个域名指向多台网关服务器(每台运行 Netty 网关服务)。
- 好处:方便客户端只用一个域名连接,Nginx 负责流量分发。
- 这里的 192.168.1.x 是局域网 IP,生产环境可以替换为公网 IP 或云服务器内网 IP。
# 2. Nginx upstream 配置
upstream iot_gateway {
# 一致性哈希,保证同一客户端IP始终分配到同一台网关
hash $remote_addr consistent;
server 192.168.1.10:8080 weight=1 max_fails=3 fail_timeout=30s;
server 192.168.1.11:8080 weight=1 max_fails=3 fail_timeout=30s;
server 192.168.1.12:8080 weight=1 max_fails=3 fail_timeout=30s;
}
# 关键点:
hash $remote_addr consistent;
使用一致性哈希算法,根据 客户端 IP 分配服务器:- 保证同一个设备(相同 IP)总是连接到同一台网关;
- 适合 长连接场景(IoT、WebSocket、Netty TCP);
consistent
关键字减少节点变动时的迁移影响。
server 192.168.1.x:8080
每台网关服务监听 8080 端口。weight=1
每台服务器权重相等,均衡分配。max_fails=3 fail_timeout=30s
- 如果某个节点在 30 秒内失败 3 次,则临时剔除 30 秒;
- 避免设备频繁连接到故障节点。
# 3. Nginx server 块
server {
listen 80;
location / {
proxy_pass http://iot_gateway;
proxy_connect_timeout 5s;
proxy_timeout 60s;
}
}
# 关键点:
listen 80;
接收客户端 HTTP 请求(如果是 TCP/UDP 则需stream
模块)。proxy_pass http://iot_gateway;
将流量转发给上面定义的upstream
集群。proxy_connect_timeout 5s;
连接上游服务器超时时间,5 秒没连上就失败。proxy_timeout 60s;
与上游服务器保持连接的最大空闲时间。
# 4. ⚠ 注意:Netty TCP 场景下的 Nginx 配置
上面是 HTTP 代理模式,如果你的 IoT 网关是 TCP(非 HTTP),Nginx 要用 stream
块:
stream {
upstream iot_gateway {
hash $remote_addr consistent;
server 192.168.1.10:8080;
server 192.168.1.11:8080;
server 192.168.1.12:8080;
}
server {
listen 9000; # 客户端连接的 TCP 端口
proxy_connect_timeout 5s;
proxy_timeout 600s; # 长连接要设置大一些
proxy_pass iot_gateway;
}
}
这样才能转发原生 TCP 数据(Netty、JT808、MQTT 等)。
# 5. 整体流程图
设备(TCP连接)
│ iot-gateway.example.com:9000
▼
Nginx (一致性哈希)
├── 192.168.1.10:8080 → Netty 网关实例 1
├── 192.168.1.11:8080 → Netty 网关实例 2
└── 192.168.1.12:8080 → Netty 网关实例 3
同一设备 → 固定网关 → 减少重连和会话迁移。
# 4. Kafka集成设计
# 4.1 消息生产者
@Component
public class IoTMessageProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void sendDeviceMessage(String deviceId, Object message) {
try {
// 按设备ID分区,保证消息顺序
kafkaTemplate.send("iot-device-data", deviceId, message)
.addCallback(
result -> log.info("消息发送成功: {}", deviceId),
failure -> log.error("消息发送失败: {}", deviceId, failure)
);
} catch (Exception e) {
// 发送失败时的重试机制
retryMessageSend(deviceId, message);
}
}
private void retryMessageSend(String deviceId, Object message) {
// 实现指数退避重试
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)
.execute(() -> sendDeviceMessage(deviceId, message));
}
}
# 4.2 Kafka配置优化
# Kafka生产者配置
spring:
kafka:
producer:
bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: 1 # 平衡性能和可靠性
retries: 3
batch-size: 16384
linger-ms: 5
buffer-memory: 33554432
compression-type: lz4
# 4.3 Topic设计
# Topic分区策略
iot-device-data:
- partitions: 32 # 根据设备数量调整
- replication-factor: 3
- retention.ms: 604800000 # 7天
iot-device-status:
- partitions: 16
- replication-factor: 3
- retention.ms: 86400000 # 1天
iot-device-alerts:
- partitions: 8
- replication-factor: 3
- retention.ms: 2592000000 # 30天
# 5. 高可用设计
# 5.1 网关集群部署
# Docker Compose示例
version: '3.8'
services:
gateway-1:
image: iot-gateway:latest
ports:
- "8080:8080"
environment:
- GATEWAY_ID=gateway-1
- KAFKA_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092
- REDIS_CLUSTER=redis1:6379,redis2:6379,redis3:6379
deploy:
replicas: 3
resources:
limits:
memory: 2G
cpus: '1.0'
# 一、分布式 IoT 网关架构图
┌─────────────────────────┐
│ IoT 设备 (TCP) │
└──────────────┬──────────┘
│ 域名: iot-gateway.example.com:9000
▼
┌─────────────────────────┐
│ Nginx (stream) │
│ 一致性哈希 hash $remote_addr │
└──────────────┬──────────┘
┌───────────┼───────────┐
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ Netty 网关 #1 │ │ Netty 网关 #2 │ │ Netty 网关 #3 │
│ 192.168.1.10 │ │ 192.168.1.11 │ │ 192.168.1.12 │
└───────┬────────┘ └───────┬────────┘ └───────┬────────┘
│ │ │
▼ ▼ ▼
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ Redis 集群 │←→│ deviceId→网关映射│←→│ 宕机重分配逻辑 │
└────────────────┘ └────────────────┘ └────────────────┘
│
▼
┌────────────────┐
│ Kafka 消息队列 │ ←→ 后端业务处理服务
└────────────────┘
# 二、Nginx stream 配置(TCP 一致性哈希)
stream {
upstream iot_gateway {
hash $remote_addr consistent;
server 192.168.1.10:9000 max_fails=3 fail_timeout=30s;
server 192.168.1.11:9000 max_fails=3 fail_timeout=30s;
server 192.168.1.12:9000 max_fails=3 fail_timeout=30s;
}
server {
listen 9000; # 对外 TCP 接口
proxy_connect_timeout 5s;
proxy_timeout 600s; # 长连接
proxy_pass iot_gateway;
}
}
hash $remote_addr consistent;
保证相同设备 IP 落到同一网关。stream
模式支持 TCP 转发(Netty、MQTT、JT808)。proxy_timeout
设置大一些,保证长连接不被 Nginx 中断。
# 三、Netty 网关 + Redis + Kafka 代码示例
# 1. ConnectionManager.java
@Component
public class ConnectionManager {
private final ConcurrentHashMap<String, Channel> deviceChannels = new ConcurrentHashMap<>();
private final StringRedisTemplate redisTemplate;
@Autowired
public ConnectionManager(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void addConnection(String deviceId, Channel channel, String gatewayIp) {
deviceChannels.put(deviceId, channel);
// 写入 Redis,TTL 设置为 2 分钟心跳自动续期
redisTemplate.opsForValue().set("device:gateway:" + deviceId, gatewayIp, 2, TimeUnit.MINUTES);
// 添加心跳检测
channel.pipeline().addLast(new IdleStateHandler(60, 30, 0));
}
public void removeConnection(String deviceId) {
Channel ch = deviceChannels.remove(deviceId);
if (ch != null) {
redisTemplate.delete("device:gateway:" + deviceId);
ch.close();
}
}
public Optional<String> getGatewayForDevice(String deviceId) {
return Optional.ofNullable(redisTemplate.opsForValue().get("device:gateway:" + deviceId));
}
}
# 2. Netty ChannelInitializer
public class IoTChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ConnectionManager connectionManager;
private final String gatewayIp;
public IoTChannelInitializer(ConnectionManager cm, String gatewayIp) {
this.connectionManager = cm;
this.gatewayIp = gatewayIp;
}
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
p.addLast(new LengthFieldPrepender(2));
p.addLast(new StringDecoder(StandardCharsets.UTF_8));
p.addLast(new StringEncoder(StandardCharsets.UTF_8));
p.addLast(new SimpleChannelInboundHandler<String>() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 假设第一条消息是 deviceId
ctx.channel().attr(AttributeKey.valueOf("deviceId")).set("UNKNOWN");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
// 设备首次上报 ID
if (ctx.channel().attr(AttributeKey.valueOf("deviceId")).get().equals("UNKNOWN")) {
String deviceId = msg.trim();
ctx.channel().attr(AttributeKey.valueOf("deviceId")).set(deviceId);
connectionManager.addConnection(deviceId, ctx.channel(), gatewayIp);
System.out.println("设备上线:" + deviceId + " → " + gatewayIp);
} else {
// 业务数据转 Kafka
kafkaTemplate.send("iot.data", msg);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
String deviceId = (String) ctx.channel().attr(AttributeKey.valueOf("deviceId")).get();
connectionManager.removeConnection(deviceId);
System.out.println("设备下线:" + deviceId);
}
});
}
}
# 四、Redis 数据示例
key | value | TTL |
---|---|---|
device:gateway:ABC123456 | 192.168.1.10 | 120 s |
device:gateway:XYZ987654 | 192.168.1.11 | 120 s |
device:gateway:LMN654321 | 192.168.1.12 | 120 s |
- 设备上线时记录当前连接的网关 IP;
- 网关每次收到心跳包时续期 TTL;
- 网关宕机时,该设备的 Redis Key 不会续期,自动过期后,Nginx 会将连接路由到新的可用网关。
# 五、Kafka 消费示例(后端业务服务)
@KafkaListener(topics = "iot.data", groupId = "iot-processor")
public void processIoTMessage(String message) {
// 处理设备上报的数据
System.out.println("收到 IoT 数据: " + message);
}
# 六、宕机平滑迁移流程
- 设备与网关保持心跳(TTL 续期)。
- 网关宕机 → 心跳停止 → Redis key 过期。
- Nginx 检测到连接断开 → 一致性哈希重新路由到新网关。
- 新网关接管设备连接 → 写入新的
deviceId → gatewayIP
映射。 - Kafka 消息处理不受影响(数据消费是分布式的)。
# 1. 正常情况下(网关没挂)
假设有一台设备,ID 是 ABC123
- 它连接到 网关1 (192.168.1.10)
- 网关1 会在 Redis 里写一条记录:
key = device:gateway:ABC123
value = 192.168.1.10
TTL = 120秒
- 网关1 每收到设备的心跳,就 把这条记录的 TTL 续回 120 秒(就像闹钟一直推迟)。
- Nginx 用 一致性哈希,保证这个设备始终连到 网关1。
# 2. 网关宕机后会发生什么
假设 网关1 突然断电 / 崩溃:
设备的 TCP 连接断了
- 因为网关死了,设备发的数据没人收,连接会超时或直接被操作系统断开。
Redis 里的 key 不再续期
- 原来网关1 每次收到心跳就延长 TTL
- 现在它挂了,所以 TTL 会一直往下掉,直到 120 秒后过期。
设备尝试重新连接
- 设备断开后会按程序设置重连,重新通过域名
iot-gateway.example.com:9000
去连 Nginx。
- 设备断开后会按程序设置重连,重新通过域名
# 3. Nginx 的一致性哈希重新分配
- Nginx 收到设备的重新连接请求时,会用设备的 IP 去算一个哈希值,然后决定转发到哪台网关。
- 由于 网关1 已经宕机,Nginx 会自动把流量转到 网关2 或 网关3。
- 这样,设备就会直接连到新的网关。
# 4. 新网关接管设备
- 新网关(比如网关2)收到设备连接时,会重新在 Redis 写入映射:
key = device:gateway:ABC123
value = 192.168.1.11
TTL = 120秒
- 后端业务(通过 Kafka)就会继续收到来自这个设备的数据,整个过程对业务几乎是无感的。
# 5. 过程总结(小白版)
可以想象成:
- Redis 像一个登记簿,记录“哪个设备在哪个网关”;
- 网关像前台接待,会不断去登记簿上续签(心跳);
- 网关死了 → 不再续签 → 登记过期 → 设备被 Nginx 重新安排到另一个前台(新网关);
- 新网关继续接待,并重新登记。
# 5.2 健康检查
@RestController
public class HealthController {
@Autowired
private ConnectionManager connectionManager;
@GetMapping("/health")
public ResponseEntity<Map<String, Object>> health() {
Map<String, Object> status = new HashMap<>();
status.put("status", "UP");
status.put("connections", connectionManager.getConnectionCount());
status.put("memory", getMemoryUsage());
status.put("timestamp", System.currentTimeMillis());
return ResponseEntity.ok(status);
}
private Map<String, Long> getMemoryUsage() {
Runtime runtime = Runtime.getRuntime();
Map<String, Long> memory = new HashMap<>();
memory.put("total", runtime.totalMemory());
memory.put("free", runtime.freeMemory());
memory.put("used", runtime.totalMemory() - runtime.freeMemory());
return memory;
}
}
# 6. 宕机迁移策略
# 6.1 连接迁移
@Component
public class ConnectionMigrationManager {
@EventListener
public void handleGatewayDown(GatewayDownEvent event) {
String failedGatewayId = event.getGatewayId();
List<String> affectedDevices = getDevicesByGateway(failedGatewayId);
// 重新分配设备连接
for (String deviceId : affectedDevices) {
GatewayNode newGateway = loadBalancer.selectGateway(deviceId);
notifyDeviceReconnect(deviceId, newGateway);
}
}
private void notifyDeviceReconnect(String deviceId, GatewayNode gateway) {
// 通过消息队列通知设备重连
DeviceReconnectMessage message = new DeviceReconnectMessage(
deviceId, gateway.getHost(), gateway.getPort()
);
messageProducer.sendReconnectNotification(message);
}
}
# 6.2 状态恢复
@Component
public class StateRecoveryService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void saveDeviceState(String deviceId, DeviceState state) {
// 保存设备状态到Redis集群
redisTemplate.opsForValue().set(
"device:state:" + deviceId,
state,
Duration.ofHours(24)
);
}
public DeviceState recoverDeviceState(String deviceId) {
return (DeviceState) redisTemplate.opsForValue()
.get("device:state:" + deviceId);
}
}
# 7. 性能优化
# 7.1 JVM调优
# JVM启动参数
java -server \
-Xms4g -Xmx4g \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=200 \
-XX:+UnlockExperimentalVMOptions \
-XX:+UseStringDeduplication \
-XX:+PrintGCDetails \
-XX:+PrintGCTimeStamps \
-Dnetty.leakDetection.level=simple \
-jar iot-gateway.jar
# 7.2 连接池优化
@Configuration
public class NettyConfig {
@Bean
public EventLoopGroup bossGroup() {
return new NioEventLoopGroup(1, new DefaultThreadFactory("boss"));
}
@Bean
public EventLoopGroup workerGroup() {
int threads = Runtime.getRuntime().availableProcessors() * 2;
return new NioEventLoopGroup(threads, new DefaultThreadFactory("worker"));
}
}
# 8. 监控告警
# 8.1 指标收集
@Component
public class MetricsCollector {
private final MeterRegistry meterRegistry;
private final Counter connectionCounter;
private final Gauge connectionGauge;
public MetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.connectionCounter = Counter.builder("iot.connections.total")
.description("Total device connections")
.register(meterRegistry);
this.connectionGauge = Gauge.builder("iot.connections.active")
.description("Active device connections")
.register(meterRegistry, this, MetricsCollector::getActiveConnections);
}
public void incrementConnection() {
connectionCounter.increment();
}
public double getActiveConnections() {
return connectionManager.getConnectionCount();
}
}
# 8.2 告警规则
# Prometheus告警规则
groups:
- name: iot-gateway
rules:
- alert: HighConnectionCount
expr: iot_connections_active > 800000
for: 5m
labels:
severity: warning
annotations:
summary: "IoT网关连接数过高"
- alert: GatewayDown
expr: up{job="iot-gateway"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "IoT网关服务宕机"
# 9. 成本优化策略
# 9.1 资源配置
- CPU: 每个网关实例2-4核心
- 内存: 4-8GB,主要用于连接缓存
- 网络: 千兆网卡,考虑带宽成本
- 存储: SSD用于日志,普通磁盘用于数据
# 9.2 弹性伸缩
# Kubernetes HPA配置
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: iot-gateway-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: iot-gateway
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: active_connections
target:
type: AverageValue
averageValue: "50000"
# 10. 部署架构图
[负载均衡器]
|
+----------------+----------------+
| | |
[网关集群1] [网关集群2] [网关集群3]
(50万连接) (50万连接) (预留扩展)
| | |
+----------------+----------------+
|
[Kafka集群]
/ | \
[业务服务1] [业务服务2] [业务服务3]
| | |
[数据库] [缓存] [存储]
# 总结
这个架构设计能够支持百万级设备连接,具备以下特点:
- 高性能: 基于Netty的异步非阻塞架构
- 高可用: 多层负载均衡和故障转移
- 可扩展: 水平扩展能力和弹性伸缩
- 低成本: 资源优化和成本控制策略
- 可靠性: 消息持久化和状态恢复机制
通过合理的架构设计和优化策略,可以在控制成本的同时实现高性能和高可用的IoT网关系统。