充电桩业务实现
# 充电桩业务实现
# 概述
充电桩业务系统是新能源汽车充电基础设施的核心,涉及设备管理、用户服务、支付结算、运营监控等多个业务领域。本文详细介绍充电桩业务系统的架构设计、核心功能实现、技术方案和实际应用。
# 业务架构
# 整体架构图
┌─────────────────────────────────────────────────────────────────┐
│ 充电桩业务系统 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 用户端 │ │ 运营端 │ │ 监控端 │ │ 管理端 │ │
│ │ (APP) │ │ (Web) │ │ (Dashboard)│ │ (Admin) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
├─────────────────────────────────────────────────────────────────┤
│ API网关层 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 用户服务 │ │ 设备服务 │ │ 订单服务 │ │ 支付服务 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 运营服务 │ │ 监控服务 │ │ 消息服务 │ │ 数据服务 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
├─────────────────────────────────────────────────────────────────┤
│ 数据层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ MySQL │ │ Redis │ │ MongoDB │ │ InfluxDB │ │
│ │ (业务数据) │ │ (缓存) │ │ (日志数据) │ │ (时序数据) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
├─────────────────────────────────────────────────────────────────┤
│ 设备层 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 充电桩 │ │ 充电桩 │ │ 充电桩 │ │ ... │ │
│ │ (站点A) │ │ (站点B) │ │ (站点C) │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
# 核心业务模块
- 用户管理:注册登录、实名认证、会员体系
- 设备管理:充电桩注册、状态监控、远程控制
- 订单管理:充电订单、预约订单、订单结算
- 支付管理:多种支付方式、预付费、后付费
- 运营管理:站点管理、价格策略、营收统计
- 监控告警:设备监控、故障告警、性能分析
# 数据模型设计
# 核心实体关系
-- 用户表
CREATE TABLE users (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
phone VARCHAR(11) UNIQUE NOT NULL COMMENT '手机号',
nickname VARCHAR(50) COMMENT '昵称',
avatar VARCHAR(255) COMMENT '头像',
real_name VARCHAR(20) COMMENT '真实姓名',
id_card VARCHAR(18) COMMENT '身份证号',
status TINYINT DEFAULT 1 COMMENT '状态:1-正常,0-禁用',
balance DECIMAL(10,2) DEFAULT 0 COMMENT '账户余额',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_phone (phone),
INDEX idx_status (status)
) ENGINE=InnoDB COMMENT='用户表';
-- 充电站表
CREATE TABLE charging_stations (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
station_code VARCHAR(32) UNIQUE NOT NULL COMMENT '站点编码',
station_name VARCHAR(100) NOT NULL COMMENT '站点名称',
province VARCHAR(20) NOT NULL COMMENT '省份',
city VARCHAR(20) NOT NULL COMMENT '城市',
district VARCHAR(20) NOT NULL COMMENT '区县',
address VARCHAR(255) NOT NULL COMMENT '详细地址',
longitude DECIMAL(10,7) COMMENT '经度',
latitude DECIMAL(10,7) COMMENT '纬度',
operator_id BIGINT COMMENT '运营商ID',
status TINYINT DEFAULT 1 COMMENT '状态:1-正常,0-停用',
pile_count INT DEFAULT 0 COMMENT '充电桩数量',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_station_code (station_code),
INDEX idx_location (province, city, district),
INDEX idx_operator (operator_id)
) ENGINE=InnoDB COMMENT='充电站表';
-- 充电桩表
CREATE TABLE charging_piles (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
pile_code VARCHAR(32) UNIQUE NOT NULL COMMENT '充电桩编码',
pile_name VARCHAR(100) NOT NULL COMMENT '充电桩名称',
station_id BIGINT NOT NULL COMMENT '所属站点ID',
pile_type TINYINT NOT NULL COMMENT '桩类型:1-直流,2-交流',
connector_type VARCHAR(20) COMMENT '接口类型',
max_power DECIMAL(8,2) COMMENT '最大功率(kW)',
rated_voltage DECIMAL(8,2) COMMENT '额定电压(V)',
rated_current DECIMAL(8,2) COMMENT '额定电流(A)',
status TINYINT DEFAULT 1 COMMENT '状态:1-空闲,2-充电中,3-故障,4-离线',
online_status TINYINT DEFAULT 1 COMMENT '在线状态:1-在线,0-离线',
last_heartbeat TIMESTAMP COMMENT '最后心跳时间',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_pile_code (pile_code),
INDEX idx_station (station_id),
INDEX idx_status (status),
INDEX idx_online_status (online_status),
FOREIGN KEY (station_id) REFERENCES charging_stations(id)
) ENGINE=InnoDB COMMENT='充电桩表';
-- 充电订单表
CREATE TABLE charging_orders (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
order_no VARCHAR(32) UNIQUE NOT NULL COMMENT '订单号',
user_id BIGINT NOT NULL COMMENT '用户ID',
pile_id BIGINT NOT NULL COMMENT '充电桩ID',
connector_id TINYINT NOT NULL COMMENT '充电接口ID',
start_time TIMESTAMP COMMENT '开始充电时间',
end_time TIMESTAMP COMMENT '结束充电时间',
start_soc DECIMAL(5,2) COMMENT '开始SOC(%)',
end_soc DECIMAL(5,2) COMMENT '结束SOC(%)',
total_power DECIMAL(10,3) COMMENT '总充电量(kWh)',
total_time INT COMMENT '总充电时长(秒)',
unit_price DECIMAL(8,4) COMMENT '电价(元/kWh)',
service_fee DECIMAL(8,4) COMMENT '服务费(元/kWh)',
total_amount DECIMAL(10,2) COMMENT '总金额',
actual_amount DECIMAL(10,2) COMMENT '实际支付金额',
status TINYINT DEFAULT 1 COMMENT '状态:1-待支付,2-充电中,3-已完成,4-已取消',
payment_status TINYINT DEFAULT 0 COMMENT '支付状态:0-未支付,1-已支付,2-退款中,3-已退款',
stop_reason TINYINT COMMENT '停止原因:1-用户停止,2-充满停止,3-故障停止',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_order_no (order_no),
INDEX idx_user (user_id),
INDEX idx_pile (pile_id),
INDEX idx_status (status),
INDEX idx_payment_status (payment_status),
INDEX idx_created_at (created_at),
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (pile_id) REFERENCES charging_piles(id)
) ENGINE=InnoDB COMMENT='充电订单表';
-- 支付记录表
CREATE TABLE payment_records (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
payment_no VARCHAR(32) UNIQUE NOT NULL COMMENT '支付单号',
order_id BIGINT NOT NULL COMMENT '订单ID',
user_id BIGINT NOT NULL COMMENT '用户ID',
payment_method TINYINT NOT NULL COMMENT '支付方式:1-微信,2-支付宝,3-银联,4-余额',
amount DECIMAL(10,2) NOT NULL COMMENT '支付金额',
status TINYINT DEFAULT 0 COMMENT '状态:0-待支付,1-支付成功,2-支付失败,3-已退款',
third_party_no VARCHAR(64) COMMENT '第三方支付单号',
paid_at TIMESTAMP COMMENT '支付时间',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_payment_no (payment_no),
INDEX idx_order (order_id),
INDEX idx_user (user_id),
INDEX idx_status (status),
FOREIGN KEY (order_id) REFERENCES charging_orders(id),
FOREIGN KEY (user_id) REFERENCES users(id)
) ENGINE=InnoDB COMMENT='支付记录表';
# 核心业务实现
# 1. 用户服务实现
@Service
@Transactional
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private SmsService smsService;
/**
* 用户注册
*/
public UserDTO register(RegisterRequest request) {
// 1. 验证手机号格式
if (!PhoneUtil.isValidPhone(request.getPhone())) {
throw new BusinessException("手机号格式不正确");
}
// 2. 验证短信验证码
String cacheKey = "sms:register:" + request.getPhone();
String cachedCode = (String) redisTemplate.opsForValue().get(cacheKey);
if (!request.getSmsCode().equals(cachedCode)) {
throw new BusinessException("验证码错误或已过期");
}
// 3. 检查手机号是否已注册
if (userMapper.existsByPhone(request.getPhone())) {
throw new BusinessException("手机号已注册");
}
// 4. 创建用户
User user = new User();
user.setPhone(request.getPhone());
user.setNickname("用户" + request.getPhone().substring(7));
user.setStatus(UserStatus.NORMAL.getValue());
user.setBalance(BigDecimal.ZERO);
userMapper.insert(user);
// 5. 删除验证码缓存
redisTemplate.delete(cacheKey);
return UserConverter.toDTO(user);
}
/**
* 用户登录
*/
public LoginResponse login(LoginRequest request) {
// 1. 验证用户
User user = userMapper.findByPhone(request.getPhone());
if (user == null) {
throw new BusinessException("用户不存在");
}
if (user.getStatus() != UserStatus.NORMAL.getValue()) {
throw new BusinessException("用户已被禁用");
}
// 2. 验证短信验证码
String cacheKey = "sms:login:" + request.getPhone();
String cachedCode = (String) redisTemplate.opsForValue().get(cacheKey);
if (!request.getSmsCode().equals(cachedCode)) {
throw new BusinessException("验证码错误或已过期");
}
// 3. 生成JWT Token
String token = JwtUtil.generateToken(user.getId(), user.getPhone());
// 4. 缓存用户信息
String userCacheKey = "user:" + user.getId();
redisTemplate.opsForValue().set(userCacheKey, user, 7, TimeUnit.DAYS);
// 5. 删除验证码缓存
redisTemplate.delete(cacheKey);
LoginResponse response = new LoginResponse();
response.setToken(token);
response.setUser(UserConverter.toDTO(user));
return response;
}
/**
* 实名认证
*/
public void realNameAuth(Long userId, RealNameAuthRequest request) {
User user = getUserById(userId);
// 1. 调用第三方实名认证接口
boolean authResult = realNameAuthService.verify(
request.getRealName(),
request.getIdCard()
);
if (!authResult) {
throw new BusinessException("实名认证失败,请检查姓名和身份证号");
}
// 2. 更新用户信息
user.setRealName(request.getRealName());
user.setIdCard(request.getIdCard());
user.setAuthStatus(AuthStatus.AUTHENTICATED.getValue());
userMapper.updateById(user);
// 3. 更新缓存
String userCacheKey = "user:" + userId;
redisTemplate.opsForValue().set(userCacheKey, user, 7, TimeUnit.DAYS);
}
}
# 2. 设备服务实现
@Service
public class ChargingPileService {
@Autowired
private ChargingPileMapper pileMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private MqttTemplate mqttTemplate;
@Autowired
private InfluxDBTemplate influxDBTemplate;
/**
* 获取附近充电桩
*/
public List<ChargingPileDTO> getNearbyPiles(NearbyRequest request) {
// 1. 基于地理位置查询
List<ChargingPile> piles = pileMapper.findNearbyPiles(
request.getLongitude(),
request.getLatitude(),
request.getRadius()
);
// 2. 过滤可用充电桩
List<ChargingPile> availablePiles = piles.stream()
.filter(pile -> pile.getStatus() == PileStatus.IDLE.getValue())
.filter(pile -> pile.getOnlineStatus() == OnlineStatus.ONLINE.getValue())
.collect(Collectors.toList());
// 3. 获取实时状态
return availablePiles.stream()
.map(pile -> {
ChargingPileDTO dto = PileConverter.toDTO(pile);
// 从Redis获取实时状态
String statusKey = "pile:status:" + pile.getId();
PileRealTimeStatus status = (PileRealTimeStatus)
redisTemplate.opsForValue().get(statusKey);
if (status != null) {
dto.setRealTimeStatus(status);
}
return dto;
})
.collect(Collectors.toList());
}
/**
* 处理充电桩心跳
*/
@EventListener
public void handleHeartbeat(PileHeartbeatEvent event) {
String pileCode = event.getPileCode();
PileHeartbeatData data = event.getData();
// 1. 更新数据库心跳时间
pileMapper.updateHeartbeat(pileCode, new Date());
// 2. 更新Redis实时状态
String statusKey = "pile:status:" + pileCode;
PileRealTimeStatus status = new PileRealTimeStatus();
status.setPileCode(pileCode);
status.setStatus(data.getStatus());
status.setVoltage(data.getVoltage());
status.setCurrent(data.getCurrent());
status.setPower(data.getPower());
status.setTemperature(data.getTemperature());
status.setUpdateTime(new Date());
redisTemplate.opsForValue().set(statusKey, status, 5, TimeUnit.MINUTES);
// 3. 存储时序数据
Point point = Point.measurement("pile_metrics")
.tag("pile_code", pileCode)
.addField("voltage", data.getVoltage())
.addField("current", data.getCurrent())
.addField("power", data.getPower())
.addField("temperature", data.getTemperature())
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.build();
influxDBTemplate.write(point);
// 4. 检查告警条件
checkAlarmConditions(pileCode, data);
}
/**
* 远程启动充电
*/
public void startCharging(String pileCode, StartChargingRequest request) {
// 1. 验证充电桩状态
ChargingPile pile = pileMapper.findByPileCode(pileCode);
if (pile == null) {
throw new BusinessException("充电桩不存在");
}
if (pile.getStatus() != PileStatus.IDLE.getValue()) {
throw new BusinessException("充电桩不可用");
}
// 2. 构造MQTT指令
StartChargingCommand command = new StartChargingCommand();
command.setPileCode(pileCode);
command.setConnectorId(request.getConnectorId());
command.setOrderNo(request.getOrderNo());
command.setMaxPower(request.getMaxPower());
command.setMaxTime(request.getMaxTime());
// 3. 发送MQTT指令
String topic = "pile/" + pileCode + "/command";
mqttTemplate.convertAndSend(topic, command);
// 4. 更新充电桩状态
pile.setStatus(PileStatus.CHARGING.getValue());
pileMapper.updateById(pile);
// 5. 记录操作日志
logOperationRecord(pileCode, "START_CHARGING", request.getOrderNo());
}
/**
* 远程停止充电
*/
public void stopCharging(String pileCode, String orderNo) {
// 1. 构造停止指令
StopChargingCommand command = new StopChargingCommand();
command.setPileCode(pileCode);
command.setOrderNo(orderNo);
command.setStopReason(StopReason.USER_STOP.getValue());
// 2. 发送MQTT指令
String topic = "pile/" + pileCode + "/command";
mqttTemplate.convertAndSend(topic, command);
// 3. 记录操作日志
logOperationRecord(pileCode, "STOP_CHARGING", orderNo);
}
}
# 3. 订单服务实现
@Service
@Transactional
public class ChargingOrderService {
@Autowired
private ChargingOrderMapper orderMapper;
@Autowired
private ChargingPileService pileService;
@Autowired
private PaymentService paymentService;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 创建充电订单
*/
public ChargingOrderDTO createOrder(CreateOrderRequest request) {
// 1. 验证用户和充电桩
User user = userService.getUserById(request.getUserId());
ChargingPile pile = pileService.getPileById(request.getPileId());
if (pile.getStatus() != PileStatus.IDLE.getValue()) {
throw new BusinessException("充电桩不可用");
}
// 2. 获取价格策略
PriceStrategy priceStrategy = priceService.getCurrentPrice(
pile.getStationId(),
new Date()
);
// 3. 创建订单
ChargingOrder order = new ChargingOrder();
order.setOrderNo(OrderNoGenerator.generate());
order.setUserId(request.getUserId());
order.setPileId(request.getPileId());
order.setConnectorId(request.getConnectorId());
order.setUnitPrice(priceStrategy.getUnitPrice());
order.setServiceFee(priceStrategy.getServiceFee());
order.setStatus(OrderStatus.PENDING_PAYMENT.getValue());
order.setPaymentStatus(PaymentStatus.UNPAID.getValue());
orderMapper.insert(order);
// 4. 锁定充电桩(设置超时时间)
String lockKey = "pile:lock:" + request.getPileId();
redisTemplate.opsForValue().set(lockKey, order.getOrderNo(), 15, TimeUnit.MINUTES);
return OrderConverter.toDTO(order);
}
/**
* 开始充电
*/
public void startCharging(String orderNo) {
// 1. 获取订单信息
ChargingOrder order = orderMapper.findByOrderNo(orderNo);
if (order == null) {
throw new BusinessException("订单不存在");
}
if (order.getPaymentStatus() != PaymentStatus.PAID.getValue()) {
throw new BusinessException("订单未支付");
}
// 2. 启动充电桩
StartChargingRequest request = new StartChargingRequest();
request.setOrderNo(orderNo);
request.setConnectorId(order.getConnectorId());
request.setMaxPower(100); // 默认最大功率
request.setMaxTime(7200); // 默认最大时长2小时
pileService.startCharging(order.getPile().getPileCode(), request);
// 3. 更新订单状态
order.setStatus(OrderStatus.CHARGING.getValue());
order.setStartTime(new Date());
orderMapper.updateById(order);
// 4. 发送开始充电消息
ChargingStartedEvent event = new ChargingStartedEvent();
event.setOrderNo(orderNo);
event.setUserId(order.getUserId());
event.setPileCode(order.getPile().getPileCode());
eventPublisher.publishEvent(event);
}
/**
* 结束充电
*/
public void finishCharging(FinishChargingRequest request) {
// 1. 获取订单
ChargingOrder order = orderMapper.findByOrderNo(request.getOrderNo());
if (order == null) {
throw new BusinessException("订单不存在");
}
// 2. 计算充电费用
BigDecimal totalPower = request.getTotalPower();
BigDecimal electricityFee = totalPower.multiply(order.getUnitPrice());
BigDecimal serviceFee = totalPower.multiply(order.getServiceFee());
BigDecimal totalAmount = electricityFee.add(serviceFee);
// 3. 更新订单信息
order.setEndTime(new Date());
order.setTotalPower(totalPower);
order.setTotalTime(request.getTotalTime());
order.setStartSoc(request.getStartSoc());
order.setEndSoc(request.getEndSoc());
order.setTotalAmount(totalAmount);
order.setActualAmount(totalAmount);
order.setStatus(OrderStatus.COMPLETED.getValue());
order.setStopReason(request.getStopReason());
orderMapper.updateById(order);
// 4. 处理后付费
if (order.getPaymentStatus() == PaymentStatus.UNPAID.getValue()) {
// 创建支付订单
paymentService.createPayment(order.getId(), totalAmount);
} else {
// 预付费,处理多退少补
handlePrePaymentSettlement(order);
}
// 5. 释放充电桩
pileService.releasePile(order.getPileId());
// 6. 发送充电完成消息
ChargingCompletedEvent event = new ChargingCompletedEvent();
event.setOrderNo(request.getOrderNo());
event.setUserId(order.getUserId());
event.setTotalAmount(totalAmount);
eventPublisher.publishEvent(event);
}
/**
* 处理预付费结算
*/
private void handlePrePaymentSettlement(ChargingOrder order) {
PaymentRecord payment = paymentService.getByOrderId(order.getId());
BigDecimal paidAmount = payment.getAmount();
BigDecimal actualAmount = order.getActualAmount();
if (paidAmount.compareTo(actualAmount) > 0) {
// 退款
BigDecimal refundAmount = paidAmount.subtract(actualAmount);
paymentService.refund(payment.getId(), refundAmount);
} else if (paidAmount.compareTo(actualAmount) < 0) {
// 补款
BigDecimal additionalAmount = actualAmount.subtract(paidAmount);
paymentService.createAdditionalPayment(order.getId(), additionalAmount);
}
}
}
# 4. 支付服务实现
@Service
public class PaymentService {
@Autowired
private PaymentRecordMapper paymentMapper;
@Autowired
private WechatPayService wechatPayService;
@Autowired
private AlipayService alipayService;
@Autowired
private UserService userService;
/**
* 创建支付订单
*/
public PaymentDTO createPayment(CreatePaymentRequest request) {
// 1. 创建支付记录
PaymentRecord payment = new PaymentRecord();
payment.setPaymentNo(PaymentNoGenerator.generate());
payment.setOrderId(request.getOrderId());
payment.setUserId(request.getUserId());
payment.setPaymentMethod(request.getPaymentMethod());
payment.setAmount(request.getAmount());
payment.setStatus(PaymentStatus.PENDING.getValue());
paymentMapper.insert(payment);
// 2. 调用第三方支付
String payUrl = null;
switch (PaymentMethod.valueOf(request.getPaymentMethod())) {
case WECHAT:
payUrl = wechatPayService.createOrder(payment);
break;
case ALIPAY:
payUrl = alipayService.createOrder(payment);
break;
case BALANCE:
return processBalancePayment(payment);
default:
throw new BusinessException("不支持的支付方式");
}
PaymentDTO dto = PaymentConverter.toDTO(payment);
dto.setPayUrl(payUrl);
return dto;
}
/**
* 余额支付
*/
private PaymentDTO processBalancePayment(PaymentRecord payment) {
User user = userService.getUserById(payment.getUserId());
// 1. 检查余额
if (user.getBalance().compareTo(payment.getAmount()) < 0) {
throw new BusinessException("余额不足");
}
// 2. 扣减余额
userService.deductBalance(user.getId(), payment.getAmount());
// 3. 更新支付状态
payment.setStatus(PaymentStatus.SUCCESS.getValue());
payment.setPaidAt(new Date());
paymentMapper.updateById(payment);
// 4. 发送支付成功事件
PaymentSuccessEvent event = new PaymentSuccessEvent();
event.setPaymentId(payment.getId());
event.setOrderId(payment.getOrderId());
event.setAmount(payment.getAmount());
eventPublisher.publishEvent(event);
return PaymentConverter.toDTO(payment);
}
/**
* 处理支付回调
*/
public void handlePaymentCallback(PaymentCallbackRequest request) {
// 1. 验证签名
if (!verifySignature(request)) {
throw new BusinessException("签名验证失败");
}
// 2. 获取支付记录
PaymentRecord payment = paymentMapper.findByPaymentNo(request.getPaymentNo());
if (payment == null) {
throw new BusinessException("支付记录不存在");
}
// 3. 防重复处理
if (payment.getStatus() == PaymentStatus.SUCCESS.getValue()) {
return;
}
// 4. 更新支付状态
payment.setStatus(PaymentStatus.SUCCESS.getValue());
payment.setThirdPartyNo(request.getThirdPartyNo());
payment.setPaidAt(new Date());
paymentMapper.updateById(payment);
// 5. 发送支付成功事件
PaymentSuccessEvent event = new PaymentSuccessEvent();
event.setPaymentId(payment.getId());
event.setOrderId(payment.getOrderId());
event.setAmount(payment.getAmount());
eventPublisher.publishEvent(event);
}
/**
* 退款处理
*/
public void refund(Long paymentId, BigDecimal refundAmount) {
PaymentRecord payment = paymentMapper.selectById(paymentId);
if (payment == null) {
throw new BusinessException("支付记录不存在");
}
// 1. 调用第三方退款
boolean refundResult = false;
switch (PaymentMethod.valueOf(payment.getPaymentMethod())) {
case WECHAT:
refundResult = wechatPayService.refund(payment, refundAmount);
break;
case ALIPAY:
refundResult = alipayService.refund(payment, refundAmount);
break;
case BALANCE:
// 余额退款直接加回用户余额
userService.addBalance(payment.getUserId(), refundAmount);
refundResult = true;
break;
}
if (refundResult) {
// 2. 更新支付状态
payment.setStatus(PaymentStatus.REFUNDED.getValue());
paymentMapper.updateById(payment);
// 3. 发送退款成功事件
RefundSuccessEvent event = new RefundSuccessEvent();
event.setPaymentId(paymentId);
event.setRefundAmount(refundAmount);
eventPublisher.publishEvent(event);
}
}
}
# 设备通信协议
# MQTT通信架构
@Component
public class MqttMessageHandler {
@Autowired
private ChargingPileService pileService;
@Autowired
private ChargingOrderService orderService;
/**
* 处理充电桩心跳消息
*/
@MqttMessageListener(topic = "pile/+/heartbeat")
public void handleHeartbeat(@Payload String message, @Header String topic) {
try {
// 解析topic获取充电桩编码
String pileCode = extractPileCodeFromTopic(topic);
// 解析心跳数据
PileHeartbeatData data = JSON.parseObject(message, PileHeartbeatData.class);
// 发布心跳事件
PileHeartbeatEvent event = new PileHeartbeatEvent();
event.setPileCode(pileCode);
event.setData(data);
eventPublisher.publishEvent(event);
} catch (Exception e) {
log.error("处理心跳消息失败: {}", message, e);
}
}
/**
* 处理充电状态上报
*/
@MqttMessageListener(topic = "pile/+/charging/status")
public void handleChargingStatus(@Payload String message, @Header String topic) {
try {
String pileCode = extractPileCodeFromTopic(topic);
ChargingStatusData data = JSON.parseObject(message, ChargingStatusData.class);
// 更新订单充电状态
orderService.updateChargingStatus(data.getOrderNo(), data);
} catch (Exception e) {
log.error("处理充电状态失败: {}", message, e);
}
}
/**
* 处理充电完成消息
*/
@MqttMessageListener(topic = "pile/+/charging/finished")
public void handleChargingFinished(@Payload String message, @Header String topic) {
try {
String pileCode = extractPileCodeFromTopic(topic);
ChargingFinishedData data = JSON.parseObject(message, ChargingFinishedData.class);
// 结束充电订单
FinishChargingRequest request = new FinishChargingRequest();
request.setOrderNo(data.getOrderNo());
request.setTotalPower(data.getTotalPower());
request.setTotalTime(data.getTotalTime());
request.setStartSoc(data.getStartSoc());
request.setEndSoc(data.getEndSoc());
request.setStopReason(data.getStopReason());
orderService.finishCharging(request);
} catch (Exception e) {
log.error("处理充电完成消息失败: {}", message, e);
}
}
}
# 协议数据结构
// 心跳数据
@Data
public class PileHeartbeatData {
private String pileCode; // 充电桩编码
private Integer status; // 状态
private BigDecimal voltage; // 电压
private BigDecimal current; // 电流
private BigDecimal power; // 功率
private BigDecimal temperature; // 温度
private Date timestamp; // 时间戳
}
// 充电状态数据
@Data
public class ChargingStatusData {
private String orderNo; // 订单号
private BigDecimal currentPower; // 当前功率
private BigDecimal totalPower; // 累计电量
private Integer totalTime; // 累计时间
private BigDecimal soc; // 当前SOC
private Date timestamp; // 时间戳
}
// 充电完成数据
@Data
public class ChargingFinishedData {
private String orderNo; // 订单号
private BigDecimal totalPower; // 总电量
private Integer totalTime; // 总时间
private BigDecimal startSoc; // 开始SOC
private BigDecimal endSoc; // 结束SOC
private Integer stopReason; // 停止原因
private Date timestamp; // 时间戳
}
// 控制指令
@Data
public class StartChargingCommand {
private String pileCode; // 充电桩编码
private Integer connectorId; // 接口ID
private String orderNo; // 订单号
private BigDecimal maxPower; // 最大功率
private Integer maxTime; // 最大时间
}
@Data
public class StopChargingCommand {
private String pileCode; // 充电桩编码
private String orderNo; // 订单号
private Integer stopReason; // 停止原因
}
# 监控与告警
# 实时监控实现
@Service
public class MonitoringService {
@Autowired
private InfluxDBTemplate influxDBTemplate;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private AlarmService alarmService;
/**
* 获取充电桩实时数据
*/
public PileRealTimeData getRealTimeData(String pileCode) {
// 1. 从Redis获取最新状态
String statusKey = "pile:status:" + pileCode;
PileRealTimeStatus status = (PileRealTimeStatus)
redisTemplate.opsForValue().get(statusKey);
if (status == null) {
throw new BusinessException("充电桩离线或无数据");
}
// 2. 从InfluxDB获取历史趋势
String query = String.format(
"SELECT mean(voltage), mean(current), mean(power), mean(temperature) " +
"FROM pile_metrics WHERE pile_code='%s' AND time > now() - 1h " +
"GROUP BY time(5m)",
pileCode
);
QueryResult result = influxDBTemplate.query(query);
List<TrendData> trendData = parseTrendData(result);
PileRealTimeData data = new PileRealTimeData();
data.setCurrentStatus(status);
data.setTrendData(trendData);
return data;
}
/**
* 检查告警条件
*/
public void checkAlarmConditions(String pileCode, PileHeartbeatData data) {
// 1. 温度告警
if (data.getTemperature().compareTo(new BigDecimal("80")) > 0) {
AlarmEvent alarm = new AlarmEvent();
alarm.setPileCode(pileCode);
alarm.setAlarmType(AlarmType.HIGH_TEMPERATURE);
alarm.setAlarmLevel(AlarmLevel.HIGH);
alarm.setMessage("充电桩温度过高: " + data.getTemperature() + "°C");
alarmService.triggerAlarm(alarm);
}
// 2. 电压异常告警
if (data.getVoltage().compareTo(new BigDecimal("200")) < 0 ||
data.getVoltage().compareTo(new BigDecimal("250")) > 0) {
AlarmEvent alarm = new AlarmEvent();
alarm.setPileCode(pileCode);
alarm.setAlarmType(AlarmType.VOLTAGE_ABNORMAL);
alarm.setAlarmLevel(AlarmLevel.MEDIUM);
alarm.setMessage("充电桩电压异常: " + data.getVoltage() + "V");
alarmService.triggerAlarm(alarm);
}
// 3. 离线告警
String lastHeartbeatKey = "pile:last_heartbeat:" + pileCode;
redisTemplate.opsForValue().set(lastHeartbeatKey, System.currentTimeMillis());
}
/**
* 检查离线设备
*/
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkOfflineDevices() {
List<ChargingPile> onlinePiles = pileMapper.findOnlinePiles();
for (ChargingPile pile : onlinePiles) {
String lastHeartbeatKey = "pile:last_heartbeat:" + pile.getPileCode();
Long lastHeartbeat = (Long) redisTemplate.opsForValue().get(lastHeartbeatKey);
if (lastHeartbeat == null ||
System.currentTimeMillis() - lastHeartbeat > 300000) { // 5分钟无心跳
// 更新设备离线状态
pile.setOnlineStatus(OnlineStatus.OFFLINE.getValue());
pileMapper.updateById(pile);
// 发送离线告警
AlarmEvent alarm = new AlarmEvent();
alarm.setPileCode(pile.getPileCode());
alarm.setAlarmType(AlarmType.DEVICE_OFFLINE);
alarm.setAlarmLevel(AlarmLevel.HIGH);
alarm.setMessage("充电桩离线");
alarmService.triggerAlarm(alarm);
}
}
}
}
# 数据统计分析
@Service
public class StatisticsService {
@Autowired
private ChargingOrderMapper orderMapper;
@Autowired
private InfluxDBTemplate influxDBTemplate;
/**
* 获取运营统计数据
*/
public OperationStatistics getOperationStatistics(StatisticsRequest request) {
Date startDate = request.getStartDate();
Date endDate = request.getEndDate();
// 1. 订单统计
OrderStatistics orderStats = orderMapper.getOrderStatistics(startDate, endDate);
// 2. 收入统计
RevenueStatistics revenueStats = orderMapper.getRevenueStatistics(startDate, endDate);
// 3. 设备利用率统计
DeviceUtilizationStatistics deviceStats = calculateDeviceUtilization(startDate, endDate);
// 4. 用户统计
UserStatistics userStats = orderMapper.getUserStatistics(startDate, endDate);
OperationStatistics statistics = new OperationStatistics();
statistics.setOrderStatistics(orderStats);
statistics.setRevenueStatistics(revenueStats);
statistics.setDeviceStatistics(deviceStats);
statistics.setUserStatistics(userStats);
return statistics;
}
/**
* 计算设备利用率
*/
private DeviceUtilizationStatistics calculateDeviceUtilization(Date startDate, Date endDate) {
// 从InfluxDB查询设备使用时间
String query = String.format(
"SELECT sum(charging_time) as total_time, pile_code " +
"FROM charging_sessions " +
"WHERE time >= '%s' AND time <= '%s' " +
"GROUP BY pile_code",
startDate.toInstant(),
endDate.toInstant()
);
QueryResult result = influxDBTemplate.query(query);
// 计算利用率
long totalPeriod = endDate.getTime() - startDate.getTime();
Map<String, Double> utilizationMap = new HashMap<>();
// 解析查询结果并计算利用率
// ...
DeviceUtilizationStatistics stats = new DeviceUtilizationStatistics();
stats.setUtilizationMap(utilizationMap);
stats.setAverageUtilization(calculateAverageUtilization(utilizationMap));
return stats;
}
}
# 部署架构
# Docker容器化部署
# Dockerfile
FROM openjdk:11-jre-slim
VOLUME /tmp
COPY target/charging-pile-service.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "/app.jar"]
# docker-compose.yml
version: '3.8'
services:
# 应用服务
charging-pile-service:
build: .
ports:
- "8080:8080"
environment:
- SPRING_PROFILES_ACTIVE=prod
- MYSQL_HOST=mysql
- REDIS_HOST=redis
- MQTT_BROKER=mqtt
depends_on:
- mysql
- redis
- mqtt
networks:
- charging-network
# MySQL数据库
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: root123
MYSQL_DATABASE: charging_pile
volumes:
- mysql_data:/var/lib/mysql
- ./sql:/docker-entrypoint-initdb.d
ports:
- "3306:3306"
networks:
- charging-network
# Redis缓存
redis:
image: redis:6.2
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- charging-network
# MQTT消息代理
mqtt:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf
networks:
- charging-network
# InfluxDB时序数据库
influxdb:
image: influxdb:1.8
environment:
INFLUXDB_DB: charging_metrics
INFLUXDB_ADMIN_USER: admin
INFLUXDB_ADMIN_PASSWORD: admin123
ports:
- "8086:8086"
volumes:
- influxdb_data:/var/lib/influxdb
networks:
- charging-network
# Grafana监控面板
grafana:
image: grafana/grafana:8.0.0
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin123
volumes:
- grafana_data:/var/lib/grafana
networks:
- charging-network
volumes:
mysql_data:
redis_data:
influxdb_data:
grafana_data:
networks:
charging-network:
driver: bridge
# Kubernetes部署
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: charging-pile-service
labels:
app: charging-pile-service
spec:
replicas: 3
selector:
matchLabels:
app: charging-pile-service
template:
metadata:
labels:
app: charging-pile-service
spec:
containers:
- name: charging-pile-service
image: charging-pile-service:latest
ports:
- containerPort: 8080
env:
- name: SPRING_PROFILES_ACTIVE
value: "k8s"
- name: MYSQL_HOST
value: "mysql-service"
- name: REDIS_HOST
value: "redis-service"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: charging-pile-service
spec:
selector:
app: charging-pile-service
ports:
- protocol: TCP
port: 80
targetPort: 8080
type: LoadBalancer
# 总结
充电桩业务系统是一个复杂的物联网应用,涉及多个技术领域:
- 业务架构:微服务架构、领域驱动设计
- 数据存储:MySQL、Redis、MongoDB、InfluxDB
- 消息通信:MQTT、消息队列、事件驱动
- 支付集成:多种支付方式、预付费/后付费
- 设备管理:实时监控、远程控制、故障告警
- 数据分析:运营统计、设备利用率、用户行为
- 部署运维:容器化、微服务、监控告警
关键技术要点:
- 高并发处理:Redis缓存、数据库优化、负载均衡
- 实时性要求:MQTT通信、WebSocket推送、时序数据库
- 数据一致性:分布式事务、最终一致性、补偿机制
- 系统可靠性:熔断降级、重试机制、故障转移
- 安全性:数据加密、访问控制、支付安全
充电桩业务系统的成功实施需要综合考虑业务需求、技术架构、运营模式等多个方面,是物联网、移动支付、大数据等技术的综合应用。