充电桩设备网关设计与实现

2024/1/1

# 充电桩设备网关设计与实现

# 1. 概述

充电桩设备网关是连接充电桩硬件设备与云端管理平台的核心组件,负责设备接入、协议转换、数据采集、指令下发、状态监控等关键功能。本文档详细介绍充电桩设备网关的架构设计、技术实现和部署方案。

# 1.1 核心功能

  • 设备接入管理:支持多种通信协议的充电桩设备接入
  • 协议转换:将设备私有协议转换为标准化数据格式
  • 数据采集:实时采集充电桩状态、充电数据、故障信息
  • 指令下发:接收云端指令并转发给充电桩设备
  • 状态监控:监控设备在线状态、健康状态
  • 数据缓存:本地数据缓存,保证网络异常时的数据完整性
  • 安全认证:设备身份认证和数据传输加密

# 1.2 应用场景

  • 公共充电站:城市公共充电基础设施管理
  • 企业充电桩:企业内部充电桩统一管理
  • 住宅小区:小区充电桩智能化管理
  • 高速服务区:高速公路充电站运营管理
  • 商业综合体:商场、写字楼充电桩服务

# 2. 系统架构

graph TB
    subgraph "充电桩设备层"
        A1[直流快充桩] --> B[设备网关]
        A2[交流慢充桩] --> B
        A3[超级充电桩] --> B
        A4[移动充电车] --> B
    end
    
    subgraph "网关核心层"
        B --> C[协议适配器]
        C --> D[数据处理引擎]
        D --> E[指令分发器]
        E --> F[状态管理器]
        F --> G[缓存管理器]
    end
    
    subgraph "通信层"
        G --> H[MQTT客户端]
        G --> I[HTTP客户端]
        G --> J[WebSocket客户端]
    end
    
    subgraph "云端平台"
        H --> K[消息队列]
        I --> L[API网关]
        J --> M[实时通信服务]
        K --> N[充电桩管理平台]
        L --> N
        M --> N
    end
    
    subgraph "外部系统"
        N --> O[支付系统]
        N --> P[用户APP]
        N --> Q[运营管理系统]
        N --> R[监控告警系统]
    end

# 2.1 架构分层

# 设备接入层

  • 多协议支持:RS485、CAN、Ethernet、4G/5G
  • 设备发现:自动发现和注册新设备
  • 连接管理:维护设备连接状态
  • 心跳检测:定期检测设备在线状态

# 协议处理层

  • 协议解析:解析各厂商私有协议
  • 数据标准化:转换为统一数据格式
  • 指令封装:将标准指令转换为设备协议
  • 错误处理:协议解析异常处理

# 业务逻辑层

  • 充电流程管理:启动、停止、暂停充电
  • 计费数据处理:电量、时长、费用计算
  • 故障诊断:设备故障检测和上报
  • 预约管理:充电预约和排队管理

# 通信传输层

  • 上行通信:向云端上报数据
  • 下行通信:接收云端指令
  • 数据压缩:减少网络传输开销
  • 断线重连:网络异常自动恢复

# 3. 技术实现

# 3.1 网关核心架构

/**
 * 充电桩设备网关主类
 */
@Component
@Slf4j
public class ChargingStationGateway {
    
    @Autowired
    private DeviceManager deviceManager;
    
    @Autowired
    private ProtocolAdapterManager protocolManager;
    
    @Autowired
    private DataProcessor dataProcessor;
    
    @Autowired
    private CloudCommunicator cloudCommunicator;
    
    @Autowired
    private LocalCacheManager cacheManager;
    
    /**
     * 网关启动
     */
    @PostConstruct
    public void start() {
        log.info("充电桩设备网关启动中...");
        
        // 初始化设备管理器
        deviceManager.initialize();
        
        // 启动协议适配器
        protocolManager.startAll();
        
        // 启动数据处理引擎
        dataProcessor.start();
        
        // 建立云端连接
        cloudCommunicator.connect();
        
        // 启动设备扫描
        startDeviceScanning();
        
        log.info("充电桩设备网关启动完成");
    }
    
    /**
     * 启动设备扫描
     */
    private void startDeviceScanning() {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> {
            try {
                deviceManager.scanDevices();
            } catch (Exception e) {
                log.error("设备扫描异常", e);
            }
        }, 0, 30, TimeUnit.SECONDS);
    }
}

# 3.2 设备管理器

/**
 * 设备管理器
 */
@Component
@Slf4j
public class DeviceManager {
    
    private final Map<String, ChargingStation> devices = new ConcurrentHashMap<>();
    private final Map<String, DeviceConnection> connections = new ConcurrentHashMap<>();
    
    @Autowired
    private DeviceRepository deviceRepository;
    
    @Autowired
    private ProtocolAdapterManager protocolManager;
    
    /**
     * 初始化设备管理器
     */
    public void initialize() {
        // 加载已注册设备
        loadRegisteredDevices();
        
        // 启动设备状态监控
        startDeviceMonitoring();
    }
    
    /**
     * 扫描新设备
     */
    public void scanDevices() {
        log.debug("开始扫描充电桩设备...");
        
        // 扫描串口设备
        scanSerialDevices();
        
        // 扫描网络设备
        scanNetworkDevices();
        
        // 扫描CAN总线设备
        scanCanDevices();
    }
    
    /**
     * 注册新设备
     */
    public void registerDevice(ChargingStation device) {
        String deviceId = device.getDeviceId();
        
        if (devices.containsKey(deviceId)) {
            log.warn("设备已存在: {}", deviceId);
            return;
        }
        
        // 建立设备连接
        DeviceConnection connection = createConnection(device);
        if (connection != null) {
            devices.put(deviceId, device);
            connections.put(deviceId, connection);
            
            // 保存到数据库
            deviceRepository.save(device);
            
            // 启动设备通信
            startDeviceCommunication(device, connection);
            
            log.info("设备注册成功: {} - {}", deviceId, device.getDeviceName());
        }
    }
    
