CoAP设备接入层详细设计与实现

# CoAP设备接入层详细设计与实现

# 故事背景

想象一下,你正在为一家智能农业公司开发物联网系统。农场里有数千个传感器:土壤湿度传感器、温度传感器、光照传感器等。这些传感器大多数时间处于"睡眠"状态以节省电池,只在需要时才"醒来"发送数据。

传统的HTTP协议就像是"电话通话",需要建立连接、维持会话,对于这些"偶尔说话"的设备来说太重了。而CoAP(Constrained Application Protocol)就像是"发短信",轻量、简洁,特别适合资源受限的设备。

让我们一起来构建这个专为"节能设备"设计的通信系统!

# CoAP协议原理

# 什么是CoAP?

CoAP是一种专为物联网设计的应用层协议,基于UDP传输,具有以下特点:

  • 轻量级:协议头只有4字节
  • 低功耗:支持设备休眠和唤醒
  • 可靠性:支持确认和重传机制
  • RESTful:类似HTTP的GET、POST、PUT、DELETE操作

# CoAP vs HTTP对比

特性 CoAP HTTP
传输协议 UDP TCP
协议头大小 4字节 20+字节
连接方式 无连接 面向连接
适用场景 资源受限设备 通用Web应用
功耗

# CoAP消息格式

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|Ver| T |  TKL  |      Code     |          Message ID           |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|   Token (if any, TKL bytes) ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|   Options (if any) ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|1 1 1 1 1 1 1 1|    Payload (if any) ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

# 系统架构设计

# 整体架构

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   设备层        │    │   CoAP接入层    │    │   业务层        │
│                 │    │                 │    │                 │
│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
│ │ 传感器      │ │    │ │ CoAP Server │ │    │ │ 数据处理    │ │
│ │ - 土壤湿度  │ │◄──►│ │ - 消息解析  │ │◄──►│ │ - 业务逻辑  │ │
│ │ - 温度      │ │    │ │ - 资源管理  │ │    │ │ - 数据存储  │ │
│ │ - 光照      │ │    │ │ - 设备管理  │ │    │ │ - 告警处理  │ │
│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
└─────────────────┘    └─────────────────┘    └─────────────────┘

# 核心实体设计

# 1. CoAP设备实体

/**
 * CoAP设备实体
 * 记录每个CoAP设备的基本信息,就像设备的"档案"
 */
@Entity
@Table(name = "coap_device")
public class CoapDevice {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    /**
     * 设备ID - 设备的唯一标识
     */
    @Column(name = "device_id", unique = true, nullable = false)
    private String deviceId;
    
    /**
     * 设备名称
     */
    @Column(name = "device_name")
    private String deviceName;
    
    /**
     * 设备IP地址
     */
    @Column(name = "ip_address")
    private String ipAddress;
    
    /**
     * CoAP端口
     */
    @Column(name = "port")
    private Integer port;
    
    /**
     * 设备类型:SENSOR(传感器)、ACTUATOR(执行器)、GATEWAY(网关)
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "device_type")
    private DeviceType deviceType;
    
    /**
     * 设备状态:ONLINE(在线)、OFFLINE(离线)、SLEEPING(休眠)
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "status")
    private DeviceStatus status;
    
    /**
     * 最后通信时间
     */
    @Column(name = "last_communication")
    private LocalDateTime lastCommunication;
    
    /**
     * 设备资源列表(JSON格式)
     * 记录设备支持的CoAP资源路径
     */
    @Lob
    @Column(name = "resources")
    private String resources;
    
    /**
     * 设备配置信息(JSON格式)
     */
    @Lob
    @Column(name = "config")
    private String config;
    
    /**
     * 创建时间
     */
    @Column(name = "create_time")
    private LocalDateTime createTime;
    
    /**
     * 更新时间
     */
    @Column(name = "update_time")
    private LocalDateTime updateTime;
    
    // 构造函数、getter、setter省略...
}

/**
 * 设备类型枚举
 */
public enum DeviceType {
    SENSOR("传感器"),
    ACTUATOR("执行器"),
    GATEWAY("网关");
    
    private final String description;
    
    DeviceType(String description) {
        this.description = description;
    }
}

/**
 * 设备状态枚举
 */
public enum DeviceStatus {
    ONLINE("在线"),
    OFFLINE("离线"),
    SLEEPING("休眠");
    
    private final String description;
    
    DeviceStatus(String description) {
        this.description = description;
    }
}

# 2. CoAP消息实体

/**
 * CoAP消息实体
 * 记录设备与服务器之间的每次通信,就像通话记录
 */
@Entity
@Table(name = "coap_message")
public class CoapMessage {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    /**
     * 消息ID
     */
    @Column(name = "message_id", unique = true)
    private String messageId;
    
    /**
     * 设备ID
     */
    @Column(name = "device_id", nullable = false)
    private String deviceId;
    
    /**
     * CoAP方法:GET、POST、PUT、DELETE
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "method")
    private CoapMethod method;
    
    /**
     * 资源路径,如:/sensors/temperature
     */
    @Column(name = "resource_path")
    private String resourcePath;
    
