充电桩设备网关设计与实现
哪吒 2024/1/1
# 充电桩设备网关设计与实现
# 1. 概述
充电桩设备网关是连接充电桩硬件设备与云端管理平台的核心组件,负责设备接入、协议转换、数据采集、指令下发、状态监控等关键功能。本文档详细介绍充电桩设备网关的架构设计、技术实现和部署方案。
# 1.1 核心功能
- 设备接入管理:支持多种通信协议的充电桩设备接入
- 协议转换:将设备私有协议转换为标准化数据格式
- 数据采集:实时采集充电桩状态、充电数据、故障信息
- 指令下发:接收云端指令并转发给充电桩设备
- 状态监控:监控设备在线状态、健康状态
- 数据缓存:本地数据缓存,保证网络异常时的数据完整性
- 安全认证:设备身份认证和数据传输加密
# 1.2 应用场景
- 公共充电站:城市公共充电基础设施管理
- 企业充电桩:企业内部充电桩统一管理
- 住宅小区:小区充电桩智能化管理
- 高速服务区:高速公路充电站运营管理
- 商业综合体:商场、写字楼充电桩服务
# 2. 系统架构
graph TB
subgraph "充电桩设备层"
A1[直流快充桩] --> B[设备网关]
A2[交流慢充桩] --> B
A3[超级充电桩] --> B
A4[移动充电车] --> B
end
subgraph "网关核心层"
B --> C[协议适配器]
C --> D[数据处理引擎]
D --> E[指令分发器]
E --> F[状态管理器]
F --> G[缓存管理器]
end
subgraph "通信层"
G --> H[MQTT客户端]
G --> I[HTTP客户端]
G --> J[WebSocket客户端]
end
subgraph "云端平台"
H --> K[消息队列]
I --> L[API网关]
J --> M[实时通信服务]
K --> N[充电桩管理平台]
L --> N
M --> N
end
subgraph "外部系统"
N --> O[支付系统]
N --> P[用户APP]
N --> Q[运营管理系统]
N --> R[监控告警系统]
end
# 2.1 架构分层
# 设备接入层
- 多协议支持:RS485、CAN、Ethernet、4G/5G
- 设备发现:自动发现和注册新设备
- 连接管理:维护设备连接状态
- 心跳检测:定期检测设备在线状态
# 协议处理层
- 协议解析:解析各厂商私有协议
- 数据标准化:转换为统一数据格式
- 指令封装:将标准指令转换为设备协议
- 错误处理:协议解析异常处理
# 业务逻辑层
- 充电流程管理:启动、停止、暂停充电
- 计费数据处理:电量、时长、费用计算
- 故障诊断:设备故障检测和上报
- 预约管理:充电预约和排队管理
# 通信传输层
- 上行通信:向云端上报数据
- 下行通信:接收云端指令
- 数据压缩:减少网络传输开销
- 断线重连:网络异常自动恢复
# 3. 技术实现
# 3.1 网关核心架构
/**
* 充电桩设备网关主类
*/
@Component
@Slf4j
public class ChargingStationGateway {
@Autowired
private DeviceManager deviceManager;
@Autowired
private ProtocolAdapterManager protocolManager;
@Autowired
private DataProcessor dataProcessor;
@Autowired
private CloudCommunicator cloudCommunicator;
@Autowired
private LocalCacheManager cacheManager;
/**
* 网关启动
*/
@PostConstruct
public void start() {
log.info("充电桩设备网关启动中...");
// 初始化设备管理器
deviceManager.initialize();
// 启动协议适配器
protocolManager.startAll();
// 启动数据处理引擎
dataProcessor.start();
// 建立云端连接
cloudCommunicator.connect();
// 启动设备扫描
startDeviceScanning();
log.info("充电桩设备网关启动完成");
}
/**
* 启动设备扫描
*/
private void startDeviceScanning() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
deviceManager.scanDevices();
} catch (Exception e) {
log.error("设备扫描异常", e);
}
}, 0, 30, TimeUnit.SECONDS);
}
}
# 3.2 设备管理器
/**
* 设备管理器
*/
@Component
@Slf4j
public class DeviceManager {
private final Map<String, ChargingStation> devices = new ConcurrentHashMap<>();
private final Map<String, DeviceConnection> connections = new ConcurrentHashMap<>();
@Autowired
private DeviceRepository deviceRepository;
@Autowired
private ProtocolAdapterManager protocolManager;
/**
* 初始化设备管理器
*/
public void initialize() {
// 加载已注册设备
loadRegisteredDevices();
// 启动设备状态监控
startDeviceMonitoring();
}
/**
* 扫描新设备
*/
public void scanDevices() {
log.debug("开始扫描充电桩设备...");
// 扫描串口设备
scanSerialDevices();
// 扫描网络设备
scanNetworkDevices();
// 扫描CAN总线设备
scanCanDevices();
}
/**
* 注册新设备
*/
public void registerDevice(ChargingStation device) {
String deviceId = device.getDeviceId();
if (devices.containsKey(deviceId)) {
log.warn("设备已存在: {}", deviceId);
return;
}
// 建立设备连接
DeviceConnection connection = createConnection(device);
if (connection != null) {
devices.put(deviceId, device);
connections.put(deviceId, connection);
// 保存到数据库
deviceRepository.save(device);
// 启动设备通信
startDeviceCommunication(device, connection);
log.info("设备注册成功: {} - {}", deviceId, device.getDeviceName());
}
}
/**
* 创建设备连接
*/
private DeviceConnection createConnection(ChargingStation device) {
try {
switch (device.getConnectionType()) {
case SERIAL:
return new SerialConnection(device.getSerialConfig());
case TCP:
return new TcpConnection(device.getTcpConfig());
case UDP:
return new UdpConnection(device.getUdpConfig());
case CAN:
return new CanConnection(device.getCanConfig());
default:
log.error("不支持的连接类型: {}", device.getConnectionType());
return null;
}
} catch (Exception e) {
log.error("创建设备连接失败: {}", device.getDeviceId(), e);
return null;
}
}
/**
* 启动设备通信
*/
private void startDeviceCommunication(ChargingStation device, DeviceConnection connection) {
// 获取协议适配器
ProtocolAdapter adapter = protocolManager.getAdapter(device.getProtocolType());
if (adapter == null) {
log.error("未找到协议适配器: {}", device.getProtocolType());
return;
}
// 启动数据接收线程
Thread receiveThread = new Thread(() -> {
while (connection.isConnected()) {
try {
byte[] data = connection.receive();
if (data != null && data.length > 0) {
// 协议解析
DeviceMessage message = adapter.parse(data);
if (message != null) {
handleDeviceMessage(device.getDeviceId(), message);
}
}
} catch (Exception e) {
log.error("设备数据接收异常: {}", device.getDeviceId(), e);
break;
}
}
});
receiveThread.setName("Device-Receive-" + device.getDeviceId());
receiveThread.setDaemon(true);
receiveThread.start();
}
/**
* 处理设备消息
*/
private void handleDeviceMessage(String deviceId, DeviceMessage message) {
try {
// 更新设备状态
updateDeviceStatus(deviceId, message);
// 处理业务数据
processBusinessData(deviceId, message);
// 上报到云端
reportToCloud(deviceId, message);
} catch (Exception e) {
log.error("处理设备消息异常: {}", deviceId, e);
}
}
/**
* 发送指令到设备
*/
public boolean sendCommand(String deviceId, DeviceCommand command) {
ChargingStation device = devices.get(deviceId);
DeviceConnection connection = connections.get(deviceId);
if (device == null || connection == null || !connection.isConnected()) {
log.warn("设备不在线或连接异常: {}", deviceId);
return false;
}
try {
// 获取协议适配器
ProtocolAdapter adapter = protocolManager.getAdapter(device.getProtocolType());
// 封装指令
byte[] data = adapter.encode(command);
// 发送数据
return connection.send(data);
} catch (Exception e) {
log.error("发送设备指令异常: {}", deviceId, e);
return false;
}
}
}
# 3.3 协议适配器
/**
* 协议适配器接口
*/
public interface ProtocolAdapter {
/**
* 获取协议类型
*/
ProtocolType getProtocolType();
/**
* 解析设备数据
*/
DeviceMessage parse(byte[] data) throws ProtocolException;
/**
* 编码设备指令
*/
byte[] encode(DeviceCommand command) throws ProtocolException;
/**
* 验证数据完整性
*/
boolean validate(byte[] data);
}
/**
* 国标GB/T 27930协议适配器
*/
@Component
@Slf4j
public class GBT27930Adapter implements ProtocolAdapter {
@Override
public ProtocolType getProtocolType() {
return ProtocolType.GBT_27930;
}
@Override
public DeviceMessage parse(byte[] data) throws ProtocolException {
if (!validate(data)) {
throw new ProtocolException("数据校验失败");
}
try {
// 解析帧头
int startFlag = ByteUtils.bytesToInt(data, 0, 2);
if (startFlag != 0x6868) {
throw new ProtocolException("帧头错误");
}
// 解析数据长度
int dataLength = ByteUtils.bytesToInt(data, 2, 2);
// 解析控制码
int controlCode = data[4] & 0xFF;
// 解析地址域
byte[] addressBytes = Arrays.copyOfRange(data, 5, 11);
String deviceAddress = ByteUtils.bytesToHex(addressBytes);
// 解析数据域
byte[] dataBytes = Arrays.copyOfRange(data, 11, 11 + dataLength - 8);
// 根据控制码解析具体数据
return parseDataByControlCode(controlCode, deviceAddress, dataBytes);
} catch (Exception e) {
throw new ProtocolException("协议解析异常", e);
}
}
/**
* 根据控制码解析数据
*/
private DeviceMessage parseDataByControlCode(int controlCode, String deviceAddress, byte[] data) {
switch (controlCode) {
case 0x01: // 心跳包
return parseHeartbeat(deviceAddress, data);
case 0x02: // 状态信息
return parseStatusInfo(deviceAddress, data);
case 0x03: // 充电数据
return parseChargingData(deviceAddress, data);
case 0x04: // 故障信息
return parseFaultInfo(deviceAddress, data);
default:
log.warn("未知控制码: {}", controlCode);
return null;
}
}
/**
* 解析充电数据
*/
private DeviceMessage parseChargingData(String deviceAddress, byte[] data) {
ChargingDataMessage message = new ChargingDataMessage();
message.setDeviceId(deviceAddress);
message.setTimestamp(System.currentTimeMillis());
int offset = 0;
// 充电枪号
message.setGunNo(data[offset++] & 0xFF);
// 充电状态
message.setChargingStatus(ChargingStatus.fromCode(data[offset++] & 0xFF));
// 充电电压 (0.1V)
message.setVoltage(ByteUtils.bytesToInt(data, offset, 2) * 0.1);
offset += 2;
// 充电电流 (0.1A)
message.setCurrent(ByteUtils.bytesToInt(data, offset, 2) * 0.1);
offset += 2;
// 累计充电电量 (0.01kWh)
message.setTotalEnergy(ByteUtils.bytesToInt(data, offset, 4) * 0.01);
offset += 4;
// 充电时长 (分钟)
message.setChargingDuration(ByteUtils.bytesToInt(data, offset, 2));
offset += 2;
// SOC (1%)
message.setSoc(data[offset++] & 0xFF);
// 温度 (1°C, -40偏移)
message.setTemperature((data[offset++] & 0xFF) - 40);
return message;
}
@Override
public byte[] encode(DeviceCommand command) throws ProtocolException {
try {
switch (command.getCommandType()) {
case START_CHARGING:
return encodeStartChargingCommand((StartChargingCommand) command);
case STOP_CHARGING:
return encodeStopChargingCommand((StopChargingCommand) command);
case QUERY_STATUS:
return encodeQueryStatusCommand((QueryStatusCommand) command);
default:
throw new ProtocolException("不支持的指令类型: " + command.getCommandType());
}
} catch (Exception e) {
throw new ProtocolException("指令编码异常", e);
}
}
/**
* 编码启动充电指令
*/
private byte[] encodeStartChargingCommand(StartChargingCommand command) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// 帧头
baos.write(0x68);
baos.write(0x68);
// 数据长度 (暂时写0,后面回填)
baos.write(0x00);
baos.write(0x00);
// 控制码 - 启动充电
baos.write(0x11);
// 地址域
byte[] addressBytes = ByteUtils.hexToBytes(command.getDeviceId());
baos.write(addressBytes, 0, 6);
// 数据域
baos.write(command.getGunNo()); // 充电枪号
// 充电模式
baos.write(command.getChargingMode().getCode());
// 充电参数
if (command.getChargingMode() == ChargingMode.BY_ENERGY) {
// 按电量充电 (0.01kWh)
byte[] energyBytes = ByteUtils.intToBytes((int)(command.getTargetEnergy() * 100), 4);
baos.write(energyBytes, 0, 4);
} else if (command.getChargingMode() == ChargingMode.BY_TIME) {
// 按时间充电 (分钟)
byte[] timeBytes = ByteUtils.intToBytes(command.getTargetTime(), 2);
baos.write(timeBytes, 0, 2);
baos.write(0x00);
baos.write(0x00);
} else {
// 自动充满
baos.write(0x00);
baos.write(0x00);
baos.write(0x00);
baos.write(0x00);
}
byte[] data = baos.toByteArray();
// 回填数据长度
int dataLength = data.length + 2; // 包含校验和
data[2] = (byte)(dataLength & 0xFF);
data[3] = (byte)((dataLength >> 8) & 0xFF);
// 计算校验和
int checksum = 0;
for (int i = 0; i < data.length; i++) {
checksum += data[i] & 0xFF;
}
// 添加校验和
ByteArrayOutputStream result = new ByteArrayOutputStream();
result.write(data, 0, data.length);
result.write(checksum & 0xFF);
result.write(0x16); // 结束符
return result.toByteArray();
}
@Override
public boolean validate(byte[] data) {
if (data == null || data.length < 12) {
return false;
}
// 检查帧头
if (data[0] != 0x68 || data[1] != 0x68) {
return false;
}
// 检查结束符
if (data[data.length - 1] != 0x16) {
return false;
}
// 校验和验证
int checksum = 0;
for (int i = 0; i < data.length - 2; i++) {
checksum += data[i] & 0xFF;
}
return (checksum & 0xFF) == (data[data.length - 2] & 0xFF);
}
}
# 3.4 数据处理引擎
/**
* 数据处理引擎
*/
@Component
@Slf4j
public class DataProcessor {
@Autowired
private ChargingDataService chargingDataService;
@Autowired
private DeviceStatusService deviceStatusService;
@Autowired
private AlarmService alarmService;
@Autowired
private CloudCommunicator cloudCommunicator;
private final BlockingQueue<ProcessTask> taskQueue = new LinkedBlockingQueue<>();
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
* 启动数据处理引擎
*/
public void start() {
// 启动任务处理线程
for (int i = 0; i < 5; i++) {
executorService.submit(this::processTask);
}
log.info("数据处理引擎启动完成");
}
/**
* 提交处理任务
*/
public void submitTask(String deviceId, DeviceMessage message) {
ProcessTask task = new ProcessTask(deviceId, message);
if (!taskQueue.offer(task)) {
log.warn("任务队列已满,丢弃消息: {}", deviceId);
}
}
/**
* 处理任务
*/
private void processTask() {
while (!Thread.currentThread().isInterrupted()) {
try {
ProcessTask task = taskQueue.take();
handleMessage(task.getDeviceId(), task.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("处理任务异常", e);
}
}
}
/**
* 处理设备消息
*/
private void handleMessage(String deviceId, DeviceMessage message) {
try {
switch (message.getMessageType()) {
case HEARTBEAT:
handleHeartbeat(deviceId, (HeartbeatMessage) message);
break;
case STATUS_INFO:
handleStatusInfo(deviceId, (StatusInfoMessage) message);
break;
case CHARGING_DATA:
handleChargingData(deviceId, (ChargingDataMessage) message);
break;
case FAULT_INFO:
handleFaultInfo(deviceId, (FaultInfoMessage) message);
break;
default:
log.warn("未知消息类型: {}", message.getMessageType());
}
} catch (Exception e) {
log.error("处理设备消息异常: {}", deviceId, e);
}
}
/**
* 处理充电数据
*/
private void handleChargingData(String deviceId, ChargingDataMessage message) {
// 保存充电数据
ChargingRecord record = convertToChargingRecord(deviceId, message);
chargingDataService.saveChargingRecord(record);
// 更新设备状态
deviceStatusService.updateChargingStatus(deviceId, message.getGunNo(), message.getChargingStatus());
// 检查异常情况
checkChargingAbnormal(deviceId, message);
// 上报到云端
cloudCommunicator.reportChargingData(deviceId, message);
log.debug("处理充电数据: {} - 枪号:{}, 状态:{}, 电量:{}kWh",
deviceId, message.getGunNo(), message.getChargingStatus(), message.getTotalEnergy());
}
/**
* 检查充电异常
*/
private void checkChargingAbnormal(String deviceId, ChargingDataMessage message) {
// 检查电压异常
if (message.getVoltage() > 500 || message.getVoltage() < 200) {
alarmService.createAlarm(deviceId, AlarmType.VOLTAGE_ABNORMAL,
"充电电压异常: " + message.getVoltage() + "V");
}
// 检查电流异常
if (message.getCurrent() > 250) {
alarmService.createAlarm(deviceId, AlarmType.CURRENT_ABNORMAL,
"充电电流过大: " + message.getCurrent() + "A");
}
// 检查温度异常
if (message.getTemperature() > 60) {
alarmService.createAlarm(deviceId, AlarmType.TEMPERATURE_HIGH,
"设备温度过高: " + message.getTemperature() + "°C");
}
// 检查充电时长异常
if (message.getChargingDuration() > 720) { // 超过12小时
alarmService.createAlarm(deviceId, AlarmType.CHARGING_TIMEOUT,
"充电时长异常: " + message.getChargingDuration() + "分钟");
}
}
/**
* 转换为充电记录
*/
private ChargingRecord convertToChargingRecord(String deviceId, ChargingDataMessage message) {
ChargingRecord record = new ChargingRecord();
record.setDeviceId(deviceId);
record.setGunNo(message.getGunNo());
record.setChargingStatus(message.getChargingStatus());
record.setVoltage(message.getVoltage());
record.setCurrent(message.getCurrent());
record.setTotalEnergy(message.getTotalEnergy());
record.setChargingDuration(message.getChargingDuration());
record.setSoc(message.getSoc());
record.setTemperature(message.getTemperature());
record.setRecordTime(new Date(message.getTimestamp()));
return record;
}
}
# 3.5 云端通信组件
/**
* 云端通信组件
*/
@Component
@Slf4j
public class CloudCommunicator {
@Value("${gateway.cloud.mqtt.broker}")
private String mqttBroker;
@Value("${gateway.cloud.mqtt.username}")
private String mqttUsername;
@Value("${gateway.cloud.mqtt.password}")
private String mqttPassword;
@Value("${gateway.cloud.api.baseUrl}")
private String apiBaseUrl;
private MqttClient mqttClient;
private RestTemplate restTemplate;
@Autowired
private LocalCacheManager cacheManager;
/**
* 建立云端连接
*/
public void connect() {
try {
// 初始化MQTT客户端
initMqttClient();
// 初始化HTTP客户端
initHttpClient();
// 订阅下行指令主题
subscribeCommandTopics();
log.info("云端连接建立成功");
} catch (Exception e) {
log.error("建立云端连接失败", e);
throw new RuntimeException("云端连接失败", e);
}
}
/**
* 初始化MQTT客户端
*/
private void initMqttClient() throws MqttException {
String clientId = "gateway-" + System.currentTimeMillis();
mqttClient = new MqttClient(mqttBroker, clientId);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttUsername);
options.setPassword(mqttPassword.toCharArray());
options.setCleanSession(false);
options.setKeepAliveInterval(60);
options.setAutomaticReconnect(true);
// 设置连接回调
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
log.warn("MQTT连接丢失", cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
handleCloudCommand(topic, new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 消息发送完成
}
});
mqttClient.connect(options);
}
/**
* 订阅指令主题
*/
private void subscribeCommandTopics() throws MqttException {
// 订阅设备控制指令
mqttClient.subscribe("gateway/command/+/control", 1);
// 订阅配置更新指令
mqttClient.subscribe("gateway/command/config", 1);
// 订阅固件升级指令
mqttClient.subscribe("gateway/command/upgrade", 1);
}
/**
* 处理云端指令
*/
private void handleCloudCommand(String topic, String payload) {
try {
log.info("收到云端指令: {} - {}", topic, payload);
if (topic.contains("/control")) {
// 设备控制指令
handleDeviceControlCommand(topic, payload);
} else if (topic.contains("/config")) {
// 配置更新指令
handleConfigUpdateCommand(payload);
} else if (topic.contains("/upgrade")) {
// 固件升级指令
handleUpgradeCommand(payload);
}
} catch (Exception e) {
log.error("处理云端指令异常: {}", topic, e);
}
}
/**
* 处理设备控制指令
*/
private void handleDeviceControlCommand(String topic, String payload) {
// 解析设备ID
String[] topicParts = topic.split("/");
String deviceId = topicParts[2];
// 解析指令
DeviceControlCommand command = JSON.parseObject(payload, DeviceControlCommand.class);
// 转发给设备管理器
DeviceManager deviceManager = SpringContextUtils.getBean(DeviceManager.class);
boolean success = deviceManager.sendCommand(deviceId, command);
// 回复执行结果
CommandResponse response = new CommandResponse();
response.setCommandId(command.getCommandId());
response.setDeviceId(deviceId);
response.setSuccess(success);
response.setTimestamp(System.currentTimeMillis());
reportCommandResponse(response);
}
/**
* 上报充电数据
*/
public void reportChargingData(String deviceId, ChargingDataMessage message) {
try {
String topic = "gateway/data/" + deviceId + "/charging";
String payload = JSON.toJSONString(message);
MqttMessage mqttMessage = new MqttMessage(payload.getBytes());
mqttMessage.setQos(1);
mqttClient.publish(topic, mqttMessage);
} catch (Exception e) {
log.error("上报充电数据失败: {}", deviceId, e);
// 保存到本地缓存,等待重试
cacheManager.cacheFailedMessage(deviceId, message);
}
}
/**
* 上报设备状态
*/
public void reportDeviceStatus(String deviceId, DeviceStatus status) {
try {
String topic = "gateway/status/" + deviceId;
String payload = JSON.toJSONString(status);
MqttMessage mqttMessage = new MqttMessage(payload.getBytes());
mqttMessage.setQos(1);
mqttClient.publish(topic, mqttMessage);
} catch (Exception e) {
log.error("上报设备状态失败: {}", deviceId, e);
}
}
/**
* 上报告警信息
*/
public void reportAlarm(String deviceId, AlarmInfo alarm) {
try {
String topic = "gateway/alarm/" + deviceId;
String payload = JSON.toJSONString(alarm);
MqttMessage mqttMessage = new MqttMessage(payload.getBytes());
mqttMessage.setQos(2); // 告警信息使用最高QoS
mqttClient.publish(topic, mqttMessage);
} catch (Exception e) {
log.error("上报告警信息失败: {}", deviceId, e);
}
}
}
# 4. 数据模型设计
# 4.1 设备实体
@Entity
@Table(name = "charging_station")
public class ChargingStation {
@Id
private String deviceId; // 设备ID
private String deviceName; // 设备名称
private String deviceType; // 设备类型:DC_FAST, AC_SLOW, SUPER
private String manufacturer; // 制造商
private String model; // 设备型号
private String firmwareVersion; // 固件版本
@Enumerated(EnumType.STRING)
private ConnectionType connectionType; // 连接类型
@Enumerated(EnumType.STRING)
private ProtocolType protocolType; // 协议类型
private String location; // 安装位置
private Double latitude; // 纬度
private Double longitude; // 经度
private Integer gunCount; // 充电枪数量
private Double maxPower; // 最大功率(kW)
private Double maxVoltage; // 最大电压(V)
private Double maxCurrent; // 最大电流(A)
@Enumerated(EnumType.STRING)
private DeviceStatus status; // 设备状态
private LocalDateTime installTime; // 安装时间
private LocalDateTime lastOnlineTime; // 最后在线时间
// 连接配置
@Column(columnDefinition = "TEXT")
private String serialConfig; // 串口配置(JSON)
@Column(columnDefinition = "TEXT")
private String tcpConfig; // TCP配置(JSON)
@Column(columnDefinition = "TEXT")
private String canConfig; // CAN配置(JSON)
// getter/setter省略
}
# 4.2 充电记录实体
@Entity
@Table(name = "charging_record")
public class ChargingRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String deviceId; // 设备ID
private Integer gunNo; // 充电枪号
private String orderId; // 订单ID
private String userId; // 用户ID
@Enumerated(EnumType.STRING)
private ChargingStatus chargingStatus; // 充电状态
private Double voltage; // 电压(V)
private Double current; // 电流(A)
private Double power; // 功率(kW)
private Double totalEnergy; // 累计电量(kWh)
private Integer chargingDuration; // 充电时长(分钟)
private Integer soc; // SOC(%)
private Integer temperature; // 温度(°C)
private LocalDateTime startTime; // 开始时间
private LocalDateTime endTime; // 结束时间
private LocalDateTime recordTime; // 记录时间
private BigDecimal totalFee; // 总费用
private BigDecimal energyFee; // 电费
private BigDecimal serviceFee; // 服务费
// getter/setter省略
}
# 5. 配置管理
# 5.1 应用配置
# application.yml
server:
port: 8080
spring:
application:
name: charging-station-gateway
datasource:
url: jdbc:mysql://localhost:3306/charging_gateway
username: root
password: password
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto: update
show-sql: false
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL8Dialect
redis:
host: localhost
port: 6379
password:
database: 0
timeout: 3000ms
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
# 网关配置
gateway:
# 云端连接配置
cloud:
mqtt:
broker: tcp://mqtt.example.com:1883
username: gateway_user
password: gateway_pass
client-id-prefix: charging-gateway
api:
base-url: https://api.example.com
timeout: 30000
# 设备扫描配置
device:
scan:
interval: 30000 # 扫描间隔(ms)
timeout: 5000 # 扫描超时(ms)
heartbeat:
interval: 60000 # 心跳间隔(ms)
timeout: 10000 # 心跳超时(ms)
# 数据处理配置
data:
processor:
thread-pool-size: 10
queue-capacity: 1000
cache:
max-size: 10000
expire-time: 3600000 # 1小时
# 串口配置
serial:
ports:
- COM1
- COM2
- COM3
baud-rate: 9600
data-bits: 8
stop-bits: 1
parity: NONE
# 网络配置
network:
tcp:
port: 8888
timeout: 5000
udp:
port: 8889
buffer-size: 1024
# 日志配置
logging:
level:
com.example.gateway: DEBUG
org.springframework.web: INFO
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
file:
name: logs/gateway.log
max-size: 100MB
max-history: 30
# 5.2 设备配置模板
{
"deviceTemplates": [
{
"templateId": "dc_fast_charger_v1",
"templateName": "直流快充桩模板V1",
"deviceType": "DC_FAST",
"protocolType": "GBT_27930",
"connectionType": "TCP",
"defaultConfig": {
"gunCount": 2,
"maxPower": 120,
"maxVoltage": 750,
"maxCurrent": 250,
"tcpConfig": {
"port": 8888,
"timeout": 5000,
"keepAlive": true
}
},
"dataPoints": [
{
"name": "voltage",
"type": "DOUBLE",
"unit": "V",
"range": [0, 1000]
},
{
"name": "current",
"type": "DOUBLE",
"unit": "A",
"range": [0, 300]
},
{
"name": "power",
"type": "DOUBLE",
"unit": "kW",
"range": [0, 150]
}
]
}
]
}
# 6. 监控与运维
# 6.1 健康检查
@RestController
@RequestMapping("/actuator")
public class HealthController {
@Autowired
private DeviceManager deviceManager;
@Autowired
private CloudCommunicator cloudCommunicator;
/**
* 网关健康检查
*/
@GetMapping("/health")
public ResponseEntity<Map<String, Object>> health() {
Map<String, Object> health = new HashMap<>();
// 检查设备连接状态
int totalDevices = deviceManager.getTotalDeviceCount();
int onlineDevices = deviceManager.getOnlineDeviceCount();
health.put("totalDevices", totalDevices);
health.put("onlineDevices", onlineDevices);
health.put("deviceOnlineRate", totalDevices > 0 ? (double)onlineDevices / totalDevices : 0);
// 检查云端连接状态
health.put("cloudConnected", cloudCommunicator.isConnected());
// 检查系统资源
Runtime runtime = Runtime.getRuntime();
health.put("memoryUsage", {
"total": runtime.totalMemory(),
"free": runtime.freeMemory(),
"used": runtime.totalMemory() - runtime.freeMemory()
});
// 检查磁盘空间
File disk = new File("/");
health.put("diskUsage", {
"total": disk.getTotalSpace(),
"free": disk.getFreeSpace(),
"used": disk.getTotalSpace() - disk.getFreeSpace()
});
return ResponseEntity.ok(health);
}
/**
* 设备状态统计
*/
@GetMapping("/metrics/devices")
public ResponseEntity<Map<String, Object>> deviceMetrics() {
Map<String, Object> metrics = deviceManager.getDeviceMetrics();
return ResponseEntity.ok(metrics);
}
}
# 6.2 日志管理
@Component
@Slf4j
public class LogManager {
private static final String LOG_DIR = "logs";
private static final String DEVICE_LOG_PREFIX = "device_";
private static final String PROTOCOL_LOG_PREFIX = "protocol_";
/**
* 记录设备通信日志
*/
public void logDeviceCommunication(String deviceId, String direction, byte[] data) {
try {
String logFile = LOG_DIR + "/" + DEVICE_LOG_PREFIX + deviceId + ".log";
String logContent = String.format("%s [%s] %s: %s%n",
LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
direction,
deviceId,
ByteUtils.bytesToHex(data));
Files.write(Paths.get(logFile), logContent.getBytes(),
StandardOpenOption.CREATE, StandardOpenOption.APPEND);
} catch (Exception e) {
log.error("记录设备通信日志失败: {}", deviceId, e);
}
}
/**
* 记录协议解析日志
*/
public void logProtocolParsing(String deviceId, String protocol, String result) {
try {
String logFile = LOG_DIR + "/" + PROTOCOL_LOG_PREFIX + protocol + ".log";
String logContent = String.format("%s [%s] %s: %s%n",
LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
deviceId,
protocol,
result);
Files.write(Paths.get(logFile), logContent.getBytes(),
StandardOpenOption.CREATE, StandardOpenOption.APPEND);
} catch (Exception e) {
log.error("记录协议解析日志失败: {}", deviceId, e);
}
}
}
# 7. 部署方案
# 7.1 Docker部署
# Dockerfile
FROM openjdk:11-jre-slim
# 安装必要的系统依赖
RUN apt-get update && apt-get install -y \
librxtx-java \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 复制应用文件
COPY target/charging-station-gateway.jar app.jar
COPY config/ config/
COPY scripts/ scripts/
# 创建日志目录
RUN mkdir -p logs
# 设置环境变量
ENV JAVA_OPTS="-Xms512m -Xmx1024m -Djava.library.path=/usr/lib/jni"
# 暴露端口
EXPOSE 8080 8888 8889
# 启动脚本
CMD ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
# docker-compose.yml
version: '3.8'
services:
charging-gateway:
build: .
container_name: charging-station-gateway
ports:
- "8080:8080" # HTTP API
- "8888:8888" # TCP设备接入
- "8889:8889" # UDP设备接入
environment:
- SPRING_PROFILES_ACTIVE=docker
- SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/charging_gateway
- SPRING_REDIS_HOST=redis
- GATEWAY_CLOUD_MQTT_BROKER=tcp://mqtt:1883
volumes:
- ./logs:/app/logs
- ./config:/app/config
- /dev:/dev # 串口设备映射
devices:
- "/dev/ttyUSB0:/dev/ttyUSB0" # 串口设备
privileged: true # 访问串口需要特权模式
depends_on:
- mysql
- redis
- mqtt
restart: unless-stopped
networks:
- gateway-network
mysql:
image: mysql:8.0
container_name: gateway-mysql
environment:
- MYSQL_ROOT_PASSWORD=root123
- MYSQL_DATABASE=charging_gateway
- MYSQL_USER=gateway
- MYSQL_PASSWORD=gateway123
volumes:
- mysql-data:/var/lib/mysql
- ./sql:/docker-entrypoint-initdb.d
ports:
- "3306:3306"
networks:
- gateway-network
redis:
image: redis:6.2-alpine
container_name: gateway-redis
command: redis-server --appendonly yes
volumes:
- redis-data:/data
ports:
- "6379:6379"
networks:
- gateway-network
mqtt:
image: eclipse-mosquitto:2.0
container_name: gateway-mqtt
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
ports:
- "1883:1883"
- "9001:9001"
networks:
- gateway-network
volumes:
mysql-data:
redis-data:
networks:
gateway-network:
driver: bridge
# 7.2 Kubernetes部署
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: charging-station-gateway
labels:
app: charging-station-gateway
spec:
replicas: 2
selector:
matchLabels:
app: charging-station-gateway
template:
metadata:
labels:
app: charging-station-gateway
spec:
containers:
- name: gateway
image: charging-station-gateway:latest
ports:
- containerPort: 8080
- containerPort: 8888
- containerPort: 8889
env:
- name: SPRING_PROFILES_ACTIVE
value: "k8s"
- name: SPRING_DATASOURCE_URL
value: "jdbc:mysql://mysql-service:3306/charging_gateway"
- name: SPRING_REDIS_HOST
value: "redis-service"
- name: GATEWAY_CLOUD_MQTT_BROKER
value: "tcp://mqtt-service:1883"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
volumeMounts:
- name: logs-volume
mountPath: /app/logs
- name: config-volume
mountPath: /app/config
livenessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
volumes:
- name: logs-volume
persistentVolumeClaim:
claimName: gateway-logs-pvc
- name: config-volume
configMap:
name: gateway-config
---
apiVersion: v1
kind: Service
metadata:
name: charging-station-gateway-service
spec:
selector:
app: charging-station-gateway
ports:
- name: http
protocol: TCP
port: 80
targetPort: 8080
- name: tcp-device
protocol: TCP
port: 8888
targetPort: 8888
- name: udp-device
protocol: UDP
port: 8889
targetPort: 8889
type: LoadBalancer
# 8. 安全设计
# 8.1 设备认证
@Component
public class DeviceAuthenticator {
@Autowired
private DeviceRepository deviceRepository;
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 设备认证
*/
public boolean authenticate(String deviceId, String authCode) {
try {
// 检查设备是否存在
ChargingStation device = deviceRepository.findByDeviceId(deviceId);
if (device == null) {
log.warn("设备不存在: {}", deviceId);
return false;
}
// 验证认证码
String expectedAuthCode = generateAuthCode(deviceId, device.getSecretKey());
if (!authCode.equals(expectedAuthCode)) {
log.warn("设备认证失败: {}", deviceId);
return false;
}
// 检查设备状态
if (device.getStatus() != DeviceStatus.ACTIVE) {
log.warn("设备状态异常: {} - {}", deviceId, device.getStatus());
return false;
}
// 缓存认证结果
cacheAuthResult(deviceId, true);
return true;
} catch (Exception e) {
log.error("设备认证异常: {}", deviceId, e);
return false;
}
}
/**
* 生成认证码
*/
private String generateAuthCode(String deviceId, String secretKey) {
long timestamp = System.currentTimeMillis() / 60000; // 分钟级时间戳
String data = deviceId + secretKey + timestamp;
return DigestUtils.md5DigestAsHex(data.getBytes()).substring(0, 8);
}
}
# 8.2 数据加密
@Component
public class DataEncryption {
private static final String ALGORITHM = "AES/CBC/PKCS5Padding";
private static final String KEY_ALGORITHM = "AES";
@Value("${gateway.security.encryption.key}")
private String encryptionKey;
/**
* 加密数据
*/
public byte[] encrypt(byte[] data) throws Exception {
SecretKeySpec keySpec = new SecretKeySpec(encryptionKey.getBytes(), KEY_ALGORITHM);
Cipher cipher = Cipher.getInstance(ALGORITHM);
// 生成随机IV
byte[] iv = new byte[16];
SecureRandom.getInstanceStrong().nextBytes(iv);
IvParameterSpec ivSpec = new IvParameterSpec(iv);
cipher.init(Cipher.ENCRYPT_MODE, keySpec, ivSpec);
byte[] encrypted = cipher.doFinal(data);
// 将IV和加密数据合并
byte[] result = new byte[iv.length + encrypted.length];
System.arraycopy(iv, 0, result, 0, iv.length);
System.arraycopy(encrypted, 0, result, iv.length, encrypted.length);
return result;
}
/**
* 解密数据
*/
public byte[] decrypt(byte[] encryptedData) throws Exception {
SecretKeySpec keySpec = new SecretKeySpec(encryptionKey.getBytes(), KEY_ALGORITHM);
Cipher cipher = Cipher.getInstance(ALGORITHM);
// 提取IV
byte[] iv = new byte[16];
System.arraycopy(encryptedData, 0, iv, 0, 16);
IvParameterSpec ivSpec = new IvParameterSpec(iv);
// 提取加密数据
byte[] encrypted = new byte[encryptedData.length - 16];
System.arraycopy(encryptedData, 16, encrypted, 0, encrypted.length);
cipher.init(Cipher.DECRYPT_MODE, keySpec, ivSpec);
return cipher.doFinal(encrypted);
}
}
# 9. 性能优化
# 9.1 连接池管理
@Component
public class ConnectionPoolManager {
private final Map<String, ObjectPool<DeviceConnection>> connectionPools = new ConcurrentHashMap<>();
/**
* 获取连接池
*/
public ObjectPool<DeviceConnection> getConnectionPool(String deviceType) {
return connectionPools.computeIfAbsent(deviceType, this::createConnectionPool);
}
/**
* 创建连接池
*/
private ObjectPool<DeviceConnection> createConnectionPool(String deviceType) {
GenericObjectPoolConfig<DeviceConnection> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(20);
config.setMaxIdle(10);
config.setMinIdle(2);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
return new GenericObjectPool<>(new DeviceConnectionFactory(deviceType), config);
}
}
# 9.2 数据缓存策略
@Component
public class DataCacheManager {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private final Cache<String, Object> localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(Duration.ofMinutes(30))
.build();
/**
* 缓存设备状态
*/
public void cacheDeviceStatus(String deviceId, DeviceStatus status) {
String key = "device:status:" + deviceId;
// 本地缓存
localCache.put(key, status);
// Redis缓存
redisTemplate.opsForValue().set(key, status, Duration.ofHours(1));
}
/**
* 获取设备状态
*/
public DeviceStatus getDeviceStatus(String deviceId) {
String key = "device:status:" + deviceId;
// 先查本地缓存
DeviceStatus status = (DeviceStatus) localCache.getIfPresent(key);
if (status != null) {
return status;
}
// 再查Redis缓存
status = (DeviceStatus) redisTemplate.opsForValue().get(key);
if (status != null) {
localCache.put(key, status);
}
return status;
}
}
# 10. 故障处理
# 10.1 异常处理机制
@Component
public class FaultHandler {
@Autowired
private AlarmService alarmService;
@Autowired
private NotificationService notificationService;
/**
* 处理设备故障
*/
public void handleDeviceFault(String deviceId, FaultInfo fault) {
try {
// 记录故障信息
logFault(deviceId, fault);
// 创建告警
AlarmInfo alarm = createAlarm(deviceId, fault);
alarmService.createAlarm(alarm);
// 执行故障处理策略
executeFaultStrategy(deviceId, fault);
// 发送通知
sendFaultNotification(deviceId, fault);
} catch (Exception e) {
log.error("处理设备故障异常: {}", deviceId, e);
}
}
/**
* 执行故障处理策略
*/
private void executeFaultStrategy(String deviceId, FaultInfo fault) {
switch (fault.getFaultLevel()) {
case CRITICAL:
// 严重故障:立即停止充电,断开设备
stopChargingImmediately(deviceId);
disconnectDevice(deviceId);
break;
case HIGH:
// 高级故障:停止新的充电请求
blockNewChargingRequests(deviceId);
break;
case MEDIUM:
// 中级故障:降低充电功率
reducePower(deviceId, 0.5);
break;
case LOW:
// 低级故障:仅记录和监控
increaseMonitoringFrequency(deviceId);
break;
}
}
}
# 11. 总结
充电桩设备网关作为充电基础设施的核心组件,承担着设备接入、协议转换、数据处理、状态监控等关键职责。本文档详细介绍了网关的架构设计、技术实现、部署方案和运维管理,为充电桩设备的智能化管理提供了完整的技术解决方案。
# 11.1 核心特性
- 多协议支持:兼容国标GB/T 27930、OCPP等主流协议
- 高可靠性:支持断线重连、数据缓存、故障自愈
- 高性能:采用异步处理、连接池、缓存优化
- 安全保障:设备认证、数据加密、访问控制
- 易于扩展:模块化设计、插件化架构
- 运维友好:完善的监控、日志、告警机制
# 11.2 应用价值
- 提升运营效率:自动化设备管理,减少人工干预
- 保障服务质量:实时监控设备状态,快速响应故障
- 降低运维成本:统一管理平台,简化运维流程
- 增强用户体验:稳定可靠的充电服务
- 支持业务扩展:灵活的架构设计,支持快速业务迭代
通过本网关系统的实施,可以有效提升充电桩设备的智能化水平,为新能源汽车充电服务提供强有力的技术支撑。