    /**
     * 创建设备连接
     */
    private DeviceConnection createConnection(ChargingStation device) {
        try {
            switch (device.getConnectionType()) {
                case SERIAL:
                    return new SerialConnection(device.getSerialConfig());
                case TCP:
                    return new TcpConnection(device.getTcpConfig());
                case UDP:
                    return new UdpConnection(device.getUdpConfig());
                case CAN:
                    return new CanConnection(device.getCanConfig());
                default:
                    log.error("不支持的连接类型: {}", device.getConnectionType());
                    return null;
            }
        } catch (Exception e) {
            log.error("创建设备连接失败: {}", device.getDeviceId(), e);
            return null;
        }
    }
    
    /**
     * 启动设备通信
     */
    private void startDeviceCommunication(ChargingStation device, DeviceConnection connection) {
        // 获取协议适配器
        ProtocolAdapter adapter = protocolManager.getAdapter(device.getProtocolType());
        
        if (adapter == null) {
            log.error("未找到协议适配器: {}", device.getProtocolType());
            return;
        }
        
        // 启动数据接收线程
        Thread receiveThread = new Thread(() -> {
            while (connection.isConnected()) {
                try {
                    byte[] data = connection.receive();
                    if (data != null && data.length > 0) {
                        // 协议解析
                        DeviceMessage message = adapter.parse(data);
                        if (message != null) {
                            handleDeviceMessage(device.getDeviceId(), message);
                        }
                    }
                } catch (Exception e) {
                    log.error("设备数据接收异常: {}", device.getDeviceId(), e);
                    break;
                }
            }
        });
        
        receiveThread.setName("Device-Receive-" + device.getDeviceId());
        receiveThread.setDaemon(true);
        receiveThread.start();
    }
    
    /**
     * 处理设备消息
     */
    private void handleDeviceMessage(String deviceId, DeviceMessage message) {
        try {
            // 更新设备状态
            updateDeviceStatus(deviceId, message);
            
            // 处理业务数据
            processBusinessData(deviceId, message);
            
            // 上报到云端
            reportToCloud(deviceId, message);
            
        } catch (Exception e) {
            log.error("处理设备消息异常: {}", deviceId, e);
        }
    }
    
    /**
     * 发送指令到设备
     */
    public boolean sendCommand(String deviceId, DeviceCommand command) {
        ChargingStation device = devices.get(deviceId);
        DeviceConnection connection = connections.get(deviceId);
        
        if (device == null || connection == null || !connection.isConnected()) {
            log.warn("设备不在线或连接异常: {}", deviceId);
            return false;
        }
        
        try {
            // 获取协议适配器
            ProtocolAdapter adapter = protocolManager.getAdapter(device.getProtocolType());
            
            // 封装指令
            byte[] data = adapter.encode(command);
            
            // 发送数据
            return connection.send(data);
            
        } catch (Exception e) {
            log.error("发送设备指令异常: {}", deviceId, e);
            return false;
        }
    }
}

# 3.3 协议适配器

/**
 * 协议适配器接口
 */
public interface ProtocolAdapter {
    
    /**
     * 获取协议类型
     */
    ProtocolType getProtocolType();
    
    /**
     * 解析设备数据
     */
    DeviceMessage parse(byte[] data) throws ProtocolException;
    
    /**
     * 编码设备指令
     */
    byte[] encode(DeviceCommand command) throws ProtocolException;
    
    /**
     * 验证数据完整性
     */
    boolean validate(byte[] data);
}

/**
 * 国标GB/T 27930协议适配器
 */
@Component
@Slf4j
public class GBT27930Adapter implements ProtocolAdapter {
    
    @Override
    public ProtocolType getProtocolType() {
        return ProtocolType.GBT_27930;
    }
    
    @Override
    public DeviceMessage parse(byte[] data) throws ProtocolException {
        if (!validate(data)) {
            throw new ProtocolException("数据校验失败");
        }
        
        try {
            // 解析帧头
            int startFlag = ByteUtils.bytesToInt(data, 0, 2);
            if (startFlag != 0x6868) {
                throw new ProtocolException("帧头错误");
            }
            
            // 解析数据长度
            int dataLength = ByteUtils.bytesToInt(data, 2, 2);
            
            // 解析控制码
            int controlCode = data[4] & 0xFF;
            
            // 解析地址域
            byte[] addressBytes = Arrays.copyOfRange(data, 5, 11);
            String deviceAddress = ByteUtils.bytesToHex(addressBytes);
            
            // 解析数据域
            byte[] dataBytes = Arrays.copyOfRange(data, 11, 11 + dataLength - 8);
            
            // 根据控制码解析具体数据
            return parseDataByControlCode(controlCode, deviceAddress, dataBytes);
            
        } catch (Exception e) {
            throw new ProtocolException("协议解析异常", e);
        }
    }
    
    /**
     * 根据控制码解析数据
     */
    private DeviceMessage parseDataByControlCode(int controlCode, String deviceAddress, byte[] data) {
        switch (controlCode) {
            case 0x01: // 心跳包
                return parseHeartbeat(deviceAddress, data);
            case 0x02: // 状态信息
                return parseStatusInfo(deviceAddress, data);
            case 0x03: // 充电数据
                return parseChargingData(deviceAddress, data);
            case 0x04: // 故障信息
                return parseFaultInfo(deviceAddress, data);
            default:
                log.warn("未知控制码: {}", controlCode);
                return null;
        }
    }
    
