百万台设备Netty网关架构设计

2024/1/15 Netty网关高可用负载均衡Kafka

# 百万台设备Netty网关架构设计

# 目录

# 1. 整体架构概述

# 1.1 架构目标

  • 支持百万级设备并发连接
  • 低成本高性能
  • 高可用性和容错能力
  • 水平扩展能力
  • 消息可靠传输

# 1.2 核心组件

设备层 -> 负载均衡器 -> Netty网关集群 -> Kafka集群 -> 业务服务集群

# 2. Netty网关设计

# 2.1 网关架构

@Component
public class IoTGatewayServer {
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ServerBootstrap bootstrap;
    
    @PostConstruct
    public void start() {
        // 使用Epoll提升性能(Linux环境)
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
        
        bootstrap = new ServerBootstrap()
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childHandler(new IoTChannelInitializer());
    }
}

下面是你提供的 Netty 服务器启动配置的逐行详细注解(中文):

bootstrap = new ServerBootstrap()

创建 ServerBootstrap 实例,是 Netty 服务端启动的核心辅助类,用于配置服务器端的各项参数。

    .group(bossGroup, workerGroup)

设置 主从线程组(EventLoopGroup)

  • bossGroup:负责接收客户端连接请求;
  • workerGroup:负责处理客户端的 I/O 读写操作。
    .channel(NioServerSocketChannel.class)

设置服务器通道类型:

  • 使用基于 NIO 的 NioServerSocketChannel,这是 Netty 为服务器端提供的异步非阻塞通道。
    .option(ChannelOption.SO_BACKLOG, 1024)

设置服务器端套接字的 连接队列大小

  • 对应 Java 原生 ServerSocketChannelbacklog 参数;
  • 表示连接请求的等待队列大小(尚未被 accept 的连接数)最大为 1024。
    .option(ChannelOption.SO_REUSEADDR, true)

允许地址复用:

  • 服务器重启时,允许重新绑定处于 TIME_WAIT 状态的端口;
  • 对于高并发环境可提高端口复用效率。
    .childOption(ChannelOption.TCP_NODELAY, true)

关闭 Nagle 算法,开启小包即时发送:

  • true 表示禁用 Nagle 算法,提升低延迟通信效率;
  • 适用于实时性要求较高的应用场景(如物联网设备)。
    .childOption(ChannelOption.SO_KEEPALIVE, true)

开启 TCP 心跳机制(保活):

  • 用于检测长时间空闲连接是否存活;
  • 防止客户端突然掉线服务端长期不知。
    .childHandler(new IoTChannelInitializer());

设置子通道的初始化器:

  • 每当有新客户端连接时,IoTChannelInitializer 会为这个连接初始化 pipeline(添加编码器、解码器、业务处理器等);
  • 是处理每个 SocketChannel 的关键入口。

# 2.2 连接管理

@Component
public class ConnectionManager {
    // 使用ConcurrentHashMap管理连接
    private final ConcurrentHashMap<String, Channel> deviceChannels = new ConcurrentHashMap<>();
    private final AtomicLong connectionCount = new AtomicLong(0);
    
    public void addConnection(String deviceId, Channel channel) {
        deviceChannels.put(deviceId, channel);
        connectionCount.incrementAndGet();
        
        // 连接心跳检测
        channel.pipeline().addLast(new IdleStateHandler(60, 30, 0));
    }
    
    public void removeConnection(String deviceId) {
        Channel channel = deviceChannels.remove(deviceId);
        if (channel != null) {
            connectionCount.decrementAndGet();
            channel.close();
        }
    }
    
    public long getConnectionCount() {
        return connectionCount.get();
    }
}
@Component
public class ConnectionManager {

声明一个 Spring Bean:

  • 使用 @Component 注解表示该类由 Spring 容器自动管理;
  • 常用于统一管理设备连接。

    private final ConcurrentHashMap<String, Channel> deviceChannels = new ConcurrentHashMap<>();

使用线程安全的 ConcurrentHashMap 存储设备连接:

  • key: 设备唯一标识(如 deviceId);
  • value: 对应的 Netty Channel,表示该设备的连接通道;
  • 便于实现设备连接状态的快速查找、管理与转发。

    private final AtomicLong connectionCount = new AtomicLong(0);

使用 AtomicLong 统计当前连接数量,支持高并发下的线程安全自增/自减操作。


