小米实战过亿设备Netty网关架构

# 小米实战过亿设备Netty网关架构

为了支撑百万台设备连接且具备高可用、低成本、高性能的特点,可以采用如下 Netty 网关架构方案,结合 Kafka 与 Redis 实现业务解耦与故障恢复:


# 🧩 架构概览图(简述)

            ┌─────────────┐
            │   设备终端   │
            └────┬────────┘
                 │ TCP 连接
         ┌───────▼────────────┐
         │   LVS/Nginx负载均衡  │   ← 基于 IP Hash / Consistent Hash(deviceId)
         └───────┬────────────┘
                 │
       ┌─────────▼────────┐
       │ 多台Netty接入网关 │  ← 每台管理部分 Channel(支持百万连接)
       └────────┬─────────┘
                │内存Channel管理、心跳机制
                │ deviceId → channelId
                │ Redis → deviceId → 网关host
                │
         ┌──────▼────────┐
         │ Kafka 消息队列 │  ← 网关发布消息
         └──────┬────────┘
                │
         ┌──────▼───────────┐
         │ 业务微服务/目标网关 │ ← 消费 Kafka 进行业务处理
         └──────────────────┘

# ☑️ 核心模块设计

# 1. Netty接入网关设计(Spring Boot + Netty)

  • TCP Server + 多Reactor线程池 + 异步事件驱动

  • ChannelManager

    • deviceId → Channel 映射(使用 ConcurrentHashMap 管理,或 Redis 扩展)
    • channelId → deviceId 映射(反向)
  • 心跳机制 + IdleStateHandler

    • 主动检测连接是否超时
    • Redis 定期更新 deviceId 对应 host
  • 设备首次连接流程

    1. 设备上报注册包(如 JT808 消息 0x0100)
    2. 服务端解码 → 校验设备号 → 保存 channel
    3. 返回注册成功应答(0x8100)

# 2. 分布式连接状态管理

  • 使用 Redis 存储:

    device_gateway:{deviceId} → 网关 IP(或 host 名)
    
    • 定期续期(例如心跳中 60s 刷一次 TTL)
    • 若 TTL 过期,视为设备掉线
  • 保证主机挂掉后,其他服务器可以重新接管

# 3. Kafka 消息分发

  • 网关只负责接收并解析协议 → 发布 Kafka topic

  • 不处理业务逻辑(解耦、高并发)

  • 消息结构统一格式:

    {
      "deviceId": "XXXX",
      "type": "JT808_0200",
      "payload": {...},
      "timestamp": 1690000000
    }
    

# 4. 高可用与宕机自动迁移

  • 多台网关机器 + Nginx / LVS 层负载均衡

  • 若某台机器挂掉:

    • Redis 中 key 会过期
    • 设备自动重连(长连接断开) → 进入新网关
    • 新网关注册新 Channel,并更新 Redis 映射

# 💾 一台机器可连接多少设备?

  • 通常可支持:10~15万 TCP 长连接

  • 依赖:

    • ulimit 设置(如 nofile=1,000,000
    • JVM 内存(堆外 Netty DirectBuffer)
    • epoll 模型(Linux 上)
  • 可按需横向扩展(10台服务器支持百万连接)


# ✅ 总结:百万设备接入设计建议

模块 建议
网关 Spring Boot + Netty,解耦业务,解析 JT808 协议
通信协议 自定义协议或 JT808,解码+编码模块隔离
连接状态 内存 Channel + Redis 冗余备份
宕机恢复 心跳过期剔除、设备断线重连接入新的网关
负载均衡 IP Hash / deviceId Hash(保障设备连接一致性)
消息分发 Kafka 作为异步消息总线,保证高吞吐
运维监控 指标采集(如连接数、空闲连接、心跳异常) + 日志追踪 + 告警

  • NettyServer 初始化与解码器
  • ChannelManager 实现
  • JT808Decoder/Encoder
  • Kafka 生产者封装
  • Redis TTL 心跳续期任务
  • 消息结构定义与发布流程

狠活儿来了⚙️——这是一个可跑的 Spring Boot + Netty 接入网关模板,带上 Redis(连接映射 & 心跳续期)Kafka(消息分发)。你拿过去改下协议解码就能用。

# 目录结构

iot-gateway/
├─ pom.xml
├─ src/main/java/com/example/gw/
│  ├─ GatewayApplication.java
│  ├─ config/
│  │  ├─ NettyProperties.java
│  │  ├─ KafkaConfig.java
│  │  └─ RedisConfig.java
│  ├─ netty/
│  │  ├─ NettyServer.java
│  │  ├─ initializer/GatewayInitializer.java
│  │  ├─ codec/FrameDecoder.java
│  │  ├─ codec/MessageDecoder.java
│  │  ├─ codec/MessageEncoder.java
│  │  └─ handler/
│  │     ├─ AuthRegisterHandler.java
│  │     ├─ HeartbeatHandler.java
│  │     └─ BusinessDispatcherHandler.java
│  ├─ service/
│  │  ├─ ChannelManager.java
│  │  ├─ DeviceRouterService.java
│  │  └─ HeartbeatRenewService.java
│  ├─ mq/KafkaProducer.java
│  ├─ model/
│  │  ├─ DeviceMessage.java
│  │  └─ Consts.java
│  └─ util/Bytes.java
└─ src/main/resources/
   ├─ application.yml
   └─ logback-spring.xml

# pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0  http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>iot-gateway</artifactId>
  <version>1.0.0</version>
  <properties>
    <java.version>17</java.version>
    <spring.boot.version>3.3.1</spring.boot.version>
    <netty.version>4.1.110.Final</netty.version>
  </properties>
  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-dependencies</artifactId>
        <version>${spring.boot.version}</version>
        <type>pom</type><scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>
  <dependencies>
    <!-- Spring Boot -->
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency>
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency>

    <!-- Netty -->
    <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>${netty.version}</version></dependency>

    <!-- Kafka -->
    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

    <!-- Redis -->
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

    <!-- utils -->
    <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
    <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin>
      <plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId>
        <configuration><source>${java.version}</source><target>${java.version}</target></configuration>
      </plugin>
    </plugins>
  </build>
</project>

# application.yml

server:
  port: 8080

netty:
  port: 7001
  bossThreads: 1
  workerThreads: 8
  backlog: 1024
  soRcvbuf: 1048576
  soSndbuf: 1048576
  soReuseaddr: true
  soKeepalive: true
  writeBufWaterMarkLow: 33554432    # 32MB
  writeBufWaterMarkHigh: 67108864   # 64MB
  idleReaderSeconds: 90

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      acks: all
      retries: 3
      linger-ms: 5
      batch-size: 32768
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

  data:
    redis:
      host: 127.0.0.1
      port: 6379
      lettuce:
        pool:
          max-active: 64
          max-idle: 32
          min-idle: 8

gateway:
  kafkaTopic: device.upstream
  redisDeviceKeyPrefix: device_gateway:
  host: ${HOSTNAME:gw-1}   # 当前网关标识(或 IP)

logging:
  level:
    root: info
    com.example.gw: info

# GatewayApplication.java

package com.example.gw;

import com.example.gw.netty.NettyServer;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import jakarta.annotation.PostConstruct;

@SpringBootApplication
@RequiredArgsConstructor
public class GatewayApplication {
    private final NettyServer nettyServer;

    public static void main(String[] args) {
        // 重要:提升文件句柄数(Linux: ulimit -n 1000000)
        SpringApplication.run(GatewayApplication.class, args);
    }

    @PostConstruct
    public void startNetty() {
        nettyServer.start();
    }
}

# config/NettyProperties.java

package com.example.gw.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "netty")
public class NettyProperties {
  private int port;
  private int bossThreads;
  private int workerThreads;
  private int backlog;
  private int soRcvbuf;
  private int soSndbuf;
  private boolean soReuseaddr;
  private boolean soKeepalive;
  private int writeBufWaterMarkLow;
  private int writeBufWaterMarkHigh;
  private int idleReaderSeconds;
}

