哈喽电池设备Netty网关架构

# 哈喽电池设备Netty网关架构

  • 读取 gateway.url.firewall 配置(逗号分隔的白名单路径)

  • 注册两个拦截器:

    1. 你的自定义拦截器 getWebInterceptor(),并对白名单 excludePathPatterns(...) 放行
    2. sentinelWebInterceptor(限流用),对所有路径生效

# 存在的常见坑

  1. 继承 WebMvcConfigurationSupport 会禁用 Spring Boot 的自动配置(静态资源、消息转换器等都要你手动配)——通常不建议。
  2. @RefreshScope 放在这个类上,白名单并不会“热更新”excludePathPatterns 是启动时就定死的)。
  3. 手动 replaceAll("\\s","")+split(",") 容易踩空格/空值坑,Spring Boot 自带的类型绑定更稳。

# 推荐实现 A:常规(重启后生效,最简单稳定)

👉 用 WebMvcConfigurer + 配置绑定 @ConfigurationProperties

这版最接近你现有逻辑,但需要重启或重新部署后才会生效。

1) 属性绑定(把白名单直接绑成 List)

// FirewallProperties.java
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;

@Component
@ConfigurationProperties(prefix = "gateway.url")
public class FirewallProperties {
    /**
     * 白名单路径,如:/actuator/**, /public/**, /error
     */
    private List<String> firewall = new ArrayList<>();

    public List<String> getFirewall() { return firewall; }
    public void setFirewall(List<String> firewall) { this.firewall = firewall; }
}

2) 你的业务拦截器(示例骨架)

// WebInterceptor.java
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;

@Component
public class WebInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
            throws Exception {
        // TODO 放你的鉴权、签名校验、日志脱敏、IP 限制等逻辑
        return true; // 返回 false 则拦截
    }
}

3) 注册拦截器(不再继承 WebMvcConfigurationSupport)

// WebMvcConfig.java
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
@RequiredArgsConstructor
public class WebMvcConfig implements WebMvcConfigurer {

    private final WebInterceptor webInterceptor;
    private final FirewallProperties firewallProperties;
    private final SentinelWebInterceptor sentinelWebInterceptor; // 来自 Sentinel 依赖,能自动注入

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        // 1) 先执行:你的业务/防火墙拦截器(对白名单放行)
        registry.addInterceptor(webInterceptor)
                .addPathPatterns("/**")
                .excludePathPatterns(firewallProperties.getFirewall())
                .order(-10); // 数字越小越先执行

        // 2) 再执行:限流 Sentinel
        registry.addInterceptor(sentinelWebInterceptor)
                .addPathPatterns("/**")
                .order(0);
    }
}

4) application.yml 示例

