From bec8cc6ef8a3a604f742664bc2368e3caf4f5d96 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 18 Jan 2026 09:05:00 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat=EF=BC=9A=E3=80=90iot=E3=80=91coap=20?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E6=8E=A5=E5=85=A5=2050%=EF=BC=9A=E5=88=9D?= =?UTF-8?q?=E5=A7=8B=E5=8C=96=E6=95=B4=E4=BD=93=E5=AE=9E=E7=8E=B0=EF=BC=8C?= =?UTF-8?q?=E5=9F=BA=E4=BA=8E=20pure-wishing-muffin.md=20=E8=A7=84?= =?UTF-8?q?=E5=88=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/IotGatewayConfiguration.java | 28 ++ .../gateway/config/IotGatewayProperties.java | 40 +++ .../coap/IotCoapDownstreamSubscriber.java | 46 ++++ .../coap/IotCoapUpstreamProtocol.java | 91 +++++++ .../gateway/protocol/coap/package-info.java | 14 + .../coap/router/IotCoapAuthHandler.java | 138 ++++++++++ .../coap/router/IotCoapAuthResource.java | 37 +++ .../coap/router/IotCoapUpstreamHandler.java | 243 ++++++++++++++++++ .../router/IotCoapUpstreamTopicResource.java | 67 +++++ .../gateway/protocol/emqx/package-info.java | 1 + .../gateway/protocol/http/package-info.java | 2 + .../gateway/protocol/tcp/package-info.java | 2 + .../src/main/resources/application.yaml | 10 + .../coap/IotCoapProtocolIntegrationTest.java | 158 ++++++++++++ 14 files changed, 877 insertions(+) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapDownstreamSubscriber.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/package-info.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthResource.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamTopicResource.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/package-info.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/package-info.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/package-info.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocolIntegrationTest.java diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index 3e573efdde..ef332f1de4 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -1,6 +1,10 @@ package cn.iocoder.yudao.module.iot.gateway.config; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; @@ -194,4 +198,28 @@ public class IotGatewayConfiguration { } + /** + * IoT 网关 CoAP 协议配置类 + */ + @Configuration + @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.coap", name = "enabled", havingValue = "true") + @Slf4j + public static class CoapProtocolConfiguration { + + @Bean + public IotCoapUpstreamProtocol iotCoapUpstreamProtocol(IotGatewayProperties gatewayProperties, + IotCoapAuthHandler authHandler, + IotCoapUpstreamHandler upstreamHandler) { + return new IotCoapUpstreamProtocol(gatewayProperties.getProtocol().getCoap(), + authHandler, upstreamHandler); + } + + @Bean + public IotCoapDownstreamSubscriber iotCoapDownstreamSubscriber(IotCoapUpstreamProtocol coapUpstreamProtocol, + IotMessageBus messageBus) { + return new IotCoapDownstreamSubscriber(coapUpstreamProtocol, messageBus); + } + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 7655a3759e..16045387f8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -93,6 +93,11 @@ public class IotGatewayProperties { */ private MqttWsProperties mqttWs; + /** + * CoAP 组件配置 + */ + private CoapProperties coap; + } @Data @@ -503,4 +508,39 @@ public class IotGatewayProperties { } + @Data + public static class CoapProperties { + + /** + * 是否开启 + */ + @NotNull(message = "是否开启不能为空") + private Boolean enabled; + + /** + * 服务端口(CoAP 默认端口 5683) + */ + // TODO @AI:默认不为空 + private Integer port = 5683; + + /** + * 最大消息大小(字节) + */ + // TODO @AI:默认不为空 + private Integer maxMessageSize = 1024; + + /** + * ACK 超时时间(毫秒) + */ + // TODO @AI:默认不为空 + private Integer ackTimeout = 2000; + + /** + * 最大重传次数 + */ + // TODO @AI:默认不为空 + private Integer maxRetransmit = 4; + + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapDownstreamSubscriber.java new file mode 100644 index 0000000000..d01cdc416c --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapDownstreamSubscriber.java @@ -0,0 +1,46 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap; + +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * IoT 网关 CoAP 订阅者:接收下行给设备的消息 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class IotCoapDownstreamSubscriber implements IotMessageSubscriber { + + private final IotCoapUpstreamProtocol protocol; + + private final IotMessageBus messageBus; + + @PostConstruct + public void init() { + messageBus.register(this); + } + + @Override + public String getTopic() { + return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId()); + } + + @Override + public String getGroup() { + // 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group + return getTopic(); + } + + @Override + public void onMessage(IotDeviceMessage message) { + // 如需支持,可通过 CoAP Observe 模式实现(设备订阅资源,服务器推送变更) + log.warn("[onMessage][IoT 网关 CoAP 协议暂不支持下行消息,忽略消息:{}]", message); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java new file mode 100644 index 0000000000..2e029e3d63 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java @@ -0,0 +1,91 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap; + +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthResource; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamTopicResource; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamHandler; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.CoapServer; +import org.eclipse.californium.core.config.CoapConfig; +import org.eclipse.californium.elements.config.Configuration; + +import java.util.concurrent.TimeUnit; + +/** + * IoT 网关 CoAP 协议:接收设备上行消息 + * + * 基于 Eclipse Californium 实现,支持: + * 1. 认证:POST /auth + * 2. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post + * 3. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/{eventId}/post + * + * @author 芋道源码 + */ +@Slf4j +public class IotCoapUpstreamProtocol { + + private final IotGatewayProperties.CoapProperties coapProperties; + + private final IotCoapAuthHandler authHandler; + private final IotCoapUpstreamHandler upstreamHandler; + + private CoapServer coapServer; + + @Getter + private final String serverId; + + public IotCoapUpstreamProtocol(IotGatewayProperties.CoapProperties coapProperties, + IotCoapAuthHandler authHandler, + IotCoapUpstreamHandler upstreamHandler) { + this.coapProperties = coapProperties; + this.authHandler = authHandler; + this.upstreamHandler = upstreamHandler; + this.serverId = IotDeviceMessageUtils.generateServerId(coapProperties.getPort()); + } + + @PostConstruct + public void start() { + try { + // 1.1 创建网络配置(Californium 3.x API) + Configuration config = Configuration.createStandardWithoutFile(); + config.set(CoapConfig.COAP_PORT, coapProperties.getPort()); + config.set(CoapConfig.MAX_MESSAGE_SIZE, coapProperties.getMaxMessageSize()); + config.set(CoapConfig.ACK_TIMEOUT, coapProperties.getAckTimeout(), TimeUnit.MILLISECONDS); + config.set(CoapConfig.MAX_RETRANSMIT, coapProperties.getMaxRetransmit()); + // 1.2 创建 CoAP 服务器 + coapServer = new CoapServer(config); + + // 2.1 添加 /auth 认证资源 + IotCoapAuthResource authResource = new IotCoapAuthResource(this, authHandler); + coapServer.add(authResource); + // 2.2 添加 /topic 根资源(用于上行消息) + IotCoapUpstreamTopicResource topicResource = new IotCoapUpstreamTopicResource(this, upstreamHandler); + coapServer.add(topicResource); + + // 3. 启动服务器 + coapServer.start(); + log.info("[start][IoT 网关 CoAP 协议启动成功,端口:{},资源:/auth, /topic]", coapProperties.getPort()); + } catch (Exception e) { + log.error("[start][IoT 网关 CoAP 协议启动失败]", e); + throw e; + } + } + + @PreDestroy + public void stop() { + if (coapServer != null) { + try { + coapServer.stop(); + log.info("[stop][IoT 网关 CoAP 协议已停止]"); + } catch (Exception e) { + log.error("[stop][IoT 网关 CoAP 协议停止失败]", e); + } + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/package-info.java new file mode 100644 index 0000000000..8da76ac784 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/package-info.java @@ -0,0 +1,14 @@ +/** + * IoT 网关 CoAP 协议 + * + * 基于 Eclipse Californium 实现,支持设备通过 CoAP 协议进行: + * 1. 属性上报:POST /sys/{productKey}/{deviceName}/thing/property/post + * 2. 事件上报:POST /sys/{productKey}/{deviceName}/thing/event/{eventId}/post + * + * 认证方式:通过 URI Query 参数 token 进行认证 + * 示例:coap://server:5683/sys/pk/dn/thing/property/post?token=xxx + * + * @author 芋道源码 + */ +// TODO @AI:参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java (现在注释应该有点不太对) +package cn.iocoder.yudao.module.iot.gateway.protocol.coap; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthHandler.java new file mode 100644 index 0000000000..72bd43f3c6 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthHandler.java @@ -0,0 +1,138 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router; + +import cn.hutool.core.lang.Assert; +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.MediaTypeRegistry; +import org.eclipse.californium.core.server.resources.CoapExchange; +import org.springframework.stereotype.Component; + +import java.util.Map; + +import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; + +/** + * IoT 网关 CoAP 协议的【认证】处理器 + * + * 参考 {@link cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAuthHandler} + * + * @author 芋道源码 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class IotCoapAuthHandler { + + private final IotDeviceTokenService deviceTokenService; + + private final IotDeviceCommonApi deviceApi; + + private final IotDeviceMessageService deviceMessageService; + + /** + * 处理认证请求 + * + * @param exchange CoAP 交换对象 + * @param protocol 协议对象 + */ + public void handle(CoapExchange exchange, IotCoapUpstreamProtocol protocol) { + try { + // 1.1 解析请求体 + byte[] payload = exchange.getRequestPayload(); + if (payload == null || payload.length == 0) { + respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空"); + return; + } + Map body; + try { + body = JsonUtils.parseObject(new String(payload), Map.class); + } catch (Exception e) { + respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体 JSON 格式错误"); + return; + } + // TODO @AI:通过 hutool maputil 去获取,简化下; + // 1.2 解析参数 + String clientId = (String) body.get("clientId"); + if (StrUtil.isEmpty(clientId)) { + respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "clientId 不能为空"); + return; + } + String username = (String) body.get("username"); + if (StrUtil.isEmpty(username)) { + respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "username 不能为空"); + return; + } + String password = (String) body.get("password"); + if (StrUtil.isEmpty(password)) { + respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "password 不能为空"); + return; + } + + // 2.1 执行认证 + CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() + .setClientId(clientId).setUsername(username).setPassword(password)); + if (result.isError()) { + log.warn("[handle][认证失败,clientId: {}, 错误: {}]", clientId, result.getMsg()); + respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败:" + result.getMsg()); + return; + } + if (!BooleanUtil.isTrue(result.getData())) { + log.warn("[handle][认证失败,clientId: {}]", clientId); + respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败"); + return; + } + // 2.2 生成 Token + IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.parseUsername(username); + Assert.notNull(deviceInfo, "设备信息不能为空"); + String token = deviceTokenService.createToken(deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + Assert.notBlank(token, "生成 token 不能为空"); + + // 3. 执行上线 + IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline(); + deviceMessageService.sendDeviceMessage(message, + deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId()); + + // 4. 返回成功响应 + log.info("[handle][认证成功,productKey: {}, deviceName: {}]", + deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + respondSuccess(exchange, MapUtil.of("token", token)); + } catch (Exception e) { + log.error("[handle][认证处理异常]", e); + respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误"); + } + } + + // TODO @AI:抽到 coap 的 util 里; + /** + * 返回成功响应 + */ + private void respondSuccess(CoapExchange exchange, Object data) { + CommonResult result = success(data); + String json = JsonUtils.toJsonString(result); + exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON); + } + + // TODO @AI:抽到 coap 的 util 里; + /** + * 返回错误响应 + */ + private void respondError(CoapExchange exchange, CoAP.ResponseCode code, String message) { + CommonResult result = CommonResult.error(code.value, message); + String json = JsonUtils.toJsonString(result); + exchange.respond(code, json, MediaTypeRegistry.APPLICATION_JSON); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthResource.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthResource.java new file mode 100644 index 0000000000..9d0d90cb3e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthResource.java @@ -0,0 +1,37 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router; + +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.CoapResource; +import org.eclipse.californium.core.server.resources.CoapExchange; + +/** + * IoT 网关 CoAP 协议的认证资源(/auth) + * + * 设备通过此资源进行认证,获取 Token + * + * @author 芋道源码 + */ +@Slf4j +public class IotCoapAuthResource extends CoapResource { + + public static final String PATH = "auth"; + + private final IotCoapUpstreamProtocol protocol; + private final IotCoapAuthHandler authHandler; + + public IotCoapAuthResource(IotCoapUpstreamProtocol protocol, + IotCoapAuthHandler authHandler) { + super(PATH); + this.protocol = protocol; + this.authHandler = authHandler; + log.info("[IotCoapAuthResource][创建 CoAP 认证资源: /{}]", PATH); + } + + @Override + public void handlePOST(CoapExchange exchange) { + log.debug("[handlePOST][收到 /auth POST 请求]"); + authHandler.handle(exchange, protocol); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamHandler.java new file mode 100644 index 0000000000..a0a68b3be9 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamHandler.java @@ -0,0 +1,243 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.text.StrPool; +import cn.hutool.core.util.ObjUtil; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.MediaTypeRegistry; +import org.eclipse.californium.core.server.resources.CoapExchange; +import org.springframework.stereotype.Component; + +import java.util.List; + +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*; + +/** + * IoT 网关 CoAP 协议的【上行】处理器 + * + * 处理设备通过 CoAP 协议发送的上行消息,包括: + * 1. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post + * 2. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/{eventId}/post + * + * Token 通过自定义 CoAP Option 2088 携带 + * + * @author 芋道源码 + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class IotCoapUpstreamHandler { + + /** + * 自定义 CoAP Option 编号,用于携带 Token + * CoAP Option 范围 2048-65535 属于实验/自定义范围 + */ + public static final int OPTION_TOKEN = 2088; + + private final IotDeviceTokenService deviceTokenService; + private final IotDeviceMessageService deviceMessageService; + + /** + * 处理 CoAP 请求 + * + * @param exchange CoAP 交换对象 + * @param httpMethod HTTP 方法 + * @param protocol 协议对象 + */ + public void handle(CoapExchange exchange, String httpMethod, IotCoapUpstreamProtocol protocol) { + try { + // TODO @AI:这种路径的解析,不用了,简化下,类似 IotHttpUpstreamHandler 这种就很简洁; + // 1. 解析 URI 路径:/topic/sys/{productKey}/{deviceName}/thing/... + // 完整路径是 [topic, sys, productKey, deviceName, thing, ...] + List uriPath = exchange.getRequestOptions().getUriPath(); + if (uriPath.size() < 6) { + respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, + "URI 路径格式错误,期望:/topic/sys/{productKey}/{deviceName}/..."); + return; + } + + // 验证路径格式:第一个应该是 "topic",第二个应该是 "sys" + if (!"topic".equals(uriPath.get(0)) || !"sys".equals(uriPath.get(1))) { + respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "URI 路径格式错误,期望以 /topic/sys 开头"); + return; + } + + // 解析 productKey 和 deviceName(索引 2 和 3) + String productKey = uriPath.get(2); + String deviceName = uriPath.get(3); + + // 2. 认证:优先从自定义 Option 获取 token,兼容 Query 参数 + String token = getTokenFromOption(exchange); + if (StrUtil.isEmpty(token)) { + // 兼容 Query 参数方式 + // TODO @AI:不用兼容 query,简化下; + token = getQueryParam(exchange, "token"); + } + if (StrUtil.isEmpty(token)) { + respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "缺少 token(请使用 Option " + OPTION_TOKEN + " 或 Query 参数携带)"); + return; + } + + // 验证 token + // TODO @AI:这里参考 IotHttpAbstractHandler 简化点校验; + IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.verifyToken(token); + if (deviceInfo == null) { + respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "token 无效或已过期"); + return; + } + // 验证设备信息匹配 + if (ObjUtil.notEqual(productKey, deviceInfo.getProductKey()) + || ObjUtil.notEqual(deviceName, deviceInfo.getDeviceName())) { + respondError(exchange, CoAP.ResponseCode.FORBIDDEN, "设备信息与 token 不匹配"); + return; + } + + // 3. 解析 method:将 URI 路径转换为 method 格式 + // /topic/sys/pk/dn/thing/property/post -> thing.property.post + // 路径是 [sys, pk, dn, thing, property, post],从索引 3 开始 + String method = buildMethod(uriPath); + + // 4. 解析并处理消息体 + byte[] payload = exchange.getRequestPayload(); + if (payload == null || payload.length == 0) { + respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空"); + return; + } + + // 5. 解码消息 + IotDeviceMessage message; + try { + message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); + } catch (Exception e) { + log.error("[handle][消息解码失败]", e); + respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "消息解码失败:" + e.getMessage()); + return; + } + + // 校验 method + // TODO @AI:不用校验 method;以 message 解析出来的为主; + if (!method.equals(message.getMethod())) { + log.warn("[handle][method 不匹配,URI: {}, 消息: {}]", method, message.getMethod()); + } + + // 6. 发送消息到消息总线 + deviceMessageService.sendDeviceMessage(message, productKey, deviceName, protocol.getServerId()); + + // 7. 返回成功响应 + respondSuccess(exchange, message.getId()); + } catch (Exception e) { + log.error("[handle][CoAP 请求处理异常]", e); + respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误"); + } + } + + /** + * 构建 method 字符串 + * + * 将 URI 路径转换为 method 格式,例如: + * [sys, pk, dn, thing, property, post] -> thing.property.post + * + * @param uriPath URI 路径列表 + * @return method 字符串 + */ + private String buildMethod(List uriPath) { + // 跳过 sys, productKey, deviceName,从第4个元素开始 + if (uriPath.size() > 3) { + return String.join(StrPool.DOT, uriPath.subList(3, uriPath.size())); + } + return ""; + } + + // TODO @AI:抽到 coap 的 util 里; + /** + * 从自定义 CoAP Option 中获取 Token + * + * @param exchange CoAP 交换对象 + * @return Token 值,如果不存在则返回 null + */ + private String getTokenFromOption(CoapExchange exchange) { + // 尝试从自定义 Option 2088 获取 Token + byte[] tokenBytes = exchange.getRequestOptions().getOthers().stream() + .filter(option -> option.getNumber() == OPTION_TOKEN) + .findFirst() + .map(option -> option.getValue()) + .orElse(null); + if (tokenBytes != null) { + return new String(tokenBytes); + } + return null; + } + + // TODO @AI:抽到 coap 的 util 里; + /** + * 从 URI Query 参数中获取指定 key 的值 + * + * @param exchange CoAP 交换对象 + * @param key 参数名 + * @return 参数值,如果不存在则返回 null + */ + private String getQueryParam(CoapExchange exchange, String key) { + for (String query : exchange.getRequestOptions().getUriQuery()) { + if (query.startsWith(key + "=")) { + return query.substring((key + "=").length()); + } + } + return null; + } + + // TODO @AI:抽到 coap 的 util 里; + /** + * 返回成功响应 + * + * @param exchange CoAP 交换对象 + * @param messageId 消息 ID + */ + private void respondSuccess(CoapExchange exchange, String messageId) { + CommonResult result = CommonResult.success(MapUtil.of("messageId", messageId)); + String json = JsonUtils.toJsonString(result); + exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON); + } + + // TODO @AI:抽到 coap 的 util 里; + /** + * 返回错误响应 + * + * @param exchange CoAP 交换对象 + * @param code CoAP 响应码 + * @param message 错误消息 + */ + private void respondError(CoapExchange exchange, CoAP.ResponseCode code, String message) { + // 将 CoAP 响应码映射到业务错误码 + int errorCode = mapCoapCodeToErrorCode(code); + CommonResult result = CommonResult.error(errorCode, message); + String json = JsonUtils.toJsonString(result); + exchange.respond(code, json, MediaTypeRegistry.APPLICATION_JSON); + } + + // TODO @AI:兼容 jdk8 的写法; + /** + * 将 CoAP 响应码映射到业务错误码 + * + * @param code CoAP 响应码 + * @return 业务错误码 + */ + private int mapCoapCodeToErrorCode(CoAP.ResponseCode code) { + return switch (code) { + case BAD_REQUEST -> BAD_REQUEST.getCode(); + case UNAUTHORIZED -> UNAUTHORIZED.getCode(); + case FORBIDDEN -> FORBIDDEN.getCode(); + default -> INTERNAL_SERVER_ERROR.getCode(); + }; + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamTopicResource.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamTopicResource.java new file mode 100644 index 0000000000..e8a9743e08 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamTopicResource.java @@ -0,0 +1,67 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router; + +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.CoapResource; +import org.eclipse.californium.core.server.resources.CoapExchange; +import org.eclipse.californium.core.server.resources.Resource; + +/** + * IoT 网关 CoAP 协议的【上行】Topic 资源 + * + * 支持任意深度的路径匹配: + * - /topic/sys/{productKey}/{deviceName}/thing/property/post + * - /topic/sys/{productKey}/{deviceName}/thing/event/{eventId}/post + * + * @author 芋道源码 + */ +@Slf4j +public class IotCoapUpstreamTopicResource extends CoapResource { + + public static final String PATH = "topic"; + + private final IotCoapUpstreamProtocol protocol; + private final IotCoapUpstreamHandler upstreamHandler; + + /** + * 创建根资源(/topic) + */ + public IotCoapUpstreamTopicResource(IotCoapUpstreamProtocol protocol, + IotCoapUpstreamHandler upstreamHandler) { + this(PATH, protocol, upstreamHandler); + log.info("[IotCoapUpstreamTopicResource][创建 CoAP 上行 Topic 资源: /{}]", PATH); + } + + /** + * 创建子资源(动态路径) + */ + private IotCoapUpstreamTopicResource(String name, + IotCoapUpstreamProtocol protocol, + IotCoapUpstreamHandler upstreamHandler) { + super(name); + this.protocol = protocol; + this.upstreamHandler = upstreamHandler; + } + + @Override + public Resource getChild(String name) { + // 递归创建动态子资源,支持任意深度路径 + return new IotCoapUpstreamTopicResource(name, protocol, upstreamHandler); + } + + @Override + public void handleGET(CoapExchange exchange) { + upstreamHandler.handle(exchange, "GET", protocol); + } + + @Override + public void handlePOST(CoapExchange exchange) { + upstreamHandler.handle(exchange, "POST", protocol); + } + + @Override + public void handlePUT(CoapExchange exchange) { + upstreamHandler.handle(exchange, "PUT", protocol); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/package-info.java new file mode 100644 index 0000000000..b64dd122bb --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/package-info.java @@ -0,0 +1 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.emqx; \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/package-info.java new file mode 100644 index 0000000000..5a027da02b --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/package-info.java @@ -0,0 +1,2 @@ +// TODO @AI:参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java 完善注释; +package cn.iocoder.yudao.module.iot.gateway.protocol.http; \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/package-info.java new file mode 100644 index 0000000000..e67eb497f4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/package-info.java @@ -0,0 +1,2 @@ +// TODO @AI:参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java +package cn.iocoder.yudao.module.iot.gateway.protocol.tcp; \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index f633f1c60b..ea3c68b037 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -117,6 +117,15 @@ yudao: keep-alive-timeout-seconds: 300 # 保持连接超时时间(秒) ssl-enabled: false # 是否启用 SSL(wss://) sub-protocol: mqtt # WebSocket 子协议 + # ==================================== + # 针对引入的 CoAP 组件的配置 + # ==================================== + coap: + enabled: false # 是否启用 CoAP 协议 + port: 5683 # CoAP 服务端口(默认 5683) + max-message-size: 1024 # 最大消息大小(字节) + ack-timeout: 2000 # ACK 超时时间(毫秒) + max-retransmit: 4 # 最大重传次数 --- #################### 日志相关配置 #################### @@ -137,6 +146,7 @@ logging: cn.iocoder.yudao.module.iot.gateway.protocol.http: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG cn.iocoder.yudao.module.iot.gateway.protocol.mqttws: DEBUG + cn.iocoder.yudao.module.iot.gateway.protocol.coap: DEBUG # 根日志级别 root: INFO diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocolIntegrationTest.java new file mode 100644 index 0000000000..799a48e359 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocolIntegrationTest.java @@ -0,0 +1,158 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap; + +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamHandler; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.CoapClient; +import org.eclipse.californium.core.CoapResponse; +import org.eclipse.californium.core.coap.MediaTypeRegistry; +import org.eclipse.californium.core.coap.Option; +import org.eclipse.californium.core.coap.Request; +import org.junit.jupiter.api.*; + +/** + * IoT 网关 CoAP 协议集成测试(手动测试) + * + * 使用步骤: + * 1. 启动 CoAP 网关服务(端口 5683) + * 2. 运行 testAuth() 获取 token + * 3. 将 token 粘贴到 TOKEN 常量 + * 4. 运行 testPropertyPost() 或 testEventPost() + * + * @author 芋道源码 + */ +@Slf4j +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class IotCoapProtocolIntegrationTest { + + private static final String SERVER_HOST = "127.0.0.1"; + private static final int SERVER_PORT = 5683; + + // 设备信息(根据实际情况修改) + private static final String PRODUCT_KEY = "testProductKey"; + private static final String DEVICE_NAME = "testDeviceName"; + private static final String CLIENT_ID = PRODUCT_KEY + "." + DEVICE_NAME; + private static final String USERNAME = DEVICE_NAME + "&" + PRODUCT_KEY; + private static final String PASSWORD = "testPassword123"; + + // TODO: 运行 testAuth() 后,将返回的 token 粘贴到这里 + private static final String TOKEN = "粘贴你的token到这里"; + + // ========== 1. 认证测试 ========== + + @Test + @Order(1) + @DisplayName("1. 认证 - 获取 Token") + void testAuth() throws Exception { + String uri = String.format("coap://%s:%d/auth", SERVER_HOST, SERVER_PORT); + + String payload = String.format(""" + { + "clientId": "%s", + "username": "%s", + "password": "%s" + } + """, CLIENT_ID, USERNAME, PASSWORD); + + CoapClient client = new CoapClient(uri); + try { + log.info("[testAuth][请求 URI: {}]", uri); + log.info("[testAuth][请求体: {}]", payload); + + CoapResponse response = client.post(payload, MediaTypeRegistry.APPLICATION_JSON); + + log.info("[testAuth][响应码: {}]", response.getCode()); + log.info("[testAuth][响应体: {}]", response.getResponseText()); + log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]"); + } finally { + client.shutdown(); + } + } + + // ========== 2. 属性上报测试 ========== + + @Test + @Order(2) + @DisplayName("2. 属性上报") + void testPropertyPost() throws Exception { + String uri = String.format("coap://%s:%d/topic/sys/%s/%s/thing/property/post", + SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME); + + String payload = """ + { + "id": "123", + "method": "thing.property.post", + "params": { + "temperature": 25.5, + "humidity": 60 + } + } + """; + + CoapClient client = new CoapClient(uri); + try { + // 构造带自定义 Option 的请求 + Request request = Request.newPost(); + request.setURI(uri); + request.setPayload(payload); + request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON); + // 添加自定义 Token Option (2088) + request.getOptions().addOption(new Option(IotCoapUpstreamHandler.OPTION_TOKEN, TOKEN)); + + log.info("[testPropertyPost][请求 URI: {}]", uri); + log.info("[testPropertyPost][Token: {}]", TOKEN); + log.info("[testPropertyPost][请求体: {}]", payload); + + CoapResponse response = client.advanced(request); + + log.info("[testPropertyPost][响应码: {}]", response.getCode()); + log.info("[testPropertyPost][响应体: {}]", response.getResponseText()); + } finally { + client.shutdown(); + } + } + + // ========== 3. 事件上报测试 ========== + + @Test + @Order(3) + @DisplayName("3. 事件上报") + void testEventPost() throws Exception { + String uri = String.format("coap://%s:%d/topic/sys/%s/%s/thing/event/alarm/post", + SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME); + + String payload = """ + { + "id": "456", + "method": "thing.event.alarm.post", + "params": { + "alarmType": "temperature_high", + "level": "warning", + "value": 85.2 + } + } + """; + + CoapClient client = new CoapClient(uri); + try { + // 构造带自定义 Option 的请求 + Request request = Request.newPost(); + request.setURI(uri); + request.setPayload(payload); + request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON); + // 添加自定义 Token Option (2088) + request.getOptions().addOption(new Option(IotCoapUpstreamHandler.OPTION_TOKEN, TOKEN)); + + log.info("[testEventPost][请求 URI: {}]", uri); + log.info("[testEventPost][Token: {}]", TOKEN); + log.info("[testEventPost][请求体: {}]", payload); + + CoapResponse response = client.advanced(request); + + log.info("[testEventPost][响应码: {}]", response.getCode()); + log.info("[testEventPost][响应体: {}]", response.getResponseText()); + } finally { + client.shutdown(); + } + } + +} From b270d82d7529e0a6fd6faa9a82acdb501daefa8f Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 18 Jan 2026 09:55:34 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat=EF=BC=9A=E3=80=90iot=E3=80=91coap=20?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E6=8E=A5=E5=85=A5=20100%=EF=BC=9A=EF=BC=8C?= =?UTF-8?q?=E5=9F=BA=E4=BA=8E=20rippling-noodling-wombat.d=20=E8=A7=84?= =?UTF-8?q?=E5=88=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yudao-dependencies/pom.xml | 8 + .../yudao-module-iot-gateway/pom.xml | 6 + .../config/IotGatewayConfiguration.java | 9 +- .../coap/IotCoapUpstreamProtocol.java | 13 +- .../gateway/protocol/coap/package-info.java | 21 +- .../coap/router/IotCoapAuthHandler.java | 63 ++--- .../coap/router/IotCoapUpstreamHandler.java | 217 ++++-------------- .../router/IotCoapUpstreamTopicResource.java | 6 +- .../protocol/coap/util/IotCoapUtils.java | 84 +++++++ .../src/main/resources/application.yaml | 4 +- .../coap/IotCoapProtocolIntegrationTest.java | 146 ++++++------ 11 files changed, 258 insertions(+), 319 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/util/IotCoapUtils.java diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index 0257eb3109..7bcb9cd48a 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -68,6 +68,7 @@ 4.2.9.Final 1.2.5 4.5.22 + 3.12.0 2.40.15 1.16.7 @@ -653,6 +654,13 @@ org.eclipse.paho.client.mqttv3 ${mqtt.version} + + + + org.eclipse.californium + californium-core + ${californium.version} + diff --git a/yudao-module-iot/yudao-module-iot-gateway/pom.xml b/yudao-module-iot/yudao-module-iot-gateway/pom.xml index 8fde9dc3ce..38ace822d8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/pom.xml +++ b/yudao-module-iot/yudao-module-iot-gateway/pom.xml @@ -48,6 +48,12 @@ vertx-mqtt + + + org.eclipse.californium + californium-core + + cn.iocoder.boot diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index ef332f1de4..d47e4f9cf6 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -3,8 +3,6 @@ package cn.iocoder.yudao.module.iot.gateway.config; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamHandler; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; @@ -207,11 +205,8 @@ public class IotGatewayConfiguration { public static class CoapProtocolConfiguration { @Bean - public IotCoapUpstreamProtocol iotCoapUpstreamProtocol(IotGatewayProperties gatewayProperties, - IotCoapAuthHandler authHandler, - IotCoapUpstreamHandler upstreamHandler) { - return new IotCoapUpstreamProtocol(gatewayProperties.getProtocol().getCoap(), - authHandler, upstreamHandler); + public IotCoapUpstreamProtocol iotCoapUpstreamProtocol(IotGatewayProperties gatewayProperties) { + return new IotCoapUpstreamProtocol(gatewayProperties.getProtocol().getCoap()); } @Bean diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java index 2e029e3d63..42f4c8edc2 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit; * 基于 Eclipse Californium 实现,支持: * 1. 认证:POST /auth * 2. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post - * 3. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/{eventId}/post + * 3. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post * * @author 芋道源码 */ @@ -31,20 +31,13 @@ public class IotCoapUpstreamProtocol { private final IotGatewayProperties.CoapProperties coapProperties; - private final IotCoapAuthHandler authHandler; - private final IotCoapUpstreamHandler upstreamHandler; - private CoapServer coapServer; @Getter private final String serverId; - public IotCoapUpstreamProtocol(IotGatewayProperties.CoapProperties coapProperties, - IotCoapAuthHandler authHandler, - IotCoapUpstreamHandler upstreamHandler) { + public IotCoapUpstreamProtocol(IotGatewayProperties.CoapProperties coapProperties) { this.coapProperties = coapProperties; - this.authHandler = authHandler; - this.upstreamHandler = upstreamHandler; this.serverId = IotDeviceMessageUtils.generateServerId(coapProperties.getPort()); } @@ -61,9 +54,11 @@ public class IotCoapUpstreamProtocol { coapServer = new CoapServer(config); // 2.1 添加 /auth 认证资源 + IotCoapAuthHandler authHandler = new IotCoapAuthHandler(); IotCoapAuthResource authResource = new IotCoapAuthResource(this, authHandler); coapServer.add(authResource); // 2.2 添加 /topic 根资源(用于上行消息) + IotCoapUpstreamHandler upstreamHandler = new IotCoapUpstreamHandler(); IotCoapUpstreamTopicResource topicResource = new IotCoapUpstreamTopicResource(this, upstreamHandler); coapServer.add(topicResource); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/package-info.java index 8da76ac784..94536a6439 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/package-info.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/package-info.java @@ -1,14 +1,13 @@ /** - * IoT 网关 CoAP 协议 - * - * 基于 Eclipse Californium 实现,支持设备通过 CoAP 协议进行: - * 1. 属性上报:POST /sys/{productKey}/{deviceName}/thing/property/post - * 2. 事件上报:POST /sys/{productKey}/{deviceName}/thing/event/{eventId}/post - * - * 认证方式:通过 URI Query 参数 token 进行认证 - * 示例:coap://server:5683/sys/pk/dn/thing/property/post?token=xxx - * - * @author 芋道源码 + * CoAP 协议实现包 + *