# config/RedisConfig.java

package com.example.gw.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;

@Configuration
public class RedisConfig {
  @Bean
  public StringRedisTemplate stringRedisTemplate(LettuceConnectionFactory factory) {
    return new StringRedisTemplate(factory);
  }
}

# config/KafkaConfig.java

package com.example.gw.config;

import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfig {
  // Spring Boot autoconfig 已足够;如需拦截器、分区器可在此扩展
}

# netty/NettyServer.java

package com.example.gw.netty;

import com.example.gw.config.NettyProperties;
import com.example.gw.netty.initializer.GatewayInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ResourceLeakDetector;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class NettyServer {

  private final NettyProperties props;
  private final GatewayInitializer initializer;

  public void start() {
    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);

    boolean epoll = Epoll.isAvailable();
    EventLoopGroup boss = epoll ? new EpollEventLoopGroup(props.getBossThreads())
                                : new NioEventLoopGroup(props.getBossThreads());
    EventLoopGroup worker = epoll ? new EpollEventLoopGroup(props.getWorkerThreads())
                                  : new NioEventLoopGroup(props.getWorkerThreads());

    try {
      ServerBootstrap b = new ServerBootstrap()
          .group(boss, worker)
          .channel(epoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, props.getBacklog())
          .option(ChannelOption.SO_REUSEADDR, props.isSoReuseaddr())
          .childOption(ChannelOption.SO_KEEPALIVE, props.isSoKeepalive())
          .childOption(ChannelOption.SO_RCVBUF, props.getSoRcvbuf())
          .childOption(ChannelOption.SO_SNDBUF, props.getSoSndbuf())
          .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
              new WriteBufferWaterMark(props.getWriteBufWaterMarkLow(), props.getWriteBufWaterMarkHigh()))
          .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
          .childHandler(initializer);

      ChannelFuture f = b.bind(props.getPort()).sync();
      log.info("Netty gateway started on port {}", props.getPort());
      f.channel().closeFuture().addListener(cf -> {
        boss.shutdownGracefully();
        worker.shutdownGracefully();
      });
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      log.error("Netty start interrupted", e);
    } catch (Exception e) {
      log.error("Netty start failed", e);
      boss.shutdownGracefully();
      worker.shutdownGracefully();
    }
  }
}

# initializer/GatewayInitializer.java

package com.example.gw.netty.initializer;

import com.example.gw.config.NettyProperties;
import com.example.gw.netty.codec.FrameDecoder;
import com.example.gw.netty.codec.MessageDecoder;
import com.example.gw.netty.codec.MessageEncoder;
import com.example.gw.netty.handler.AuthRegisterHandler;
import com.example.gw.netty.handler.HeartbeatHandler;
import com.example.gw.netty.handler.BusinessDispatcherHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class GatewayInitializer extends ChannelInitializer<SocketChannel> {

  private final NettyProperties props;
  private final AuthRegisterHandler authRegisterHandler;
  private final HeartbeatHandler heartbeatHandler;
  private final BusinessDispatcherHandler dispatcherHandler;

  @Override
  protected void initChannel(SocketChannel ch) {
    ch.pipeline()
      // 拆包:根据协议做帧界定(示例长度前缀/分隔符/固定头尾,自选)
      .addLast("frameDecoder", new FrameDecoder())
      // 协议解码(把ByteBuf→DeviceMessage)
      .addLast("msgDecoder", new MessageDecoder())
      // 协议编码(下行时DeviceMessage→ByteBuf)
      .addLast("msgEncoder", new MessageEncoder())
      // 空闲检测(读空闲触发心跳处理/断开)
      .addLast("idle", new IdleStateHandler(props.getIdleReaderSeconds(), 0, 0))
      // 注册鉴权(首包携带deviceId等)
      .addLast("auth", authRegisterHandler)
      // 心跳
      .addLast("hb", heartbeatHandler)
      // 分发到Kafka
      .addLast("dispatcher", dispatcherHandler);
  }
}