    /**
     * 解析充电数据
     */
    private DeviceMessage parseChargingData(String deviceAddress, byte[] data) {
        ChargingDataMessage message = new ChargingDataMessage();
        message.setDeviceId(deviceAddress);
        message.setTimestamp(System.currentTimeMillis());
        
        int offset = 0;
        
        // 充电枪号
        message.setGunNo(data[offset++] & 0xFF);
        
        // 充电状态
        message.setChargingStatus(ChargingStatus.fromCode(data[offset++] & 0xFF));
        
        // 充电电压 (0.1V)
        message.setVoltage(ByteUtils.bytesToInt(data, offset, 2) * 0.1);
        offset += 2;
        
        // 充电电流 (0.1A)
        message.setCurrent(ByteUtils.bytesToInt(data, offset, 2) * 0.1);
        offset += 2;
        
        // 累计充电电量 (0.01kWh)
        message.setTotalEnergy(ByteUtils.bytesToInt(data, offset, 4) * 0.01);
        offset += 4;
        
        // 充电时长 (分钟)
        message.setChargingDuration(ByteUtils.bytesToInt(data, offset, 2));
        offset += 2;
        
        // SOC (1%)
        message.setSoc(data[offset++] & 0xFF);
        
        // 温度 (1°C, -40偏移)
        message.setTemperature((data[offset++] & 0xFF) - 40);
        
        return message;
    }
    
    @Override
    public byte[] encode(DeviceCommand command) throws ProtocolException {
        try {
            switch (command.getCommandType()) {
                case START_CHARGING:
                    return encodeStartChargingCommand((StartChargingCommand) command);
                case STOP_CHARGING:
                    return encodeStopChargingCommand((StopChargingCommand) command);
                case QUERY_STATUS:
                    return encodeQueryStatusCommand((QueryStatusCommand) command);
                default:
                    throw new ProtocolException("不支持的指令类型: " + command.getCommandType());
            }
        } catch (Exception e) {
            throw new ProtocolException("指令编码异常", e);
        }
    }
    
    /**
     * 编码启动充电指令
     */
    private byte[] encodeStartChargingCommand(StartChargingCommand command) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        
        // 帧头
        baos.write(0x68);
        baos.write(0x68);
        
        // 数据长度 (暂时写0,后面回填)
        baos.write(0x00);
        baos.write(0x00);
        
        // 控制码 - 启动充电
        baos.write(0x11);
        
        // 地址域
        byte[] addressBytes = ByteUtils.hexToBytes(command.getDeviceId());
        baos.write(addressBytes, 0, 6);
        
        // 数据域
        baos.write(command.getGunNo()); // 充电枪号
        
        // 充电模式
        baos.write(command.getChargingMode().getCode());
        
        // 充电参数
        if (command.getChargingMode() == ChargingMode.BY_ENERGY) {
            // 按电量充电 (0.01kWh)
            byte[] energyBytes = ByteUtils.intToBytes((int)(command.getTargetEnergy() * 100), 4);
            baos.write(energyBytes, 0, 4);
        } else if (command.getChargingMode() == ChargingMode.BY_TIME) {
            // 按时间充电 (分钟)
            byte[] timeBytes = ByteUtils.intToBytes(command.getTargetTime(), 2);
            baos.write(timeBytes, 0, 2);
            baos.write(0x00);
            baos.write(0x00);
        } else {
            // 自动充满
            baos.write(0x00);
            baos.write(0x00);
            baos.write(0x00);
            baos.write(0x00);
        }
        
        byte[] data = baos.toByteArray();
        
        // 回填数据长度
        int dataLength = data.length + 2; // 包含校验和
        data[2] = (byte)(dataLength & 0xFF);
        data[3] = (byte)((dataLength >> 8) & 0xFF);
        
        // 计算校验和
        int checksum = 0;
        for (int i = 0; i < data.length; i++) {
            checksum += data[i] & 0xFF;
        }
        
        // 添加校验和
        ByteArrayOutputStream result = new ByteArrayOutputStream();
        result.write(data, 0, data.length);
        result.write(checksum & 0xFF);
        result.write(0x16); // 结束符
        
        return result.toByteArray();
    }
    
    @Override
    public boolean validate(byte[] data) {
        if (data == null || data.length < 12) {
            return false;
        }
        
        // 检查帧头
        if (data[0] != 0x68 || data[1] != 0x68) {
            return false;
        }
        
        // 检查结束符
        if (data[data.length - 1] != 0x16) {
            return false;
        }
        
        // 校验和验证
        int checksum = 0;
        for (int i = 0; i < data.length - 2; i++) {
            checksum += data[i] & 0xFF;
        }
        
        return (checksum & 0xFF) == (data[data.length - 2] & 0xFF);
    }
}

# 3.4 数据处理引擎

/**
 * 数据处理引擎
 */
@Component
@Slf4j
public class DataProcessor {
    
    @Autowired
    private ChargingDataService chargingDataService;
    
    @Autowired
    private DeviceStatusService deviceStatusService;
    
    @Autowired
    private AlarmService alarmService;
    
    @Autowired
    private CloudCommunicator cloudCommunicator;
    
    private final BlockingQueue<ProcessTask> taskQueue = new LinkedBlockingQueue<>();
    private final ExecutorService executorService = Executors.newFixedThreadPool(10);
    
    /**
     * 启动数据处理引擎
     */
    public void start() {
        // 启动任务处理线程
        for (int i = 0; i < 5; i++) {
            executorService.submit(this::processTask);
        }
        
        log.info("数据处理引擎启动完成");
    }
    
    /**
     * 提交处理任务
     */
    public void submitTask(String deviceId, DeviceMessage message) {
        ProcessTask task = new ProcessTask(deviceId, message);
        
        if (!taskQueue.offer(task)) {
            log.warn("任务队列已满,丢弃消息: {}", deviceId);
        }
    }
    