    /**
     * 消息载荷
     */
    @Lob
    @Column(name = "payload")
    private String payload;
    
    /**
     * 响应码:2.05 Content、4.04 Not Found等
     */
    @Column(name = "response_code")
    private String responseCode;
    
    /**
     * 消息类型:CON(可靠)、NON(不可靠)、ACK(确认)、RST(重置)
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "message_type")
    private MessageType messageType;
    
    /**
     * 消息方向:REQUEST(请求)、RESPONSE(响应)
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "direction")
    private MessageDirection direction;
    
    /**
     * 处理时间(毫秒)
     */
    @Column(name = "process_time")
    private Long processTime;
    
    /**
     * 创建时间
     */
    @Column(name = "create_time")
    private LocalDateTime createTime;
    
    // 构造函数、getter、setter省略...
}

/**
 * CoAP方法枚举
 */
public enum CoapMethod {
    GET, POST, PUT, DELETE
}

/**
 * 消息类型枚举
 */
public enum MessageType {
    CON,  // Confirmable
    NON,  // Non-confirmable
    ACK,  // Acknowledgement
    RST   // Reset
}

/**
 * 消息方向枚举
 */
public enum MessageDirection {
    REQUEST,   // 请求
    RESPONSE   // 响应
}

# 3. CoAP资源实体

/**
 * CoAP资源实体
 * 定义设备提供的资源,就像设备的"功能清单"
 */
@Entity
@Table(name = "coap_resource")
public class CoapResource {
    
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    /**
     * 设备ID
     */
    @Column(name = "device_id", nullable = false)
    private String deviceId;
    
    /**
     * 资源路径
     */
    @Column(name = "resource_path", nullable = false)
    private String resourcePath;
    
    /**
     * 资源名称
     */
    @Column(name = "resource_name")
    private String resourceName;
    
    /**
     * 资源类型:SENSOR_DATA(传感器数据)、CONTROL(控制)、CONFIG(配置)
     */
    @Enumerated(EnumType.STRING)
    @Column(name = "resource_type")
    private ResourceType resourceType;
    
    /**
     * 支持的方法(JSON数组):["GET", "POST"]
     */
    @Column(name = "supported_methods")
    private String supportedMethods;
    
    /**
     * 内容格式:application/json、text/plain等
     */
    @Column(name = "content_format")
    private String contentFormat;
    
    /**
     * 是否可观察(支持订阅)
     */
    @Column(name = "observable")
    private Boolean observable;
    
    /**
     * 资源描述
     */
    @Column(name = "description")
    private String description;
    
    /**
     * 最后更新时间
     */
    @Column(name = "last_update")
    private LocalDateTime lastUpdate;
    
    // 构造函数、getter、setter省略...
}

/**
 * 资源类型枚举
 */
public enum ResourceType {
    SENSOR_DATA("传感器数据"),
    CONTROL("控制"),
    CONFIG("配置"),
    STATUS("状态");
    
    private final String description;
    
    ResourceType(String description) {
        this.description = description;
    }
}

# 核心服务实现

# 1. CoAP服务器实现

/**
 * CoAP服务器实现
 * 这是整个CoAP接入层的"大脑",负责处理所有CoAP通信
 */
@Component
@Slf4j
public class CoapServerImpl extends CoapServer {
    
    @Autowired
    private CoapDeviceService deviceService;
    
    @Autowired
    private CoapMessageService messageService;
    
    @Autowired
    private CoapResourceService resourceService;
    
    @Value("${coap.server.port:5683}")
    private int serverPort;
    
    /**
     * 启动CoAP服务器
     */
    @PostConstruct
    public void startServer() {
        try {
            // 创建UDP连接器
            CoapEndpoint.Builder builder = new CoapEndpoint.Builder();
            builder.setInetSocketAddress(new InetSocketAddress(serverPort));
            builder.setNetworkConfig(createNetworkConfig());
            
            addEndpoint(builder.build());
            
            // 添加资源处理器
            getRoot().add(new DeviceDiscoveryResource());
            getRoot().add(new SensorDataResource());
            getRoot().add(new DeviceControlResource());
            getRoot().add(new DeviceConfigResource());
            
            // 启动服务器
            start();
            
            log.info("CoAP服务器启动成功,端口:{}", serverPort);
            
        } catch (Exception e) {
            log.error("CoAP服务器启动失败", e);
            throw new RuntimeException("CoAP服务器启动失败", e);
        }
    }
    
    /**
     * 创建网络配置
     */
    private NetworkConfig createNetworkConfig() {
        NetworkConfig config = NetworkConfig.createStandardWithoutFile();
        
        // 设置ACK超时时间
        config.setInt(NetworkConfig.Keys.ACK_TIMEOUT, 2000);
        
        // 设置最大重传次数
        config.setInt(NetworkConfig.Keys.MAX_RETRANSMIT, 3);
        
        // 设置最大消息大小
        config.setInt(NetworkConfig.Keys.MAX_MESSAGE_SIZE, 1024);
        
        // 设置块传输大小
        config.setInt(NetworkConfig.Keys.PREFERRED_BLOCK_SIZE, 512);
        
        return config;
    }
    