# codec/FrameDecoder.java(示例:简单定界或长度前缀,自己换成 JT808 也行)

package com.example.gw.netty.codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/** 示例:长度前缀(2字节) + body */
public class FrameDecoder extends ByteToMessageDecoder {
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 2) return;
    in.markReaderIndex();
    int len = in.readUnsignedShort();
    if (in.readableBytes() < len) {
      in.resetReaderIndex();
      return;
    }
    ByteBuf frame = in.readRetainedSlice(len);
    out.add(frame);
  }
}

# codec/MessageDecoder.java(把帧→业务对象)

package com.example.gw.netty.codec;

import com.example.gw.model.DeviceMessage;
import com.example.gw.util.Bytes;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

import java.util.List;

/** Demo 协议:| devLen(1) | devId(N) | msgType(1) | payload(剩余) | */
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
    int devLen = buf.readUnsignedByte();
    byte[] devBytes = new byte[devLen];
    buf.readBytes(devBytes);
    String deviceId = new String(devBytes);

    int msgType = buf.readUnsignedByte();
    byte[] payload = new byte[buf.readableBytes()];
    buf.readBytes(payload);

    DeviceMessage msg = new DeviceMessage();
    msg.setDeviceId(deviceId);
    msg.setMsgType(msgType);
    msg.setPayload(payload);
    msg.setTs(System.currentTimeMillis());

    out.add(msg);
  }
}

# codec/MessageEncoder.java(下行需要时用)

package com.example.gw.netty.codec;

import com.example.gw.model.DeviceMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class MessageEncoder extends MessageToByteEncoder<DeviceMessage> {
  @Override
  protected void encode(ChannelHandlerContext ctx, DeviceMessage msg, ByteBuf out) {
    byte[] dev = msg.getDeviceId().getBytes();
    out.writeShort(1 + dev.length + 1 + (msg.getPayload() == null ? 0 : msg.getPayload().length)); // length
    out.writeByte(dev.length);
    out.writeBytes(dev);
    out.writeByte(msg.getMsgType());
    if (msg.getPayload() != null) out.writeBytes(msg.getPayload());
  }
}

# model/DeviceMessage.java

package com.example.gw.model;

import lombok.Data;

@Data
public class DeviceMessage {
  private String deviceId;
  private int msgType;         // 自定义或协议中的消息类型
  private byte[] payload;      // 原始负载
  private long ts;             // 网关接收时间
}

# model/Consts.java

package com.example.gw.model;

public interface Consts {
  int TYPE_REGISTER = 0x01;
  int TYPE_HEARTBEAT = 0x02;
  int TYPE_GPS = 0x10;
}

# service/ChannelManager.java(内存 + Redis 映射)

package com.example.gw.service;

import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

@Service
@RequiredArgsConstructor
public class ChannelManager {
  private final StringRedisTemplate redis;

  @Value("${gateway.redisDeviceKeyPrefix}")
  private String keyPrefix;

  @Value("${gateway.host}")
  private String host;

  // 本机内存:deviceId -> Channel
  private final Map<String, Channel> deviceChannel = new ConcurrentHashMap<>();
  // 反查:channelId -> deviceId
  private final Map<ChannelId, String> channelDevice = new ConcurrentHashMap<>();

  public void bind(String deviceId, Channel ch) {
    deviceChannel.put(deviceId, ch);
    channelDevice.put(ch.id(), deviceId);
    // 写入 Redis(60s 续期)
    redis.opsForValue().set(keyPrefix + deviceId, host, 60, TimeUnit.SECONDS);
  }

  public void renew(String deviceId) {
    redis.expire(keyPrefix + deviceId, 60, TimeUnit.SECONDS);
  }

  public void unbind(Channel ch) {
    String deviceId = channelDevice.remove(ch.id());
    if (deviceId != null) {
      deviceChannel.remove(deviceId);
      // 不立即删 Redis,交给 TTL 过期;也可显式删除:
      // redis.delete(keyPrefix + deviceId);
    }
  }

  public Channel getChannel(String deviceId) {
    return deviceChannel.get(deviceId);
  }

  public String getDeviceId(Channel ch) {
    return channelDevice.get(ch.id());
  }
}

# service/DeviceRouterService.java(查 Redis 看设备在哪台网关)

package com.example.gw.service;

import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class DeviceRouterService {
  private final StringRedisTemplate redis;
  @Value("${gateway.redisDeviceKeyPrefix}")
  private String keyPrefix;

  public String getGatewayHost(String deviceId) {
    return redis.opsForValue().get(keyPrefix + deviceId);
  }
}

# service/HeartbeatRenewService.java(定时续期:可选)

package com.example.gw.service;

import lombok.RequiredArgsConstructor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

/** 如果设备心跳很频繁,可在收到心跳时直接续期,不需要此任务。此类仅演示。 */
@Service
@RequiredArgsConstructor
public class HeartbeatRenewService {
  private final ChannelManager channelManager;

  @Scheduled(fixedDelay = 30000)
  public void renewAll() {
    // 可遍历已知 deviceId 做 redis TTL 续期(不建议大规模遍历,这里仅示例)
  }
}

# netty/handler/AuthRegisterHandler.java(设备首包注册)

package com.example.gw.netty.handler;