+ * 提供基于 Eclipse Californium 的 IoT 设备连接和消息处理功能 + *

+ * URI 路径: + * - 认证:POST /auth + * - 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post + * - 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post + *

+ * Token 通过 CoAP Option 2088 携带 */ -// TODO @AI:参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java (现在注释应该有点不太对) package cn.iocoder.yudao.module.iot.gateway.protocol.coap; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthHandler.java index 72bd43f3c6..2348cf990b 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthHandler.java @@ -4,6 +4,7 @@ import cn.hutool.core.lang.Assert; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; @@ -11,19 +12,15 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.util.IotCoapUtils; import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.coap.CoAP; -import org.eclipse.californium.core.coap.MediaTypeRegistry; import org.eclipse.californium.core.server.resources.CoapExchange; -import org.springframework.stereotype.Component; import java.util.Map; -import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; - /** * IoT 网关 CoAP 协议的【认证】处理器 * @@ -31,53 +28,55 @@ import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; * * @author 芋道源码 */ -@Component -@RequiredArgsConstructor @Slf4j public class IotCoapAuthHandler { private final IotDeviceTokenService deviceTokenService; - private final IotDeviceCommonApi deviceApi; - private final IotDeviceMessageService deviceMessageService; + public IotCoapAuthHandler() { + this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class); + this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); + } + /** * 处理认证请求 * * @param exchange CoAP 交换对象 * @param protocol 协议对象 */ + @SuppressWarnings("unchecked") public void handle(CoapExchange exchange, IotCoapUpstreamProtocol protocol) { try { // 1.1 解析请求体 byte[] payload = exchange.getRequestPayload(); if (payload == null || payload.length == 0) { - respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空"); + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空"); return; } Map body; try { body = JsonUtils.parseObject(new String(payload), Map.class); } catch (Exception e) { - respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体 JSON 格式错误"); + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体 JSON 格式错误"); return; } - // TODO @AI:通过 hutool maputil 去获取,简化下; // 1.2 解析参数 - String clientId = (String) body.get("clientId"); + String clientId = MapUtil.getStr(body, "clientId"); if (StrUtil.isEmpty(clientId)) { - respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "clientId 不能为空"); + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "clientId 不能为空"); return; } - String username = (String) body.get("username"); + String username = MapUtil.getStr(body, "username"); if (StrUtil.isEmpty(username)) { - respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "username 不能为空"); + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "username 不能为空"); return; } - String password = (String) body.get("password"); + String password = MapUtil.getStr(body, "password"); if (StrUtil.isEmpty(password)) { - respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "password 不能为空"); + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "password 不能为空"); return; } @@ -86,12 +85,12 @@ public class IotCoapAuthHandler { .setClientId(clientId).setUsername(username).setPassword(password)); if (result.isError()) { log.warn("[handle][认证失败,clientId: {}, 错误: {}]", clientId, result.getMsg()); - respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败:" + result.getMsg()); + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败:" + result.getMsg()); return; } if (!BooleanUtil.isTrue(result.getData())) { log.warn("[handle][认证失败,clientId: {}]", clientId); - respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败"); + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败"); return; } // 2.2 生成 Token @@ -108,31 +107,11 @@ public class IotCoapAuthHandler { // 4. 返回成功响应 log.info("[handle][认证成功,productKey: {}, deviceName: {}]", deviceInfo.getProductKey(), deviceInfo.getDeviceName()); - respondSuccess(exchange, MapUtil.of("token", token)); + IotCoapUtils.respondSuccess(exchange, MapUtil.of("token", token)); } catch (Exception e) { log.error("[handle][认证处理异常]", e); - respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误"); + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误"); } } - // TODO @AI:抽到 coap 的 util 里; - /** - * 返回成功响应 - */ - private void respondSuccess(CoapExchange exchange, Object data) { - CommonResult result = success(data); - String json = JsonUtils.toJsonString(result); - exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON); - } - - // TODO @AI:抽到 coap 的 util 里; - /** - * 返回错误响应 - */ - private void respondError(CoapExchange exchange, CoAP.ResponseCode code, String message) { - CommonResult result = CommonResult.error(code.value, message); - String json = JsonUtils.toJsonString(result); - exchange.respond(code, json, MediaTypeRegistry.APPLICATION_JSON); - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamHandler.java index a0a68b3be9..d51215fd6c 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamHandler.java @@ -1,243 +1,108 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; import cn.hutool.core.map.MapUtil; import cn.hutool.core.text.StrPool; +import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ObjUtil; import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils; import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.util.IotCoapUtils; import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.coap.CoAP; -import org.eclipse.californium.core.coap.MediaTypeRegistry; import org.eclipse.californium.core.server.resources.CoapExchange; -import org.springframework.stereotype.Component; import java.util.List; -import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*; - /** * IoT 网关 CoAP 协议的【上行】处理器 * * 处理设备通过 CoAP 协议发送的上行消息,包括: * 1. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post - * 2. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/{eventId}/post + * 2. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post * * Token 通过自定义 CoAP Option 2088 携带 * * @author 芋道源码 */ -@Component -@RequiredArgsConstructor @Slf4j public class IotCoapUpstreamHandler { - /** - * 自定义 CoAP Option 编号,用于携带 Token - * CoAP Option 范围 2048-65535 属于实验/自定义范围 - */ - public static final int OPTION_TOKEN = 2088; - private final IotDeviceTokenService deviceTokenService; private final IotDeviceMessageService deviceMessageService; + public IotCoapUpstreamHandler() { + this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class); + this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); + } + /** * 处理 CoAP 请求 * * @param exchange CoAP 交换对象 - * @param httpMethod HTTP 方法 * @param protocol 协议对象 */ - public void handle(CoapExchange exchange, String httpMethod, IotCoapUpstreamProtocol protocol) { + public void handle(CoapExchange exchange, IotCoapUpstreamProtocol protocol) { try { - // TODO @AI:这种路径的解析,不用了,简化下,类似 IotHttpUpstreamHandler 这种就很简洁; - // 1. 解析 URI 路径:/topic/sys/{productKey}/{deviceName}/thing/... - // 完整路径是 [topic, sys, productKey, deviceName, thing, ...] + // 1. 解析通用参数 List uriPath = exchange.getRequestOptions().getUriPath(); - if (uriPath.size() < 6) { - respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, - "URI 路径格式错误,期望:/topic/sys/{productKey}/{deviceName}/..."); + String productKey = CollUtil.get(uriPath, 2); + String deviceName = CollUtil.get(uriPath, 3); + byte[] payload = exchange.getRequestPayload(); + if (StrUtil.isEmpty(productKey)) { + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "productKey 不能为空"); + return; + } + if (StrUtil.isEmpty(deviceName)) { + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "deviceName 不能为空"); + return; + } + if (ArrayUtil.isEmpty(payload)) { + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空"); return; } - // 验证路径格式:第一个应该是 "topic",第二个应该是 "sys" - if (!"topic".equals(uriPath.get(0)) || !"sys".equals(uriPath.get(1))) { - respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "URI 路径格式错误,期望以 /topic/sys 开头"); - return; - } - - // 解析 productKey 和 deviceName(索引 2 和 3) - String productKey = uriPath.get(2); - String deviceName = uriPath.get(3); - - // 2. 认证:优先从自定义 Option 获取 token,兼容 Query 参数 - String token = getTokenFromOption(exchange); + // 2. 认证:从自定义 Option 获取 token + String token = IotCoapUtils.getTokenFromOption(exchange, IotCoapUtils.OPTION_TOKEN); if (StrUtil.isEmpty(token)) { - // 兼容 Query 参数方式 - // TODO @AI:不用兼容 query,简化下; - token = getQueryParam(exchange, "token"); - } - if (StrUtil.isEmpty(token)) { - respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "缺少 token(请使用 Option " + OPTION_TOKEN + " 或 Query 参数携带)"); + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "token 不能为空"); return; } - // 验证 token - // TODO @AI:这里参考 IotHttpAbstractHandler 简化点校验; IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.verifyToken(token); if (deviceInfo == null) { - respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "token 无效或已过期"); + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "token 无效或已过期"); return; } // 验证设备信息匹配 if (ObjUtil.notEqual(productKey, deviceInfo.getProductKey()) || ObjUtil.notEqual(deviceName, deviceInfo.getDeviceName())) { - respondError(exchange, CoAP.ResponseCode.FORBIDDEN, "设备信息与 token 不匹配"); + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.FORBIDDEN, "设备信息与 token 不匹配"); return; } - // 3. 解析 method:将 URI 路径转换为 method 格式 - // /topic/sys/pk/dn/thing/property/post -> thing.property.post - // 路径是 [sys, pk, dn, thing, property, post],从索引 3 开始 - String method = buildMethod(uriPath); + // 2.1 解析 method:deviceName 后面的路径,用 . 拼接 + // 路径格式:[topic, sys, productKey, deviceName, thing, property, post] + String method = String.join(StrPool.DOT, uriPath.subList(4, uriPath.size())); - // 4. 解析并处理消息体 - byte[] payload = exchange.getRequestPayload(); - if (payload == null || payload.length == 0) { - respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空"); - return; - } - - // 5. 解码消息 - IotDeviceMessage message; - try { - message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); - } catch (Exception e) { - log.error("[handle][消息解码失败]", e); - respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "消息解码失败:" + e.getMessage()); - return; - } - - // 校验 method - // TODO @AI:不用校验 method;以 message 解析出来的为主; - if (!method.equals(message.getMethod())) { - log.warn("[handle][method 不匹配,URI: {}, 消息: {}]", method, message.getMethod()); - } - - // 6. 发送消息到消息总线 + // 2.2 解码消息 + IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); + Assert.equals(method, message.getMethod(), "method 不匹配"); + // 2.3 发送消息到消息总线 deviceMessageService.sendDeviceMessage(message, productKey, deviceName, protocol.getServerId()); - // 7. 返回成功响应 - respondSuccess(exchange, message.getId()); + // 3. 返回成功响应 + IotCoapUtils.respondSuccess(exchange, MapUtil.of("messageId", message.getId())); } catch (Exception e) { log.error("[handle][CoAP 请求处理异常]", e); - respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误"); + IotCoapUtils.respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误"); } } - /** - * 构建 method 字符串 - * - * 将 URI 路径转换为 method 格式,例如: - * [sys, pk, dn, thing, property, post] -> thing.property.post - * - * @param uriPath URI 路径列表 - * @return method 字符串 - */ - private String buildMethod(List uriPath) { - // 跳过 sys, productKey, deviceName,从第4个元素开始 - if (uriPath.size() > 3) { - return String.join(StrPool.DOT, uriPath.subList(3, uriPath.size())); - } - return ""; - } - - // TODO @AI:抽到 coap 的 util 里; - /** - * 从自定义 CoAP Option 中获取 Token - * - * @param exchange CoAP 交换对象 - * @return Token 值,如果不存在则返回 null - */ - private String getTokenFromOption(CoapExchange exchange) { - // 尝试从自定义 Option 2088 获取 Token - byte[] tokenBytes = exchange.getRequestOptions().getOthers().stream() - .filter(option -> option.getNumber() == OPTION_TOKEN) - .findFirst() - .map(option -> option.getValue()) - .orElse(null); - if (tokenBytes != null) { - return new String(tokenBytes); - } - return null; - } - - // TODO @AI:抽到 coap 的 util 里; - /** - * 从 URI Query 参数中获取指定 key 的值 - * - * @param exchange CoAP 交换对象 - * @param key 参数名 - * @return 参数值,如果不存在则返回 null - */ - private String getQueryParam(CoapExchange exchange, String key) { - for (String query : exchange.getRequestOptions().getUriQuery()) { - if (query.startsWith(key + "=")) { - return query.substring((key + "=").length()); - } - } - return null; - } - - // TODO @AI:抽到 coap 的 util 里; - /** - * 返回成功响应 - * - * @param exchange CoAP 交换对象 - * @param messageId 消息 ID - */ - private void respondSuccess(CoapExchange exchange, String messageId) { - CommonResult result = CommonResult.success(MapUtil.of("messageId", messageId)); - String json = JsonUtils.toJsonString(result); - exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON); - } - - // TODO @AI:抽到 coap 的 util 里; - /** - * 返回错误响应 - * - * @param exchange CoAP 交换对象 - * @param code CoAP 响应码 - * @param message 错误消息 - */ - private void respondError(CoapExchange exchange, CoAP.ResponseCode code, String message) { - // 将 CoAP 响应码映射到业务错误码 - int errorCode = mapCoapCodeToErrorCode(code); - CommonResult result = CommonResult.error(errorCode, message); - String json = JsonUtils.toJsonString(result); - exchange.respond(code, json, MediaTypeRegistry.APPLICATION_JSON); - } - - // TODO @AI:兼容 jdk8 的写法; - /** - * 将 CoAP 响应码映射到业务错误码 - * - * @param code CoAP 响应码 - * @return 业务错误码 - */ - private int mapCoapCodeToErrorCode(CoAP.ResponseCode code) { - return switch (code) { - case BAD_REQUEST -> BAD_REQUEST.getCode(); - case UNAUTHORIZED -> UNAUTHORIZED.getCode(); - case FORBIDDEN -> FORBIDDEN.getCode(); - default -> INTERNAL_SERVER_ERROR.getCode(); - }; - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamTopicResource.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamTopicResource.java index e8a9743e08..1c694483fa 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamTopicResource.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamTopicResource.java @@ -51,17 +51,17 @@ public class IotCoapUpstreamTopicResource extends CoapResource { @Override public void handleGET(CoapExchange exchange) { - upstreamHandler.handle(exchange, "GET", protocol); + upstreamHandler.handle(exchange, protocol); } @Override public void handlePOST(CoapExchange exchange) { - upstreamHandler.handle(exchange, "POST", protocol); + upstreamHandler.handle(exchange, protocol); } @Override public void handlePUT(CoapExchange exchange) { - upstreamHandler.handle(exchange, "PUT", protocol); + upstreamHandler.handle(exchange, protocol); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/util/IotCoapUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/util/IotCoapUtils.java new file mode 100644 index 0000000000..9d5cdf3ffb --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/util/IotCoapUtils.java @@ -0,0 +1,84 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.util; + +import cn.hutool.core.collection.CollUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.MediaTypeRegistry; +import org.eclipse.californium.core.coap.Option; +import org.eclipse.californium.core.server.resources.CoapExchange; + +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*; + +/** + * IoT CoAP 协议工具类 + * + * @author 芋道源码 + */ +public class IotCoapUtils { + + /** + * 自定义 CoAP Option 编号,用于携带 Token + *

