From a28a15295cdb60f340f7157a5bbd18a76fd4ccbb Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sun, 1 Feb 2026 17:05:02 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=88iot=EF=BC=89=EF=BC=9A=E3=80=90?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E6=94=B9=E9=80=A0=E3=80=91coap=20=E5=88=9D?= =?UTF-8?q?=E6=AD=A5=E6=94=B9=E9=80=A0=EF=BC=88100%=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/IotGatewayConfiguration.java | 66 ------- .../gateway/config/IotGatewayProperties.java | 111 ++--------- .../gateway/protocol/IotProtocolManager.java | 26 +++ .../gateway/protocol/coap/IotCoapConfig.java | 36 ++++ .../protocol/coap/IotCoapProtocol.java | 173 ++++++++++++++++ .../coap/IotCoapUpstreamProtocol.java | 120 ----------- .../IotCoapDownstreamSubscriber.java | 5 +- .../upstrem/IotCoapAbstractHandler.java | 186 ++++++++++++++++++ .../handler/upstrem/IotCoapAuthHandler.java | 72 +++++++ .../upstrem}/IotCoapAuthResource.java | 10 +- .../upstrem/IotCoapRegisterHandler.java | 46 +++++ .../upstrem}/IotCoapRegisterResource.java | 2 +- .../upstrem/IotCoapRegisterSubHandler.java | 84 ++++++++ .../upstrem/IotCoapRegisterSubResource.java | 52 +++++ .../upstrem/IotCoapUpstreamHandler.java | 76 +++++++ .../IotCoapUpstreamTopicResource.java | 21 +- .../gateway/protocol/coap/package-info.java | 7 - .../coap/router/IotCoapAuthHandler.java | 115 ----------- .../coap/router/IotCoapRegisterHandler.java | 97 --------- .../coap/router/IotCoapUpstreamHandler.java | 110 ----------- .../protocol/coap/util/IotCoapUtils.java | 69 +------ .../upstream}/IotHttpAbstractHandler.java | 28 ++- .../handler/upstream/IotHttpAuthHandler.java | 19 +- .../upstream/IotHttpRegisterHandler.java | 17 +- .../upstream/IotHttpRegisterSubHandler.java | 9 +- .../upstream/IotHttpUpstreamHandler.java | 10 +- .../upstream/IotTcpUpstreamHandler.java | 25 +-- .../upstream/IotUdpUpstreamHandler.java | 111 +++++------ .../udp/manager/IotUdpSessionManager.java | 164 +++++---------- .../src/main/resources/application.yaml | 48 ++--- 30 files changed, 955 insertions(+), 960 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapConfig.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocol.java delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/{ => handler/downstream}/IotCoapDownstreamSubscriber.java (76%) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapAbstractHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapAuthHandler.java rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/{router => handler/upstrem}/IotCoapAuthResource.java (64%) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterHandler.java rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/{router => handler/upstrem}/IotCoapRegisterResource.java (92%) create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterSubHandler.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterSubResource.java create mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapUpstreamHandler.java rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/{router => handler/upstrem}/IotCoapUpstreamTopicResource.java (70%) delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthHandler.java delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapRegisterHandler.java delete mode 100644 yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamHandler.java rename yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/{router => handler/upstream}/IotHttpAbstractHandler.java (84%) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java index c2bf96df32..e9800c34e4 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayConfiguration.java @@ -2,8 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.config; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber; import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol; @@ -11,12 +9,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscr import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager; import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketDownstreamSubscriber; -import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager; -import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketDownstreamHandler; import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager; -import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; import lombok.extern.slf4j.Slf4j; @@ -111,63 +104,4 @@ public class IotGatewayConfiguration { } - /** - * IoT 网关 CoAP 协议配置类 - */ - @Configuration - @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.coap", name = "enabled", havingValue = "true") - @Slf4j - public static class CoapProtocolConfiguration { - - @Bean - public IotCoapUpstreamProtocol iotCoapUpstreamProtocol(IotGatewayProperties gatewayProperties) { - return new IotCoapUpstreamProtocol(gatewayProperties.getProtocol().getCoap()); - } - - @Bean - public IotCoapDownstreamSubscriber iotCoapDownstreamSubscriber(IotCoapUpstreamProtocol coapUpstreamProtocol, - IotMessageBus messageBus) { - return new IotCoapDownstreamSubscriber(coapUpstreamProtocol, messageBus); - } - - } - - /** - * IoT 网关 WebSocket 协议配置类 - */ - @Configuration - @ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.websocket", name = "enabled", havingValue = "true") - @Slf4j - public static class WebSocketProtocolConfiguration { - - @Bean(name = "websocketVertx", destroyMethod = "close") - public Vertx websocketVertx() { - return Vertx.vertx(); - } - - @Bean - public IotWebSocketUpstreamProtocol iotWebSocketUpstreamProtocol(IotGatewayProperties gatewayProperties, - IotDeviceService deviceService, - IotDeviceMessageService messageService, - IotWebSocketConnectionManager connectionManager, - @Qualifier("websocketVertx") Vertx websocketVertx) { - return new IotWebSocketUpstreamProtocol(gatewayProperties.getProtocol().getWebsocket(), - deviceService, messageService, connectionManager, websocketVertx); - } - - @Bean - public IotWebSocketDownstreamHandler iotWebSocketDownstreamHandler(IotDeviceMessageService messageService, - IotWebSocketConnectionManager connectionManager) { - return new IotWebSocketDownstreamHandler(messageService, connectionManager); - } - - @Bean - public IotWebSocketDownstreamSubscriber iotWebSocketDownstreamSubscriber(IotWebSocketUpstreamProtocol protocolHandler, - IotWebSocketDownstreamHandler downstreamHandler, - IotMessageBus messageBus) { - return new IotWebSocketDownstreamSubscriber(protocolHandler, downstreamHandler, messageBus); - } - - } - } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java index 1f0dccdaf2..dc5d545373 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/config/IotGatewayProperties.java @@ -1,9 +1,11 @@ package cn.iocoder.yudao.module.iot.gateway.config; import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpConfig; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketConfig; import io.vertx.core.net.KeyCertOptions; import io.vertx.core.net.TrustOptions; import jakarta.validation.Valid; @@ -90,16 +92,6 @@ public class IotGatewayProperties { */ private MqttProperties mqtt; - /** - * CoAP 组件配置 - */ - private CoapProperties coap; - - /** - * WebSocket 组件配置 - */ - private WebSocketProperties websocket; - } @Data @@ -344,93 +336,6 @@ public class IotGatewayProperties { } - @Data - public static class CoapProperties { - - /** - * 是否开启 - */ - @NotNull(message = "是否开启不能为空") - private Boolean enabled; - - /** - * 服务端口(CoAP 默认端口 5683) - */ - @NotNull(message = "服务端口不能为空") - private Integer port = 5683; - - /** - * 最大消息大小(字节) - */ - @NotNull(message = "最大消息大小不能为空") - private Integer maxMessageSize = 1024; - - /** - * ACK 超时时间(毫秒) - */ - @NotNull(message = "ACK 超时时间不能为空") - private Integer ackTimeout = 2000; - - /** - * 最大重传次数 - */ - @NotNull(message = "最大重传次数不能为空") - private Integer maxRetransmit = 4; - - } - - @Data - public static class WebSocketProperties { - - /** - * 是否开启 - */ - @NotNull(message = "是否开启不能为空") - private Boolean enabled; - - /** - * 服务器端口(默认:8094) - */ - private Integer port = 8094; - - /** - * WebSocket 路径(默认:/ws) - */ - @NotEmpty(message = "WebSocket 路径不能为空") - private String path = "/ws"; - - /** - * 最大消息大小(字节,默认 64KB) - */ - private Integer maxMessageSize = 65536; - - /** - * 最大帧大小(字节,默认 64KB) - */ - private Integer maxFrameSize = 65536; - - /** - * 空闲超时时间(秒,默认 60) - */ - private Integer idleTimeoutSeconds = 60; - - /** - * 是否启用 SSL(wss://) - */ - private Boolean sslEnabled = false; - - /** - * SSL 证书路径 - */ - private String sslCertPath; - - /** - * SSL 私钥路径 - */ - private String sslKeyPath; - - } - // TODO @AI:【暂时忽略】改成 ProtocolProperties /** * 协议实例配置 @@ -489,6 +394,18 @@ public class IotGatewayProperties { @Valid private IotUdpConfig udp; + /** + * CoAP 协议配置 + */ + @Valid + private IotCoapConfig coap; + + /** + * WebSocket 协议配置 + */ + @Valid + private IotWebSocketConfig websocket; + } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java index c13b44ee5f..ed60897e55 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/IotProtocolManager.java @@ -4,9 +4,11 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.BooleanUtil; import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.context.SmartLifecycle; @@ -100,6 +102,10 @@ public class IotProtocolManager implements SmartLifecycle { return createTcpProtocol(config); case UDP: return createUdpProtocol(config); + case COAP: + return createCoapProtocol(config); + case WEBSOCKET: + return createWebSocketProtocol(config); default: throw new IllegalArgumentException(String.format( "[createProtocol][协议实例 %s 的协议类型 %s 暂不支持]", config.getId(), protocolType)); @@ -136,4 +142,24 @@ public class IotProtocolManager implements SmartLifecycle { return new IotUdpProtocol(config); } + /** + * 创建 CoAP 协议实例 + * + * @param config 协议实例配置 + * @return CoAP 协议实例 + */ + private IotCoapProtocol createCoapProtocol(IotGatewayProperties.ProtocolInstanceProperties config) { + return new IotCoapProtocol(config); + } + + /** + * 创建 WebSocket 协议实例 + * + * @param config 协议实例配置 + * @return WebSocket 协议实例 + */ + private IotWebSocketUpstreamProtocol createWebSocketProtocol(IotGatewayProperties.ProtocolInstanceProperties config) { + return new IotWebSocketUpstreamProtocol(config); + } + } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapConfig.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapConfig.java new file mode 100644 index 0000000000..45fe3007e5 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapConfig.java @@ -0,0 +1,36 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap; + +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +/** + * IoT CoAP 协议配置 + * + * @author 芋道源码 + */ +@Data +public class IotCoapConfig { + + /** + * 最大消息大小(字节) + */ + @NotNull(message = "最大消息大小不能为空") + @Min(value = 64, message = "最大消息大小必须大于 64 字节") + private Integer maxMessageSize = 1024; + + /** + * ACK 超时时间(毫秒) + */ + @NotNull(message = "ACK 超时时间不能为空") + @Min(value = 100, message = "ACK 超时时间必须大于 100 毫秒") + private Integer ackTimeoutMs = 2000; + + /** + * 最大重传次数 + */ + @NotNull(message = "最大重传次数不能为空") + @Min(value = 0, message = "最大重传次数必须大于等于 0") + private Integer maxRetransmit = 4; + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocol.java new file mode 100644 index 0000000000..2749bd232c --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocol.java @@ -0,0 +1,173 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap; + +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties; +import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.downstream.IotCoapDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapAuthHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapAuthResource; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapRegisterHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapRegisterResource; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapRegisterSubHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapRegisterSubResource; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapUpstreamHandler; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapUpstreamTopicResource; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.CoapResource; +import org.eclipse.californium.core.CoapServer; +import org.eclipse.californium.core.config.CoapConfig; +import org.eclipse.californium.elements.config.Configuration; +import org.springframework.util.Assert; + +import java.util.concurrent.TimeUnit; + +/** + * IoT CoAP 协议实现 + *