    public void addConnection(String deviceId, Channel channel) {
        deviceChannels.put(deviceId, channel);
        connectionCount.incrementAndGet();

添加新连接:

  • 将设备与其连接的 Channel 建立映射;
  • 连接计数加 1;

        channel.pipeline().addLast(new IdleStateHandler(60, 30, 0));

为当前连接添加 心跳检测处理器

  • readerIdleTime = 60 秒:60 秒内未读取数据则触发 READER_IDLE
  • writerIdleTime = 30 秒:30 秒内未写入数据则触发 WRITER_IDLE
  • allIdleTime = 0:关闭 ALL_IDLE
  • 用于检测连接空闲状态,避免死连接或长时间不通信的设备;

    public void removeConnection(String deviceId) {
        Channel channel = deviceChannels.remove(deviceId);

移除连接:

  • deviceChannels 映射中移除该设备对应的 Channel;
  • 如果存在该连接则执行以下操作:

        if (channel != null) {
            connectionCount.decrementAndGet();
            channel.close();
        }
  • 连接计数减 1;
  • 主动关闭对应的 Channel(断开连接);

    public long getConnectionCount() {
        return connectionCount.get();
    }

获取当前连接数(用于监控系统负载、统计连接活跃度等);


# ✅ 总结功能:

  • 支持百万连接高并发安全管理;
  • 提供设备 → Channel 快速映射能力;
  • 支持连接数统计;
  • 支持心跳检测与断开处理;
  • 可作为设备连接网关中间件的核心组件。

# 3. 负载均衡设计

# 3.1 多层负载均衡

# 3.1.1 DNS负载均衡

# 域名解析到多个IP
iot-gateway.example.com
├── 192.168.1.10 (网关集群1)
├── 192.168.1.11 (网关集群2)
└── 192.168.1.12 (网关集群3)

# 3.1.2 硬件负载均衡器配置

# Nginx配置示例
upstream iot_gateway {
    # 一致性哈希,保证设备连接到固定网关
    hash $remote_addr consistent;
    
    server 192.168.1.10:8080 weight=1 max_fails=3 fail_timeout=30s;
    server 192.168.1.11:8080 weight=1 max_fails=3 fail_timeout=30s;
    server 192.168.1.12:8080 weight=1 max_fails=3 fail_timeout=30s;
}

server {
    listen 80;
    location / {
        proxy_pass http://iot_gateway;
        proxy_connect_timeout 5s;
        proxy_timeout 60s;
    }
}

# 3.2 应用层负载均衡

@Component
public class GatewayLoadBalancer {
    private final List<GatewayNode> gatewayNodes;
    private final ConsistentHash<GatewayNode> consistentHash;
    
    public GatewayNode selectGateway(String deviceId) {
        // 使用一致性哈希算法
        return consistentHash.get(deviceId);
    }
    
    public void addGatewayNode(GatewayNode node) {
        gatewayNodes.add(node);
        consistentHash.add(node);
    }
    
    public void removeGatewayNode(GatewayNode node) {
        gatewayNodes.remove(node);
        consistentHash.remove(node);
    }
}

# 1. 域名解析

iot-gateway.example.com
├── 192.168.1.10 (网关集群1)
├── 192.168.1.11 (网关集群2)
└── 192.168.1.12 (网关集群3)
  • 目的:让同一个域名指向多台网关服务器(每台运行 Netty 网关服务)。
  • 好处:方便客户端只用一个域名连接,Nginx 负责流量分发。
  • 这里的 192.168.1.x 是局域网 IP,生产环境可以替换为公网 IP 或云服务器内网 IP。

# 2. Nginx upstream 配置

upstream iot_gateway {
    # 一致性哈希,保证同一客户端IP始终分配到同一台网关
    hash $remote_addr consistent;
    
    server 192.168.1.10:8080 weight=1 max_fails=3 fail_timeout=30s;
    server 192.168.1.11:8080 weight=1 max_fails=3 fail_timeout=30s;
    server 192.168.1.12:8080 weight=1 max_fails=3 fail_timeout=30s;
}

# 关键点:

  • hash $remote_addr consistent; 使用一致性哈希算法,根据 客户端 IP 分配服务器:

    • 保证同一个设备(相同 IP)总是连接到同一台网关;
    • 适合 长连接场景(IoT、WebSocket、Netty TCP);
    • consistent 关键字减少节点变动时的迁移影响。
  • server 192.168.1.x:8080 每台网关服务监听 8080 端口。

  • weight=1 每台服务器权重相等,均衡分配。

  • max_fails=3 fail_timeout=30s

    • 如果某个节点在 30 秒内失败 3 次,则临时剔除 30 秒;
    • 避免设备频繁连接到故障节点。

# 3. Nginx server 块

server {
    listen 80;
    location / {
        proxy_pass http://iot_gateway;
        proxy_connect_timeout 5s;
        proxy_timeout 60s;
    }
}

# 关键点:

  • listen 80; 接收客户端 HTTP 请求(如果是 TCP/UDP 则需 stream 模块)。

  • proxy_pass http://iot_gateway; 将流量转发给上面定义的 upstream 集群。

  • proxy_connect_timeout 5s; 连接上游服务器超时时间,5 秒没连上就失败。

  • proxy_timeout 60s; 与上游服务器保持连接的最大空闲时间。


# 4. ⚠ 注意:Netty TCP 场景下的 Nginx 配置

上面是 HTTP 代理模式,如果你的 IoT 网关是 TCP(非 HTTP),Nginx 要用 stream 块:

stream {
    upstream iot_gateway {
        hash $remote_addr consistent;
        server 192.168.1.10:8080;
        server 192.168.1.11:8080;
        server 192.168.1.12:8080;
    }

    server {
        listen 9000; # 客户端连接的 TCP 端口
        proxy_connect_timeout 5s;
        proxy_timeout 600s; # 长连接要设置大一些
        proxy_pass iot_gateway;
    }
}

这样才能转发原生 TCP 数据(Netty、JT808、MQTT 等)。


# 5. 整体流程图

设备(TCP连接)
     │  iot-gateway.example.com:9000
     ▼
  Nginx (一致性哈希)
     ├── 192.168.1.10:8080 → Netty 网关实例 1
     ├── 192.168.1.11:8080 → Netty 网关实例 2
     └── 192.168.1.12:8080 → Netty 网关实例 3

同一设备 → 固定网关 → 减少重连和会话迁移。

# 4. Kafka集成设计

# 4.1 消息生产者

@Component
public class IoTMessageProducer {
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    public void sendDeviceMessage(String deviceId, Object message) {
        try {
            // 按设备ID分区,保证消息顺序
            kafkaTemplate.send("iot-device-data", deviceId, message)
                .addCallback(
                    result -> log.info("消息发送成功: {}", deviceId),
                    failure -> log.error("消息发送失败: {}", deviceId, failure)
                );
        } catch (Exception e) {
            // 发送失败时的重试机制
            retryMessageSend(deviceId, message);
        }
    }
    
    private void retryMessageSend(String deviceId, Object message) {
        // 实现指数退避重试
        CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)
            .execute(() -> sendDeviceMessage(deviceId, message));
    }
}

# 4.2 Kafka配置优化

# Kafka生产者配置
spring:
  kafka:
    producer:
      bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: 1  # 平衡性能和可靠性
      retries: 3
      batch-size: 16384
      linger-ms: 5
      buffer-memory: 33554432
      compression-type: lz4

# 4.3 Topic设计

# Topic分区策略
iot-device-data:
  - partitions: 32  # 根据设备数量调整
  - replication-factor: 3
  - retention.ms: 604800000  # 7天

iot-device-status:
  - partitions: 16
  - replication-factor: 3
  - retention.ms: 86400000   # 1天

iot-device-alerts:
  - partitions: 8
  - replication-factor: 3
  - retention.ms: 2592000000 # 30天

# 5. 高可用设计

# 5.1 网关集群部署

# Docker Compose示例
version: '3.8'
services:
  gateway-1:
    image: iot-gateway:latest
    ports:
      - "8080:8080"
    environment:
      - GATEWAY_ID=gateway-1
      - KAFKA_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092
      - REDIS_CLUSTER=redis1:6379,redis2:6379,redis3:6379
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 2G
          cpus: '1.0'

# 一、分布式 IoT 网关架构图

                ┌─────────────────────────┐
                │      IoT 设备 (TCP)      │
                └──────────────┬──────────┘
                               │ 域名: iot-gateway.example.com:9000
                               ▼
                  ┌─────────────────────────┐
                  │      Nginx (stream)      │
                  │ 一致性哈希 hash $remote_addr │
                  └──────────────┬──────────┘
                     ┌───────────┼───────────┐
                     ▼           ▼           ▼
       ┌────────────────┐  ┌────────────────┐  ┌────────────────┐
       │  Netty 网关 #1 │  │  Netty 网关 #2 │  │  Netty 网关 #3 │
       │ 192.168.1.10   │  │ 192.168.1.11   │  │ 192.168.1.12   │
       └───────┬────────┘  └───────┬────────┘  └───────┬────────┘
               │                   │                   │
               ▼                   ▼                   ▼
      ┌────────────────┐   ┌────────────────┐   ┌────────────────┐
      │ Redis 集群      │←→│ deviceId→网关映射│←→│ 宕机重分配逻辑 │
      └────────────────┘   └────────────────┘   └────────────────┘
               │
               ▼
      ┌────────────────┐
      │ Kafka 消息队列 │ ←→ 后端业务处理服务
      └────────────────┘

# 二、Nginx stream 配置(TCP 一致性哈希)

stream {
    upstream iot_gateway {
        hash $remote_addr consistent;
        server 192.168.1.10:9000 max_fails=3 fail_timeout=30s;
        server 192.168.1.11:9000 max_fails=3 fail_timeout=30s;
        server 192.168.1.12:9000 max_fails=3 fail_timeout=30s;
    }

    server {
        listen 9000; # 对外 TCP 接口
        proxy_connect_timeout 5s;
        proxy_timeout 600s;  # 长连接
        proxy_pass iot_gateway;
    }
}
  • hash $remote_addr consistent; 保证相同设备 IP 落到同一网关。
  • stream 模式支持 TCP 转发(Netty、MQTT、JT808)。
  • proxy_timeout 设置大一些,保证长连接不被 Nginx 中断。

# 三、Netty 网关 + Redis + Kafka 代码示例

# 1. ConnectionManager.java

@Component
public class ConnectionManager {
    private final ConcurrentHashMap<String, Channel> deviceChannels = new ConcurrentHashMap<>();
    private final StringRedisTemplate redisTemplate;

    @Autowired
    public ConnectionManager(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public void addConnection(String deviceId, Channel channel, String gatewayIp) {
        deviceChannels.put(deviceId, channel);
        // 写入 Redis,TTL 设置为 2 分钟心跳自动续期
        redisTemplate.opsForValue().set("device:gateway:" + deviceId, gatewayIp, 2, TimeUnit.MINUTES);
        // 添加心跳检测
        channel.pipeline().addLast(new IdleStateHandler(60, 30, 0));
    }

    public void removeConnection(String deviceId) {
        Channel ch = deviceChannels.remove(deviceId);
        if (ch != null) {
            redisTemplate.delete("device:gateway:" + deviceId);
            ch.close();
        }
    }

    public Optional<String> getGatewayForDevice(String deviceId) {
        return Optional.ofNullable(redisTemplate.opsForValue().get("device:gateway:" + deviceId));
    }
}

# 2. Netty ChannelInitializer

public class IoTChannelInitializer extends ChannelInitializer<SocketChannel> {
    private final ConnectionManager connectionManager;
    private final String gatewayIp;

    public IoTChannelInitializer(ConnectionManager cm, String gatewayIp) {
        this.connectionManager = cm;
        this.gatewayIp = gatewayIp;
    }

    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
        p.addLast(new LengthFieldPrepender(2));
        p.addLast(new StringDecoder(StandardCharsets.UTF_8));
        p.addLast(new StringEncoder(StandardCharsets.UTF_8));
        p.addLast(new SimpleChannelInboundHandler<String>() {
            @Override
            public void channelActive(ChannelHandlerContext ctx) {
                // 假设第一条消息是 deviceId
                ctx.channel().attr(AttributeKey.valueOf("deviceId")).set("UNKNOWN");
            }

            @Override
            protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                // 设备首次上报 ID
                if (ctx.channel().attr(AttributeKey.valueOf("deviceId")).get().equals("UNKNOWN")) {
                    String deviceId = msg.trim();
                    ctx.channel().attr(AttributeKey.valueOf("deviceId")).set(deviceId);
                    connectionManager.addConnection(deviceId, ctx.channel(), gatewayIp);
                    System.out.println("设备上线:" + deviceId + " → " + gatewayIp);
                } else {
                    // 业务数据转 Kafka
                    kafkaTemplate.send("iot.data", msg);
                }
            }

            @Override
            public void channelInactive(ChannelHandlerContext ctx) {
                String deviceId = (String) ctx.channel().attr(AttributeKey.valueOf("deviceId")).get();
                connectionManager.removeConnection(deviceId);
                System.out.println("设备下线:" + deviceId);
            }
        });
    }
}

# 四、Redis 数据示例

key value TTL
device:gateway:ABC123456 192.168.1.10 120 s
device:gateway:XYZ987654 192.168.1.11 120 s
device:gateway:LMN654321 192.168.1.12 120 s
  • 设备上线时记录当前连接的网关 IP;
  • 网关每次收到心跳包时续期 TTL;
  • 网关宕机时,该设备的 Redis Key 不会续期,自动过期后,Nginx 会将连接路由到新的可用网关。

# 五、Kafka 消费示例(后端业务服务)

@KafkaListener(topics = "iot.data", groupId = "iot-processor")
public void processIoTMessage(String message) {
    // 处理设备上报的数据
    System.out.println("收到 IoT 数据: " + message);
}

# 六、宕机平滑迁移流程