+ * CoAP Option 范围 2048-65535 属于实验/自定义范围 + */ + public static final int OPTION_TOKEN = 2088; + + /** + * 返回成功响应 + * + * @param exchange CoAP 交换对象 + * @param data 响应数据 + */ + public static void respondSuccess(CoapExchange exchange, Object data) { + CommonResult result = CommonResult.success(data); + String json = JsonUtils.toJsonString(result); + exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON); + } + + /** + * 返回错误响应 + * + * @param exchange CoAP 交换对象 + * @param code CoAP 响应码 + * @param message 错误消息 + */ + public static void respondError(CoapExchange exchange, CoAP.ResponseCode code, String message) { + int errorCode = mapCoapCodeToErrorCode(code); + CommonResult result = CommonResult.error(errorCode, message); + String json = JsonUtils.toJsonString(result); + exchange.respond(code, json, MediaTypeRegistry.APPLICATION_JSON); + } + + /** + * 从自定义 CoAP Option 中获取 Token + * + * @param exchange CoAP 交换对象 + * @param optionNumber Option 编号 + * @return Token 值,如果不存在则返回 null + */ + public static String getTokenFromOption(CoapExchange exchange, int optionNumber) { + Option option = CollUtil.findOne(exchange.getRequestOptions().getOthers(), + o -> o.getNumber() == optionNumber); + return option != null ? new String(option.getValue()) : null; + } + + /** + * 将 CoAP 响应码映射到业务错误码 + * + * @param code CoAP 响应码 + * @return 业务错误码 + */ + public static int mapCoapCodeToErrorCode(CoAP.ResponseCode code) { + if (code == CoAP.ResponseCode.BAD_REQUEST) { + return BAD_REQUEST.getCode(); + } else if (code == CoAP.ResponseCode.UNAUTHORIZED) { + return UNAUTHORIZED.getCode(); + } else if (code == CoAP.ResponseCode.FORBIDDEN) { + return FORBIDDEN.getCode(); + } else { + return INTERNAL_SERVER_ERROR.getCode(); + } + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index ea3c68b037..1187d430ad 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -108,7 +108,7 @@ yudao: # 针对引入的 MQTT WebSocket 组件的配置 # ==================================== mqtt-ws: - enabled: true # 是否启用 MQTT WebSocket + enabled: false # 是否启用 MQTT WebSocket port: 8083 # WebSocket 服务端口 path: /mqtt # WebSocket 路径 max-message-size: 8192 # 最大消息大小(字节) @@ -121,7 +121,7 @@ yudao: # 针对引入的 CoAP 组件的配置 # ==================================== coap: - enabled: false # 是否启用 CoAP 协议 + enabled: true # 是否启用 CoAP 协议 port: 5683 # CoAP 服务端口(默认 5683) max-message-size: 1024 # 最大消息大小(字节) ack-timeout: 2000 # ACK 超时时间(毫秒) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocolIntegrationTest.java index 799a48e359..e5e214cde1 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocolIntegrationTest.java @@ -1,57 +1,74 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.coap; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamHandler; +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.util.IotCoapUtils; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.CoapClient; import org.eclipse.californium.core.CoapResponse; import org.eclipse.californium.core.coap.MediaTypeRegistry; import org.eclipse.californium.core.coap.Option; import org.eclipse.californium.core.coap.Request; -import org.junit.jupiter.api.*; +import org.eclipse.californium.core.config.CoapConfig; +import org.eclipse.californium.elements.config.Configuration; +import org.eclipse.californium.elements.config.UdpConfig; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; /** * IoT 网关 CoAP 协议集成测试(手动测试) * - * 使用步骤: - * 1. 启动 CoAP 网关服务(端口 5683) - * 2. 运行 testAuth() 获取 token - * 3. 将 token 粘贴到 TOKEN 常量 - * 4. 运行 testPropertyPost() 或 testEventPost() + *

使用步骤: + *

    + *
  1. 启动 yudao-module-iot-gateway 服务(CoAP 端口 5683)
  2. + *
  3. 运行 {@link #testAuth()} 获取 token,将返回的 token 粘贴到 {@link #TOKEN} 常量
  4. + *
  5. 运行 {@link #testPropertyPost()} 测试属性上报,或运行 {@link #testEventPost()} 测试事件上报
  6. + *
* * @author 芋道源码 */ @Slf4j -@TestMethodOrder(MethodOrderer.OrderAnnotation.class) -class IotCoapProtocolIntegrationTest { +public class IotCoapProtocolIntegrationTest { private static final String SERVER_HOST = "127.0.0.1"; private static final int SERVER_PORT = 5683; - // 设备信息(根据实际情况修改) - private static final String PRODUCT_KEY = "testProductKey"; - private static final String DEVICE_NAME = "testDeviceName"; + // 设备信息(根据实际情况修改 PRODUCT_KEY、DEVICE_NAME、PASSWORD) + private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT"; + private static final String DEVICE_NAME = "small"; + private static final String PASSWORD = "509e2b08f7598eb139d276388c600435913ba4c94cd0d50aebc5c0d1855bcb75"; + private static final String CLIENT_ID = PRODUCT_KEY + "." + DEVICE_NAME; private static final String USERNAME = DEVICE_NAME + "&" + PRODUCT_KEY; - private static final String PASSWORD = "testPassword123"; - // TODO: 运行 testAuth() 后,将返回的 token 粘贴到这里 - private static final String TOKEN = "粘贴你的token到这里"; + /** + * 设备 Token:从 {@link #testAuth()} 方法获取后,粘贴到这里 + */ + private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTMwNTA1NSwiZGV2aWNlTmFtZSI6InNtYWxsIn0.mf3MEATCn5bp6cXgULunZjs8d00RGUxj96JEz0hMS7k"; - // ========== 1. 认证测试 ========== + @BeforeAll + public static void initCaliforniumConfig() { + // 注册 Californium 配置定义 + CoapConfig.register(); + UdpConfig.register(); + // 创建默认配置 + Configuration.setStandard(Configuration.createStandardWithoutFile()); + } + /** + * 认证测试:获取设备 Token + */ @Test - @Order(1) - @DisplayName("1. 认证 - 获取 Token") - void testAuth() throws Exception { + @SuppressWarnings("deprecation") + public void testAuth() throws Exception { String uri = String.format("coap://%s:%d/auth", SERVER_HOST, SERVER_PORT); - - String payload = String.format(""" - { - "clientId": "%s", - "username": "%s", - "password": "%s" - } - """, CLIENT_ID, USERNAME, PASSWORD); + String payload = JsonUtils.toJsonString(MapUtil.builder() + .put("clientId", CLIENT_ID) + .put("username", USERNAME) + .put("password", PASSWORD) + .build()); CoapClient client = new CoapClient(uri); try { @@ -68,38 +85,33 @@ class IotCoapProtocolIntegrationTest { } } - // ========== 2. 属性上报测试 ========== - + /** + * 属性上报测试 + */ @Test - @Order(2) - @DisplayName("2. 属性上报") - void testPropertyPost() throws Exception { + @SuppressWarnings("deprecation") + public void testPropertyPost() throws Exception { String uri = String.format("coap://%s:%d/topic/sys/%s/%s/thing/property/post", SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME); - - String payload = """ - { - "id": "123", - "method": "thing.property.post", - "params": { - "temperature": 25.5, - "humidity": 60 - } - } - """; + String payload = JsonUtils.toJsonString(MapUtil.builder() + .put("id", IdUtil.fastSimpleUUID()) + .put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod()) + .put("version", "1.0") + .put("params", MapUtil.builder() + .put("width", 1) + .put("height", "2") + .build()) + .build()); CoapClient client = new CoapClient(uri); try { - // 构造带自定义 Option 的请求 Request request = Request.newPost(); request.setURI(uri); request.setPayload(payload); request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON); - // 添加自定义 Token Option (2088) - request.getOptions().addOption(new Option(IotCoapUpstreamHandler.OPTION_TOKEN, TOKEN)); + request.getOptions().addOption(new Option(IotCoapUtils.OPTION_TOKEN, TOKEN)); log.info("[testPropertyPost][请求 URI: {}]", uri); - log.info("[testPropertyPost][Token: {}]", TOKEN); log.info("[testPropertyPost][请求体: {}]", payload); CoapResponse response = client.advanced(request); @@ -111,39 +123,35 @@ class IotCoapProtocolIntegrationTest { } } - // ========== 3. 事件上报测试 ========== - + /** + * 事件上报测试 + */ @Test - @Order(3) - @DisplayName("3. 事件上报") - void testEventPost() throws Exception { - String uri = String.format("coap://%s:%d/topic/sys/%s/%s/thing/event/alarm/post", + @SuppressWarnings("deprecation") + public void testEventPost() throws Exception { + String uri = String.format("coap://%s:%d/topic/sys/%s/%s/thing/event/post", SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME); - - String payload = """ - { - "id": "456", - "method": "thing.event.alarm.post", - "params": { - "alarmType": "temperature_high", - "level": "warning", - "value": 85.2 - } - } - """; + String payload = JsonUtils.toJsonString(MapUtil.builder() + .put("id", IdUtil.fastSimpleUUID()) + .put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod()) + .put("version", "1.0") + .put("identifier", "eat") + .put("params", MapUtil.builder() + .put("width", 1) + .put("height", "2") + .put("oneThree", "3") + .build()) + .build()); CoapClient client = new CoapClient(uri); try { - // 构造带自定义 Option 的请求 Request request = Request.newPost(); request.setURI(uri); request.setPayload(payload); request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON); - // 添加自定义 Token Option (2088) - request.getOptions().addOption(new Option(IotCoapUpstreamHandler.OPTION_TOKEN, TOKEN)); + request.getOptions().addOption(new Option(IotCoapUtils.OPTION_TOKEN, TOKEN)); log.info("[testEventPost][请求 URI: {}]", uri); - log.info("[testEventPost][Token: {}]", TOKEN); log.info("[testEventPost][请求体: {}]", payload); CoapResponse response = client.advanced(request); From 6991a2dea492e2beed9cfb306255629af9d1f102 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 18 Jan 2026 11:23:23 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix=EF=BC=9A=E3=80=90iot=E3=80=91coap=20?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E6=8E=A5=E5=85=A5=EF=BC=9AtestEventPost=20?= =?UTF-8?q?=E5=8D=95=E6=B5=8B=E7=9A=84=E6=95=B0=E6=8D=AE=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=E4=B8=8D=E6=AD=A3=E7=A1=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../protocol/coap/IotCoapProtocolIntegrationTest.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocolIntegrationTest.java b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocolIntegrationTest.java index e5e214cde1..5856ced429 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocolIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/test/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocolIntegrationTest.java @@ -135,11 +135,14 @@ public class IotCoapProtocolIntegrationTest { .put("id", IdUtil.fastSimpleUUID()) .put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod()) .put("version", "1.0") - .put("identifier", "eat") .put("params", MapUtil.builder() - .put("width", 1) - .put("height", "2") - .put("oneThree", "3") + .put("identifier", "eat") + .put("value", MapUtil.builder() + .put("width", 1) + .put("height", "2") + .put("oneThree", "3") + .build()) + .put("time", System.currentTimeMillis()) .build()) .build());