    /**
     * 停止服务器
     */
    @PreDestroy
    public void stopServer() {
        if (isStarted()) {
            stop();
            log.info("CoAP服务器已停止");
        }
    }
}

# 2. 设备发现资源

/**
 * 设备发现资源
 * 处理设备的自动发现和注册,就像"新员工报到"
 */
public class DeviceDiscoveryResource extends CoapResource {
    
    @Autowired
    private CoapDeviceService deviceService;
    
    public DeviceDiscoveryResource() {
        super("discovery");
        getAttributes().setTitle("Device Discovery");
        getAttributes().addContentType(MediaTypeRegistry.APPLICATION_JSON);
    }
    
    /**
     * 处理设备注册请求
     * POST /discovery
     */
    @Override
    public void handlePOST(CoapExchange exchange) {
        try {
            String payload = exchange.getRequestText();
            log.info("收到设备注册请求:{}", payload);
            
            // 解析设备信息
            DeviceRegistrationRequest request = parseRegistrationRequest(payload);
            
            // 验证设备信息
            if (!validateRegistrationRequest(request)) {
                exchange.respond(ResponseCode.BAD_REQUEST, "设备信息验证失败");
                return;
            }
            
            // 获取设备IP地址
            String deviceIp = exchange.getSourceAddress().getHostAddress();
            
            // 注册设备
            CoapDevice device = deviceService.registerDevice(request, deviceIp);
            
            // 返回注册成功响应
            DeviceRegistrationResponse response = new DeviceRegistrationResponse();
            response.setDeviceId(device.getDeviceId());
            response.setStatus("SUCCESS");
            response.setMessage("设备注册成功");
            
            exchange.respond(ResponseCode.CREATED, toJson(response));
            
            log.info("设备注册成功:deviceId={}, ip={}", device.getDeviceId(), deviceIp);
            
        } catch (Exception e) {
            log.error("处理设备注册失败", e);
            exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
        }
    }
    
    /**
     * 处理设备发现请求
     * GET /discovery
     */
    @Override
    public void handleGET(CoapExchange exchange) {
        try {
            // 返回服务器信息
            ServerInfo serverInfo = new ServerInfo();
            serverInfo.setServerName("IoT CoAP Server");
            serverInfo.setVersion("1.0.0");
            serverInfo.setSupportedFormats(Arrays.asList("application/json", "text/plain"));
            
            exchange.respond(ResponseCode.CONTENT, toJson(serverInfo));
            
        } catch (Exception e) {
            log.error("处理设备发现失败", e);
            exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
        }
    }
    
    // 辅助方法
    private DeviceRegistrationRequest parseRegistrationRequest(String payload) {
        // JSON解析逻辑
        return new DeviceRegistrationRequest();
    }
    
    private boolean validateRegistrationRequest(DeviceRegistrationRequest request) {
        return request != null && request.getDeviceId() != null;
    }
    
    private String toJson(Object obj) {
        // JSON序列化逻辑
        return "{}";
    }
}

# 3. 传感器数据资源

/**
 * 传感器数据资源
 * 处理传感器数据的上报和查询,就像"数据收集站"
 */
public class SensorDataResource extends CoapResource {
    
    @Autowired
    private CoapMessageService messageService;
    
    @Autowired
    private SensorDataService sensorDataService;
    
    public SensorDataResource() {
        super("sensors");
        getAttributes().setTitle("Sensor Data");
        getAttributes().addContentType(MediaTypeRegistry.APPLICATION_JSON);
        getAttributes().setObservable(); // 支持观察模式
        
        // 添加子资源
        add(new TemperatureResource());
        add(new HumidityResource());
        add(new LightResource());
    }
    
    /**
     * 处理传感器数据上报
     * POST /sensors/{sensorType}
     */
    @Override
    public void handlePOST(CoapExchange exchange) {
        try {
            String deviceIp = exchange.getSourceAddress().getHostAddress();
            String payload = exchange.getRequestText();
            String resourcePath = exchange.getRequestOptions().getUriPathString();
            
            log.info("收到传感器数据:ip={}, path={}, data={}", deviceIp, resourcePath, payload);
            
            // 解析传感器数据
            SensorData sensorData = parseSensorData(payload);
            
            // 验证数据
            if (!validateSensorData(sensorData)) {
                exchange.respond(ResponseCode.BAD_REQUEST, "传感器数据格式错误");
                return;
            }
            
            // 获取设备信息
            CoapDevice device = deviceService.getDeviceByIp(deviceIp);
            if (device == null) {
                exchange.respond(ResponseCode.UNAUTHORIZED, "设备未注册");
                return;
            }
            
            // 处理传感器数据
            sensorDataService.processSensorData(device.getDeviceId(), sensorData);
            
            // 记录消息
            messageService.recordMessage(device.getDeviceId(), CoapMethod.POST, 
                    resourcePath, payload, MessageDirection.REQUEST);
            
            // 更新设备最后通信时间
            deviceService.updateLastCommunication(device.getDeviceId());
            
            // 返回成功响应
            exchange.respond(ResponseCode.CHANGED, "数据接收成功");
            
            // 通知观察者(如果有的话)
            notifyObservers(sensorData);
            
            log.info("传感器数据处理完成:deviceId={}", device.getDeviceId());
            
        } catch (Exception e) {
            log.error("处理传感器数据失败", e);
            exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
        }
    }
    