  1. 设备与网关保持心跳(TTL 续期)。
  2. 网关宕机 → 心跳停止 → Redis key 过期。
  3. Nginx 检测到连接断开 → 一致性哈希重新路由到新网关。
  4. 新网关接管设备连接 → 写入新的 deviceId → gatewayIP 映射。
  5. Kafka 消息处理不受影响(数据消费是分布式的)。

# 1. 正常情况下(网关没挂)

假设有一台设备,ID 是 ABC123

  • 它连接到 网关1 (192.168.1.10)
  • 网关1 会在 Redis 里写一条记录:
key   = device:gateway:ABC123
value = 192.168.1.10
TTL   = 120秒
  • 网关1 每收到设备的心跳,就 把这条记录的 TTL 续回 120 秒(就像闹钟一直推迟)。
  • Nginx 用 一致性哈希,保证这个设备始终连到 网关1

# 2. 网关宕机后会发生什么

假设 网关1 突然断电 / 崩溃

  1. 设备的 TCP 连接断了

    • 因为网关死了,设备发的数据没人收,连接会超时或直接被操作系统断开。
  2. Redis 里的 key 不再续期

    • 原来网关1 每次收到心跳就延长 TTL
    • 现在它挂了,所以 TTL 会一直往下掉,直到 120 秒后过期
  3. 设备尝试重新连接