    /**
     * 处理任务
     */
    private void processTask() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                ProcessTask task = taskQueue.take();
                handleMessage(task.getDeviceId(), task.getMessage());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                log.error("处理任务异常", e);
            }
        }
    }
    
    /**
     * 处理设备消息
     */
    private void handleMessage(String deviceId, DeviceMessage message) {
        try {
            switch (message.getMessageType()) {
                case HEARTBEAT:
                    handleHeartbeat(deviceId, (HeartbeatMessage) message);
                    break;
                case STATUS_INFO:
                    handleStatusInfo(deviceId, (StatusInfoMessage) message);
                    break;
                case CHARGING_DATA:
                    handleChargingData(deviceId, (ChargingDataMessage) message);
                    break;
                case FAULT_INFO:
                    handleFaultInfo(deviceId, (FaultInfoMessage) message);
                    break;
                default:
                    log.warn("未知消息类型: {}", message.getMessageType());
            }
        } catch (Exception e) {
            log.error("处理设备消息异常: {}", deviceId, e);
        }
    }
    
    /**
     * 处理充电数据
     */
    private void handleChargingData(String deviceId, ChargingDataMessage message) {
        // 保存充电数据
        ChargingRecord record = convertToChargingRecord(deviceId, message);
        chargingDataService.saveChargingRecord(record);
        
        // 更新设备状态
        deviceStatusService.updateChargingStatus(deviceId, message.getGunNo(), message.getChargingStatus());
        
        // 检查异常情况
        checkChargingAbnormal(deviceId, message);
        
        // 上报到云端
        cloudCommunicator.reportChargingData(deviceId, message);
        
        log.debug("处理充电数据: {} - 枪号:{}, 状态:{}, 电量:{}kWh", 
                 deviceId, message.getGunNo(), message.getChargingStatus(), message.getTotalEnergy());
    }
    
    /**
     * 检查充电异常
     */
    private void checkChargingAbnormal(String deviceId, ChargingDataMessage message) {
        // 检查电压异常
        if (message.getVoltage() > 500 || message.getVoltage() < 200) {
            alarmService.createAlarm(deviceId, AlarmType.VOLTAGE_ABNORMAL, 
                                   "充电电压异常: " + message.getVoltage() + "V");
        }
        
        // 检查电流异常
        if (message.getCurrent() > 250) {
            alarmService.createAlarm(deviceId, AlarmType.CURRENT_ABNORMAL, 
                                   "充电电流过大: " + message.getCurrent() + "A");
        }
        
        // 检查温度异常
        if (message.getTemperature() > 60) {
            alarmService.createAlarm(deviceId, AlarmType.TEMPERATURE_HIGH, 
                                   "设备温度过高: " + message.getTemperature() + "°C");
        }
        
        // 检查充电时长异常
        if (message.getChargingDuration() > 720) { // 超过12小时
            alarmService.createAlarm(deviceId, AlarmType.CHARGING_TIMEOUT, 
                                   "充电时长异常: " + message.getChargingDuration() + "分钟");
        }
    }
    
    /**
     * 转换为充电记录
     */
    private ChargingRecord convertToChargingRecord(String deviceId, ChargingDataMessage message) {
        ChargingRecord record = new ChargingRecord();
        record.setDeviceId(deviceId);
        record.setGunNo(message.getGunNo());
        record.setChargingStatus(message.getChargingStatus());
        record.setVoltage(message.getVoltage());
        record.setCurrent(message.getCurrent());
        record.setTotalEnergy(message.getTotalEnergy());
        record.setChargingDuration(message.getChargingDuration());
        record.setSoc(message.getSoc());
        record.setTemperature(message.getTemperature());
        record.setRecordTime(new Date(message.getTimestamp()));
        return record;
    }
}

# 3.5 云端通信组件

/**
 * 云端通信组件
 */
@Component
@Slf4j
public class CloudCommunicator {
    
    @Value("${gateway.cloud.mqtt.broker}")
    private String mqttBroker;
    
    @Value("${gateway.cloud.mqtt.username}")
    private String mqttUsername;
    
    @Value("${gateway.cloud.mqtt.password}")
    private String mqttPassword;
    
    @Value("${gateway.cloud.api.baseUrl}")
    private String apiBaseUrl;
    
    private MqttClient mqttClient;
    private RestTemplate restTemplate;
    
    @Autowired
    private LocalCacheManager cacheManager;
    
    /**
     * 建立云端连接
     */
    public void connect() {
        try {
            // 初始化MQTT客户端
            initMqttClient();
            
            // 初始化HTTP客户端
            initHttpClient();
            
            // 订阅下行指令主题
            subscribeCommandTopics();
            
            log.info("云端连接建立成功");
            
        } catch (Exception e) {
            log.error("建立云端连接失败", e);
            throw new RuntimeException("云端连接失败", e);
        }
    }
    
    /**
     * 初始化MQTT客户端
     */
    private void initMqttClient() throws MqttException {
        String clientId = "gateway-" + System.currentTimeMillis();
        mqttClient = new MqttClient(mqttBroker, clientId);
        
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqttUsername);
        options.setPassword(mqttPassword.toCharArray());
        options.setCleanSession(false);
        options.setKeepAliveInterval(60);
        options.setAutomaticReconnect(true);
        