    /**
     * 处理传感器数据查询
     * GET /sensors/{sensorType}
     */
    @Override
    public void handleGET(CoapExchange exchange) {
        try {
            String deviceIp = exchange.getSourceAddress().getHostAddress();
            String resourcePath = exchange.getRequestOptions().getUriPathString();
            
            // 获取设备信息
            CoapDevice device = deviceService.getDeviceByIp(deviceIp);
            if (device == null) {
                exchange.respond(ResponseCode.UNAUTHORIZED, "设备未注册");
                return;
            }
            
            // 获取最新传感器数据
            SensorData latestData = sensorDataService.getLatestData(device.getDeviceId(), resourcePath);
            
            if (latestData != null) {
                exchange.respond(ResponseCode.CONTENT, toJson(latestData));
            } else {
                exchange.respond(ResponseCode.NOT_FOUND, "未找到数据");
            }
            
            // 记录消息
            messageService.recordMessage(device.getDeviceId(), CoapMethod.GET, 
                    resourcePath, null, MessageDirection.REQUEST);
            
        } catch (Exception e) {
            log.error("查询传感器数据失败", e);
            exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
        }
    }
    
    /**
     * 通知观察者
     */
    private void notifyObservers(SensorData data) {
        // 通知所有观察者新的传感器数据
        changed();
    }
    
    // 辅助方法
    private SensorData parseSensorData(String payload) {
        // JSON解析逻辑
        return new SensorData();
    }
    
    private boolean validateSensorData(SensorData data) {
        return data != null && data.getValue() != null;
    }
    
    private String toJson(Object obj) {
        // JSON序列化逻辑
        return "{}";
    }
}

# 4. 设备控制资源

/**
 * 设备控制资源
 * 处理对设备的控制指令,就像"遥控器"
 */
public class DeviceControlResource extends CoapResource {
    
    @Autowired
    private DeviceControlService controlService;
    
    @Autowired
    private CoapMessageService messageService;
    
    public DeviceControlResource() {
        super("control");
        getAttributes().setTitle("Device Control");
        getAttributes().addContentType(MediaTypeRegistry.APPLICATION_JSON);
    }
    
    /**
     * 处理设备控制指令
     * PUT /control/{deviceId}/{action}
     */
    @Override
    public void handlePUT(CoapExchange exchange) {
        try {
            String payload = exchange.getRequestText();
            String resourcePath = exchange.getRequestOptions().getUriPathString();
            
            log.info("收到设备控制指令:path={}, payload={}", resourcePath, payload);
            
            // 解析控制指令
            ControlCommand command = parseControlCommand(payload);
            
            // 验证指令
            if (!validateControlCommand(command)) {
                exchange.respond(ResponseCode.BAD_REQUEST, "控制指令格式错误");
                return;
            }
            
            // 检查设备是否存在且在线
            CoapDevice device = deviceService.getDevice(command.getDeviceId());
            if (device == null) {
                exchange.respond(ResponseCode.NOT_FOUND, "设备不存在");
                return;
            }
            
            if (device.getStatus() != DeviceStatus.ONLINE) {
                exchange.respond(ResponseCode.SERVICE_UNAVAILABLE, "设备不在线");
                return;
            }
            
            // 执行控制指令
            ControlResult result = controlService.executeCommand(command);
            
            // 记录消息
            messageService.recordMessage(command.getDeviceId(), CoapMethod.PUT, 
                    resourcePath, payload, MessageDirection.REQUEST);
            
            // 返回执行结果
            if (result.isSuccess()) {
                exchange.respond(ResponseCode.CHANGED, toJson(result));
            } else {
                exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, result.getErrorMessage());
            }
            
            log.info("设备控制指令执行完成:deviceId={}, success={}", 
                    command.getDeviceId(), result.isSuccess());
            
        } catch (Exception e) {
            log.error("处理设备控制指令失败", e);
            exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
        }
    }
    
    /**
     * 查询设备控制状态
     * GET /control/{deviceId}
     */
    @Override
    public void handleGET(CoapExchange exchange) {
        try {
            String resourcePath = exchange.getRequestOptions().getUriPathString();
            String deviceId = extractDeviceIdFromPath(resourcePath);
            
            if (deviceId == null) {
                exchange.respond(ResponseCode.BAD_REQUEST, "设备ID不能为空");
                return;
            }
            
            // 获取设备控制状态
            DeviceControlStatus status = controlService.getControlStatus(deviceId);
            
            if (status != null) {
                exchange.respond(ResponseCode.CONTENT, toJson(status));
            } else {
                exchange.respond(ResponseCode.NOT_FOUND, "设备控制状态未找到");
            }
            
        } catch (Exception e) {
            log.error("查询设备控制状态失败", e);
            exchange.respond(ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
        }
    }
    
    // 辅助方法
    private ControlCommand parseControlCommand(String payload) {
        // JSON解析逻辑
        return new ControlCommand();
    }
    
    private boolean validateControlCommand(ControlCommand command) {
        return command != null && command.getDeviceId() != null && command.getAction() != null;
    }
    
    private String extractDeviceIdFromPath(String path) {
        // 从路径中提取设备ID
        String[] parts = path.split("/");
        return parts.length > 1 ? parts[1] : null;
    }
    
    private String toJson(Object obj) {
        // JSON序列化逻辑
        return "{}";
    }
}