    • 设备断开后会按程序设置重连,重新通过域名 iot-gateway.example.com:9000 去连 Nginx。

# 3. Nginx 的一致性哈希重新分配

  • Nginx 收到设备的重新连接请求时,会用设备的 IP 去算一个哈希值,然后决定转发到哪台网关。
  • 由于 网关1 已经宕机,Nginx 会自动把流量转到 网关2 或 网关3
  • 这样,设备就会直接连到新的网关。

# 4. 新网关接管设备

  • 新网关(比如网关2)收到设备连接时,会重新在 Redis 写入映射
key   = device:gateway:ABC123
value = 192.168.1.11
TTL   = 120秒
  • 后端业务(通过 Kafka)就会继续收到来自这个设备的数据,整个过程对业务几乎是无感的。

# 5. 过程总结(小白版)

可以想象成:

  1. Redis 像一个登记簿,记录“哪个设备在哪个网关”;
  2. 网关像前台接待,会不断去登记簿上续签(心跳);
  3. 网关死了 → 不再续签 → 登记过期 → 设备被 Nginx 重新安排到另一个前台(新网关);
  4. 新网关继续接待,并重新登记。

# 5.2 健康检查

@RestController
public class HealthController {
    
    @Autowired
    private ConnectionManager connectionManager;
    