+ * 基于 Eclipse Californium 实现,支持: + * 1. 认证:POST /auth + * 2. 设备动态注册:POST /auth/register/device + * 3. 子设备动态注册:POST /auth/register/sub-device/{productKey}/{deviceName} + * 4. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post + * 5. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post + * + * @author 芋道源码 + */ +@Slf4j +public class IotCoapProtocol implements IotProtocol { + + /** + * 协议配置 + */ + private final ProtocolInstanceProperties properties; + /** + * 服务器 ID(用于消息追踪,全局唯一) + */ + @Getter + private final String serverId; + + /** + * 运行状态 + */ + @Getter + private volatile boolean running = false; + + /** + * CoAP 服务器 + */ + private CoapServer coapServer; + + /** + * 下行消息订阅者 + */ + private final IotCoapDownstreamSubscriber downstreamSubscriber; + + public IotCoapProtocol(ProtocolInstanceProperties properties) { + IotCoapConfig coapConfig = properties.getCoap(); + Assert.notNull(coapConfig, "CoAP 协议配置(coap)不能为空"); + this.properties = properties; + this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort()); + + // 初始化下行消息订阅者 + IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class); + this.downstreamSubscriber = new IotCoapDownstreamSubscriber(this, messageBus); + } + + @Override + public String getId() { + return properties.getId(); + } + + @Override + public IotProtocolTypeEnum getType() { + return IotProtocolTypeEnum.COAP; + } + + @Override + public void start() { + if (running) { + log.warn("[start][IoT CoAP 协议 {} 已经在运行中]", getId()); + return; + } + + IotCoapConfig coapConfig = properties.getCoap(); + try { + // 1.1 创建 CoAP 配置 + Configuration config = Configuration.createStandardWithoutFile(); + config.set(CoapConfig.COAP_PORT, properties.getPort()); + config.set(CoapConfig.MAX_MESSAGE_SIZE, coapConfig.getMaxMessageSize()); + config.set(CoapConfig.ACK_TIMEOUT, coapConfig.getAckTimeoutMs(), TimeUnit.MILLISECONDS); + config.set(CoapConfig.MAX_RETRANSMIT, coapConfig.getMaxRetransmit()); + // 1.2 创建 CoAP 服务器 + coapServer = new CoapServer(config); + + // 2.1 添加 /auth 认证资源 + IotCoapAuthHandler authHandler = new IotCoapAuthHandler(serverId); + IotCoapAuthResource authResource = new IotCoapAuthResource(authHandler); + coapServer.add(authResource); + // 2.2 添加 /auth/register/device 设备动态注册资源(一型一密) + IotCoapRegisterHandler registerHandler = new IotCoapRegisterHandler(); + IotCoapRegisterResource registerResource = new IotCoapRegisterResource(registerHandler); + // 2.3 添加 /auth/register/sub-device/{productKey}/{deviceName} 子设备动态注册资源 + IotCoapRegisterSubHandler registerSubHandler = new IotCoapRegisterSubHandler(); + IotCoapRegisterSubResource registerSubResource = new IotCoapRegisterSubResource(registerSubHandler); + authResource.add(new CoapResource("register") {{ + add(registerResource); + add(registerSubResource); + }}); + // 2.4 添加 /topic 根资源(用于上行消息) + IotCoapUpstreamHandler upstreamHandler = new IotCoapUpstreamHandler(serverId); + IotCoapUpstreamTopicResource topicResource = new IotCoapUpstreamTopicResource(serverId, upstreamHandler); + coapServer.add(topicResource); + + // 3. 启动服务器 + coapServer.start(); + running = true; + log.info("[start][IoT CoAP 协议 {} 启动成功,端口:{},serverId:{}]", + getId(), properties.getPort(), serverId); + + // 4. 启动下行消息订阅者 + this.downstreamSubscriber.start(); + } catch (Exception e) { + log.error("[start][IoT CoAP 协议 {} 启动失败]", getId(), e); + if (coapServer != null) { + coapServer.destroy(); + coapServer = null; + } + throw e; + } + } + + @Override + public void stop() { + if (!running) { + return; + } + // 1. 停止下行消息订阅者 + try { + downstreamSubscriber.stop(); + log.info("[stop][IoT CoAP 协议 {} 下行消息订阅者已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT CoAP 协议 {} 下行消息订阅者停止失败]", getId(), e); + } + + // 2. 关闭 CoAP 服务器 + if (coapServer != null) { + try { + coapServer.stop(); + coapServer.destroy(); + coapServer = null; + log.info("[stop][IoT CoAP 协议 {} 服务器已停止]", getId()); + } catch (Exception e) { + log.error("[stop][IoT CoAP 协议 {} 服务器停止失败]", getId(), e); + } + } + running = false; + log.info("[stop][IoT CoAP 协议 {} 已停止]", getId()); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java deleted file mode 100644 index 771a33a955..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java +++ /dev/null @@ -1,120 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.coap; - -import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum; -import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; -import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties; -import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthResource; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapRegisterHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapRegisterResource; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamTopicResource; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamHandler; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.californium.core.CoapResource; -import org.eclipse.californium.core.CoapServer; -import org.eclipse.californium.core.config.CoapConfig; -import org.eclipse.californium.elements.config.Configuration; - -import java.util.concurrent.TimeUnit; - -/** - * IoT 网关 CoAP 协议:接收设备上行消息 - * - * 基于 Eclipse Californium 实现,支持: - * 1. 认证:POST /auth - * 2. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post - * 3. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post - * - * @author 芋道源码 - */ -@Slf4j -public class IotCoapUpstreamProtocol implements IotProtocol { - - private static final String ID = "coap"; - - private final IotGatewayProperties.CoapProperties coapProperties; - - private CoapServer coapServer; - - @Getter - private final String serverId; - - private volatile boolean running = false; - - public IotCoapUpstreamProtocol(IotGatewayProperties.CoapProperties coapProperties) { - this.coapProperties = coapProperties; - this.serverId = IotDeviceMessageUtils.generateServerId(coapProperties.getPort()); - } - - @Override - public String getId() { - return ID; - } - - @Override - public IotProtocolTypeEnum getType() { - return IotProtocolTypeEnum.COAP; - } - - @Override - @PostConstruct - public void start() { - try { - // 1.1 创建网络配置(Californium 3.x API) - Configuration config = Configuration.createStandardWithoutFile(); - config.set(CoapConfig.COAP_PORT, coapProperties.getPort()); - config.set(CoapConfig.MAX_MESSAGE_SIZE, coapProperties.getMaxMessageSize()); - config.set(CoapConfig.ACK_TIMEOUT, coapProperties.getAckTimeout(), TimeUnit.MILLISECONDS); - config.set(CoapConfig.MAX_RETRANSMIT, coapProperties.getMaxRetransmit()); - // 1.2 创建 CoAP 服务器 - coapServer = new CoapServer(config); - - // 2.1 添加 /auth 认证资源 - IotCoapAuthHandler authHandler = new IotCoapAuthHandler(); - IotCoapAuthResource authResource = new IotCoapAuthResource(this, authHandler); - coapServer.add(authResource); - // 2.2 添加 /auth/register/device 设备动态注册资源(一型一密) - IotCoapRegisterHandler registerHandler = new IotCoapRegisterHandler(); - IotCoapRegisterResource registerResource = new IotCoapRegisterResource(registerHandler); - authResource.add(new CoapResource("register") {{ - add(registerResource); - }}); - // 2.3 添加 /topic 根资源(用于上行消息) - IotCoapUpstreamHandler upstreamHandler = new IotCoapUpstreamHandler(); - IotCoapUpstreamTopicResource topicResource = new IotCoapUpstreamTopicResource(this, upstreamHandler); - coapServer.add(topicResource); - - // 3. 启动服务器 - coapServer.start(); - running = true; - log.info("[start][IoT 网关 CoAP 协议启动成功,端口:{},资源:/auth, /auth/register/device, /topic]", coapProperties.getPort()); - } catch (Exception e) { - log.error("[start][IoT 网关 CoAP 协议启动失败]", e); - throw e; - } - } - - @Override - @PreDestroy - public void stop() { - if (coapServer != null) { - try { - coapServer.stop(); - running = false; - log.info("[stop][IoT 网关 CoAP 协议已停止]"); - } catch (Exception e) { - log.error("[stop][IoT 网关 CoAP 协议停止失败]", e); - } - } - } - - @Override - public boolean isRunning() { - return running; - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/downstream/IotCoapDownstreamSubscriber.java similarity index 76% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapDownstreamSubscriber.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/downstream/IotCoapDownstreamSubscriber.java index 8003602d86..188d2e6428 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapDownstreamSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/downstream/IotCoapDownstreamSubscriber.java @@ -1,8 +1,9 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.coap; +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.downstream; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol; import lombok.extern.slf4j.Slf4j; /** @@ -13,7 +14,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class IotCoapDownstreamSubscriber extends IotProtocolDownstreamSubscriber { - public IotCoapDownstreamSubscriber(IotCoapUpstreamProtocol protocol, IotMessageBus messageBus) { + public IotCoapDownstreamSubscriber(IotCoapProtocol protocol, IotMessageBus messageBus) { super(protocol, messageBus); } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapAbstractHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapAbstractHandler.java new file mode 100644 index 0000000000..cda8466dc2 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapAbstractHandler.java @@ -0,0 +1,186 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.ObjUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.exception.ServiceException; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.coap.CoAP; +import org.eclipse.californium.core.coap.MediaTypeRegistry; +import org.eclipse.californium.core.coap.Option; +import org.eclipse.californium.core.server.resources.CoapExchange; + +import java.util.List; + +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*; +import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; + +/** + * IoT 网关 CoAP 协议的处理器抽象基类:提供通用的前置处理(认证)、请求解析、响应处理、全局的异常捕获等 + * + * @author 芋道源码 + */ +@Slf4j +public abstract class IotCoapAbstractHandler { + + /** + * 自定义 CoAP Option 编号,用于携带 Token + *

+ * CoAP Option 范围 2048-65535 属于实验/自定义范围 + */ + public static final int OPTION_TOKEN = 2088; + + private final IotDeviceTokenService deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class); + + /** + * 处理 CoAP 请求(模板方法) + * + * @param exchange CoAP 交换对象 + */ + public final void handle(CoapExchange exchange) { + try { + // 1. 前置处理 + beforeHandle(exchange); + + // 2. 执行业务逻辑 + CommonResult result = handle0(exchange); + writeResponse(exchange, result); + } catch (ServiceException e) { + // 业务异常,返回对应的错误码和消息 + writeResponse(exchange, CommonResult.error(e.getCode(), e.getMessage())); + } catch (IllegalArgumentException e) { + // 参数校验异常(hutool Assert 抛出),返回 BAD_REQUEST + writeResponse(exchange, CommonResult.error(BAD_REQUEST.getCode(), e.getMessage())); + } catch (Exception e) { + // 其他未知异常,返回 INTERNAL_SERVER_ERROR + log.error("[handle][CoAP 请求处理异常]", e); + writeResponse(exchange, CommonResult.error(INTERNAL_SERVER_ERROR)); + } + } + + /** + * 处理 CoAP 请求(子类实现) + * + * @param exchange CoAP 交换对象 + * @return 处理结果 + */ + protected abstract CommonResult handle0(CoapExchange exchange); + + /** + * 前置处理:认证等 + * + * @param exchange CoAP 交换对象 + */ + private void beforeHandle(CoapExchange exchange) { + // 1.1 如果不需要认证,则不走前置处理 + if (!requiresAuthentication()) { + return; + } + // 1.2 从自定义 Option 获取 token + String token = getTokenFromOption(exchange); + if (StrUtil.isEmpty(token)) { + throw exception(UNAUTHORIZED); + } + // 1.3 校验 token + IotDeviceIdentity deviceInfo = deviceTokenService.verifyToken(token); + if (deviceInfo == null) { + throw exception(UNAUTHORIZED); + } + + // 2.1 解析 productKey 和 deviceName + List uriPath = exchange.getRequestOptions().getUriPath(); + String productKey = getProductKey(uriPath); + String deviceName = getDeviceName(uriPath); + if (StrUtil.isEmpty(productKey) || StrUtil.isEmpty(deviceName)) { + throw exception(BAD_REQUEST); + } + // 2.2 校验设备信息是否匹配 + if (ObjUtil.notEqual(productKey, deviceInfo.getProductKey()) + || ObjUtil.notEqual(deviceName, deviceInfo.getDeviceName())) { + throw exception(FORBIDDEN); + } + } + + // ========== Token 相关方法 ========== + + /** + * 是否需要认证(子类可覆盖) + *

+ * 默认不需要认证 + * + * @return 是否需要认证 + */ + protected boolean requiresAuthentication() { + return false; + } + + /** + * 从 URI 路径中获取 productKey(子类实现) + *

+ * 默认抛出异常,需要认证的子类必须实现此方法 + * + * @param uriPath URI 路径 + * @return productKey + */ + protected String getProductKey(List uriPath) { + throw new UnsupportedOperationException("子类需要实现 getProductKey 方法"); + } + + /** + * 从 URI 路径中获取 deviceName(子类实现) + *

+ * 默认抛出异常,需要认证的子类必须实现此方法 + * + * @param uriPath URI 路径 + * @return deviceName + */ + protected String getDeviceName(List uriPath) { + throw new UnsupportedOperationException("子类需要实现 getDeviceName 方法"); + } + + /** + * 从自定义 CoAP Option 中获取 Token + * + * @param exchange CoAP 交换对象 + * @return Token 值,如果不存在则返回 null + */ + protected String getTokenFromOption(CoapExchange exchange) { + Option option = CollUtil.findOne(exchange.getRequestOptions().getOthers(), + o -> o.getNumber() == OPTION_TOKEN); + return option != null ? new String(option.getValue()) : null; + } + + // ========== 序列化相关方法 ========== + + /** + * 解析请求体为指定类型 + * + * @param exchange CoAP 交换对象 + * @param clazz 目标类型 + * @param 目标类型泛型 + * @return 解析后的对象,解析失败返回 null + */ + protected T deserializeRequest(CoapExchange exchange, Class clazz) { + byte[] payload = exchange.getRequestPayload(); + if (ArrayUtil.isEmpty(payload)) { + return null; + } + return JsonUtils.parseObject(payload, clazz); + } + + private static String serializeResponse(Object data) { + return JsonUtils.toJsonString(data); + } + + protected void writeResponse(CoapExchange exchange, CommonResult data) { + String json = serializeResponse(data); + exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapAuthHandler.java new file mode 100644 index 0000000000..059d878ade --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapAuthHandler.java @@ -0,0 +1,72 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem; + +import cn.hutool.core.lang.Assert; +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; +import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.server.resources.CoapExchange; + +import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; +import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL; + +/** + * IoT 网关 CoAP 协议的【认证】处理器 + * + * @author 芋道源码 + */ +@Slf4j +public class IotCoapAuthHandler extends IotCoapAbstractHandler { + + private final String serverId; + + private final IotDeviceTokenService deviceTokenService; + private final IotDeviceCommonApi deviceApi; + private final IotDeviceMessageService deviceMessageService; + + public IotCoapAuthHandler(String serverId) { + this.serverId = serverId; + this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class); + this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); + } + + @Override + @SuppressWarnings("DuplicatedCode") + protected CommonResult handle0(CoapExchange exchange) { + // 1. 解析参数 + IotDeviceAuthReqDTO request = deserializeRequest(exchange, IotDeviceAuthReqDTO.class); + Assert.notNull(request, "请求体不能为空"); + Assert.notBlank(request.getClientId(), "clientId 不能为空"); + Assert.notBlank(request.getUsername(), "username 不能为空"); + Assert.notBlank(request.getPassword(), "password 不能为空"); + + // 2.1 执行认证 + CommonResult result = deviceApi.authDevice(request); + result.checkError(); + if (BooleanUtil.isFalse(result.getData())) { + throw exception(DEVICE_AUTH_FAIL); + } + // 2.2 生成 Token + IotDeviceIdentity deviceInfo = deviceTokenService.parseUsername(request.getUsername()); + Assert.notNull(deviceInfo, "设备信息不能为空"); + String token = deviceTokenService.createToken(deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + Assert.notBlank(token, "生成 token 不能为空"); + + // 3. 执行上线 + IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline(); + deviceMessageService.sendDeviceMessage(message, + deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId); + + // 4. 构建响应数据 + return CommonResult.success(MapUtil.of("token", token)); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthResource.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapAuthResource.java similarity index 64% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthResource.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapAuthResource.java index 9d0d90cb3e..0b7a7e6d06 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthResource.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapAuthResource.java @@ -1,6 +1,5 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.CoapResource; import org.eclipse.californium.core.server.resources.CoapExchange; @@ -17,13 +16,10 @@ public class IotCoapAuthResource extends CoapResource { public static final String PATH = "auth"; - private final IotCoapUpstreamProtocol protocol; private final IotCoapAuthHandler authHandler; - public IotCoapAuthResource(IotCoapUpstreamProtocol protocol, - IotCoapAuthHandler authHandler) { + public IotCoapAuthResource(IotCoapAuthHandler authHandler) { super(PATH); - this.protocol = protocol; this.authHandler = authHandler; log.info("[IotCoapAuthResource][创建 CoAP 认证资源: /{}]", PATH); } @@ -31,7 +27,7 @@ public class IotCoapAuthResource extends CoapResource { @Override public void handlePOST(CoapExchange exchange) { log.debug("[handlePOST][收到 /auth POST 请求]"); - authHandler.handle(exchange, protocol); + authHandler.handle(exchange); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterHandler.java new file mode 100644 index 0000000000..3dfb6f0df5 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterHandler.java @@ -0,0 +1,46 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem; + +import cn.hutool.core.lang.Assert; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.server.resources.CoapExchange; + +/** + * IoT 网关 CoAP 协议的【设备动态注册】处理器 + *

+ * 用于直连设备/网关的一型一密动态注册,不需要认证 + * + * @author 芋道源码 + * @see 阿里云 - 一型一密 + */ +@Slf4j +public class IotCoapRegisterHandler extends IotCoapAbstractHandler { + + private final IotDeviceCommonApi deviceApi; + + public IotCoapRegisterHandler() { + this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + } + + @Override + protected CommonResult handle0(CoapExchange exchange) { + // 1. 解析参数 + IotDeviceRegisterReqDTO request = deserializeRequest(exchange, IotDeviceRegisterReqDTO.class); + Assert.notNull(request, "请求体不能为空"); + Assert.notBlank(request.getProductKey(), "productKey 不能为空"); + Assert.notBlank(request.getDeviceName(), "deviceName 不能为空"); + Assert.notBlank(request.getProductSecret(), "productSecret 不能为空"); + + // 2. 调用动态注册 + CommonResult result = deviceApi.registerDevice(request); + result.checkError(); + + // 3. 构建响应数据 + return CommonResult.success(result.getData()); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapRegisterResource.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterResource.java similarity index 92% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapRegisterResource.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterResource.java index 05fd1ec89d..3a9b5df692 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapRegisterResource.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterResource.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.CoapResource; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterSubHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterSubHandler.java new file mode 100644 index 0000000000..f0f007094e --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterSubHandler.java @@ -0,0 +1,84 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; +import cn.iocoder.yudao.module.iot.core.biz.dto.IotSubDeviceRegisterFullReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO; +import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.server.resources.CoapExchange; + +import java.util.List; + +import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; + +/** + * IoT 网关 CoAP 协议的【子设备动态注册】处理器 + *

+ * 用于子设备的动态注册,需要网关认证 + * + * @author 芋道源码 + * @see 阿里云 - 动态注册子设备 + */ +@Slf4j +public class IotCoapRegisterSubHandler extends IotCoapAbstractHandler { + + private final IotDeviceCommonApi deviceApi; + + public IotCoapRegisterSubHandler() { + this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + } + + @Override + @SuppressWarnings("DuplicatedCode") + protected CommonResult handle0(CoapExchange exchange) { + // 1.1 解析通用参数(从 URI 路径获取网关设备信息) + List uriPath = exchange.getRequestOptions().getUriPath(); + String productKey = getProductKey(uriPath); + String deviceName = getDeviceName(uriPath); + // 1.2 解析子设备列表 + SubDeviceRegisterRequest request = deserializeRequest(exchange, SubDeviceRegisterRequest.class); + Assert.notNull(request, "请求参数不能为空"); + Assert.notEmpty(request.getParams(), "params 不能为空"); + + // 2. 调用子设备动态注册 + IotSubDeviceRegisterFullReqDTO reqDTO = new IotSubDeviceRegisterFullReqDTO() + .setGatewayProductKey(productKey) + .setGatewayDeviceName(deviceName) + .setSubDevices(request.getParams()); + CommonResult> result = deviceApi.registerSubDevices(reqDTO); + result.checkError(); + + // 3. 返回结果 + return success(result.getData()); + } + + @Override + protected boolean requiresAuthentication() { + return true; + } + + @Override + protected String getProductKey(List uriPath) { + // 路径格式:/auth/register/sub-device/{productKey}/{deviceName} + return CollUtil.get(uriPath, 3); + } + + @Override + protected String getDeviceName(List uriPath) { + // 路径格式:/auth/register/sub-device/{productKey}/{deviceName} + return CollUtil.get(uriPath, 4); + } + + @Data + public static class SubDeviceRegisterRequest { + + private List params; + + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterSubResource.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterSubResource.java new file mode 100644 index 0000000000..1108505360 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapRegisterSubResource.java @@ -0,0 +1,52 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.CoapResource; +import org.eclipse.californium.core.server.resources.CoapExchange; +import org.eclipse.californium.core.server.resources.Resource; + +/** + * IoT 网关 CoAP 协议的子设备动态注册资源(/auth/register/sub-device/{productKey}/{deviceName}) + *

+ * 用于子设备的动态注册,需要网关认证 + *

+ * 支持动态路径匹配:productKey 和 deviceName 是网关设备的标识 + * + * @author 芋道源码 + */ +@Slf4j +public class IotCoapRegisterSubResource extends CoapResource { + + public static final String PATH = "sub-device"; + + private final IotCoapRegisterSubHandler registerSubHandler; + + /** + * 创建根资源(/auth/register/sub-device) + */ + public IotCoapRegisterSubResource(IotCoapRegisterSubHandler registerSubHandler) { + this(PATH, registerSubHandler); + log.info("[IotCoapRegisterSubResource][创建 CoAP 子设备动态注册资源: /auth/register/{}]", PATH); + } + + /** + * 创建子资源(动态路径) + */ + private IotCoapRegisterSubResource(String name, IotCoapRegisterSubHandler registerSubHandler) { + super(name); + this.registerSubHandler = registerSubHandler; + } + + @Override + public Resource getChild(String name) { + // 递归创建动态子资源,支持 /sub-device/{productKey}/{deviceName} 路径 + return new IotCoapRegisterSubResource(name, registerSubHandler); + } + + @Override + public void handlePOST(CoapExchange exchange) { + log.debug("[handlePOST][收到子设备动态注册请求]"); + registerSubHandler.handle(exchange); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapUpstreamHandler.java new file mode 100644 index 0000000000..2d2ee4c6c7 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapUpstreamHandler.java @@ -0,0 +1,76 @@ +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.text.StrPool; +import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.californium.core.server.resources.CoapExchange; + +import java.util.List; + +/** + * IoT 网关 CoAP 协议的【上行】处理器 + * + * 处理设备通过 CoAP 协议发送的上行消息,包括: + * 1. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post + * 2. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post + * + * Token 通过自定义 CoAP Option 2088 携带 + * + * @author 芋道源码 + */ +@Slf4j +public class IotCoapUpstreamHandler extends IotCoapAbstractHandler { + + private final String serverId; + + private final IotDeviceMessageService deviceMessageService; + + public IotCoapUpstreamHandler(String serverId) { + this.serverId = serverId; + this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); + } + + @Override + @SuppressWarnings("DuplicatedCode") + protected CommonResult handle0(CoapExchange exchange) { + // 1.1 解析通用参数 + List uriPath = exchange.getRequestOptions().getUriPath(); + String productKey = getProductKey(uriPath); + String deviceName = getDeviceName(uriPath); + String method = String.join(StrPool.DOT, uriPath.subList(4, uriPath.size())); + // 1.2 解析消息 + IotDeviceMessage message = deserializeRequest(exchange, IotDeviceMessage.class); + Assert.notNull(message, "请求参数不能为空"); + Assert.equals(method, message.getMethod(), "method 不匹配"); + + // 2. 发送消息 + deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); + + // 3. 返回结果 + return CommonResult.success(MapUtil.of("messageId", message.getId())); + } + + @Override + protected boolean requiresAuthentication() { + return true; + } + + @Override + protected String getProductKey(List uriPath) { + // 路径格式:/topic/sys/{productKey}/{deviceName}/... + return CollUtil.get(uriPath, 2); + } + + @Override + protected String getDeviceName(List uriPath) { + // 路径格式:/topic/sys/{productKey}/{deviceName}/... + return CollUtil.get(uriPath, 3); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamTopicResource.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapUpstreamTopicResource.java similarity index 70% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamTopicResource.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapUpstreamTopicResource.java index 1c694483fa..52d44d3cc6 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamTopicResource.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstrem/IotCoapUpstreamTopicResource.java @@ -1,6 +1,5 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.CoapResource; import org.eclipse.californium.core.server.resources.CoapExchange; @@ -20,15 +19,15 @@ public class IotCoapUpstreamTopicResource extends CoapResource { public static final String PATH = "topic"; - private final IotCoapUpstreamProtocol protocol; + private final String serverId; private final IotCoapUpstreamHandler upstreamHandler; /** * 创建根资源(/topic) */ - public IotCoapUpstreamTopicResource(IotCoapUpstreamProtocol protocol, + public IotCoapUpstreamTopicResource(String serverId, IotCoapUpstreamHandler upstreamHandler) { - this(PATH, protocol, upstreamHandler); + this(PATH, serverId, upstreamHandler); log.info("[IotCoapUpstreamTopicResource][创建 CoAP 上行 Topic 资源: /{}]", PATH); } @@ -36,32 +35,32 @@ public class IotCoapUpstreamTopicResource extends CoapResource { * 创建子资源(动态路径) */ private IotCoapUpstreamTopicResource(String name, - IotCoapUpstreamProtocol protocol, + String serverId, IotCoapUpstreamHandler upstreamHandler) { super(name); - this.protocol = protocol; + this.serverId = serverId; this.upstreamHandler = upstreamHandler; } @Override public Resource getChild(String name) { // 递归创建动态子资源,支持任意深度路径 - return new IotCoapUpstreamTopicResource(name, protocol, upstreamHandler); + return new IotCoapUpstreamTopicResource(name, serverId, upstreamHandler); } @Override public void handleGET(CoapExchange exchange) { - upstreamHandler.handle(exchange, protocol); + upstreamHandler.handle(exchange); } @Override public void handlePOST(CoapExchange exchange) { - upstreamHandler.handle(exchange, protocol); + upstreamHandler.handle(exchange); } @Override public void handlePUT(CoapExchange exchange) { - upstreamHandler.handle(exchange, protocol); + upstreamHandler.handle(exchange); } } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/package-info.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/package-info.java index 94536a6439..3de662a5ca 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/package-info.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/package-info.java @@ -2,12 +2,5 @@ * CoAP 协议实现包 *

* 提供基于 Eclipse Californium 的 IoT 设备连接和消息处理功能 - *

- * URI 路径: - * - 认证:POST /auth - * - 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post - * - 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post - *

- * Token 通过 CoAP Option 2088 携带 */ package cn.iocoder.yudao.module.iot.gateway.protocol.coap; diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthHandler.java deleted file mode 100644 index fa2cc04fdd..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapAuthHandler.java +++ /dev/null @@ -1,115 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router; - -import cn.hutool.core.lang.Assert; -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.BooleanUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; -import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.util.IotCoapUtils; -import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.californium.core.coap.CoAP; -import org.eclipse.californium.core.server.resources.CoapExchange; - -import java.util.Map; - -/** - * IoT 网关 CoAP 协议的【认证】处理器 - * - * @author 芋道源码 - */ -@Slf4j -public class IotCoapAuthHandler { - - private final IotDeviceTokenService deviceTokenService; - private final IotDeviceCommonApi deviceApi; - private final IotDeviceMessageService deviceMessageService; - - public IotCoapAuthHandler() { - this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class); - this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); - this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); - } - - /** - * 处理认证请求 - * - * @param exchange CoAP 交换对象 - * @param protocol 协议对象 - */ - @SuppressWarnings("unchecked") - public void handle(CoapExchange exchange, IotCoapUpstreamProtocol protocol) { - try { - // 1.1 解析请求体 - byte[] payload = exchange.getRequestPayload(); - if (payload == null || payload.length == 0) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空"); - return; - } - Map body; - try { - body = JsonUtils.parseObject(new String(payload), Map.class); - } catch (Exception e) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体 JSON 格式错误"); - return; - } - // 1.2 解析参数 - String clientId = MapUtil.getStr(body, "clientId"); - if (StrUtil.isEmpty(clientId)) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "clientId 不能为空"); - return; - } - String username = MapUtil.getStr(body, "username"); - if (StrUtil.isEmpty(username)) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "username 不能为空"); - return; - } - String password = MapUtil.getStr(body, "password"); - if (StrUtil.isEmpty(password)) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "password 不能为空"); - return; - } - - // 2.1 执行认证 - CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() - .setClientId(clientId).setUsername(username).setPassword(password)); - if (result.isError()) { - log.warn("[handle][认证失败,clientId: {}, 错误: {}]", clientId, result.getMsg()); - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败:" + result.getMsg()); - return; - } - if (!BooleanUtil.isTrue(result.getData())) { - log.warn("[handle][认证失败,clientId: {}]", clientId); - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败"); - return; - } - // 2.2 生成 Token - IotDeviceIdentity deviceInfo = deviceTokenService.parseUsername(username); - Assert.notNull(deviceInfo, "设备信息不能为空"); - String token = deviceTokenService.createToken(deviceInfo.getProductKey(), deviceInfo.getDeviceName()); - Assert.notBlank(token, "生成 token 不能为空"); - - // 3. 执行上线 - IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline(); - deviceMessageService.sendDeviceMessage(message, - deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId()); - - // 4. 返回成功响应 - log.info("[handle][认证成功,productKey: {}, deviceName: {}]", - deviceInfo.getProductKey(), deviceInfo.getDeviceName()); - IotCoapUtils.respondSuccess(exchange, MapUtil.of("token", token)); - } catch (Exception e) { - log.error("[handle][认证处理异常]", e); - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误"); - } - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapRegisterHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapRegisterHandler.java deleted file mode 100644 index 08f6cca7a1..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapRegisterHandler.java +++ /dev/null @@ -1,97 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router; - -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; -import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; -import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.util.IotCoapUtils; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.californium.core.coap.CoAP; -import org.eclipse.californium.core.server.resources.CoapExchange; - -import java.util.Map; - -/** - * IoT 网关 CoAP 协议的【设备动态注册】处理器 - *

- * 用于直连设备/网关的一型一密动态注册,不需要认证 - * - * @author 芋道源码 - * @see 阿里云 - 一型一密 - */ -@Slf4j -public class IotCoapRegisterHandler { - - private final IotDeviceCommonApi deviceApi; - - public IotCoapRegisterHandler() { - this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); - } - - /** - * 处理设备动态注册请求 - * - * @param exchange CoAP 交换对象 - */ - @SuppressWarnings("unchecked") - public void handle(CoapExchange exchange) { - try { - // 1.1 解析请求体 - byte[] payload = exchange.getRequestPayload(); - if (payload == null || payload.length == 0) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空"); - return; - } - Map body; - try { - body = JsonUtils.parseObject(new String(payload), Map.class); - } catch (Exception e) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体 JSON 格式错误"); - return; - } - - // 1.2 解析参数 - String productKey = MapUtil.getStr(body, "productKey"); - if (StrUtil.isEmpty(productKey)) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "productKey 不能为空"); - return; - } - String deviceName = MapUtil.getStr(body, "deviceName"); - if (StrUtil.isEmpty(deviceName)) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "deviceName 不能为空"); - return; - } - String productSecret = MapUtil.getStr(body, "productSecret"); - if (StrUtil.isEmpty(productSecret)) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "productSecret 不能为空"); - return; - } - - // 2. 调用动态注册 - IotDeviceRegisterReqDTO reqDTO = new IotDeviceRegisterReqDTO() - .setProductKey(productKey) - .setDeviceName(deviceName) - .setProductSecret(productSecret); - CommonResult result = deviceApi.registerDevice(reqDTO); - if (result.isError()) { - log.warn("[handle][设备动态注册失败,productKey: {}, deviceName: {}, 错误: {}]", - productKey, deviceName, result.getMsg()); - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, - "设备动态注册失败:" + result.getMsg()); - return; - } - - // 3. 返回成功响应 - log.info("[handle][设备动态注册成功,productKey: {}, deviceName: {}]", productKey, deviceName); - IotCoapUtils.respondSuccess(exchange, result.getData()); - } catch (Exception e) { - log.error("[handle][设备动态注册处理异常]", e); - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误"); - } - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamHandler.java deleted file mode 100644 index d33eb464bb..0000000000 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/router/IotCoapUpstreamHandler.java +++ /dev/null @@ -1,110 +0,0 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router; - -import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.text.StrPool; -import cn.hutool.core.util.ArrayUtil; -import cn.hutool.core.util.ObjUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; -import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.coap.util.IotCoapUtils; -import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; -import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.californium.core.coap.CoAP; -import org.eclipse.californium.core.server.resources.CoapExchange; - -import java.util.List; - -/** - * IoT 网关 CoAP 协议的【上行】处理器 - * - * 处理设备通过 CoAP 协议发送的上行消息,包括: - * 1. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post - * 2. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post - * - * Token 通过自定义 CoAP Option 2088 携带 - * - * @author 芋道源码 - */ -@Slf4j -public class IotCoapUpstreamHandler { - - private final IotDeviceTokenService deviceTokenService; - private final IotDeviceMessageService deviceMessageService; - - public IotCoapUpstreamHandler() { - this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class); - this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); - } - - /** - * 处理 CoAP 请求 - * - * @param exchange CoAP 交换对象 - * @param protocol 协议对象 - */ - public void handle(CoapExchange exchange, IotCoapUpstreamProtocol protocol) { - try { - // 1. 解析通用参数 - List uriPath = exchange.getRequestOptions().getUriPath(); - String productKey = CollUtil.get(uriPath, 2); - String deviceName = CollUtil.get(uriPath, 3); - byte[] payload = exchange.getRequestPayload(); - if (StrUtil.isEmpty(productKey)) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "productKey 不能为空"); - return; - } - if (StrUtil.isEmpty(deviceName)) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "deviceName 不能为空"); - return; - } - if (ArrayUtil.isEmpty(payload)) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空"); - return; - } - - // 2. 认证:从自定义 Option 获取 token - String token = IotCoapUtils.getTokenFromOption(exchange, IotCoapUtils.OPTION_TOKEN); - if (StrUtil.isEmpty(token)) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "token 不能为空"); - return; - } - // 验证 token - IotDeviceIdentity deviceInfo = deviceTokenService.verifyToken(token); - if (deviceInfo == null) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "token 无效或已过期"); - return; - } - // 验证设备信息匹配 - if (ObjUtil.notEqual(productKey, deviceInfo.getProductKey()) - || ObjUtil.notEqual(deviceName, deviceInfo.getDeviceName())) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.FORBIDDEN, "设备信息与 token 不匹配"); - return; - } - - // 2.1 解析 method:deviceName 后面的路径,用 . 拼接 - // 路径格式:[topic, sys, productKey, deviceName, thing, property, post] - String method = String.join(StrPool.DOT, uriPath.subList(4, uriPath.size())); - - // 2.2 解码消息 - IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName); - if (ObjUtil.notEqual(method, message.getMethod())) { - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "method 不匹配"); - return; - } - // 2.3 发送消息到消息总线 - deviceMessageService.sendDeviceMessage(message, productKey, deviceName, protocol.getServerId()); - - // 3. 返回成功响应 - IotCoapUtils.respondSuccess(exchange, MapUtil.of("messageId", message.getId())); - } catch (Exception e) { - log.error("[handle][CoAP 请求处理异常]", e); - IotCoapUtils.respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误"); - } - } - -} diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/util/IotCoapUtils.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/util/IotCoapUtils.java index 9d5cdf3ffb..58e34a2fa8 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/util/IotCoapUtils.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/util/IotCoapUtils.java @@ -1,14 +1,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.coap.util; -import cn.hutool.core.collection.CollUtil; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import org.eclipse.californium.core.coap.CoAP; -import org.eclipse.californium.core.coap.MediaTypeRegistry; -import org.eclipse.californium.core.coap.Option; -import org.eclipse.californium.core.server.resources.CoapExchange; - -import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*; +import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapAbstractHandler; /** * IoT CoAP 协议工具类 @@ -22,63 +14,6 @@ public class IotCoapUtils { *

* CoAP Option 范围 2048-65535 属于实验/自定义范围 */ - public static final int OPTION_TOKEN = 2088; - - /** - * 返回成功响应 - * - * @param exchange CoAP 交换对象 - * @param data 响应数据 - */ - public static void respondSuccess(CoapExchange exchange, Object data) { - CommonResult result = CommonResult.success(data); - String json = JsonUtils.toJsonString(result); - exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON); - } - - /** - * 返回错误响应 - * - * @param exchange CoAP 交换对象 - * @param code CoAP 响应码 - * @param message 错误消息 - */ - public static void respondError(CoapExchange exchange, CoAP.ResponseCode code, String message) { - int errorCode = mapCoapCodeToErrorCode(code); - CommonResult result = CommonResult.error(errorCode, message); - String json = JsonUtils.toJsonString(result); - exchange.respond(code, json, MediaTypeRegistry.APPLICATION_JSON); - } - - /** - * 从自定义 CoAP Option 中获取 Token - * - * @param exchange CoAP 交换对象 - * @param optionNumber Option 编号 - * @return Token 值,如果不存在则返回 null - */ - public static String getTokenFromOption(CoapExchange exchange, int optionNumber) { - Option option = CollUtil.findOne(exchange.getRequestOptions().getOthers(), - o -> o.getNumber() == optionNumber); - return option != null ? new String(option.getValue()) : null; - } - - /** - * 将 CoAP 响应码映射到业务错误码 - * - * @param code CoAP 响应码 - * @return 业务错误码 - */ - public static int mapCoapCodeToErrorCode(CoAP.ResponseCode code) { - if (code == CoAP.ResponseCode.BAD_REQUEST) { - return BAD_REQUEST.getCode(); - } else if (code == CoAP.ResponseCode.UNAUTHORIZED) { - return UNAUTHORIZED.getCode(); - } else if (code == CoAP.ResponseCode.FORBIDDEN) { - return FORBIDDEN.getCode(); - } else { - return INTERNAL_SERVER_ERROR.getCode(); - } - } + public static final int OPTION_TOKEN = IotCoapAbstractHandler.OPTION_TOKEN; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/router/IotHttpAbstractHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpAbstractHandler.java similarity index 84% rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/router/IotHttpAbstractHandler.java rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpAbstractHandler.java index dbc93a927e..c403ee973f 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/router/IotHttpAbstractHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpAbstractHandler.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.module.iot.gateway.protocol.http.router; +package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream; import cn.hutool.core.lang.Assert; import cn.hutool.core.util.ArrayUtil; @@ -10,9 +10,6 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.framework.common.util.object.ObjectUtils; import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; -import cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpAuthHandler; -import cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpRegisterHandler; -import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager; import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; import io.vertx.core.Handler; import io.vertx.core.http.HttpHeaders; @@ -20,8 +17,7 @@ import io.vertx.ext.web.RoutingContext; import lombok.extern.slf4j.Slf4j; import org.springframework.http.MediaType; -import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.FORBIDDEN; -import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR; +import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*; import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException; @@ -35,8 +31,6 @@ public abstract class IotHttpAbstractHandler implements Handler private final IotDeviceTokenService deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class); - private final IotMessageSerializerManager serializerManager = SpringUtil.getBean(IotMessageSerializerManager.class); - @Override public final void handle(RoutingContext context) { try { @@ -47,15 +41,31 @@ public abstract class IotHttpAbstractHandler implements Handler CommonResult result = handle0(context); writeResponse(context, result); } catch (ServiceException e) { + // 已知异常,返回对应的错误码和错误信息 writeResponse(context, CommonResult.error(e.getCode(), e.getMessage())); + } catch (IllegalArgumentException e) { + // 参数校验异常,返回 400 错误 + writeResponse(context, CommonResult.error(BAD_REQUEST.getCode(), e.getMessage())); } catch (Exception e) { + // 其他未知异常,返回 500 错误 log.error("[handle][path({}) 处理异常]", context.request().path(), e); writeResponse(context, CommonResult.error(INTERNAL_SERVER_ERROR)); } } + /** + * 处理 HTTP 请求(子类实现) + * + * @param context RoutingContext 对象 + * @return 处理结果 + */ protected abstract CommonResult handle0(RoutingContext context); + /** + * 前置处理:认证等 + * + * @param context RoutingContext 对象 + */ private void beforeHandle(RoutingContext context) { // 如果不需要认证,则不走前置处理 String path = context.request().path(); @@ -102,7 +112,7 @@ public abstract class IotHttpAbstractHandler implements Handler } @SuppressWarnings("deprecation") - public static void writeResponse(RoutingContext context, Object data) { + public static void writeResponse(RoutingContext context, CommonResult data) { context.response() .setStatusCode(200) .putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE) diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpAuthHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpAuthHandler.java index 0cc8e35554..21aa5a8fb4 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpAuthHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpAuthHandler.java @@ -3,7 +3,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream; import cn.hutool.core.lang.Assert; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.BooleanUtil; -import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; @@ -11,13 +10,11 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAbstractHandler; import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.ext.web.RoutingContext; import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; -import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException; import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL; @@ -48,23 +45,19 @@ public class IotHttpAuthHandler extends IotHttpAbstractHandler { } @Override + @SuppressWarnings("DuplicatedCode") public CommonResult handle0(RoutingContext context) { // 1. 解析参数 IotDeviceAuthReqDTO request = deserializeRequest(context, IotDeviceAuthReqDTO.class); - if (StrUtil.isEmpty(request.getClientId())) { - throw invalidParamException("clientId 不能为空"); - } - if (StrUtil.isEmpty(request.getUsername())) { - throw invalidParamException("username 不能为空"); - } - if (StrUtil.isEmpty(request.getPassword())) { - throw invalidParamException("password 不能为空"); - } + Assert.notNull(request, "请求参数不能为空"); + Assert.notBlank(request.getClientId(), "clientId 不能为空"); + Assert.notBlank(request.getUsername(), "username 不能为空"); + Assert.notBlank(request.getPassword(), "password 不能为空"); // 2.1 执行认证 CommonResult result = deviceApi.authDevice(request); result.checkError(); - if (!BooleanUtil.isTrue(result.getData())) { + if (BooleanUtil.isFalse(result.getData())) { throw exception(DEVICE_AUTH_FAIL); } // 2.2 生成 Token diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpRegisterHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpRegisterHandler.java index ec3bd54b4e..08c60f3c9d 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpRegisterHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpRegisterHandler.java @@ -1,15 +1,13 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream; -import cn.hutool.core.util.StrUtil; +import cn.hutool.core.lang.Assert; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO; import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO; -import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAbstractHandler; import io.vertx.ext.web.RoutingContext; -import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException; import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; /** @@ -34,15 +32,10 @@ public class IotHttpRegisterHandler extends IotHttpAbstractHandler { public CommonResult handle0(RoutingContext context) { // 1. 解析参数 IotDeviceRegisterReqDTO request = deserializeRequest(context, IotDeviceRegisterReqDTO.class); - if (StrUtil.isEmpty(request.getProductKey())) { - throw invalidParamException("productKey 不能为空"); - } - if (StrUtil.isEmpty(request.getDeviceName())) { - throw invalidParamException("deviceName 不能为空"); - } - if (StrUtil.isEmpty(request.getProductSecret())) { - throw invalidParamException("productSecret 不能为空"); - } + Assert.notNull(request, "请求参数不能为空"); + Assert.notBlank(request.getProductKey(), "productKey 不能为空"); + Assert.notBlank(request.getDeviceName(), "deviceName 不能为空"); + Assert.notBlank(request.getProductSecret(), "productSecret 不能为空"); // 2. 调用动态注册 CommonResult result = deviceApi.registerDevice(request); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpRegisterSubHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpRegisterSubHandler.java index 914de1b795..46932204db 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpRegisterSubHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpRegisterSubHandler.java @@ -1,19 +1,17 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream; -import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.lang.Assert; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; import cn.iocoder.yudao.module.iot.core.biz.dto.IotSubDeviceRegisterFullReqDTO; import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO; import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO; -import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAbstractHandler; import io.vertx.ext.web.RoutingContext; import lombok.Data; import java.util.List; -import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException; import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; /** @@ -46,9 +44,8 @@ public class IotHttpRegisterSubHandler extends IotHttpAbstractHandler { String deviceName = context.pathParam("deviceName"); // 1.2 解析子设备列表 SubDeviceRegisterRequest request = deserializeRequest(context, SubDeviceRegisterRequest.class); - if (CollUtil.isEmpty(request.getParams())) { - throw invalidParamException("params 不能为空"); - } + Assert.notNull(request, "请求参数不能为空"); + Assert.notEmpty(request.getParams(), "params 不能为空"); // 2. 调用子设备动态注册 IotSubDeviceRegisterFullReqDTO reqDTO = new IotSubDeviceRegisterFullReqDTO() diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpUpstreamHandler.java index d7f307bb8d..aa408dc79b 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/http/handler/upstream/IotHttpUpstreamHandler.java @@ -7,7 +7,6 @@ import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol; -import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAbstractHandler; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.ext.web.RoutingContext; import lombok.extern.slf4j.Slf4j; @@ -33,15 +32,16 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler { @Override protected CommonResult handle0(RoutingContext context) { - // 1. 解析通用参数 + // 1.1 解析通用参数 String productKey = context.pathParam("productKey"); String deviceName = context.pathParam("deviceName"); String method = context.pathParam("*").replaceAll(StrPool.SLASH, StrPool.DOT); - - // 2.1 根据 Content-Type 反序列化消息 + // 1.2 根据 Content-Type 反序列化消息 IotDeviceMessage message = deserializeRequest(context, IotDeviceMessage.class); + Assert.notNull(message, "请求参数不能为空"); Assert.equals(method, message.getMethod(), "method 不匹配"); - // 2.2 发送消息 + + // 2. 发送消息 deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/upstream/IotTcpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/upstream/IotTcpUpstreamHandler.java index fe3a38fe50..45cc3e3ffa 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/upstream/IotTcpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/tcp/handler/upstream/IotTcpUpstreamHandler.java @@ -1,7 +1,9 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.upstream; +import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.exception.ServiceException; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; @@ -26,7 +28,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.util.Assert; import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*; -import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.UNAUTHORIZED; +import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; +import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL; /** * TCP 上行消息处理器 @@ -132,6 +135,12 @@ public class IotTcpUpstreamHandler implements Handler { // 业务消息 handleBusinessRequest(clientId, message, socket); } + } catch (ServiceException e) { + // 业务异常,返回对应的错误码和错误信息 + log.warn("[processMessage][业务异常,客户端 ID: {},错误: {}]", clientId, e.getMessage()); + String requestId = message != null ? message.getRequestId() : null; + String method = message != null ? message.getMethod() : null; + sendErrorResponse(socket, requestId, method, e.getCode(), e.getMessage()); } catch (IllegalArgumentException e) { // 参数校验失败,返回 400 log.warn("[processMessage][参数校验失败,客户端 ID: {},错误: {}]", clientId, e.getMessage()); @@ -166,10 +175,9 @@ public class IotTcpUpstreamHandler implements Handler { // 2.1 执行认证 CommonResult authResult = deviceApi.authDevice(authParams); - if (authResult.isError()) { - log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]", clientId, authParams.getUsername()); - sendErrorResponse(socket, message.getRequestId(), AUTH_METHOD, authResult.getCode(), authResult.getMsg()); - return; + authResult.checkError(); + if (BooleanUtil.isFalse(authResult.getData())) { + throw exception(DEVICE_AUTH_FAIL); } // 2.2 解析设备信息 IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername()); @@ -205,12 +213,7 @@ public class IotTcpUpstreamHandler implements Handler { // 2. 调用动态注册 CommonResult result = deviceApi.registerDevice(params); - if (result.isError()) { - log.warn("[handleRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg()); - sendErrorResponse(socket, message.getRequestId(), IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), - result.getCode(), result.getMsg()); - return; - } + result.checkError(); // 3. 发送成功响应 sendSuccessResponse(socket, message.getRequestId(), diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/handler/upstream/IotUdpUpstreamHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/handler/upstream/IotUdpUpstreamHandler.java index 3e1a4fe143..dd41a52527 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/handler/upstream/IotUdpUpstreamHandler.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/handler/upstream/IotUdpUpstreamHandler.java @@ -1,9 +1,12 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp.handler.upstream; +import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; +import cn.iocoder.yudao.framework.common.exception.ServiceException; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi; @@ -27,10 +30,11 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.util.Assert; import java.net.InetSocketAddress; -import java.time.LocalDateTime; import java.util.Map; import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*; +import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; +import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL; /** * UDP 上行消息处理器 @@ -87,7 +91,7 @@ public class IotUdpUpstreamHandler { this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class); } - // TODO @AI:vertx 有 udp 的实现么? + // TODO done @AI:vertx 有 udp 的实现么?当前已使用 Vert.x DatagramSocket 实现 /** * 处理 UDP 数据包 * @@ -99,18 +103,7 @@ public class IotUdpUpstreamHandler { Buffer data = packet.data(); String addressKey = sessionManager.buildAddressKey(senderAddress); log.debug("[handle][收到 UDP 数据包,来源: {},数据长度: {} 字节]", addressKey, data.length()); - try { - processMessage(data, senderAddress, socket); - } catch (IllegalArgumentException e) { - // 参数校验失败,返回 400 - log.warn("[handle][参数校验失败,来源: {},错误: {}]", addressKey, e.getMessage()); - sendErrorResponse(socket, senderAddress, null, null, BAD_REQUEST.getCode(), e.getMessage()); - } catch (Exception e) { - // 其他异常,返回 500 - log.error("[handle][处理消息失败,来源: {}]", addressKey, e); - sendErrorResponse(socket, senderAddress, null, null, - INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg()); - } + processMessage(data, senderAddress, socket); } /** @@ -121,6 +114,7 @@ public class IotUdpUpstreamHandler { * @param socket UDP Socket */ private void processMessage(Buffer buffer, InetSocketAddress senderAddress, DatagramSocket socket) { + String addressKey = sessionManager.buildAddressKey(senderAddress); // 1.1 基础检查 if (ArrayUtil.isEmpty(buffer)) { return; @@ -133,15 +127,35 @@ public class IotUdpUpstreamHandler { } // 2. 根据消息类型路由处理 - if (AUTH_METHOD.equals(message.getMethod())) { - // 认证请求 - handleAuthenticationRequest(message, senderAddress, socket); - } else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) { - // 设备动态注册请求 - handleRegisterRequest(message, senderAddress, socket); - } else { - // 业务消息 - handleBusinessRequest(message, senderAddress, socket); + try { + if (AUTH_METHOD.equals(message.getMethod())) { + // 认证请求 + handleAuthenticationRequest(message, senderAddress, socket); + } else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) { + // 设备动态注册请求 + handleRegisterRequest(message, senderAddress, socket); + } else { + // 业务消息 + handleBusinessRequest(message, senderAddress, socket); + } + } catch (ServiceException e) { + // 业务异常,返回对应的错误码和错误信息 + log.warn("[processMessage][业务异常,来源: {},requestId: {},method: {},错误: {}]", + addressKey, message.getRequestId(), message.getMethod(), e.getMessage()); + sendErrorResponse(socket, senderAddress, message.getRequestId(), message.getMethod(), + e.getCode(), e.getMessage()); + } catch (IllegalArgumentException e) { + // 参数校验失败,返回 400 + log.warn("[processMessage][参数校验失败,来源: {},requestId: {},method: {},错误: {}]", + addressKey, message.getRequestId(), message.getMethod(), e.getMessage()); + sendErrorResponse(socket, senderAddress, message.getRequestId(), message.getMethod(), + BAD_REQUEST.getCode(), e.getMessage()); + } catch (Exception e) { + // 其他异常,返回 500 + log.error("[processMessage][处理消息失败,来源: {},requestId: {},method: {}]", + addressKey, message.getRequestId(), message.getMethod(), e); + sendErrorResponse(socket, senderAddress, message.getRequestId(), message.getMethod(), + INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg()); } } @@ -164,12 +178,9 @@ public class IotUdpUpstreamHandler { // 2.1 执行认证 CommonResult authResult = deviceApi.authDevice(authParams); - if (authResult.isError()) { - log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]", - clientId, authParams.getUsername()); - sendErrorResponse(socket, senderAddress, message.getRequestId(), AUTH_METHOD, - authResult.getCode(), authResult.getMsg()); - return; + authResult.checkError(); + if (!BooleanUtil.isTrue(authResult.getData())) { + throw exception(DEVICE_AUTH_FAIL); } // 2.2 解析设备信息 IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername()); @@ -187,7 +198,8 @@ public class IotUdpUpstreamHandler { // 4.2 发送上线消息 sendOnlineMessage(device); // 4.3 发送成功响应(包含 token) - sendAuthSuccessResponse(socket, senderAddress, message.getRequestId(), token); + sendSuccessResponse(socket, senderAddress, message.getRequestId(), AUTH_METHOD, + MapUtil.of("token", token)); log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {},来源: {}]", device.getId(), device.getDeviceName(), sessionManager.buildAddressKey(senderAddress)); } @@ -211,13 +223,7 @@ public class IotUdpUpstreamHandler { // 2. 调用动态注册 CommonResult result = deviceApi.registerDevice(params); - if (result.isError()) { - log.warn("[handleRegisterRequest][注册失败,来源: {},错误: {}]", - sessionManager.buildAddressKey(senderAddress), result.getMsg()); - sendErrorResponse(socket, senderAddress, message.getRequestId(), - IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), result.getCode(), result.getMsg()); - return; - } + result.checkError(); // 3. 发送成功响应 sendSuccessResponse(socket, senderAddress, message.getRequestId(), @@ -274,17 +280,8 @@ public class IotUdpUpstreamHandler { return; } - // 2. 更新会话活跃时间和地址 - // TODO @AI:是不是合并到 sessionManager 里面更好? - IotUdpSessionManager.SessionInfo sessionInfo = sessionManager.getSessionInfo(device.getId()); - if (sessionInfo != null) { - // 检查地址是否变化,变化则更新 - if (!senderAddress.equals(sessionInfo.getAddress())) { - sessionManager.updateSessionAddress(device.getId(), senderAddress); - } else { - sessionManager.updateSessionActivity(device.getId()); - } - } + // 2. 更新会话地址(如有变化) + sessionManager.updateSessionAddress(device.getId(), senderAddress); // 3. 将 body 设置为实际的 params,发送消息到消息总线 message.setParams(body); @@ -306,8 +303,7 @@ public class IotUdpUpstreamHandler { .setDeviceId(device.getId()) .setProductKey(device.getProductKey()) .setDeviceName(device.getDeviceName()) - .setAddress(address) - .setLastActiveTime(LocalDateTime.now()); + .setAddress(address); sessionManager.registerSession(device.getId(), sessionInfo); } @@ -324,21 +320,6 @@ public class IotUdpUpstreamHandler { // ===================== 发送响应消息 ===================== - /** - * 发送认证成功响应(包含 token) - * - * @param socket UDP Socket - * @param address 目标地址 - * @param requestId 请求 ID - * @param token JWT Token - */ - private void sendAuthSuccessResponse(DatagramSocket socket, InetSocketAddress address, - String requestId, String token) { - IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, token, - SUCCESS.getCode(), null); - writeResponse(socket, address, responseMessage); - } - /** * 发送成功响应 * diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java index 5280250331..8195c99961 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/udp/manager/IotUdpSessionManager.java @@ -1,25 +1,23 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager; +import cn.hutool.core.util.ObjUtil; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import io.vertx.core.buffer.Buffer; import io.vertx.core.datagram.DatagramSocket; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; /** * IoT 网关 UDP 会话管理器 *

- * 统一管理 UDP 会话的认证状态、设备会话和消息发送功能: - * 1. 管理 UDP 会话的认证状态 - * 2. 管理设备会话和在线状态 - * 3. 管理消息发送到设备 + * 基于 Guava Cache 实现会话的自动过期清理: + * 1. 管理设备会话信息(设备 ID -> 地址映射) + * 2. 自动清理超时会话(expireAfterAccess) + * 3. 限制最大会话数(maximumSize) * * @author 芋道源码 */ @@ -27,109 +25,76 @@ import java.util.concurrent.ConcurrentHashMap; public class IotUdpSessionManager { /** - * 最大会话数 + * 设备会话缓存:设备 ID -> 会话信息 + *

+ * 使用 Guava Cache 自动管理过期:expireAfterAccess:每次访问(get/put)自动刷新过期时间 */ + private final Cache deviceSessionCache; + private final int maxSessions; - /** - * 设备 ID -> 会话信息 - */ - private final Map deviceSessionMap = new ConcurrentHashMap<>(); - - /** - * 设备地址 Key -> 设备 ID(反向映射,用于清理时同步) - */ - // TODO @AI:1)这个变量是否必须?2)unregisterSession 这个方法是否必须? - private final Map addressDeviceMap = new ConcurrentHashMap<>(); - - public IotUdpSessionManager(int maxSessions) { + public IotUdpSessionManager(int maxSessions, long sessionTimeoutMs) { this.maxSessions = maxSessions; + this.deviceSessionCache = CacheBuilder.newBuilder() + .maximumSize(maxSessions) + .expireAfterAccess(sessionTimeoutMs, TimeUnit.MILLISECONDS) + .build(); } /** - * 注册设备会话(包含认证信息) + * 注册设备会话 * * @param deviceId 设备 ID * @param sessionInfo 会话信息 */ public void registerSession(Long deviceId, SessionInfo sessionInfo) { - // 检查会话数是否已达上限 - if (deviceSessionMap.size() >= maxSessions) { + // 检查是否为新设备,且会话数已达上限 + if (deviceSessionCache.getIfPresent(deviceId) == null + && deviceSessionCache.size() >= maxSessions) { throw new IllegalStateException("会话数已达上限: " + maxSessions); } - // 如果设备已有其他会话,先清理旧会话 - SessionInfo oldSessionInfo = deviceSessionMap.get(deviceId); - if (oldSessionInfo != null) { - String oldAddressKey = buildAddressKey(oldSessionInfo.getAddress()); - addressDeviceMap.remove(oldAddressKey, deviceId); - log.info("[registerSession][设备已有其他会话,清理旧会话,设备 ID: {},旧地址: {}]", - deviceId, oldAddressKey); - } - - // 注册新会话 - String addressKey = buildAddressKey(sessionInfo.getAddress()); - deviceSessionMap.put(deviceId, sessionInfo); - addressDeviceMap.put(addressKey, deviceId); - log.info("[registerSession][注册设备会话,设备 ID: {},地址: {},product key: {},device name: {}]", - deviceId, addressKey, sessionInfo.getProductKey(), sessionInfo.getDeviceName()); + // 注册会话 + deviceSessionCache.put(deviceId, sessionInfo); + log.info("[registerSession][注册设备会话,设备 ID: {},地址: {},productKey: {},deviceName: {}]", + deviceId, buildAddressKey(sessionInfo.getAddress()), + sessionInfo.getProductKey(), sessionInfo.getDeviceName()); } /** - * 注销设备会话 + * 获取会话信息 + *

+ * 注意:调用此方法会自动刷新会话的过期时间 * * @param deviceId 设备 ID + * @return 会话信息,不存在则返回 null */ - public void unregisterSession(Long deviceId) { - SessionInfo sessionInfo = deviceSessionMap.remove(deviceId); - if (sessionInfo == null) { - return; - } - String addressKey = buildAddressKey(sessionInfo.getAddress()); - // 仅当 addressDeviceMap 中的 deviceId 是当前 deviceId 时才移除,避免误删新会话 - addressDeviceMap.remove(addressKey, deviceId); - log.info("[unregisterSession][注销设备会话,设备 ID: {},地址: {}]", deviceId, addressKey); - } - - /** - * 更新会话活跃时间(每次收到上行消息时调用) - * - * @param deviceId 设备 ID - */ - public void updateSessionActivity(Long deviceId) { - SessionInfo sessionInfo = deviceSessionMap.get(deviceId); - if (sessionInfo != null) { - sessionInfo.setLastActiveTime(LocalDateTime.now()); - } + public SessionInfo getSession(Long deviceId) { + return deviceSessionCache.getIfPresent(deviceId); } /** * 更新设备会话地址(设备地址变更时调用) + *

+ * 注意:getIfPresent 已自动刷新过期时间,无需重新 put * * @param deviceId 设备 ID * @param newAddress 新地址 */ public void updateSessionAddress(Long deviceId, InetSocketAddress newAddress) { - SessionInfo sessionInfo = deviceSessionMap.get(deviceId); + // 地址未变化,无需更新 + SessionInfo sessionInfo = deviceSessionCache.getIfPresent(deviceId); if (sessionInfo == null) { return; } - // 清理旧地址映射 + if (ObjUtil.equals(newAddress, sessionInfo.getAddress())) { + return; + } + + // 更新地址 String oldAddressKey = buildAddressKey(sessionInfo.getAddress()); - addressDeviceMap.remove(oldAddressKey, deviceId); - - // 更新新地址 - String newAddressKey = buildAddressKey(newAddress); sessionInfo.setAddress(newAddress); - sessionInfo.setLastActiveTime(LocalDateTime.now()); - addressDeviceMap.put(newAddressKey, deviceId); - log.debug("[updateSessionAddress][更新设备地址,设备 ID: {},新地址: {}]", deviceId, newAddressKey); - } - - /** - * 获取会话信息 - */ - public SessionInfo getSessionInfo(Long deviceId) { - return deviceSessionMap.get(deviceId); + log.debug("[updateSessionAddress][更新设备地址,设备 ID: {},旧地址: {},新地址: {}]", + deviceId, oldAddressKey, buildAddressKey(newAddress)); } /** @@ -141,7 +106,7 @@ public class IotUdpSessionManager { * @return 是否发送成功 */ public boolean sendToDevice(Long deviceId, byte[] data, DatagramSocket socket) { - SessionInfo sessionInfo = deviceSessionMap.get(deviceId); + SessionInfo sessionInfo = deviceSessionCache.getIfPresent(deviceId); if (sessionInfo == null || sessionInfo.getAddress() == null) { log.warn("[sendToDevice][设备会话不存在,设备 ID: {}]", deviceId); return false; @@ -165,38 +130,7 @@ public class IotUdpSessionManager { } /** - * 定期清理不活跃的设备会话 - * - * @param timeoutMs 超时时间(毫秒) - * @return 清理的设备 ID 列表(用于发送离线消息) - */ - public List cleanExpiredSessions(long timeoutMs) { - List offlineDeviceIds = new ArrayList<>(); - LocalDateTime now = LocalDateTime.now(); - LocalDateTime expireTime = now.minusNanos(timeoutMs * 1_000_000); - Iterator> iterator = deviceSessionMap.entrySet().iterator(); - // TODO @AI:改成 for each 会不会更好? - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - SessionInfo sessionInfo = entry.getValue(); - // 未过期,跳过 - if (sessionInfo.getLastActiveTime().isAfter(expireTime)) { - continue; - } - // 过期处理:记录离线设备 ID - Long deviceId = entry.getKey(); - String addressKey = buildAddressKey(sessionInfo.getAddress()); - addressDeviceMap.remove(addressKey, deviceId); - offlineDeviceIds.add(deviceId); - log.debug("[cleanExpiredSessions][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}]", - deviceId, addressKey, sessionInfo.getLastActiveTime()); - iterator.remove(); - } - return offlineDeviceIds; - } - - /** - * 构建地址 Key + * 构建地址 Key(用于日志输出) * * @param address 地址 * @return 地址 Key @@ -206,7 +140,7 @@ public class IotUdpSessionManager { } /** - * 会话信息(包含认证信息) + * 会话信息 */ @Data public static class SessionInfo { @@ -228,10 +162,6 @@ public class IotUdpSessionManager { * 设备地址 */ private InetSocketAddress address; - /** - * 最后活跃时间 - */ - private LocalDateTime lastActiveTime; } diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml index 7691fc9b4a..ddc353f399 100644 --- a/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml +++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/resources/application.yaml @@ -81,13 +81,38 @@ yudao: - id: udp-json type: udp port: 8093 - enabled: true + enabled: false serialize: json udp: max-sessions: 1000 # 最大会话数 session-timeout-ms: 60000 # 会话超时时间(毫秒),基于 Guava Cache 自动过期 receive-buffer-size: 65536 # 接收缓冲区大小(字节) send-buffer-size: 65536 # 发送缓冲区大小(字节) + # ==================================== + # 针对引入的 WebSocket 组件的配置 + # ==================================== + - id: websocket-json + type: websocket + port: 8094 + enabled: false + serialize: json + websocket: + path: /ws + max-message-size: 65536 # 最大消息大小(字节,默认 64KB) + max-frame-size: 65536 # 最大帧大小(字节,默认 64KB) + idle-timeout-seconds: 60 # 空闲超时时间(秒,默认 60) + ssl-enabled: false # 是否启用 SSL(wss://) + # ==================================== + # 针对引入的 CoAP 组件的配置 + # ==================================== + - id: coap-json + type: coap + port: 5683 + enabled: true + coap: + max-message-size: 1024 # 最大消息大小(字节) + ack-timeout-ms: 2000 # ACK 超时时间(毫秒) + max-retransmit: 4 # 最大重传次数 # 协议配置(旧版,保持兼容) protocol: @@ -134,27 +159,6 @@ yudao: max-message-size: 8192 connect-timeout-seconds: 60 ssl-enabled: false - # ==================================== - # 针对引入的 CoAP 组件的配置 - # ==================================== - coap: - enabled: false # 是否启用 CoAP 协议 - port: 5683 # CoAP 服务端口(默认 5683) - max-message-size: 1024 # 最大消息大小(字节) - ack-timeout: 2000 # ACK 超时时间(毫秒) - max-retransmit: 4 # 最大重传次数 - # ==================================== - # 针对引入的 WebSocket 组件的配置 - # ==================================== - websocket: - enabled: false # 是否启用 WebSocket 协议 - port: 8094 # WebSocket 服务端口(默认 8094) - path: /ws # WebSocket 路径(默认 /ws) - max-message-size: 65536 # 最大消息大小(字节,默认 64KB) - max-frame-size: 65536 # 最大帧大小(字节,默认 64KB) - idle-timeout-seconds: 60 # 空闲超时时间(秒,默认 60) - ssl-enabled: false # 是否启用 SSL(wss://) - --- #################### 日志相关配置 #################### # 基础日志配置