        // 设置连接回调
        mqttClient.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                log.warn("MQTT连接丢失", cause);
            }
            
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                handleCloudCommand(topic, new String(message.getPayload()));
            }
            
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // 消息发送完成
            }
        });
        
        mqttClient.connect(options);
    }
    
    /**
     * 订阅指令主题
     */
    private void subscribeCommandTopics() throws MqttException {
        // 订阅设备控制指令
        mqttClient.subscribe("gateway/command/+/control", 1);
        
        // 订阅配置更新指令
        mqttClient.subscribe("gateway/command/config", 1);
        
        // 订阅固件升级指令
        mqttClient.subscribe("gateway/command/upgrade", 1);
    }
    
    /**
     * 处理云端指令
     */
    private void handleCloudCommand(String topic, String payload) {
        try {
            log.info("收到云端指令: {} - {}", topic, payload);
            
            if (topic.contains("/control")) {
                // 设备控制指令
                handleDeviceControlCommand(topic, payload);
            } else if (topic.contains("/config")) {
                // 配置更新指令
                handleConfigUpdateCommand(payload);
            } else if (topic.contains("/upgrade")) {
                // 固件升级指令
                handleUpgradeCommand(payload);
            }
            
        } catch (Exception e) {
            log.error("处理云端指令异常: {}", topic, e);
        }
    }
    
    /**
     * 处理设备控制指令
     */
    private void handleDeviceControlCommand(String topic, String payload) {
        // 解析设备ID
        String[] topicParts = topic.split("/");
        String deviceId = topicParts[2];
        
        // 解析指令
        DeviceControlCommand command = JSON.parseObject(payload, DeviceControlCommand.class);
        
        // 转发给设备管理器
        DeviceManager deviceManager = SpringContextUtils.getBean(DeviceManager.class);
        boolean success = deviceManager.sendCommand(deviceId, command);
        
        // 回复执行结果
        CommandResponse response = new CommandResponse();
        response.setCommandId(command.getCommandId());
        response.setDeviceId(deviceId);
        response.setSuccess(success);
        response.setTimestamp(System.currentTimeMillis());
        
        reportCommandResponse(response);
    }
    
    /**
     * 上报充电数据
     */
    public void reportChargingData(String deviceId, ChargingDataMessage message) {
        try {
            String topic = "gateway/data/" + deviceId + "/charging";
            String payload = JSON.toJSONString(message);
            
            MqttMessage mqttMessage = new MqttMessage(payload.getBytes());
            mqttMessage.setQos(1);
            
            mqttClient.publish(topic, mqttMessage);
            
        } catch (Exception e) {
            log.error("上报充电数据失败: {}", deviceId, e);
            
            // 保存到本地缓存,等待重试
            cacheManager.cacheFailedMessage(deviceId, message);
        }
    }
    
    /**
     * 上报设备状态
     */
    public void reportDeviceStatus(String deviceId, DeviceStatus status) {
        try {
            String topic = "gateway/status/" + deviceId;
            String payload = JSON.toJSONString(status);
            
            MqttMessage mqttMessage = new MqttMessage(payload.getBytes());
            mqttMessage.setQos(1);
            
            mqttClient.publish(topic, mqttMessage);
            
        } catch (Exception e) {
            log.error("上报设备状态失败: {}", deviceId, e);
        }
    }
    
    /**
     * 上报告警信息
     */
    public void reportAlarm(String deviceId, AlarmInfo alarm) {
        try {
            String topic = "gateway/alarm/" + deviceId;
            String payload = JSON.toJSONString(alarm);
            
            MqttMessage mqttMessage = new MqttMessage(payload.getBytes());
            mqttMessage.setQos(2); // 告警信息使用最高QoS
            
            mqttClient.publish(topic, mqttMessage);
            
        } catch (Exception e) {
            log.error("上报告警信息失败: {}", deviceId, e);
        }
    }
}

# 4. 数据模型设计

# 4.1 设备实体

@Entity
@Table(name = "charging_station")
public class ChargingStation {
    @Id
    private String deviceId;            // 设备ID
    
    private String deviceName;          // 设备名称
    private String deviceType;          // 设备类型:DC_FAST, AC_SLOW, SUPER
    private String manufacturer;        // 制造商
    private String model;              // 设备型号
    private String firmwareVersion;    // 固件版本
    
    @Enumerated(EnumType.STRING)
    private ConnectionType connectionType;  // 连接类型
    
    @Enumerated(EnumType.STRING)
    private ProtocolType protocolType;      // 协议类型
    
    private String location;           // 安装位置
    private Double latitude;           // 纬度
    private Double longitude;          // 经度
    
    private Integer gunCount;          // 充电枪数量
    private Double maxPower;           // 最大功率(kW)
    private Double maxVoltage;         // 最大电压(V)
    private Double maxCurrent;         // 最大电流(A)
    
    @Enumerated(EnumType.STRING)
    private DeviceStatus status;       // 设备状态
    
    private LocalDateTime installTime; // 安装时间
    private LocalDateTime lastOnlineTime; // 最后在线时间
    
    // 连接配置
    @Column(columnDefinition = "TEXT")
    private String serialConfig;       // 串口配置(JSON)
    
    @Column(columnDefinition = "TEXT")
    private String tcpConfig;          // TCP配置(JSON)
    
    @Column(columnDefinition = "TEXT")
    private String canConfig;          // CAN配置(JSON)
    
    // getter/setter省略
}

# 4.2 充电记录实体

@Entity
@Table(name = "charging_record")
public class ChargingRecord {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    private String deviceId;           // 设备ID
    private Integer gunNo;             // 充电枪号
    private String orderId;            // 订单ID
    private String userId;             // 用户ID
    
    @Enumerated(EnumType.STRING)
    private ChargingStatus chargingStatus; // 充电状态
    
    private Double voltage;            // 电压(V)
    private Double current;            // 电流(A)
    private Double power;              // 功率(kW)
    private Double totalEnergy;        // 累计电量(kWh)
    private Integer chargingDuration;  // 充电时长(分钟)
    private Integer soc;               // SOC(%)
    private Integer temperature;       // 温度(°C)
    
    private LocalDateTime startTime;   // 开始时间
    private LocalDateTime endTime;     // 结束时间
    private LocalDateTime recordTime;  // 记录时间
    
    private BigDecimal totalFee;       // 总费用
    private BigDecimal energyFee;      // 电费
    private BigDecimal serviceFee;     // 服务费
    
    // getter/setter省略
}

# 5. 配置管理

# 5.1 应用配置

# application.yml
server:
  port: 8080

spring:
  application:
    name: charging-station-gateway
  
  datasource:
    url: jdbc:mysql://localhost:3306/charging_gateway
    username: root
    password: password
    driver-class-name: com.mysql.cj.jdbc.Driver
  
  jpa:
    hibernate:
      ddl-auto: update
    show-sql: false
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQL8Dialect
  
  redis:
    host: localhost
    port: 6379
    password: 
    database: 0
    timeout: 3000ms
    lettuce:
      pool:
        max-active: 8
        max-idle: 8
        min-idle: 0