    @GetMapping("/health")
    public ResponseEntity<Map<String, Object>> health() {
        Map<String, Object> status = new HashMap<>();
        status.put("status", "UP");
        status.put("connections", connectionManager.getConnectionCount());
        status.put("memory", getMemoryUsage());
        status.put("timestamp", System.currentTimeMillis());
        
        return ResponseEntity.ok(status);
    }
    
    private Map<String, Long> getMemoryUsage() {
        Runtime runtime = Runtime.getRuntime();
        Map<String, Long> memory = new HashMap<>();
        memory.put("total", runtime.totalMemory());
        memory.put("free", runtime.freeMemory());
        memory.put("used", runtime.totalMemory() - runtime.freeMemory());
        return memory;
    }
}

# 6. 宕机迁移策略

# 6.1 连接迁移

@Component
public class ConnectionMigrationManager {
    
    @EventListener
    public void handleGatewayDown(GatewayDownEvent event) {
        String failedGatewayId = event.getGatewayId();
        List<String> affectedDevices = getDevicesByGateway(failedGatewayId);
        
        // 重新分配设备连接
        for (String deviceId : affectedDevices) {
            GatewayNode newGateway = loadBalancer.selectGateway(deviceId);
            notifyDeviceReconnect(deviceId, newGateway);
        }
    }
    