import com.example.gw.model.Consts;
import com.example.gw.model.DeviceMessage;
import com.example.gw.service.ChannelManager;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class AuthRegisterHandler extends SimpleChannelInboundHandler<DeviceMessage> {

  private final ChannelManager channelManager;

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, DeviceMessage msg) {
    if (msg.getMsgType() == Consts.TYPE_REGISTER) {
      String deviceId = msg.getDeviceId();
      channelManager.bind(deviceId, ctx.channel());
      log.info("Device registered: {} via {}", deviceId, ctx.channel().remoteAddress());
      // TODO: 回应注册应答(按协议编码)
      // ctx.writeAndFlush(ackMessage);
      return; // 注册消息不再下发,直接吃掉
    }
    ctx.fireChannelRead(msg); // 交给下一个
  }

  @Override
  public void channelInactive(ChannelHandlerContext ctx) {
    channelManager.unbind(ctx.channel());
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    log.warn("Auth handler error", cause);
    ctx.close();
  }
}

# netty/handler/HeartbeatHandler.java

package com.example.gw.netty.handler;

import com.example.gw.model.Consts;
import com.example.gw.model.DeviceMessage;
import com.example.gw.service.ChannelManager;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class HeartbeatHandler extends SimpleChannelInboundHandler<DeviceMessage> {
  private final ChannelManager channelManager;

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, DeviceMessage msg) {
    if (msg.getMsgType() == Consts.TYPE_HEARTBEAT) {
      String deviceId = channelManager.getDeviceId(ctx.channel());
      if (deviceId != null) channelManager.renew(deviceId);
      // TODO: 心跳应答(可回包)
      return;
    }
    ctx.fireChannelRead(msg);
  }

  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
    if (evt instanceof IdleStateEvent e) {
      log.info("Idle close: {}", ctx.channel().remoteAddress());
      ctx.close();
    } else {
      ctx.fireUserEventTriggered(evt);
    }
  }
}

# netty/handler/BusinessDispatcherHandler.java(推送到 Kafka)

package com.example.gw.netty.handler;

import com.example.gw.model.DeviceMessage;
import com.example.gw.mq.KafkaProducer;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import io.netty.channel.*;

@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class BusinessDispatcherHandler extends SimpleChannelInboundHandler<DeviceMessage> {
  private final KafkaProducer producer;
  private final ObjectMapper mapper = new ObjectMapper();

  @Value("${gateway.kafkaTopic}")
  private String topic;

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, DeviceMessage msg) throws Exception {
    // 这里可以做协议转 JSON 的映射(示例直接吐原字段)
    String json = mapper.writeValueAsString(msg);
    producer.send(topic, msg.getDeviceId(), json);
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    log.warn("Dispatcher error", cause);
    ctx.close();
  }
}

# mq/KafkaProducer.java

package com.example.gw.mq;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducer {

  private final KafkaTemplate<String, String> kafkaTemplate;

  public void send(String topic, String key, String value) {
    kafkaTemplate.send(topic, key, value)
        .whenComplete((res, ex) -> {
          if (ex != null) {
            log.warn("Kafka send fail: {}", ex.getMessage());
          } else {
            RecordMetadata md = res.getRecordMetadata();
            log.debug("Kafka ok topic={} partition={} offset={}", md.topic(), md.partition(), md.offset());
          }
        });
  }
}

# util/Bytes.java(占位:你可以放 CRC、BCE、JT808 转义等)

package com.example.gw.util;

public class Bytes {
  public static int u8(byte b) { return b & 0xFF; }
}

# logback-spring.xml(可选)

<configuration>
  <springProperty scope="context" name="APP_NAME" source="spring.application.name" defaultValue="iot-gateway"/>
  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
    <encoder><pattern>%d{HH:mm:ss.SSS} %-5level [%thread] %logger{36} - %msg%n</pattern></encoder>
  </appender>
  <root level="INFO"><appender-ref ref="CONSOLE"/></root>
</configuration>

# 如何跑起来

  1. 起 Redis、Kafka
  2. application.yml 按你机器改好端口/IP
  3. mvn spring-boot:run
  4. (可选)前置 Nginx/LVS 用 一致性 hash(deviceId) 转发到多台网关,确保“同一设备优先落到同一台机”。

Nginx 简单示例(stream 层 TCP 转发):

stream {
  upstream gw_hash {
    hash $remote_addr consistent;  # 生产建议自行接入支持按deviceId哈希的四层LB
    server 10.0.0.11:7001;
    server 10.0.0.12:7001;
    server 10.0.0.13:7001;
  }
  server {
    listen 7001;
    proxy_pass gw_hash;
  }
}

# 伸缩&高可用要点(别跳过)

  • 一台机承载连接:合理配置 ulimit -n、开启 epoll、直连内存+堆外缓冲可到 10~15 万长连接/台(看内存/网卡/内核参数)。
  • 多机扩:前置 LB(IP Hash / Consistent Hash)+ Redis 的 deviceId→host 映射 + 心跳 TTL 续期。
  • 宕机迁移:网关挂了 TCP 断开 → 设备自动重连 → 打到其它网关 → 注册时覆盖 Redis host 即完成接管。
  • 业务解耦:Netty 只做“接入+协议解析+转 Kafka”,下游微服务消费 Kafka 做业务。
  • 反向下发:业务若要下发,先查 Redis 看设备在哪台网关,再通过网关本地 ChannelManager 找 Channel 写回(或走 RPC/网关间转发)。

如果你要换成 JT808

  • FrameDecoder / MessageDecoder / MessageEncoder 换成 808 的转义/校验/拆包实现;
  • AuthRegisterHandler 处理 0x0100 注册应答、0x0002 心跳等;
  • BusinessDispatcherHandler 按 808 消息 ID 把 GPS(0x0200)/报警等转 Kafka。