# 网关配置
gateway:
  # 云端连接配置
  cloud:
    mqtt:
      broker: tcp://mqtt.example.com:1883
      username: gateway_user
      password: gateway_pass
      client-id-prefix: charging-gateway
    
    api:
      base-url: https://api.example.com
      timeout: 30000
  
  # 设备扫描配置
  device:
    scan:
      interval: 30000  # 扫描间隔(ms)
      timeout: 5000    # 扫描超时(ms)
    
    heartbeat:
      interval: 60000  # 心跳间隔(ms)
      timeout: 10000   # 心跳超时(ms)
  
  # 数据处理配置
  data:
    processor:
      thread-pool-size: 10
      queue-capacity: 1000
    
    cache:
      max-size: 10000
      expire-time: 3600000  # 1小时
  
  # 串口配置
  serial:
    ports:
      - COM1
      - COM2
      - COM3
    baud-rate: 9600
    data-bits: 8
    stop-bits: 1
    parity: NONE
  
  # 网络配置
  network:
    tcp:
      port: 8888
      timeout: 5000
    
    udp:
      port: 8889
      buffer-size: 1024

# 日志配置
logging:
  level:
    com.example.gateway: DEBUG
    org.springframework.web: INFO
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
  file:
    name: logs/gateway.log
    max-size: 100MB
    max-history: 30

# 5.2 设备配置模板

{
  "deviceTemplates": [
    {
      "templateId": "dc_fast_charger_v1",
      "templateName": "直流快充桩模板V1",
      "deviceType": "DC_FAST",
      "protocolType": "GBT_27930",
      "connectionType": "TCP",
      "defaultConfig": {
        "gunCount": 2,
        "maxPower": 120,
        "maxVoltage": 750,
        "maxCurrent": 250,
        "tcpConfig": {
          "port": 8888,
          "timeout": 5000,
          "keepAlive": true
        }
      },
      "dataPoints": [
        {
          "name": "voltage",
          "type": "DOUBLE",
          "unit": "V",
          "range": [0, 1000]
        },
        {
          "name": "current",
          "type": "DOUBLE",
          "unit": "A",
          "range": [0, 300]
        },
        {
          "name": "power",
          "type": "DOUBLE",
          "unit": "kW",
          "range": [0, 150]
        }
      ]
    }
  ]
}

# 6. 监控与运维

# 6.1 健康检查

@RestController
@RequestMapping("/actuator")
public class HealthController {
    
    @Autowired
    private DeviceManager deviceManager;
    
    @Autowired
    private CloudCommunicator cloudCommunicator;
    
    /**
     * 网关健康检查
     */
    @GetMapping("/health")
    public ResponseEntity<Map<String, Object>> health() {
        Map<String, Object> health = new HashMap<>();
        
        // 检查设备连接状态
        int totalDevices = deviceManager.getTotalDeviceCount();
        int onlineDevices = deviceManager.getOnlineDeviceCount();
        
        health.put("totalDevices", totalDevices);
        health.put("onlineDevices", onlineDevices);
        health.put("deviceOnlineRate", totalDevices > 0 ? (double)onlineDevices / totalDevices : 0);
        
        // 检查云端连接状态
        health.put("cloudConnected", cloudCommunicator.isConnected());
        
        // 检查系统资源
        Runtime runtime = Runtime.getRuntime();
        health.put("memoryUsage", {
            "total": runtime.totalMemory(),
            "free": runtime.freeMemory(),
            "used": runtime.totalMemory() - runtime.freeMemory()
        });
        
        // 检查磁盘空间
        File disk = new File("/");
        health.put("diskUsage", {
            "total": disk.getTotalSpace(),
            "free": disk.getFreeSpace(),
            "used": disk.getTotalSpace() - disk.getFreeSpace()
        });
        
        return ResponseEntity.ok(health);
    }
    
    /**
     * 设备状态统计
     */
    @GetMapping("/metrics/devices")
    public ResponseEntity<Map<String, Object>> deviceMetrics() {
        Map<String, Object> metrics = deviceManager.getDeviceMetrics();
        return ResponseEntity.ok(metrics);
    }
}

# 6.2 日志管理

@Component
@Slf4j
public class LogManager {
    
    private static final String LOG_DIR = "logs";
    private static final String DEVICE_LOG_PREFIX = "device_";
    private static final String PROTOCOL_LOG_PREFIX = "protocol_";
    
    /**
     * 记录设备通信日志
     */
    public void logDeviceCommunication(String deviceId, String direction, byte[] data) {
        try {
            String logFile = LOG_DIR + "/" + DEVICE_LOG_PREFIX + deviceId + ".log";
            String logContent = String.format("%s [%s] %s: %s%n", 
                                             LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
                                             direction,
                                             deviceId,
                                             ByteUtils.bytesToHex(data));
            
            Files.write(Paths.get(logFile), logContent.getBytes(), 
                       StandardOpenOption.CREATE, StandardOpenOption.APPEND);
                       
        } catch (Exception e) {
            log.error("记录设备通信日志失败: {}", deviceId, e);
        }
    }
    
    /**
     * 记录协议解析日志
     */
    public void logProtocolParsing(String deviceId, String protocol, String result) {
        try {
            String logFile = LOG_DIR + "/" + PROTOCOL_LOG_PREFIX + protocol + ".log";
            String logContent = String.format("%s [%s] %s: %s%n", 
                                             LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
                                             deviceId,
                                             protocol,
                                             result);
            
            Files.write(Paths.get(logFile), logContent.getBytes(), 
                       StandardOpenOption.CREATE, StandardOpenOption.APPEND);
                       
        } catch (Exception e) {
            log.error("记录协议解析日志失败: {}", deviceId, e);
        }
    }
}

# 7. 部署方案

# 7.1 Docker部署

# Dockerfile
FROM openjdk:11-jre-slim