# CoAP客户端实现

# 1. CoAP客户端服务

/**
 * CoAP客户端服务
 * 用于主动向设备发送请求,就像"主动拨打电话"
 */
@Service
@Slf4j
public class CoapClientService {
    
    private final CoapClient coapClient;
    
    @Autowired
    private CoapDeviceService deviceService;
    
    @Autowired
    private CoapMessageService messageService;
    
    public CoapClientService() {
        this.coapClient = new CoapClient();
        
        // 配置客户端
        NetworkConfig config = NetworkConfig.createStandardWithoutFile();
        config.setInt(NetworkConfig.Keys.ACK_TIMEOUT, 2000);
        config.setInt(NetworkConfig.Keys.MAX_RETRANSMIT, 3);
        coapClient.setEndpoint(new CoapEndpoint.Builder()
                .setNetworkConfig(config)
                .build());
    }
    
    /**
     * 向设备发送GET请求
     */
    public CoapResponse sendGetRequest(String deviceId, String resourcePath) {
        try {
            // 获取设备信息
            CoapDevice device = deviceService.getDevice(deviceId);
            if (device == null) {
                throw new IllegalArgumentException("设备不存在:" + deviceId);
            }
            
            // 构建请求URI
            String uri = String.format("coap://%s:%d%s", 
                    device.getIpAddress(), device.getPort(), resourcePath);
            
            log.info("发送GET请求:uri={}", uri);
            
            // 发送请求
            coapClient.setURI(uri);
            CoapResponse response = coapClient.get();
            
            // 记录消息
            messageService.recordMessage(deviceId, CoapMethod.GET, resourcePath, 
                    null, MessageDirection.REQUEST);
            
            if (response != null) {
                messageService.recordMessage(deviceId, CoapMethod.GET, resourcePath, 
                        response.getResponseText(), MessageDirection.RESPONSE);
                
                log.info("GET请求成功:deviceId={}, code={}", deviceId, response.getCode());
            } else {
                log.warn("GET请求超时:deviceId={}", deviceId);
            }
            
            return response;
            
        } catch (Exception e) {
            log.error("发送GET请求失败:deviceId={}, path={}", deviceId, resourcePath, e);
            throw new RuntimeException("GET请求失败", e);
        }
    }
    
    /**
     * 向设备发送POST请求
     */
    public CoapResponse sendPostRequest(String deviceId, String resourcePath, String payload) {
        try {
            CoapDevice device = deviceService.getDevice(deviceId);
            if (device == null) {
                throw new IllegalArgumentException("设备不存在:" + deviceId);
            }
            
            String uri = String.format("coap://%s:%d%s", 
                    device.getIpAddress(), device.getPort(), resourcePath);
            
            log.info("发送POST请求:uri={}, payload={}", uri, payload);
            
            coapClient.setURI(uri);
            CoapResponse response = coapClient.post(payload, MediaTypeRegistry.APPLICATION_JSON);
            
            // 记录消息
            messageService.recordMessage(deviceId, CoapMethod.POST, resourcePath, 
                    payload, MessageDirection.REQUEST);
            
            if (response != null) {
                messageService.recordMessage(deviceId, CoapMethod.POST, resourcePath, 
                        response.getResponseText(), MessageDirection.RESPONSE);
                
                log.info("POST请求成功:deviceId={}, code={}", deviceId, response.getCode());
            } else {
                log.warn("POST请求超时:deviceId={}", deviceId);
            }
            
            return response;
            
        } catch (Exception e) {
            log.error("发送POST请求失败:deviceId={}, path={}", deviceId, resourcePath, e);
            throw new RuntimeException("POST请求失败", e);
        }
    }
    