行,给你一份能落地的方案包:

  • 两套 Nginx:A. 纯 L4 一致性哈希(最稳)B. 按 deviceId 做一致性哈希(OpenResty/Lua,适合强粘性)
  • 扩容与过亿设备分层架构建议
  • 宕机/容灾做法
  • 阿里云部署实操路线(GA/NLB/ACK/ECS)

# 架构总览(推荐分层)

(全球/全国接入)
[Alibaba Cloud GA]  ——TCP——>  [NLB(多AZ, TCP, ProxyProtocol v2)]
                                   │
                              TCP 四层透传
                                   │
                        [Nginx Stream 层 × N(可横向扩)]
                                   │
                L4 源IP一致性哈希 或 L7 解析后按 deviceId 一致性哈希
                                   │
                         [Netty Gateway 池 × M(多AZ)]
                                   │
                           [Redis Cluster 租约/路由]
                                   │
                               [Kafka/Pulsar]
  • NLB负责公网/跨 AZ/自动 HC;Nginx 层一致性哈希与连接扇出Netty承接长连(10–15 万/台);RedisdeviceId→host 租约 + 心跳 TTL;Kafka/Pulsar落后端。
  • 过亿设备量级:假设 12 万/台,需 ~830 台 Netty;Nginx 与 NLB均可横向扩(连接是端到端驻留,Nginx只做内网转发)。

# A) 纯 L4 源 IP 一致性哈希(开箱即用,最稳)

# nginx.conf(含注解)

# ===== 全局:进程与FD =====
worker_processes auto;                 # 跟随CPU核数
worker_rlimit_nofile 2000000;          # 打开文件上限,配合系统limits

events {
    worker_connections  200000;        # 每worker最大并发连接
    use epoll;
    multi_accept on;
}

stream {
    # ===== 上游:网关池(多AZ IP)=====
    upstream gw_pool {
        # 一致性哈希:最小迁移,节点增删重映射少
        hash $binary_remote_addr consistent;     # 按“源IP”粘性
        # 网关实例池(示例)
        server 10.0.1.11:9000 max_fails=3 fail_timeout=5s;
        server 10.0.2.12:9000 max_fails=3 fail_timeout=5s;
        server 10.0.3.13:9000 max_fails=3 fail_timeout=5s;
        # 持续横向扩:直接追加 server 行,reload 即生效
    }

    # ===== 接入Server:入口7000(给NLB指向)=====
    server {
        # reuseport把监听负载分散到多个内核队列(Linux 3.9+)
        listen 7000 reuseport proxy_protocol;   # 打开PROXY协议保留真实源IP(NLB需启PPv2)
        proxy_protocol on;

        proxy_connect_timeout 3s;               # 首次连接握手超时
        proxy_timeout 86400s;                   # 长连接保活
        proxy_pass gw_pool;                     # 交由一致性哈希上游

        # TCP层健康检查:开源版 stream 无“主动HC”
        # 依靠NLB对本server端口做健康检查;本处用被动: max_fails/fail_timeout
        # 若必须主动HC,可编译第三方 stream_upstream_check_module 或用Nginx Plus。
        # 也可部署两个Nginx层+互相做旁路HC切换(keepalived)。
    }
}

# 说明

  • 优点:配置简单,极稳当;一致性哈希对扩缩容迁移最小
  • 缺点:NAT 下“同出口IP一大群设备”会粘在同一网关,分布可能偏斜。如果你国家级分布,通常已足够均衡;若强制均衡,见 B 方案。

# B) 按 deviceId 做一致性哈希(OpenResty/Lua,强粘性,精细均衡)

原理:在 stream 子系统里用 preread_by_lua* 读取首帧(长度+JSON),解析出 deviceId,把它放入变量 $hash_key,然后 hash $hash_key consistent;

# nginx.conf(OpenResty 版,含注解)

worker_processes auto;
worker_rlimit_nofile 2000000;

events { worker_connections 200000; use epoll; multi_accept on; }

stream {
    lua_socket_log_errors off;
    lua_package_path "/usr/local/openresty/lualib/?.lua;;";

    # 解析首包:协议固定为 [4-byte BE length][JSON]
    lua_shared_dict tmp 10m;

    # 上游:对 $hash_key 做一致性哈希
    upstream gw_pool_by_dev {
        hash $hash_key consistent;
        server 10.0.1.11:9000 max_fails=3 fail_timeout=5s;
        server 10.0.2.12:9000 max_fails=3 fail_timeout=5s;
        server 10.0.3.13:9000 max_fails=3 fail_timeout=5s;
    }

    server {
        listen 7000 reuseport proxy_protocol;
        proxy_protocol on;
        proxy_connect_timeout 3s;
        proxy_timeout 86400s;

        # 在代理前读取首包,取出 deviceId,设置 $hash_key
        preread_by_lua_block {
            local sock = ngx.req.socket(true)     -- downstream (client) socket
            sock:settimeouts(1000, 0, 0)          -- 1s 读超时
            -- 读取4字节长度
            local len_buf, err = sock:peek(4)
            if not len_buf or #len_buf < 4 then
                return ngx.exit(ngx.ERROR)
            end
            local b1,b2,b3,b4 = string.byte(len_buf,1,4)
            local len = b1*16777216 + b2*65536 + b3*256 + b4
            if len <= 0 or len > 1024*64 then
                return ngx.exit(ngx.ERROR)
            end
            -- 再探测读取 payload(不消耗缓冲,保持给后端)
            local frame, err2 = sock:peek(4 + len)
            if not frame or #frame < (4+len) then
                return ngx.exit(ngx.ERROR)
            end
            local json = string.sub(frame, 5)     -- 去掉4字节长度
            local cjson = require "cjson.safe"
            local obj = cjson.decode(json)
            if not obj or obj.type ~= "auth" or not obj.deviceId then
                return ngx.exit(ngx.ERROR)
            end
            -- 设置哈希键(变量默认空串,必须赋值)
            ngx.var.hash_key = obj.deviceId
        }

        # 交给上游(按deviceId一致性哈希)
        proxy_pass gw_pool_by_dev;
    }
}