# 安装必要的系统依赖
RUN apt-get update && apt-get install -y \
    librxtx-java \
    && rm -rf /var/lib/apt/lists/*

# 设置工作目录
WORKDIR /app

# 复制应用文件
COPY target/charging-station-gateway.jar app.jar
COPY config/ config/
COPY scripts/ scripts/

# 创建日志目录
RUN mkdir -p logs

# 设置环境变量
ENV JAVA_OPTS="-Xms512m -Xmx1024m -Djava.library.path=/usr/lib/jni"

# 暴露端口
EXPOSE 8080 8888 8889

# 启动脚本
CMD ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
# docker-compose.yml
version: '3.8'

services:
  charging-gateway:
    build: .
    container_name: charging-station-gateway
    ports:
      - "8080:8080"    # HTTP API
      - "8888:8888"    # TCP设备接入
      - "8889:8889"    # UDP设备接入
    environment:
      - SPRING_PROFILES_ACTIVE=docker
      - SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/charging_gateway
      - SPRING_REDIS_HOST=redis
      - GATEWAY_CLOUD_MQTT_BROKER=tcp://mqtt:1883
    volumes:
      - ./logs:/app/logs
      - ./config:/app/config
      - /dev:/dev    # 串口设备映射
    devices:
      - "/dev/ttyUSB0:/dev/ttyUSB0"  # 串口设备
    privileged: true  # 访问串口需要特权模式
    depends_on:
      - mysql
      - redis
      - mqtt
    restart: unless-stopped
    networks:
      - gateway-network
  
  mysql:
    image: mysql:8.0
    container_name: gateway-mysql
    environment:
      - MYSQL_ROOT_PASSWORD=root123
      - MYSQL_DATABASE=charging_gateway
      - MYSQL_USER=gateway
      - MYSQL_PASSWORD=gateway123
    volumes:
      - mysql-data:/var/lib/mysql
      - ./sql:/docker-entrypoint-initdb.d
    ports:
      - "3306:3306"
    networks:
      - gateway-network
  
  redis:
    image: redis:6.2-alpine
    container_name: gateway-redis
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data
    ports:
      - "6379:6379"
    networks:
      - gateway-network
  
  mqtt:
    image: eclipse-mosquitto:2.0
    container_name: gateway-mqtt
    volumes:
      - ./mosquitto/config:/mosquitto/config
      - ./mosquitto/data:/mosquitto/data
      - ./mosquitto/log:/mosquitto/log
    ports:
      - "1883:1883"
      - "9001:9001"
    networks:
      - gateway-network

volumes:
  mysql-data:
  redis-data:

networks:
  gateway-network:
    driver: bridge

# 7.2 Kubernetes部署

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: charging-station-gateway
  labels:
    app: charging-station-gateway
spec:
  replicas: 2
  selector:
    matchLabels:
      app: charging-station-gateway
  template:
    metadata:
      labels:
        app: charging-station-gateway
    spec:
      containers:
      - name: gateway
        image: charging-station-gateway:latest
        ports:
        - containerPort: 8080
        - containerPort: 8888
        - containerPort: 8889
        env:
        - name: SPRING_PROFILES_ACTIVE
          value: "k8s"
        - name: SPRING_DATASOURCE_URL
          value: "jdbc:mysql://mysql-service:3306/charging_gateway"
        - name: SPRING_REDIS_HOST
          value: "redis-service"
        - name: GATEWAY_CLOUD_MQTT_BROKER
          value: "tcp://mqtt-service:1883"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        volumeMounts:
        - name: logs-volume
          mountPath: /app/logs
        - name: config-volume
          mountPath: /app/config
        livenessProbe:
          httpGet:
            path: /actuator/health
            port: 8080
          initialDelaySeconds: 60
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /actuator/health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
      volumes:
      - name: logs-volume
        persistentVolumeClaim:
          claimName: gateway-logs-pvc
      - name: config-volume
        configMap:
          name: gateway-config
---
apiVersion: v1
kind: Service
metadata:
  name: charging-station-gateway-service
spec:
  selector:
    app: charging-station-gateway
  ports:
  - name: http
    protocol: TCP
    port: 80
    targetPort: 8080
  - name: tcp-device
    protocol: TCP
    port: 8888
    targetPort: 8888
  - name: udp-device
    protocol: UDP
    port: 8889
    targetPort: 8889
  type: LoadBalancer

# 8. 安全设计

# 8.1 设备认证

@Component
public class DeviceAuthenticator {
    
    @Autowired
    private DeviceRepository deviceRepository;
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    /**
     * 设备认证
     */
    public boolean authenticate(String deviceId, String authCode) {
        try {
            // 检查设备是否存在
            ChargingStation device = deviceRepository.findByDeviceId(deviceId);
            if (device == null) {
                log.warn("设备不存在: {}", deviceId);
                return false;
            }
            
            // 验证认证码
            String expectedAuthCode = generateAuthCode(deviceId, device.getSecretKey());
            if (!authCode.equals(expectedAuthCode)) {
                log.warn("设备认证失败: {}", deviceId);
                return false;
            }
            
            // 检查设备状态
            if (device.getStatus() != DeviceStatus.ACTIVE) {
                log.warn("设备状态异常: {} - {}", deviceId, device.getStatus());
                return false;
            }
            
            // 缓存认证结果
            cacheAuthResult(deviceId, true);
            
            return true;
            
        } catch (Exception e) {
            log.error("设备认证异常: {}", deviceId, e);
            return false;
        }
    }
    
    /**
     * 生成认证码
     */
    private String generateAuthCode(String deviceId, String secretKey) {
        long timestamp = System.currentTimeMillis() / 60000; // 分钟级时间戳
        String data = deviceId + secretKey + timestamp;
        return DigestUtils.md5DigestAsHex(data.getBytes()).substring(0, 8);
    }
}

# 8.2 数据加密

@Component
public class DataEncryption {
    
    private static final String ALGORITHM = "AES/CBC/PKCS5Padding";
    private static final String KEY_ALGORITHM = "AES";
    
    @Value("${gateway.security.encryption.key}")
    private String encryptionKey;
    
    /**
     * 加密数据
     */
    public byte[] encrypt(byte[] data) throws Exception {
        SecretKeySpec keySpec = new SecretKeySpec(encryptionKey.getBytes(), KEY_ALGORITHM);
        
        Cipher cipher = Cipher.getInstance(ALGORITHM);
        
        // 生成随机IV
        byte[] iv = new byte[16];
        SecureRandom.getInstanceStrong().nextBytes(iv);
        IvParameterSpec ivSpec = new IvParameterSpec(iv);
        
        cipher.init(Cipher.ENCRYPT_MODE, keySpec, ivSpec);
        byte[] encrypted = cipher.doFinal(data);
        
        // 将IV和加密数据合并
        byte[] result = new byte[iv.length + encrypted.length];
        System.arraycopy(iv, 0, result, 0, iv.length);
        System.arraycopy(encrypted, 0, result, iv.length, encrypted.length);
        
        return result;
    }
    