    /**
     * 向设备发送PUT请求(控制指令)
     */
    public CoapResponse sendPutRequest(String deviceId, String resourcePath, String payload) {
        try {
            CoapDevice device = deviceService.getDevice(deviceId);
            if (device == null) {
                throw new IllegalArgumentException("设备不存在:" + deviceId);
            }
            
            String uri = String.format("coap://%s:%d%s", 
                    device.getIpAddress(), device.getPort(), resourcePath);
            
            log.info("发送PUT请求:uri={}, payload={}", uri, payload);
            
            coapClient.setURI(uri);
            CoapResponse response = coapClient.put(payload, MediaTypeRegistry.APPLICATION_JSON);
            
            // 记录消息
            messageService.recordMessage(deviceId, CoapMethod.PUT, resourcePath, 
                    payload, MessageDirection.REQUEST);
            
            if (response != null) {
                messageService.recordMessage(deviceId, CoapMethod.PUT, resourcePath, 
                        response.getResponseText(), MessageDirection.RESPONSE);
                
                log.info("PUT请求成功:deviceId={}, code={}", deviceId, response.getCode());
            } else {
                log.warn("PUT请求超时:deviceId={}", deviceId);
            }
            
            return response;
            
        } catch (Exception e) {
            log.error("发送PUT请求失败:deviceId={}, path={}", deviceId, resourcePath, e);
            throw new RuntimeException("PUT请求失败", e);
        }
    }
    
    /**
     * 观察设备资源
     */
    public void observeResource(String deviceId, String resourcePath, CoapObserveRelation.CoapObserver observer) {
        try {
            CoapDevice device = deviceService.getDevice(deviceId);
            if (device == null) {
                throw new IllegalArgumentException("设备不存在:" + deviceId);
            }
            
            String uri = String.format("coap://%s:%d%s", 
                    device.getIpAddress(), device.getPort(), resourcePath);
            
            log.info("开始观察资源:uri={}", uri);
            
            coapClient.setURI(uri);
            CoapObserveRelation relation = coapClient.observe(observer);
            
            log.info("资源观察已建立:deviceId={}, path={}", deviceId, resourcePath);
            
        } catch (Exception e) {
            log.error("建立资源观察失败:deviceId={}, path={}", deviceId, resourcePath, e);
            throw new RuntimeException("资源观察失败", e);
        }
    }
}

# 数据库设计

# 1. CoAP设备表

-- CoAP设备信息表
CREATE TABLE coap_device (
    id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
    device_id VARCHAR(100) NOT NULL UNIQUE COMMENT '设备ID',
    device_name VARCHAR(200) COMMENT '设备名称',
    ip_address VARCHAR(50) COMMENT 'IP地址',
    port INT DEFAULT 5683 COMMENT 'CoAP端口',
    device_type VARCHAR(20) COMMENT '设备类型',
    status VARCHAR(20) COMMENT '设备状态',
    last_communication DATETIME COMMENT '最后通信时间',
    resources TEXT COMMENT '设备资源列表(JSON)',
    config TEXT COMMENT '设备配置(JSON)',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    
    INDEX idx_device_id (device_id),
    INDEX idx_ip_address (ip_address),
    INDEX idx_device_type (device_type),
    INDEX idx_status (status),
    INDEX idx_last_communication (last_communication)
) COMMENT='CoAP设备信息表';

# 2. CoAP消息表

-- CoAP消息记录表
CREATE TABLE coap_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
    message_id VARCHAR(50) NOT NULL UNIQUE COMMENT '消息ID',
    device_id VARCHAR(100) NOT NULL COMMENT '设备ID',
    method VARCHAR(10) COMMENT 'CoAP方法',
    resource_path VARCHAR(200) COMMENT '资源路径',
    payload TEXT COMMENT '消息载荷',
    response_code VARCHAR(10) COMMENT '响应码',
    message_type VARCHAR(10) COMMENT '消息类型',
    direction VARCHAR(10) NOT NULL COMMENT '消息方向',
    process_time BIGINT COMMENT '处理时间(毫秒)',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    
    INDEX idx_message_id (message_id),
    INDEX idx_device_id (device_id),
    INDEX idx_method (method),
    INDEX idx_resource_path (resource_path),
    INDEX idx_direction (direction),
    INDEX idx_create_time (create_time)
) COMMENT='CoAP消息记录表';

# 3. CoAP资源表

-- CoAP资源定义表
CREATE TABLE coap_resource (
    id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键ID',
    device_id VARCHAR(100) NOT NULL COMMENT '设备ID',
    resource_path VARCHAR(200) NOT NULL COMMENT '资源路径',
    resource_name VARCHAR(100) COMMENT '资源名称',
    resource_type VARCHAR(20) COMMENT '资源类型',
    supported_methods VARCHAR(100) COMMENT '支持的方法(JSON)',
    content_format VARCHAR(50) COMMENT '内容格式',
    observable BOOLEAN DEFAULT FALSE COMMENT '是否可观察',
    description TEXT COMMENT '资源描述',
    last_update DATETIME COMMENT '最后更新时间',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    
    UNIQUE KEY uk_device_resource (device_id, resource_path),
    INDEX idx_device_id (device_id),
    INDEX idx_resource_path (resource_path),
    INDEX idx_resource_type (resource_type),
    INDEX idx_observable (observable)
) COMMENT='CoAP资源定义表';

# 性能优化策略

# 1. UDP连接池管理

/**
 * UDP连接池管理器
 * 管理CoAP的UDP连接,提高并发处理能力
 */