# 说明

  • 优势:设备层面强粘性 + 更均衡(避免大NAT出口倾斜)。
  • 注意:必须确保首帧就是鉴权JSON(你的 Netty 项目已使用此约定)。
  • 性能:OpenResty 的 peek + 常量 JSON 解析开销很小(µs 级),可水平扩多个 Nginx 实例分担。

# 扩容与“不断加设备”的方法论

# 水平扩展步骤(无中断)

  1. 先扩 Netty 池:新增 ECS/ACK 实例,加入 gw_pool(A 方案按 IP、B 方案按 deviceId)。
  2. 上游 reloadnginx -t && nginx -s reload(连接不丢,端到端保留)。
  3. 观察迁移量:一致性哈希仅搬迁小部分连接(理论约 = 新权重 / 新总权重)。
  4. 再扩 Nginx 层(若 Nginx 层的连接数或带宽/中断队列逼近上限):加 ECS,挂到 NLB;NLB 健康后开始分流。
  5. 跨地域:加一个 Region 的整套 NLB+Nginx+Netty,前面套 GA 做最优路由。

# 容量估算(经验值,非极限)

  • Netty:10–15 万/台(你已有参数);
  • Nginx(OpenResty):单台稳定 50–100 万 TCP 透传连接(足够,且可横扩);
  • NLB:云侧承载千万级连接没压力,按地域和账限扩容;
  • Redis Cluster:只保存活跃设备 Key(带 TTL),内存按 1–2KB/Key 粗估,分片水平扩

# 宕机&容灾(节点挂了怎么办)

  • 网关(Netty)挂

    • 不再续租 → dev:{id} TTL 到期;设备心跳/重连 → 由 NLB/Nginx 一致性哈希到其它健康网关
    • 新网关 REG(Lua 原子)→ 成为新宿主,并通过 PUB/SUB 向旧宿主发 kick(如果旧还存活)。
  • Nginx 节点挂

    • NLB 健康检查失败 → 流量自动摘除;剩余 Nginx 继续转发;
    • 连接驻留在端到端 TCP:穿过 Nginx 的连接会断(该 Nginx 为中间跳点)。多台 Nginx 并行可把影响面摊薄。
  • Redis 主挂

    • Redis Sentinel/Cluster 自动选主;网关端幂等续租/注册;Lua 保证路由原子性。
  • 单 AZ 故障

    • NLB/GA 跨 AZ/Region;上游池中跨 AZ/MIX的网关节点;
    • 业务端“快速重连 + 指数退避**”容错。

# 阿里云部署实操(建议路线)

# 1) 网络与计算

  • VPC + 多可用区子网;每个 AZ 放 NLB 实例ECS/ACK(Nginx 层、Netty 层)。
  • ECS 机型:选 c7/g7 等新代实例,25Gbps/40Gbps 网卡、多队列(RSS);开启 Enhanced Network
  • 安全组:放行 7000(NLB→Nginx)、9000(Nginx→Netty)、Redis/Kafka 内网端口;限制来源。

# 2) 入口层

  • Global Accelerator (GA)(可选):跨地域就近接入 + 智能路由;监听 TCP 7000。

  • NLB(Network Load Balancer)

    • 监听 TCP 7000,后端指向 Nginx ECS
    • 开启 Proxy Protocol v2(保留源IP,后端 Nginx 配置 proxy_protocol on)。
    • 健康检查:TCP/应用端口。

# 3) 转发层(Nginx/OpenResty)

  • 部署为 ECS AutoScaling 组(ESS),镜像内置 Nginx 配置模板与守护。
  • A 方案:部署开源 Nginx;B 方案:部署 OpenResty。
  • 系统参数nofile=2,000,000somaxconn=65535、合理内核缓冲;
  • 日志:仅记 error/access 的摘要,落 SLS(日志服务)

# 4) 网关层(Netty)

  • 可用 ACK(容器服务K8s版) 部署 StatefulSet,也可 ECS 裸机 Systemd。
  • 开启 EPC/NUMA、直连内存、epoll;你的 Netty 项目已经准备好。
  • HPA/ESS:按连接数/CPU/队列水位扩缩容;灰度放量。

# 5) Redis/Kafka

  • Redis:选 云数据库Redis版(主从/集群),AOF 开;避免自建坑。
  • Kafka:选 消息队列 Kafka 版,跨 AZ 部署;上报落库/消费分析。