gateway:
  url:
    firewall:
      - /actuator/**
      - /public/**
      - /error
      - /health

# 推荐实现 B:需要热更新白名单(不重启也生效)

👉 把白名单“匹配逻辑”放进拦截器,每次请求都按最新配置判断放行;用 @RefreshScope 只刷新数据,不刷新 Spring MVC 映射。

1) 可刷新的白名单配置

// FirewallProperties.java
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;

@RefreshScope
@Component
@ConfigurationProperties(prefix = "gateway.url")
public class FirewallProperties {
    private List<String> firewall = new ArrayList<>();
    public List<String> getFirewall() { return firewall; }
    public void setFirewall(List<String> firewall) { this.firewall = firewall; }
}

2) 在拦截器里自己判断“白名单即放行”

// WebInterceptor.java
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.servlet.HandlerInterceptor;

@Component
public class WebInterceptor implements HandlerInterceptor {
    private final FirewallProperties props;
    private final AntPathMatcher matcher = new AntPathMatcher();

    public WebInterceptor(FirewallProperties props) {
        this.props = props;
    }

    @Override
    public boolean preHandle(HttpServletRequest req, HttpServletResponse res, Object handler)
            throws Exception {
        String uri = req.getRequestURI();
        // 命中白名单 => 直接放行(支持 /public/** 这类通配)
        for (String p : props.getFirewall()) {
            if (matcher.match(p, uri)) return true;
        }

        // 非白名单 => 你的校验逻辑
        // TODO 签名、token、IP、频率限制等
        return true;
    }
}

3) 注册时就不要再用 excludePathPatterns

@Configuration
@RequiredArgsConstructor
public class WebMvcConfig implements WebMvcConfigurer {

    private final WebInterceptor webInterceptor;
    private final SentinelWebInterceptor sentinelWebInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(webInterceptor)
                .addPathPatterns("/**")
                .order(-10);

        registry.addInterceptor(sentinelWebInterceptor)
                .addPathPatterns("/**")
                .order(0);
    }
}

这样,Nacos/Config 刷新 gateway.url.firewall 后,下一次请求立刻按新白名单生效


# 必要依赖(示例)

<!-- Spring Web -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 如果用 Sentinel 的 Web 拦截器 -->
<dependency>
  <groupId>com.alibaba.csp</groupId>
  <artifactId>sentinel-parameter-flow-control</artifactId>
  <version></version>
</dependency>

<!-- 如果要 @RefreshScope(Spring Cloud Config / Nacos)-->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter</artifactId>
</dependency>

# 使用顺序(小白版)

  1. 把上面 3 个类(或 5 个类,取 A 或 B 方案)放进你的工程。

  2. application.yml 写好白名单路径(可以先放 /public/**, /actuator/**)。

  3. 启动项目:

    • 访问白名单里的地址(应直接通过)
    • 访问非白名单地址(应走你的拦截逻辑)
  4. 如果你需要热更新:用“方案 B”,在配置中心改白名单,/actuator/refresh 或自动刷新后,不用重启就生效。


# 小技巧 & 说明

  • 顺序order 数字越小越先执行,通常“白名单/鉴权”优先,再限流。
  • Spring Boot 3 默认使用 PathPattern,AntPathMatcher 也可以在拦截器里用,通配语法基本一致(/a/**/a/*)。
  • 不要再继承 WebMvcConfigurationSupport,除非你明确要完全接管 MVC 配置。
  • 反代之后取真实 IP:在拦截器里用 X-Forwarded-For(记得在网关/NGINX 开启透传)。

# 总体架构(逻辑视图)

┌─────────┐    TCP(4G/物联)      ┌──────────────┐      ┌─────────────┐
│  设备端  │ ───────────────────▶ │ Nginx/L4 LB  │ ───▶ │ Netty 网关xN │
└─────────┘  (IP Hash/一致哈希)   └──────┬───────┘      └──────┬──────┘
                                         │Consistent Hash       │
                                         │路由一致性            │
                                         ▼                      ▼
                                   ┌─────────┐            ┌──────────┐
                                   │ Redis   │◀─租约TTL──▶│ 网关租约  │
                                   │ (Cluster│   device→  │ 管理(Lua) │
                                   └────┬────┘   gateway  └────┬─────┘
                                        │                     上报/续签
                                        │                             
                         指令下发/状态流 │                              事件消息
                                        ▼                              ▼
                                   ┌──────────┐                 ┌──────────┐
                                   │ Kafka    │◀───────────────▶│ 业务集群 │
                                   │  topics  │   (事件/状态)    │ (消费/存储│
                                   └────┬─────┘                 │ 策略/风控)│
                                        │                       └──────────┘
                                 ┌───────▼────────┐
                                 │ 监控&日志链路  │(Prometheus/Grafana/ELK/Jaeger)
                                 └────────────────┘

# 端到端数据流

  1. 设备上线:设备→Nginx→某台网关(按一致哈希)。
  2. 握手&鉴权:网关校验签名/密钥→通过后在 Redis 写 device→gateway 映射并设置 TTL(租约)。
  3. 心跳保活:设备心跳;网关定期续签 Redis TTL。
  4. 上报数据:网关解码→校验→异步写 Kafka(at-least-once),关键字段落地(如设备在线态、最近电压等可走 Redis/时序库)。
  5. 指令下发:业务集群→查 device→gateway →RPC/消息投递到对应网关→Netty 写回设备。
  6. 故障切换:网关故障→租约不再续签到期→Nginx 一致哈希将新连接打到其他网关→新网关接管。

# 关键模块

  • 接入层(Nginx Stream/L4 LB):一致性哈希到网关;健康检查;连接数限流。
  • 连接管理(Netty):epoll+直连堆外缓冲、连接表(ConcurrentHashMap)、空闲检测(IdleStateHandler)、背压(writeBufferWaterMark)。
  • 协议编解码:JT/T808 或自定义帧(LengthFieldBasedFrameDecoder/Protobuf/自定义 CRC 校验)。
  • 鉴权&会话:设备密钥/签名、时间戳;会话对象绑定 Channel,支持幂等上线。
  • 租约与映射(Redis)HSET device→gateway + EXPIRESETEX;Lua 原子续约/踢同端。
  • 消息总线(Kafka):上行事件/状态(topics 分区=一致性Key),下行指令回执。
  • 指令路由:先查 Redis 查到网关IP→RPC/HTTP 到该网关内置“下发接口”→Channel 定位并写。
  • 可观测性:Prometheus 指标(连接数、心跳延迟、解码失败、写队列积压、指令RT/成功率);日志脱敏;Trace(可选)。
  • 安全:黑名单、频率限制;签名+重放防护(nonce+时窗);数据加密(可选)。

# Redis 关键键设计(示例)

# 设备→网关映射与租约(60s 心跳,TTL 180s)
Key: dev:lease:{deviceId}  Value: {gatewayId}|{leaseVersion}|{ts}, TTL=180

# 网关活跃集
Key: gw:alive                 Set(gatewayId), 网关心跳维护

# 设备在线态
Key: dev:online:{deviceId}    0/1 + lastSeenTs

用 Lua 保证:同一设备的“续约/接管/踢同端”原子化,且校验版本号避免并发覆盖。

# Nginx stream 一致性哈希(示例)

stream {
  log_format basic '$remote_addr:$remote_port -> $server_addr:$server_port $status';
  access_log /var/log/nginx/stream_access.log basic;

  upstream gateway_pool {
    hash $remote_addr consistent;    # 可切换为 $ssl_preread_server_name 或 $proxy_protocol_addr
    server 10.0.0.11:9000 max_fails=3 fail_timeout=10s;
    server 10.0.0.12:9000 max_fails=3 fail_timeout=10s;
    server 10.0.0.13:9000 max_fails=3 fail_timeout=10s;
  }

  server {
    listen 7000;                     # 设备 TCP 入口
    proxy_connect_timeout 3s;
    proxy_timeout 3600s;
    proxy_pass gateway_pool;
  }
}

# Netty Pipeline 建议

ch.pipeline()
  .addLast("idle", new IdleStateHandler(90, 0, 0, TimeUnit.SECONDS))  // 读空闲=心跳丢失
  .addLast("frame", new LengthFieldBasedFrameDecoder(64 * 1024, 2, 2, 0, 0))
  .addLast("codec", new DeviceMessageCodec())        // 自定义编解码/CRC/签名
  .addLast("auth", new AuthHandler(redis, config))   // 首包鉴权, 通过后移除
  .addLast("hb", new HeartbeatHandler(redis))        // 续租、更新 lastSeen
  .addLast("biz", new UplinkHandler(kafkaProducer))  // 上行落 Kafka
  .addLast("ack", new DownlinkAckHandler(kafkaProducer)) // 回执
;

Channel 选项

bootstrap.option(ChannelOption.SO_BACKLOG, 4096)
         .childOption(ChannelOption.TCP_NODELAY, true)
         .childOption(ChannelOption.SO_REUSEADDR, true)
         .childOption(ChannelOption.SO_RCVBUF, 256*1024)
         .childOption(ChannelOption.SO_SNDBUF, 256*1024)
         .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
             new WriteBufferWaterMark(8*1024, 32*1024));

# 心跳 & 自愈(核心逻辑)

  • 设备心跳间隔:60s;网关读空闲>90s 视为掉线,关闭 Channel。
  • 网关续租:每 60s 执行 Lua:若 dev:lease:{id}gatewayId 为本机且版本匹配→刷新 TTL;否则不续。
  • 网关宕机:不续租→TTL 到期(~180s)→映射消失→设备重连被 Nginx 分配到新网关→新网关用“接管 Lua”将 lease 版本+1 并写入。

# 指令下发路径

  1. 业务调用:POST /cmd/send {deviceId, cmd, payload, timeout}
  2. 网关内部:根据 deviceId 查 Channel;若在线→写并等待 ACK;不在线→返回离线或入延时队列(可选)。
  3. 幂等:业务侧自带 requestId;ACK 携带同 ID;Kafka 侧对回执做去重。

# Kafka Topic 规划(示例)

  • device.uplink.raw(分区 key=deviceId):原始上报
  • device.status.event:上线/下线/心跳异常
  • device.cmd.ack:指令回执
  • device.metric:电压/温度/故障码等指标

# Lua 核心(伪代码)

-- KEYS[1]=dev:lease:{deviceId}, ARGV={gatewayId, version, nowTs, ttl}
local v = redis.call('GET', KEYS[1])
if not v then
  redis.call('SET', KEYS[1], ARGV[1]..'|'..ARGV[2]..'|'..ARGV[3], 'EX', ARGV[4])
  return 'NEW'
end
local parts = {}
for s in string.gmatch(v, "([^|]+)") do table.insert(parts, s) end
local curGw = parts[1]; local curVer = tonumber(parts[2]) or 0
if curGw == ARGV[1] and tonumber(ARGV[2]) == curVer then
  redis.call('SET', KEYS[1], curGw..'|'..curVer..'|'..ARGV[3], 'EX', ARGV[4])
  return 'RENEW'
end
-- 可加版本比较或踢同端策略
return 'MISMATCH'

# JVM/内核/容量建议(单机 10~15万长连接基线)

  • JVM:JDK17;G1/ ZGC(内存≥32G可选 ZGC);-XX:MaxGCPauseMillis=100;堆 8~16G,直内存与堆外缓冲(PooledByteBufAllocator)≥堆;禁用偏向锁。
  • Netty-Dio.netty.allocator.type=pooled,开启 recycler.maxCapacity 合理回收。
  • 内核ulimit -n 1,000,000net.ipv4.tcp_tw_reuse=1somaxconn=65535tcp_max_syn_backlog=262144
  • 网卡:多队列中断、RPS/RFS;关闭大包分片导致的异常。
  • 压测:虚拟设备+连接增长/抖动/秒级断线重连;关注连接建立速率、写队列水位、99线RT。

# 灰度与降级

  • 解码失败/异常比率阈值→自动降级(限流/丢弃高频非关键事件)。
  • 业务侧拥塞→下行指令限流/队列长度报警。
  • Redis/Kafka 抖动→本地缓存短暂兜底(Caffeine + TTL 5~15s)。

# 扩容路线图(分阶段)

  • 阶段A(≤50万设备):2~3 台网关 + Redis 主从或小集群 + 3 节点 Kafka;Nginx 单层。
  • 阶段B(≤500万):网关水平扩展至 10+;Redis Cluster(69 节点);Kafka 57 节点,跨 AZ;NLB/SLB 健康检查。
  • 阶段C(≥2000万):网关分区域池;跨区域 Redis/Kafka 双活(异步复制);Topic 分域;Trident/流计算侧做实时聚合降噪;自研 L4/五元组路由。

# 最小可运行骨架(关键点)

  • 模块gateway-boot(Spring Boot)/ gateway-core(Netty)/ gateway-protocol(编解码)/ gateway-admin(运维面板)

  • 接口

    • POST /internal/cmd/send(下发)
    • GET /internal/channel/{deviceId}(查询在线/所在网关)
    • /actuator/prometheus(指标)