@Component
@Slf4j
public class UdpConnectionPoolManager {
    
    private final Map<String, DatagramSocket> socketPool = new ConcurrentHashMap<>();
    private final Object poolLock = new Object();
    
    @Value("${coap.pool.max-size:50}")
    private int maxPoolSize;
    
    @Value("${coap.pool.timeout:30000}")
    private int socketTimeout;
    
    /**
     * 获取UDP套接字
     */
    public DatagramSocket getSocket(String key) {
        synchronized (poolLock) {
            DatagramSocket socket = socketPool.get(key);
            
            if (socket == null || socket.isClosed()) {
                try {
                    socket = new DatagramSocket();
                    socket.setSoTimeout(socketTimeout);
                    socketPool.put(key, socket);
                    
                    log.debug("创建新的UDP套接字:key={}", key);
                    
                } catch (SocketException e) {
                    log.error("创建UDP套接字失败:key={}", key, e);
                    throw new RuntimeException("UDP套接字创建失败", e);
                }
            }
            
            return socket;
        }
    }
    
    /**
     * 释放UDP套接字
     */
    public void releaseSocket(String key) {
        synchronized (poolLock) {
            DatagramSocket socket = socketPool.remove(key);
            if (socket != null && !socket.isClosed()) {
                socket.close();
                log.debug("释放UDP套接字:key={}", key);
            }
        }
    }
    
    /**
     * 清理过期连接
     */
    @Scheduled(fixedRate = 60000) // 每分钟清理一次
    public void cleanupExpiredSockets() {
        synchronized (poolLock) {
            Iterator<Map.Entry<String, DatagramSocket>> iterator = socketPool.entrySet().iterator();
            
            while (iterator.hasNext()) {
                Map.Entry<String, DatagramSocket> entry = iterator.next();
                DatagramSocket socket = entry.getValue();
                
                if (socket.isClosed()) {
                    iterator.remove();
                    log.debug("清理已关闭的套接字:key={}", entry.getKey());
                }
            }
            
            log.info("UDP连接池清理完成,当前连接数:{}", socketPool.size());
        }
    }
    
    /**
     * 获取连接池状态
     */
    public Map<String, Object> getPoolStatus() {
        Map<String, Object> status = new HashMap<>();
        status.put("activeConnections", socketPool.size());
        status.put("maxPoolSize", maxPoolSize);
        status.put("socketTimeout", socketTimeout);
        
        return status;
    }
}

# 2. 消息缓存策略

/**
 * CoAP消息缓存管理器
 * 缓存最近的消息,提高查询性能
 */
@Component
@Slf4j
public class CoapMessageCacheManager {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String CACHE_PREFIX = "coap:message:";
    private static final String DEVICE_CACHE_PREFIX = "coap:device:messages:";
    
    /**
     * 缓存消息
     */
    public void cacheMessage(CoapMessage message) {
        try {
            // 缓存单个消息
            String messageKey = CACHE_PREFIX + message.getMessageId();
            redisTemplate.opsForValue().set(messageKey, message, Duration.ofHours(24));
            
            // 缓存到设备消息列表
            String deviceKey = DEVICE_CACHE_PREFIX + message.getDeviceId();
            redisTemplate.opsForList().leftPush(deviceKey, message);
            redisTemplate.opsForList().trim(deviceKey, 0, 99); // 只保留最近100条
            redisTemplate.expire(deviceKey, Duration.ofDays(7));
            
            log.debug("消息已缓存:messageId={}", message.getMessageId());
            
        } catch (Exception e) {
            log.error("缓存消息失败:messageId={}", message.getMessageId(), e);
        }
    }
    
    /**
     * 获取缓存的消息
     */
    public CoapMessage getCachedMessage(String messageId) {
        try {
            String key = CACHE_PREFIX + messageId;
            return (CoapMessage) redisTemplate.opsForValue().get(key);
            
        } catch (Exception e) {
            log.error("获取缓存消息失败:messageId={}", messageId, e);
            return null;
        }
    }
    
    /**
     * 获取设备最近的消息
     */
    public List<CoapMessage> getDeviceRecentMessages(String deviceId, int limit) {
        try {
            String key = DEVICE_CACHE_PREFIX + deviceId;
            List<Object> messages = redisTemplate.opsForList().range(key, 0, limit - 1);
            
            return messages.stream()
                    .map(obj -> (CoapMessage) obj)
                    .collect(Collectors.toList());
            
        } catch (Exception e) {
            log.error("获取设备最近消息失败:deviceId={}", deviceId, e);
            return Collections.emptyList();
        }
    }
    
    /**
     * 清理过期缓存
     */
    @Scheduled(fixedRate = 3600000) // 每小时清理一次
    public void cleanupExpiredCache() {
        try {
            // 这里可以实现更复杂的缓存清理逻辑
            log.info("CoAP消息缓存清理完成");
            
        } catch (Exception e) {
            log.error("清理缓存失败", e);
        }
    }
}

# 监控和告警

# 1. CoAP性能监控

/**
 * CoAP性能监控服务
 * 监控CoAP服务器的性能指标
 */
