CoAP设备接入层详细设计与实现
# CoAP设备接入层详细设计与实现
# 故事背景
想象一下,你正在为一家智能农业公司开发物联网系统。农场里有数千个传感器:土壤湿度传感器、温度传感器、光照传感器等。这些传感器大多数时间处于"睡眠"状态以节省电池,只在需要时才"醒来"发送数据。
传统的HTTP协议就像是"电话通话",需要建立连接、维持会话,对于这些"偶尔说话"的设备来说太重了。而CoAP(Constrained Application Protocol)就像是"发短信",轻量、简洁,特别适合资源受限的设备。
让我们一起来构建这个专为"节能设备"设计的通信系统!
# CoAP协议原理
# 什么是CoAP?
CoAP是一种专为物联网设计的应用层协议,基于UDP传输,具有以下特点:
- 轻量级:协议头只有4字节
- 低功耗:支持设备休眠和唤醒
- 可靠性:支持确认和重传机制
- RESTful:类似HTTP的GET、POST、PUT、DELETE操作
# CoAP vs HTTP对比
特性 | CoAP | HTTP |
---|---|---|
传输协议 | UDP | TCP |
协议头大小 | 4字节 | 20+字节 |
连接方式 | 无连接 | 面向连接 |
适用场景 | 资源受限设备 | 通用Web应用 |
功耗 | 低 | 高 |
# CoAP消息格式
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|Ver| T | TKL | Code | Message ID |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Token (if any, TKL bytes) ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Options (if any) ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|1 1 1 1 1 1 1 1| Payload (if any) ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
# 系统架构设计
# 整体架构
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 设备层 │ │ CoAP接入层 │ │ 业务层 │
│ │ │ │ │ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ 传感器 │ │ │ │ CoAP Server │ │ │ │ 数据处理 │ │
│ │ - 土壤湿度 │ │◄──►│ │ - 消息解析 │ │◄──►│ │ - 业务逻辑 │ │
│ │ - 温度 │ │ │ │ - 资源管理 │ │ │ │ - 数据存储 │ │
│ │ - 光照 │ │ │ │ - 设备管理 │ │ │ │ - 告警处理 │ │
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
# 核心实体设计
# 1. CoAP设备实体
/**
* CoAP设备实体
* 记录每个CoAP设备的基本信息,就像设备的"档案"
*/
@Entity
@Table(name = "coap_device")
public class CoapDevice {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 设备ID - 设备的唯一标识
*/
@Column(name = "device_id", unique = true, nullable = false)
private String deviceId;
/**
* 设备名称
*/
@Column(name = "device_name")
private String deviceName;
/**
* 设备IP地址
*/
@Column(name = "ip_address")
private String ipAddress;
/**
* CoAP端口
*/
@Column(name = "port")
private Integer port;
/**
* 设备类型:SENSOR(传感器)、ACTUATOR(执行器)、GATEWAY(网关)
*/
@Enumerated(EnumType.STRING)
@Column(name = "device_type")
private DeviceType deviceType;
/**
* 设备状态:ONLINE(在线)、OFFLINE(离线)、SLEEPING(休眠)
*/
@Enumerated(EnumType.STRING)
@Column(name = "status")
private DeviceStatus status;
/**
* 最后通信时间
*/
@Column(name = "last_communication")
private LocalDateTime lastCommunication;
/**
* 设备资源列表(JSON格式)
* 记录设备支持的CoAP资源路径
*/
@Lob
@Column(name = "resources")
private String resources;
/**
* 设备配置信息(JSON格式)
*/
@Lob
@Column(name = "config")
private String config;
/**
* 创建时间
*/
@Column(name = "create_time")
private LocalDateTime createTime;
/**
* 更新时间
*/
@Column(name = "update_time")
private LocalDateTime updateTime;
// 构造函数、getter、setter省略...
}
/**
* 设备类型枚举
*/
public enum DeviceType {
SENSOR("传感器"),
ACTUATOR("执行器"),
GATEWAY("网关");
private final String description;
DeviceType(String description) {
this.description = description;
}
}
/**
* 设备状态枚举
*/
public enum DeviceStatus {
ONLINE("在线"),
OFFLINE("离线"),
SLEEPING("休眠");
private final String description;
DeviceStatus(String description) {
this.description = description;
}
}
# 2. CoAP消息实体
/**
* CoAP消息实体
* 记录设备与服务器之间的每次通信,就像通话记录
*/
@Entity
@Table(name = "coap_message")
public class CoapMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 消息ID
*/
@Column(name = "message_id", unique = true)
private String messageId;
/**
* 设备ID
*/
@Column(name = "device_id", nullable = false)
private String deviceId;
/**
* CoAP方法:GET、POST、PUT、DELETE
*/
@Enumerated(EnumType.STRING)
@Column(name = "method")
private CoapMethod method;
/**
* 资源路径,如:/sensors/temperature
*/
@Column(name = "resource_path")
private String resourcePath;
/**
* 消息载荷
*/
@Lob
@Column(name = "payload")
private String payload;
/**
* 响应码:2.05 Content、4.04 Not Found等
*/
@Column(name = "response_code")
private String responseCode;
/**
* 消息类型:CON(可靠)、NON(不可靠)、ACK(确认)、RST(重置)
*/
@Enumerated(EnumType.STRING)
@Column(name = "message_type")
private MessageType messageType;
/**
* 消息方向:REQUEST(请求)、RESPONSE(响应)
*/
@Enumerated(EnumType.STRING)
@Column(name = "direction")
private MessageDirection direction;
/**
* 处理时间(毫秒)
*/
@Column(name = "process_time")
private Long processTime;
/**
* 创建时间
*/
@Column(name = "create_time")
private LocalDateTime createTime;
// 构造函数、getter、setter省略...
}
/**
* CoAP方法枚举
*/
public enum CoapMethod {
GET, POST, PUT, DELETE
}
/**
* 消息类型枚举
*/
public enum MessageType {
CON, // Confirmable
NON, // Non-confirmable
ACK, // Acknowledgement
RST // Reset
}
/**
* 消息方向枚举
*/
public enum MessageDirection {
REQUEST, // 请求
RESPONSE // 响应
}
# 3. CoAP资源实体
/**
* CoAP资源实体
* 定义设备提供的资源,就像设备的"功能清单"
*/
@Entity
@Table(name = "coap_resource")
public class CoapResource {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 设备ID
*/
@Column(name = "device_id", nullable = false)
private String deviceId;
/**
* 资源路径
*/
@Column(name = "resource_path", nullable = false)
private String resourcePath;
/**
* 资源名称
*/
@Column(name = "resource_name")
private String resourceName;
/**
* 资源类型:SENSOR_DATA(传感器数据)、CONTROL(控制)、CONFIG(配置)
*/
@Enumerated(EnumType.STRING)
@Column(name = "resource_type")
private ResourceType resourceType;
/**
* 支持的方法(JSON数组):["GET", "POST"]
*/
@Column(name = "supported_methods")
private String supportedMethods;
/**
* 内容格式:application/json、text/plain等
*/
@Column(name = "content_format")
private String contentFormat;
/**
* 是否可观察(支持订阅)
*/
@Column(name = "observable")
private Boolean observable;
/**
* 资源描述
*/
@Column(name = "description")
private String description;
/**
* 最后更新时间
*/
@Column(name = "last_update")
private LocalDateTime lastUpdate;
// 构造函数、getter、setter省略...
}
/**
* 资源类型枚举
*/
public enum ResourceType {
SENSOR_DATA("传感器数据"),
CONTROL("控制"),
CONFIG("配置"),
STATUS("状态");
private final String description;
ResourceType(String description) {
this.description = description;
}
}
# 核心服务实现
# 1. CoAP服务器实现
/**
* CoAP服务器实现
* 这是整个CoAP接入层的"大脑",负责处理所有CoAP通信
*/
@Component
@Slf4j
public class CoapServerImpl extends CoapServer {
@Autowired
private CoapDeviceService deviceService;
@Autowired
private CoapMessageService messageService;
@Autowired
private CoapResourceService resourceService;
@Value("${coap.server.port:5683}")
private int serverPort;
/**
* 启动CoAP服务器
*/
@PostConstruct
public void startServer() {
try {
// 创建UDP连接器
CoapEndpoint.Builder builder = new CoapEndpoint.Builder();
builder.setInetSocketAddress(new InetSocketAddress(serverPort));
builder.setNetworkConfig(createNetworkConfig());
addEndpoint(builder.build());
// 添加资源处理器
getRoot().add(new DeviceDiscoveryResource());
getRoot().add(new SensorDataResource());
getRoot().add(new DeviceControlResource());
getRoot().add(new DeviceConfigResource());
// 启动服务器
start();
log.info("CoAP服务器启动成功,端口:{}", serverPort);
} catch (Exception e) {
log.error("CoAP服务器启动失败", e);
throw new RuntimeException("CoAP服务器启动失败", e);
}
}
/**
* 创建网络配置
*/
private NetworkConfig createNetworkConfig() {
NetworkConfig config = NetworkConfig.createStandardWithoutFile();
// 设置ACK超时时间
config.setInt(NetworkConfig.Keys.ACK_TIMEOUT, 2000);
// 设置最大重传次数
config.setInt(NetworkConfig.Keys.MAX_RETRANSMIT, 3);
// 设置最大消息大小
config.setInt(NetworkConfig.Keys.MAX_MESSAGE_SIZE, 1024);
// 设置块传输大小
config.setInt(NetworkConfig.Keys.PREFERRED_BLOCK_SIZE, 512);
return config;
}
/**
* 停止服务器
*/
@PreDestroy
public void stopServer() {
if (isStarted()) {
stop();
log.info("CoAP服务器已停止");
}
}
}
# 2. 设备发现资源
/**
* 设备发现资源
* 处理设备的自动发现和注册,就像"新员工报到"
*/
public class DeviceDiscoveryResource extends CoapResource {
@Autowired
private CoapDeviceService deviceService;
public DeviceDiscoveryResource() {
super("discovery");
getAttributes().setTitle("Device Discovery");
getAttributes().addContentType(MediaTypeRegistry.APPLICATION_JSON);
}
/**
* 处理设备注册请求
* POST /discovery
*/
@Override
public void handlePOST(CoapExchange exchange) {
try {
String payload = exchange.getRequestText();
log.info("收到设备注册请求:{}", payload);
// 解析设备信息
DeviceRegistrationRequest request = parseRegistrationRequest(payload);
// 验证设备信息
if (!validateRegistrationRequest(request)) {
exchange.respond(ResponseCode.BAD_REQUEST, "设备信息验证失败");
return;
}
// 获取设备IP地址
String deviceIp = exchange.getSourceAddress().getHostAddress();
// 注册设备
CoapDevice device = deviceService.registerDevice(request, deviceIp);
// 返回注册成功响应
DeviceRegistrationResponse response = new DeviceRegistrationResponse();
response.setDeviceId(device.getDeviceId());
response.setStatus("SUCCESS");
response.setMessage("设备注册成功");
exchange.respond(ResponseCode.CREATED, toJson(response));
log.info("设备注册成功:deviceId={}, ip={}", device.getDeviceId(), deviceIp);
} catch (Exception e) {
log.error("处理设备注册失败", e);
exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
}
}
/**
* 处理设备发现请求
* GET /discovery
*/
@Override
public void handleGET(CoapExchange exchange) {
try {
// 返回服务器信息
ServerInfo serverInfo = new ServerInfo();
serverInfo.setServerName("IoT CoAP Server");
serverInfo.setVersion("1.0.0");
serverInfo.setSupportedFormats(Arrays.asList("application/json", "text/plain"));
exchange.respond(ResponseCode.CONTENT, toJson(serverInfo));
} catch (Exception e) {
log.error("处理设备发现失败", e);
exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
}
}
// 辅助方法
private DeviceRegistrationRequest parseRegistrationRequest(String payload) {
// JSON解析逻辑
return new DeviceRegistrationRequest();
}
private boolean validateRegistrationRequest(DeviceRegistrationRequest request) {
return request != null && request.getDeviceId() != null;
}
private String toJson(Object obj) {
// JSON序列化逻辑
return "{}";
}
}
# 3. 传感器数据资源
/**
* 传感器数据资源
* 处理传感器数据的上报和查询,就像"数据收集站"
*/
public class SensorDataResource extends CoapResource {
@Autowired
private CoapMessageService messageService;
@Autowired
private SensorDataService sensorDataService;
public SensorDataResource() {
super("sensors");
getAttributes().setTitle("Sensor Data");
getAttributes().addContentType(MediaTypeRegistry.APPLICATION_JSON);
getAttributes().setObservable(); // 支持观察模式
// 添加子资源
add(new TemperatureResource());
add(new HumidityResource());
add(new LightResource());
}
/**
* 处理传感器数据上报
* POST /sensors/{sensorType}
*/
@Override
public void handlePOST(CoapExchange exchange) {
try {
String deviceIp = exchange.getSourceAddress().getHostAddress();
String payload = exchange.getRequestText();
String resourcePath = exchange.getRequestOptions().getUriPathString();
log.info("收到传感器数据:ip={}, path={}, data={}", deviceIp, resourcePath, payload);
// 解析传感器数据
SensorData sensorData = parseSensorData(payload);
// 验证数据
if (!validateSensorData(sensorData)) {
exchange.respond(ResponseCode.BAD_REQUEST, "传感器数据格式错误");
return;
}
// 获取设备信息
CoapDevice device = deviceService.getDeviceByIp(deviceIp);
if (device == null) {
exchange.respond(ResponseCode.UNAUTHORIZED, "设备未注册");
return;
}
// 处理传感器数据
sensorDataService.processSensorData(device.getDeviceId(), sensorData);
// 记录消息
messageService.recordMessage(device.getDeviceId(), CoapMethod.POST,
resourcePath, payload, MessageDirection.REQUEST);
// 更新设备最后通信时间
deviceService.updateLastCommunication(device.getDeviceId());
// 返回成功响应
exchange.respond(ResponseCode.CHANGED, "数据接收成功");
// 通知观察者(如果有的话)
notifyObservers(sensorData);
log.info("传感器数据处理完成:deviceId={}", device.getDeviceId());
} catch (Exception e) {
log.error("处理传感器数据失败", e);
exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
}
}
/**
* 处理传感器数据查询
* GET /sensors/{sensorType}
*/
@Override
public void handleGET(CoapExchange exchange) {
try {
String deviceIp = exchange.getSourceAddress().getHostAddress();
String resourcePath = exchange.getRequestOptions().getUriPathString();
// 获取设备信息
CoapDevice device = deviceService.getDeviceByIp(deviceIp);
if (device == null) {
exchange.respond(ResponseCode.UNAUTHORIZED, "设备未注册");
return;
}
// 获取最新传感器数据
SensorData latestData = sensorDataService.getLatestData(device.getDeviceId(), resourcePath);
if (latestData != null) {
exchange.respond(ResponseCode.CONTENT, toJson(latestData));
} else {
exchange.respond(ResponseCode.NOT_FOUND, "未找到数据");
}
// 记录消息
messageService.recordMessage(device.getDeviceId(), CoapMethod.GET,
resourcePath, null, MessageDirection.REQUEST);
} catch (Exception e) {
log.error("查询传感器数据失败", e);
exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
}
}
/**
* 通知观察者
*/
private void notifyObservers(SensorData data) {
// 通知所有观察者新的传感器数据
changed();
}
// 辅助方法
private SensorData parseSensorData(String payload) {
// JSON解析逻辑
return new SensorData();
}
private boolean validateSensorData(SensorData data) {
return data != null && data.getValue() != null;
}
private String toJson(Object obj) {
// JSON序列化逻辑
return "{}";
}
}
# 4. 设备控制资源
/**
* 设备控制资源
* 处理对设备的控制指令,就像"遥控器"
*/
public class DeviceControlResource extends CoapResource {
@Autowired
private DeviceControlService controlService;
@Autowired
private CoapMessageService messageService;
public DeviceControlResource() {
super("control");
getAttributes().setTitle("Device Control");
getAttributes().addContentType(MediaTypeRegistry.APPLICATION_JSON);
}
/**
* 处理设备控制指令
* PUT /control/{deviceId}/{action}
*/
@Override
public void handlePUT(CoapExchange exchange) {
try {
String payload = exchange.getRequestText();
String resourcePath = exchange.getRequestOptions().getUriPathString();
log.info("收到设备控制指令:path={}, payload={}", resourcePath, payload);
// 解析控制指令
ControlCommand command = parseControlCommand(payload);
// 验证指令
if (!validateControlCommand(command)) {
exchange.respond(ResponseCode.BAD_REQUEST, "控制指令格式错误");
return;
}
// 检查设备是否存在且在线
CoapDevice device = deviceService.getDevice(command.getDeviceId());
if (device == null) {
exchange.respond(ResponseCode.NOT_FOUND, "设备不存在");
return;
}
if (device.getStatus() != DeviceStatus.ONLINE) {
exchange.respond(ResponseCode.SERVICE_UNAVAILABLE, "设备不在线");
return;
}
// 执行控制指令
ControlResult result = controlService.executeCommand(command);
// 记录消息
messageService.recordMessage(command.getDeviceId(), CoapMethod.PUT,
resourcePath, payload, MessageDirection.REQUEST);
// 返回执行结果
if (result.isSuccess()) {
exchange.respond(ResponseCode.CHANGED, toJson(result));
} else {
exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, result.getErrorMessage());
}
log.info("设备控制指令执行完成:deviceId={}, success={}",
command.getDeviceId(), result.isSuccess());
} catch (Exception e) {
log.error("处理设备控制指令失败", e);
exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
}
}
/**
* 查询设备控制状态
* GET /control/{deviceId}
*/
@Override
public void handleGET(CoapExchange exchange) {
try {
String resourcePath = exchange.getRequestOptions().getUriPathString();
String deviceId = extractDeviceIdFromPath(resourcePath);
if (deviceId == null) {
exchange.respond(ResponseCode.BAD_REQUEST, "设备ID不能为空");
return;
}
// 获取设备控制状态
DeviceControlStatus status = controlService.getControlStatus(deviceId);
if (status != null) {
exchange.respond(ResponseCode.CONTENT, toJson(status));
} else {
exchange.respond(ResponseCode.NOT_FOUND, "设备控制状态未找到");
}
} catch (Exception e) {
log.error("查询设备控制状态失败", e);
exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
}
}
// 辅助方法
private ControlCommand parseControlCommand(String payload) {
// JSON解析逻辑
return new ControlCommand();
}
private boolean validateControlCommand(ControlCommand command) {
return command != null && command.getDeviceId() != null && command.getAction() != null;
}
private String extractDeviceIdFromPath(String path) {
// 从路径中提取设备ID
String[] parts = path.split("/");
return parts.length > 1 ? parts[1] : null;
}
private String toJson(Object obj) {
// JSON序列化逻辑
return "{}";
}
}
# CoAP客户端实现
# 1. CoAP客户端服务
/**
* CoAP客户端服务
* 用于主动向设备发送请求,就像"主动拨打电话"
*/
@Service
@Slf4j
public class CoapClientService {
private final CoapClient coapClient;
@Autowired
private CoapDeviceService deviceService;
@Autowired
private CoapMessageService messageService;
public CoapClientService() {
this.coapClient = new CoapClient();
// 配置客户端
NetworkConfig config = NetworkConfig.createStandardWithoutFile();
config.setInt(NetworkConfig.Keys.ACK_TIMEOUT, 2000);
config.setInt(NetworkConfig.Keys.MAX_RETRANSMIT, 3);
coapClient.setEndpoint(new CoapEndpoint.Builder()
.setNetworkConfig(config)
.build());
}
/**
* 向设备发送GET请求
*/
public CoapResponse sendGetRequest(String deviceId, String resourcePath) {
try {
// 获取设备信息
CoapDevice device = deviceService.getDevice(deviceId);
if (device == null) {
throw new IllegalArgumentException("设备不存在:" + deviceId);
}
// 构建请求URI
String uri = String.format("coap://%s:%d%s",
device.getIpAddress(), device.getPort(), resourcePath);
log.info("发送GET请求:uri={}", uri);
// 发送请求
coapClient.setURI(uri);
CoapResponse response = coapClient.get();
// 记录消息
messageService.recordMessage(deviceId, CoapMethod.GET, resourcePath,
null, MessageDirection.REQUEST);
if (response != null) {
messageService.recordMessage(deviceId, CoapMethod.GET, resourcePath,
response.getResponseText(), MessageDirection.RESPONSE);
log.info("GET请求成功:deviceId={}, code={}", deviceId, response.getCode());
} else {
log.warn("GET请求超时:deviceId={}", deviceId);
}
return response;
} catch (Exception e) {
log.error("发送GET请求失败:deviceId={}, path={}", deviceId, resourcePath, e);
throw new RuntimeException("GET请求失败", e);
}
}
/**
* 向设备发送POST请求
*/
public CoapResponse sendPostRequest(String deviceId, String resourcePath, String payload) {
try {
CoapDevice device = deviceService.getDevice(deviceId);
if (device == null) {
throw new IllegalArgumentException("设备不存在:" + deviceId);
}
String uri = String.format("coap://%s:%d%s",
device.getIpAddress(), device.getPort(), resourcePath);
log.info("发送POST请求:uri={}, payload={}", uri, payload);
coapClient.setURI(uri);
CoapResponse response = coapClient.post(payload, MediaTypeRegistry.APPLICATION_JSON);
// 记录消息
messageService.recordMessage(deviceId, CoapMethod.POST, resourcePath,
payload, MessageDirection.REQUEST);
if (response != null) {
messageService.recordMessage(deviceId, CoapMethod.POST, resourcePath,
response.getResponseText(), MessageDirection.RESPONSE);
log.info("POST请求成功:deviceId={}, code={}", deviceId, response.getCode());
} else {
log.warn("POST请求超时:deviceId={}", deviceId);
}
return response;
} catch (Exception e) {
log.error("发送POST请求失败:deviceId={}, path={}", deviceId, resourcePath, e);
throw new RuntimeException("POST请求失败", e);
}
}
/**
* 向设备发送PUT请求(控制指令)
*/
public CoapResponse sendPutRequest(String deviceId, String resourcePath, String payload) {
try {
CoapDevice device = deviceService.getDevice(deviceId);
if (device == null) {
throw new IllegalArgumentException("设备不存在:" + deviceId);
}
String uri = String.format("coap://%s:%d%s",
device.getIpAddress(), device.getPort(), resourcePath);
log.info("发送PUT请求:uri={}, payload={}", uri, payload);
coapClient.setURI(uri);
CoapResponse response = coapClient.put(payload, MediaTypeRegistry.APPLICATION_JSON);
// 记录消息
messageService.recordMessage(deviceId, CoapMethod.PUT, resourcePath,
payload, MessageDirection.REQUEST);
if (response != null) {
messageService.recordMessage(deviceId, CoapMethod.PUT, resourcePath,
response.getResponseText(), MessageDirection.RESPONSE);
log.info("PUT请求成功:deviceId={}, code={}", deviceId, response.getCode());
} else {
log.warn("PUT请求超时:deviceId={}", deviceId);
}
return response;
} catch (Exception e) {
log.error("发送PUT请求失败:deviceId={}, path={}", deviceId, resourcePath, e);
throw new RuntimeException("PUT请求失败", e);
}
}
/**
* 观察设备资源
*/
public void observeResource(String deviceId, String resourcePath, CoapObserveRelation.CoapObserver observer) {
try {
CoapDevice device = deviceService.getDevice(deviceId);
if (device == null) {
throw new IllegalArgumentException("设备不存在:" + deviceId);
}
String uri = String.format("coap://%s:%d%s",
device.getIpAddress(), device.getPort(), resourcePath);
log.info("开始观察资源:uri={}", uri);
coapClient.setURI(uri);
CoapObserveRelation relation = coapClient.observe(observer);
log.info("资源观察已建立:deviceId={}, path={}", deviceId, resourcePath);
} catch (Exception e) {
log.error("建立资源观察失败:deviceId={}, path={}", deviceId, resourcePath, e);
throw new RuntimeException("资源观察失败", e);
}
}
}
# 数据库设计
# 1. CoAP设备表
-- CoAP设备信息表
CREATE TABLE coap_device (
id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
device_id VARCHAR(100) NOT NULL UNIQUE COMMENT '设备ID',
device_name VARCHAR(200) COMMENT '设备名称',
ip_address VARCHAR(50) COMMENT 'IP地址',
port INT DEFAULT 5683 COMMENT 'CoAP端口',
device_type VARCHAR(20) COMMENT '设备类型',
status VARCHAR(20) COMMENT '设备状态',
last_communication DATETIME COMMENT '最后通信时间',
resources TEXT COMMENT '设备资源列表(JSON)',
config TEXT COMMENT '设备配置(JSON)',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_device_id (device_id),
INDEX idx_ip_address (ip_address),
INDEX idx_device_type (device_type),
INDEX idx_status (status),
INDEX idx_last_communication (last_communication)
) COMMENT='CoAP设备信息表';
# 2. CoAP消息表
-- CoAP消息记录表
CREATE TABLE coap_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
message_id VARCHAR(50) NOT NULL UNIQUE COMMENT '消息ID',
device_id VARCHAR(100) NOT NULL COMMENT '设备ID',
method VARCHAR(10) COMMENT 'CoAP方法',
resource_path VARCHAR(200) COMMENT '资源路径',
payload TEXT COMMENT '消息载荷',
response_code VARCHAR(10) COMMENT '响应码',
message_type VARCHAR(10) COMMENT '消息类型',
direction VARCHAR(10) NOT NULL COMMENT '消息方向',
process_time BIGINT COMMENT '处理时间(毫秒)',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
INDEX idx_message_id (message_id),
INDEX idx_device_id (device_id),
INDEX idx_method (method),
INDEX idx_resource_path (resource_path),
INDEX idx_direction (direction),
INDEX idx_create_time (create_time)
) COMMENT='CoAP消息记录表';
# 3. CoAP资源表
-- CoAP资源定义表
CREATE TABLE coap_resource (
id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
device_id VARCHAR(100) NOT NULL COMMENT '设备ID',
resource_path VARCHAR(200) NOT NULL COMMENT '资源路径',
resource_name VARCHAR(100) COMMENT '资源名称',
resource_type VARCHAR(20) COMMENT '资源类型',
supported_methods VARCHAR(100) COMMENT '支持的方法(JSON)',
content_format VARCHAR(50) COMMENT '内容格式',
observable BOOLEAN DEFAULT FALSE COMMENT '是否可观察',
description TEXT COMMENT '资源描述',
last_update DATETIME COMMENT '最后更新时间',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
UNIQUE KEY uk_device_resource (device_id, resource_path),
INDEX idx_device_id (device_id),
INDEX idx_resource_path (resource_path),
INDEX idx_resource_type (resource_type),
INDEX idx_observable (observable)
) COMMENT='CoAP资源定义表';
# 性能优化策略
# 1. UDP连接池管理
/**
* UDP连接池管理器
* 管理CoAP的UDP连接,提高并发处理能力
*/
@Component
@Slf4j
public class UdpConnectionPoolManager {
private final Map<String, DatagramSocket> socketPool = new ConcurrentHashMap<>();
private final Object poolLock = new Object();
@Value("${coap.pool.max-size:50}")
private int maxPoolSize;
@Value("${coap.pool.timeout:30000}")
private int socketTimeout;
/**
* 获取UDP套接字
*/
public DatagramSocket getSocket(String key) {
synchronized (poolLock) {
DatagramSocket socket = socketPool.get(key);
if (socket == null || socket.isClosed()) {
try {
socket = new DatagramSocket();
socket.setSoTimeout(socketTimeout);
socketPool.put(key, socket);
log.debug("创建新的UDP套接字:key={}", key);
} catch (SocketException e) {
log.error("创建UDP套接字失败:key={}", key, e);
throw new RuntimeException("UDP套接字创建失败", e);
}
}
return socket;
}
}
/**
* 释放UDP套接字
*/
public void releaseSocket(String key) {
synchronized (poolLock) {
DatagramSocket socket = socketPool.remove(key);
if (socket != null && !socket.isClosed()) {
socket.close();
log.debug("释放UDP套接字:key={}", key);
}
}
}
/**
* 清理过期连接
*/
@Scheduled(fixedRate = 60000) // 每分钟清理一次
public void cleanupExpiredSockets() {
synchronized (poolLock) {
Iterator<Map.Entry<String, DatagramSocket>> iterator = socketPool.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, DatagramSocket> entry = iterator.next();
DatagramSocket socket = entry.getValue();
if (socket.isClosed()) {
iterator.remove();
log.debug("清理已关闭的套接字:key={}", entry.getKey());
}
}
log.info("UDP连接池清理完成,当前连接数:{}", socketPool.size());
}
}
/**
* 获取连接池状态
*/
public Map<String, Object> getPoolStatus() {
Map<String, Object> status = new HashMap<>();
status.put("activeConnections", socketPool.size());
status.put("maxPoolSize", maxPoolSize);
status.put("socketTimeout", socketTimeout);
return status;
}
}
# 2. 消息缓存策略
/**
* CoAP消息缓存管理器
* 缓存最近的消息,提高查询性能
*/
@Component
@Slf4j
public class CoapMessageCacheManager {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String CACHE_PREFIX = "coap:message:";
private static final String DEVICE_CACHE_PREFIX = "coap:device:messages:";
/**
* 缓存消息
*/
public void cacheMessage(CoapMessage message) {
try {
// 缓存单个消息
String messageKey = CACHE_PREFIX + message.getMessageId();
redisTemplate.opsForValue().set(messageKey, message, Duration.ofHours(24));
// 缓存到设备消息列表
String deviceKey = DEVICE_CACHE_PREFIX + message.getDeviceId();
redisTemplate.opsForList().leftPush(deviceKey, message);
redisTemplate.opsForList().trim(deviceKey, 0, 99); // 只保留最近100条
redisTemplate.expire(deviceKey, Duration.ofDays(7));
log.debug("消息已缓存:messageId={}", message.getMessageId());
} catch (Exception e) {
log.error("缓存消息失败:messageId={}", message.getMessageId(), e);
}
}
/**
* 获取缓存的消息
*/
public CoapMessage getCachedMessage(String messageId) {
try {
String key = CACHE_PREFIX + messageId;
return (CoapMessage) redisTemplate.opsForValue().get(key);
} catch (Exception e) {
log.error("获取缓存消息失败:messageId={}", messageId, e);
return null;
}
}
/**
* 获取设备最近的消息
*/
public List<CoapMessage> getDeviceRecentMessages(String deviceId, int limit) {
try {
String key = DEVICE_CACHE_PREFIX + deviceId;
List<Object> messages = redisTemplate.opsForList().range(key, 0, limit - 1);
return messages.stream()
.map(obj -> (CoapMessage) obj)
.collect(Collectors.toList());
} catch (Exception e) {
log.error("获取设备最近消息失败:deviceId={}", deviceId, e);
return Collections.emptyList();
}
}
/**
* 清理过期缓存
*/
@Scheduled(fixedRate = 3600000) // 每小时清理一次
public void cleanupExpiredCache() {
try {
// 这里可以实现更复杂的缓存清理逻辑
log.info("CoAP消息缓存清理完成");
} catch (Exception e) {
log.error("清理缓存失败", e);
}
}
}
# 监控和告警
# 1. CoAP性能监控
/**
* CoAP性能监控服务
* 监控CoAP服务器的性能指标
*/
@Service
@Slf4j
public class CoapPerformanceMonitor {
private final AtomicLong requestCount = new AtomicLong(0);
private final AtomicLong responseCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
private final AtomicLong totalProcessTime = new AtomicLong(0);
@Autowired
private AlertService alertService;
/**
* 记录请求
*/
public void recordRequest() {
requestCount.incrementAndGet();
}
/**
* 记录响应
*/
public void recordResponse(long processTime) {
responseCount.incrementAndGet();
totalProcessTime.addAndGet(processTime);
}
/**
* 记录错误
*/
public void recordError() {
errorCount.incrementAndGet();
}
/**
* 定时报告性能指标
*/
@Scheduled(fixedRate = 60000) // 每分钟报告一次
public void reportMetrics() {
long requests = requestCount.get();
long responses = responseCount.get();
long errors = errorCount.get();
long totalTime = totalProcessTime.get();
// 计算平均响应时间
double avgResponseTime = responses > 0 ? (double) totalTime / responses : 0;
// 计算错误率
double errorRate = requests > 0 ? (double) errors / requests * 100 : 0;
log.info("CoAP性能指标 - 请求数:{}, 响应数:{}, 错误数:{}, 平均响应时间:{}ms, 错误率:{}%",
requests, responses, errors, String.format("%.2f", avgResponseTime),
String.format("%.2f", errorRate));
// 检查告警条件
checkAlertConditions(errorRate, avgResponseTime);
// 重置计数器(可选)
// resetCounters();
}
/**
* 检查告警条件
*/
private void checkAlertConditions(double errorRate, double avgResponseTime) {
// 错误率过高告警
if (errorRate > 10.0) {
alertService.sendAlert("CoAP错误率过高",
String.format("当前错误率:%.2f%%,超过阈值10%%", errorRate));
}
// 响应时间过长告警
if (avgResponseTime > 5000) {
alertService.sendAlert("CoAP响应时间过长",
String.format("当前平均响应时间:%.2fms,超过阈值5000ms", avgResponseTime));
}
}
/**
* 重置计数器
*/
private void resetCounters() {
requestCount.set(0);
responseCount.set(0);
errorCount.set(0);
totalProcessTime.set(0);
}
/**
* 获取当前性能指标
*/
public Map<String, Object> getCurrentMetrics() {
Map<String, Object> metrics = new HashMap<>();
long requests = requestCount.get();
long responses = responseCount.get();
long errors = errorCount.get();
long totalTime = totalProcessTime.get();
metrics.put("requestCount", requests);
metrics.put("responseCount", responses);
metrics.put("errorCount", errors);
metrics.put("avgResponseTime", responses > 0 ? (double) totalTime / responses : 0);
metrics.put("errorRate", requests > 0 ? (double) errors / requests * 100 : 0);
return metrics;
}
}
# 配置文件
# application.yml
# CoAP服务器配置
coap:
server:
port: 5683
max-message-size: 1024
ack-timeout: 2000
max-retransmit: 3
preferred-block-size: 512
# 连接池配置
pool:
max-size: 50
timeout: 30000
# 客户端配置
client:
timeout: 5000
max-retries: 3
# 数据库配置
spring:
datasource:
url: jdbc:mysql://localhost:3306/iot_platform?useUnicode=true&characterEncoding=utf8&useSSL=false
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
# JPA配置
jpa:
hibernate:
ddl-auto: update
show-sql: false
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL8Dialect
# Redis配置
redis:
host: localhost
port: 6379
password:
database: 1
timeout: 3000
lettuce:
pool:
max-active: 50
max-idle: 10
min-idle: 2
max-wait: 3000
# 日志配置
logging:
level:
com.iot.coap: DEBUG
org.eclipse.californium: INFO
# 总结
通过这个CoAP设备接入层的实现,我们构建了一个专为资源受限设备设计的通信系统:
- 轻量级通信:基于UDP的CoAP协议,减少网络开销
- RESTful接口:类似HTTP的操作方式,易于理解和使用
- 设备管理:完整的设备注册、发现和状态管理
- 资源管理:灵活的资源定义和访问控制
- 观察模式:支持实时数据订阅和推送
- 性能优化:连接池、消息缓存等优化策略
- 监控告警:全面的性能监控和异常告警
这就像是为"节能设备"建立了一个高效的"短信系统",让它们可以在最小的能耗下与服务器保持有效通信!