# 运维要点与自动化

  • 动态上游

    • 开源 Nginx 不支持原生 API 动态 upstream,建议:

      1. upstream gw_pool*.conf 拆分到 conf.d/upstreams/*.conf
      2. 扩缩容由脚本/Operator 写入/删除 server 行,
      3. nginx -t && nginx -s reload 零停机生效。
    • 或使用 NGINX Plus/Envoy(原生API/EDS)做动态后端。

  • 容量告警

    • Nginx:active connectionsaccepts/handled/dropworker_conns 接近上限报警;
    • Netty:FD使用率、写缓冲水位、事件循环队列长度、心跳失败率;
    • Redis:keyspace hits/misses、过期速率、eval QPS、延迟;
    • 链路:NLB 5xx/丢包、GA健康端口状态。
  • 内核/NIC

    • ethtool -G rx/tx 4096、启用 gro(小包密集可评估关闭 lro)、txqueuelen 20000
    • 中断打散:irqbalance 或手工 smp_affinity

# 你接下来怎么做(最小可行路线)

  1. 先上 A 方案(L4 源IP一致性哈希):足够扛量,配置最稳。
  2. 如果看到 NAT 倾斜(某些网关连接偏多),再切 B 方案(OpenResty 按 deviceId 哈希),把倾斜打散。
  3. 前面加 NLB(或 GA+NLB)做公网入口与健康剔除;Nginx、Netty 两层都可水平扩
  4. 持续接入设备:只要加机器 + reload,一致性哈希只搬小部分流量,无感扩容
  5. 宕机:交给 NLB 健康检查+Redis 租约自愈,无需人工干预。

LettuceJedis 都是 Java 里用来连接 Redis 的客户端库,作用是让你的 Java 程序能去读写 Redis 数据。 但它们的设计和使用方式不太一样,主要区别在:


# 1️⃣ 基本定位

特性 Jedis Lettuce
实现原理 基于直连 Socket 基于 Netty(异步事件驱动)
线程安全 非线程安全(一个 Jedis 对象只能一个线程用) 线程安全(一个 Lettuce 连接可多线程共享)
连接模式 每次需要时获取连接,用完关闭/归还到连接池 推荐长连接,可一个连接支持并发命令
支持模式 单机、主从、哨兵、集群 单机、主从、哨兵、集群,全支持
同步/异步 只支持同步 支持 同步 + 异步 + Reactive(响应式)

# 2️⃣ 使用风格示例

Jedis 同步示例

import redis.clients.jedis.Jedis;

public class JedisDemo {
    public static void main(String[] args) {
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            jedis.set("name", "leo");
            String value = jedis.get("name");
            System.out.println(value);
        }
    }
}

缺点:Jedis 对象不能多线程共享,要靠连接池管理。


Lettuce 同步/异步示例

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.StatefulRedisConnection;

public class LettuceDemo {
    public static void main(String[] args) {
        RedisClient client = RedisClient.create("redis://localhost:6379");
        StatefulRedisConnection<String, String> connection = client.connect();

        // 同步
        RedisCommands<String, String> sync = connection.sync();
        sync.set("name", "leo");
        System.out.println(sync.get("name"));

        // 异步
        RedisAsyncCommands<String, String> async = connection.async();
        async.get("name").thenAccept(System.out::println);

        connection.close();
        client.shutdown();
    }
}

优点:一个连接就能支持多线程,且异步性能好,特别适合高并发和 Spring WebFlux 这种响应式架构。


# 3️⃣ 总结选型建议

  • Jedis:简单、老牌、容易上手,但线程安全差,需要连接池管理;适合老项目或低并发场景。
  • Lettuce:新项目首选,线程安全,支持异步/响应式,Spring Boot 2.x 之后默认使用 Lettuce 代替 Jedis。

如果你是在 Spring Boot + Redis,其实 spring-boot-starter-data-redis 默认就是 Lettuce,不用自己额外引 Jedis,除非你特别想换。


# 一、最小可用(单机→双机)

# 1) 拿一台服务器先跑通(PoC)

  • 一台机Nginx (stream) + Netty 网关 + Redis 单机
  • 用途:先把“设备能连、心跳续期、映射写Redis、Nginx转发”跑起来

# Nginx(TCP 转发)

# /etc/nginx/nginx.conf
stream {
  upstream iot_gateway {
    # 先单机,后面加节点
    server 127.0.0.1:9000;
  }
  server {
    listen 9000;                   # 设备连这个端口
    proxy_connect_timeout 5s;
    proxy_timeout 600s;
    proxy_pass iot_gateway;
  }
}

# Netty(关键参数)

new ServerBootstrap()
  .group(new NioEventLoopGroup(1), new NioEventLoopGroup())   // boss=1, worker=CPU核数
  .channel(NioServerSocketChannel.class)
  .option(ChannelOption.SO_BACKLOG, 4096)
  .childOption(ChannelOption.TCP_NODELAY, true)
  .childOption(ChannelOption.SO_KEEPALIVE, true)
  .childHandler(new IoTChannelInitializer(connectionManager, gatewayIp));

# Redis(键设计)

  • device:gateway:{deviceId} = {gatewayIp},TTL=120s
  • 每次收到心跳/数据EXPIRE key 120
  • 网关宕机=不再续期 → TTL 到 → 键过期 → 设备重连被分配到存活节点

# 2) 升级成两台网关 + Nginx

  • 两台机网关A网关B
  • 一台机Nginx(对外),内部指向A/B
  • Redis:先仍单机(下一步再高可用)

# Nginx(加一致性哈希)

stream {
  upstream iot_gateway {
    hash $remote_addr consistent;   # 按来源IP粘性(足够简单)
    server 10.0.0.10:9000 max_fails=3 fail_timeout=30s; # 网关A
    server 10.0.0.11:9000 max_fails=3 fail_timeout=30s; # 网关B
  }
  server {
    listen 9000;
    proxy_connect_timeout 5s;
    proxy_timeout 600s;
    proxy_pass iot_gateway;
  }
}

说明:如果你的设备大量在同一运营商NAT后面,$remote_addr 可能“很多设备同一IP”。到百万级时建议升级为“按设备ID粘性”(可用 Nginx njs preread 提前读首包提取ID做hash,或让前置接入层读首包后按设备ID转发——这是进阶方案,先不急)。


# 二、Redis 高可用(两种选一个)

# 方案A:Sentinel(主从 + 自动切主)

  • 适合:容量要求一般、想简单点
  • Spring Boot 配置(节选):
spring:
  redis:
    sentinel:
      master: mymaster
      nodes: 10.0.0.21:26379,10.0.0.22:26379,10.0.0.23:26379
    password: ${REDIS_AUTH:}

# 方案B:Cluster(分片 + 多副本)

  • 适合:设备多、QPS高、容量大
spring:
  redis:
    cluster:
      nodes: 10.0.0.31:6379,10.0.0.32:6379,10.0.0.33:6379
      max-redirects: 3
    password: ${REDIS_AUTH:}

建议:Cluster 更长远。你只需要把应用的 application.yml 切到 cluster profile 即可。


# 三、网关健康上报 + 映射原子写 + 踢掉旧连接(必要的三件套)

# 1) 健康上报(每10s)

@Scheduled(fixedDelay = 10_000)
public void reportAlive() {
  redis.opsForValue().set("gateway:alive:" + gatewayIp,
                          String.valueOf(System.currentTimeMillis()),
                          Duration.ofSeconds(30));  // TTL 30s
}
  • 运维看 gateway:alive:* 就知道谁在线

# 2) 原子写映射(Lua)

  • 目的:把 device:gateway:{id} 原子更新为当前网关IP,并返回旧IP(如果旧IP不同)
  • 用来跨网关“踢掉”旧连接,避免同一设备多地同时在线

Lua(简化版,够用):

-- KEYS[1]=deviceKey, ARGV[1]=newIp, ARGV[2]=ttlSec
local old=redis.call('GET', KEYS[1])
if(old==ARGV[1]) then
  redis.call('EXPIRE', KEYS[1], tonumber(ARGV[2])); return {0}
end
redis.call('SET', KEYS[1], ARGV[1], 'EX', tonumber(ARGV[2]))
if(not old or old=='') then return {0} end
return {1, old}  -- 需要踢掉 old 网关的同设备连接

Java 调用:设备首包时执行;若返回{1, oldIp}→发消息给 oldIp 踢人(用 Redis Pub/Sub 或 Kafka)。

# 3) 踢掉旧连接(轻量版:Redis Pub/Sub)

  • 发布:{"type":"EVICT","target":"10.0.0.10","deviceId":"ABC123"}
  • 订阅端在目标网关拿到消息后 channel.close() 清理本机旧连接

# 四、操作系统 & JVM & Netty 调优(保姆版)

# Linux 内核(/etc/sysctl.conf)

net.core.somaxconn = 65535
net.core.netdev_max_backlog = 250000
net.ipv4.ip_local_port_range = 10000 65000
net.ipv4.tcp_max_syn_backlog = 262144
net.ipv4.tcp_fin_timeout = 15
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_keepalive_time = 60
net.ipv4.tcp_keepalive_intvl = 10
net.ipv4.tcp_keepalive_probes = 3

应用:

sysctl -p
ulimit -n 1048576   # 打开文件句柄,决定最大连接数上限

# Netty 线程

  • bossGroup = 1
  • workerGroup = CPU核数 ~ 2×CPU核数(压测看情况)
  • Epoll(Linux):使用 EpollEventLoopGroup 更省CPU

# JVM

  • 连接多时堆外内存DirectBuffer占比高,Xms=Xmx 固定堆
  • GC:G1/ ZGC(二选一),尽量减少 stop-the-world
  • 监控:Prometheus + Grafana(曲线看连接数、堆、GC、CPU)

# 五、容量估算 & 扩容阈值

# 1) 经验值(保守)

  • 纯TCP长连接 + 轻协议:每连接 8–16KB RAM(含内核socket缓存 + 应用少量状态)
  • 1 台 16GB 机器:80k–120k 长连接(还要留出 JVM/系统/监控余量)
  • CPU 不是瓶颈时,内存和FD是主要限制

# 2) 你可以这样定阈值(示例)

  • 每台网关目标连接100k

  • 触发扩容:任一网关 10 分钟均值 > 80k 或 JVM老年代>70% 或 CPU>60%

  • 扩容操作:

    1. 新增一台网关(同样注册到 Nginx upstream)
    2. Nginx reload(无损)
    3. 设备会在后续重连时被一致性哈希分摊到新节点
  • 如果增长很快,一次性多上两台,少走回头路


# 六、你就照这个顺序做(最重要)

  1. 单机跑通(Nginx + 网关 + Redis 单机)

  2. 加一台网关 → Nginx upstream + consistent hash

  3. 把 Redis 换成 哨兵或集群

  4. 健康上报Lua 原子写跨网关踢人(三件套)

  5. 监控/告警(连接数、QPS、GC、负载)

  6. 设备量上来后:

    • 网关横向扩容(加机器)
    • Redis 走 Cluster
    • Nginx 多实例 + 前面再加 L4 SLB/Anycast(多机房可选)
    • 如果 $remote_addr 粘性不够,升级为按设备ID粘性(njs preread 或前置接入层)

# 七、你会遇到的坑(先知道=少走弯路)

  • NAT 同IP:很多设备同一个公网IP → $remote_addr 粘性会让单个网关压力偏高。可先调大权重或加节点,后续再上按设备ID粘性。
  • TTL 误判:心跳周期×3 做TTL,避免偶发丢包导致过期误切换。
  • 双连脏连接:一定做Lua原子写 + 踢旧连接
  • FD 不够ulimit -n 不到位会很快耗尽。
  • 监控缺失:没有连接数/GC/CPU/Redis延迟监控,很难定位抖动。

# 八、要配置清单(你可以拷走用)

  • Nginx(stream)模板:上面给了
  • Redis:ClusterSentinelapplication.yml 模板:上面给了
  • 网关:ServerBootstrapChannelInitializerConnectionManager(含续期)、Lua原子写Pub/Sub踢人:上文都有
  • 系统:sysctl.conf + ulimit 详单

nginx + redis-cluster + gatewayA/B + 压测客户端; 你本地跑起来立刻能看到:宕机迁移、TTL过期、踢旧连接 全流程。

没问题!给你一份小白可落地的“Redis + Nginx + 网关实例(Netty)”部署与渐进式扩容方案。你可以照着一步步做,设备多了就按阈值扩。