@Service
@Slf4j
public class CoapPerformanceMonitor {
    
    private final AtomicLong requestCount = new AtomicLong(0);
    private final AtomicLong responseCount = new AtomicLong(0);
    private final AtomicLong errorCount = new AtomicLong(0);
    private final AtomicLong totalProcessTime = new AtomicLong(0);
    
    @Autowired
    private AlertService alertService;
    
    /**
     * 记录请求
     */
    public void recordRequest() {
        requestCount.incrementAndGet();
    }
    
    /**
     * 记录响应
     */
    public void recordResponse(long processTime) {
        responseCount.incrementAndGet();
        totalProcessTime.addAndGet(processTime);
    }
    
    /**
     * 记录错误
     */
    public void recordError() {
        errorCount.incrementAndGet();
    }
    
    /**
     * 定时报告性能指标
     */
    @Scheduled(fixedRate = 60000) // 每分钟报告一次
    public void reportMetrics() {
        long requests = requestCount.get();
        long responses = responseCount.get();
        long errors = errorCount.get();
        long totalTime = totalProcessTime.get();
        
        // 计算平均响应时间
        double avgResponseTime = responses > 0 ? (double) totalTime / responses : 0;
        
        // 计算错误率
        double errorRate = requests > 0 ? (double) errors / requests * 100 : 0;
        
        log.info("CoAP性能指标 - 请求数:{}, 响应数:{}, 错误数:{}, 平均响应时间:{}ms, 错误率:{}%", 
                requests, responses, errors, String.format("%.2f", avgResponseTime), 
                String.format("%.2f", errorRate));
        
        // 检查告警条件
        checkAlertConditions(errorRate, avgResponseTime);
        
        // 重置计数器(可选)
        // resetCounters();
    }
    
    /**
     * 检查告警条件
     */
    private void checkAlertConditions(double errorRate, double avgResponseTime) {
        // 错误率过高告警
        if (errorRate > 10.0) {
            alertService.sendAlert("CoAP错误率过高", 
                    String.format("当前错误率:%.2f%%,超过阈值10%%", errorRate));
        }
        
        // 响应时间过长告警
        if (avgResponseTime > 5000) {
            alertService.sendAlert("CoAP响应时间过长", 
                    String.format("当前平均响应时间:%.2fms,超过阈值5000ms", avgResponseTime));
        }
    }
    
    /**
     * 重置计数器
     */
    private void resetCounters() {
        requestCount.set(0);
        responseCount.set(0);
        errorCount.set(0);
        totalProcessTime.set(0);
    }
    
    /**
     * 获取当前性能指标
     */
    public Map<String, Object> getCurrentMetrics() {
        Map<String, Object> metrics = new HashMap<>();
        
        long requests = requestCount.get();
        long responses = responseCount.get();
        long errors = errorCount.get();
        long totalTime = totalProcessTime.get();
        
        metrics.put("requestCount", requests);
        metrics.put("responseCount", responses);
        metrics.put("errorCount", errors);
        metrics.put("avgResponseTime", responses > 0 ? (double) totalTime / responses : 0);
        metrics.put("errorRate", requests > 0 ? (double) errors / requests * 100 : 0);
        
        return metrics;
    }
}

# 配置文件

# application.yml

# CoAP服务器配置
coap:
  server:
    port: 5683
    max-message-size: 1024
    ack-timeout: 2000
    max-retransmit: 3
    preferred-block-size: 512
  
  # 连接池配置
  pool:
    max-size: 50
    timeout: 30000
  
  # 客户端配置
  client:
    timeout: 5000
    max-retries: 3

# 数据库配置
spring:
  datasource:
    url: jdbc:mysql://localhost:3306/iot_platform?useUnicode=true&characterEncoding=utf8&useSSL=false
    username: root
    password: 123456
    driver-class-name: com.mysql.cj.jdbc.Driver
  
  # JPA配置
  jpa:
    hibernate:
      ddl-auto: update
    show-sql: false
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQL8Dialect
  
  # Redis配置
  redis:
    host: localhost
    port: 6379
    password: 
    database: 1
    timeout: 3000
    lettuce:
      pool:
        max-active: 50
        max-idle: 10
        min-idle: 2
        max-wait: 3000

# 日志配置
logging:
  level:
    com.iot.coap: DEBUG
    org.eclipse.californium: INFO

# 总结

通过这个CoAP设备接入层的实现,我们构建了一个专为资源受限设备设计的通信系统:

  1. 轻量级通信:基于UDP的CoAP协议,减少网络开销
  2. RESTful接口:类似HTTP的操作方式,易于理解和使用
  3. 设备管理:完整的设备注册、发现和状态管理
  4. 资源管理:灵活的资源定义和访问控制
  5. 观察模式:支持实时数据订阅和推送
  6. 性能优化:连接池、消息缓存等优化策略
  7. 监控告警:全面的性能监控和异常告警

这就像是为"节能设备"建立了一个高效的"短信系统",让它们可以在最小的能耗下与服务器保持有效通信!