    private void notifyDeviceReconnect(String deviceId, GatewayNode gateway) {
        // 通过消息队列通知设备重连
        DeviceReconnectMessage message = new DeviceReconnectMessage(
            deviceId, gateway.getHost(), gateway.getPort()
        );
        messageProducer.sendReconnectNotification(message);
    }
}

# 6.2 状态恢复

@Component
public class StateRecoveryService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public void saveDeviceState(String deviceId, DeviceState state) {
        // 保存设备状态到Redis集群
        redisTemplate.opsForValue().set(
            "device:state:" + deviceId, 
            state, 
            Duration.ofHours(24)
        );
    }
    
    public DeviceState recoverDeviceState(String deviceId) {
        return (DeviceState) redisTemplate.opsForValue()
            .get("device:state:" + deviceId);
    }
}

# 7. 性能优化

# 7.1 JVM调优

# JVM启动参数
java -server \
  -Xms4g -Xmx4g \
  -XX:+UseG1GC \
  -XX:MaxGCPauseMillis=200 \
  -XX:+UnlockExperimentalVMOptions \
  -XX:+UseStringDeduplication \
  -XX:+PrintGCDetails \
  -XX:+PrintGCTimeStamps \
  -Dnetty.leakDetection.level=simple \
  -jar iot-gateway.jar

# 7.2 连接池优化

@Configuration
public class NettyConfig {
    
    @Bean
    public EventLoopGroup bossGroup() {
        return new NioEventLoopGroup(1, new DefaultThreadFactory("boss"));
    }
    
    @Bean
    public EventLoopGroup workerGroup() {
        int threads = Runtime.getRuntime().availableProcessors() * 2;
        return new NioEventLoopGroup(threads, new DefaultThreadFactory("worker"));
    }
}

# 8. 监控告警

# 8.1 指标收集

@Component
public class MetricsCollector {
    
    private final MeterRegistry meterRegistry;
    private final Counter connectionCounter;
    private final Gauge connectionGauge;
    
    public MetricsCollector(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.connectionCounter = Counter.builder("iot.connections.total")
            .description("Total device connections")
            .register(meterRegistry);
        this.connectionGauge = Gauge.builder("iot.connections.active")
            .description("Active device connections")
            .register(meterRegistry, this, MetricsCollector::getActiveConnections);
    }
    
    public void incrementConnection() {
        connectionCounter.increment();
    }
    
    public double getActiveConnections() {
        return connectionManager.getConnectionCount();
    }
}

# 8.2 告警规则

# Prometheus告警规则
groups:
- name: iot-gateway
  rules:
  - alert: HighConnectionCount
    expr: iot_connections_active > 800000
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "IoT网关连接数过高"
      
  - alert: GatewayDown
    expr: up{job="iot-gateway"} == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "IoT网关服务宕机"

# 9. 成本优化策略

# 9.1 资源配置

  • CPU: 每个网关实例2-4核心
  • 内存: 4-8GB,主要用于连接缓存
  • 网络: 千兆网卡,考虑带宽成本
  • 存储: SSD用于日志,普通磁盘用于数据

# 9.2 弹性伸缩

# Kubernetes HPA配置
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: iot-gateway-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: iot-gateway
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: active_connections
      target:
        type: AverageValue
        averageValue: "50000"

# 10. 部署架构图

                    [负载均衡器]
                         |
        +----------------+----------------+
        |                |                |
   [网关集群1]      [网关集群2]      [网关集群3]
   (50万连接)       (50万连接)       (预留扩展)
        |                |                |
        +----------------+----------------+
                         |
                   [Kafka集群]
                  /      |      \
            [业务服务1] [业务服务2] [业务服务3]
                 |        |        |
               [数据库] [缓存]   [存储]

# 总结

这个架构设计能够支持百万级设备连接,具备以下特点:

  1. 高性能: 基于Netty的异步非阻塞架构
  2. 高可用: 多层负载均衡和故障转移
  3. 可扩展: 水平扩展能力和弹性伸缩
  4. 低成本: 资源优化和成本控制策略
  5. 可靠性: 消息持久化和状态恢复机制

通过合理的架构设计和优化策略,可以在控制成本的同时实现高性能和高可用的IoT网关系统。