    /**
     * 解密数据
     */
    public byte[] decrypt(byte[] encryptedData) throws Exception {
        SecretKeySpec keySpec = new SecretKeySpec(encryptionKey.getBytes(), KEY_ALGORITHM);
        
        Cipher cipher = Cipher.getInstance(ALGORITHM);
        
        // 提取IV
        byte[] iv = new byte[16];
        System.arraycopy(encryptedData, 0, iv, 0, 16);
        IvParameterSpec ivSpec = new IvParameterSpec(iv);
        
        // 提取加密数据
        byte[] encrypted = new byte[encryptedData.length - 16];
        System.arraycopy(encryptedData, 16, encrypted, 0, encrypted.length);
        
        cipher.init(Cipher.DECRYPT_MODE, keySpec, ivSpec);
        return cipher.doFinal(encrypted);
    }
}

# 9. 性能优化

# 9.1 连接池管理

@Component
public class ConnectionPoolManager {
    
    private final Map<String, ObjectPool<DeviceConnection>> connectionPools = new ConcurrentHashMap<>();
    
    /**
     * 获取连接池
     */
    public ObjectPool<DeviceConnection> getConnectionPool(String deviceType) {
        return connectionPools.computeIfAbsent(deviceType, this::createConnectionPool);
    }
    
    /**
     * 创建连接池
     */
    private ObjectPool<DeviceConnection> createConnectionPool(String deviceType) {
        GenericObjectPoolConfig<DeviceConnection> config = new GenericObjectPoolConfig<>();
        config.setMaxTotal(20);
        config.setMaxIdle(10);
        config.setMinIdle(2);
        config.setTestOnBorrow(true);
        config.setTestOnReturn(true);
        
        return new GenericObjectPool<>(new DeviceConnectionFactory(deviceType), config);
    }
}

# 9.2 数据缓存策略

@Component
public class DataCacheManager {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private final Cache<String, Object> localCache = Caffeine.newBuilder()
            .maximumSize(10000)
            .expireAfterWrite(Duration.ofMinutes(30))
            .build();
    
    /**
     * 缓存设备状态
     */
    public void cacheDeviceStatus(String deviceId, DeviceStatus status) {
        String key = "device:status:" + deviceId;
        
        // 本地缓存
        localCache.put(key, status);
        
        // Redis缓存
        redisTemplate.opsForValue().set(key, status, Duration.ofHours(1));
    }
    
    /**
     * 获取设备状态
     */
    public DeviceStatus getDeviceStatus(String deviceId) {
        String key = "device:status:" + deviceId;
        
        // 先查本地缓存
        DeviceStatus status = (DeviceStatus) localCache.getIfPresent(key);
        if (status != null) {
            return status;
        }
        
        // 再查Redis缓存
        status = (DeviceStatus) redisTemplate.opsForValue().get(key);
        if (status != null) {
            localCache.put(key, status);
        }
        
        return status;
    }
}

# 10. 故障处理

# 10.1 异常处理机制

@Component
public class FaultHandler {
    
    @Autowired
    private AlarmService alarmService;
    
    @Autowired
    private NotificationService notificationService;
    
    /**
     * 处理设备故障
     */
    public void handleDeviceFault(String deviceId, FaultInfo fault) {
        try {
            // 记录故障信息
            logFault(deviceId, fault);
            
            // 创建告警
            AlarmInfo alarm = createAlarm(deviceId, fault);
            alarmService.createAlarm(alarm);
            
            // 执行故障处理策略
            executeFaultStrategy(deviceId, fault);
            
            // 发送通知
            sendFaultNotification(deviceId, fault);
            
        } catch (Exception e) {
            log.error("处理设备故障异常: {}", deviceId, e);
        }
    }
    
    /**
     * 执行故障处理策略
     */
    private void executeFaultStrategy(String deviceId, FaultInfo fault) {
        switch (fault.getFaultLevel()) {
            case CRITICAL:
                // 严重故障:立即停止充电,断开设备
                stopChargingImmediately(deviceId);
                disconnectDevice(deviceId);
                break;
            case HIGH:
                // 高级故障:停止新的充电请求
                blockNewChargingRequests(deviceId);
                break;
            case MEDIUM:
                // 中级故障:降低充电功率
                reducePower(deviceId, 0.5);
                break;
            case LOW:
                // 低级故障:仅记录和监控
                increaseMonitoringFrequency(deviceId);
                break;
        }
    }
}

# 11. 总结

充电桩设备网关作为充电基础设施的核心组件,承担着设备接入、协议转换、数据处理、状态监控等关键职责。本文档详细介绍了网关的架构设计、技术实现、部署方案和运维管理,为充电桩设备的智能化管理提供了完整的技术解决方案。

# 11.1 核心特性

  • 多协议支持:兼容国标GB/T 27930、OCPP等主流协议
  • 高可靠性:支持断线重连、数据缓存、故障自愈
  • 高性能:采用异步处理、连接池、缓存优化
  • 安全保障:设备认证、数据加密、访问控制
  • 易于扩展:模块化设计、插件化架构
  • 运维友好:完善的监控、日志、告警机制

# 11.2 应用价值

  • 提升运营效率:自动化设备管理,减少人工干预
  • 保障服务质量:实时监控设备状态,快速响应故障
  • 降低运维成本:统一管理平台,简化运维流程
  • 增强用户体验:稳定可靠的充电服务
  • 支持业务扩展:灵活的架构设计,支持快速业务迭代

通过本网关系统的实施,可以有效提升充电桩设备的智能化水平,为新能源汽车充电服务提供强有力的技术支撑。