mirror of
https://gitee.com/zhijiantianya/ruoyi-vue-pro.git
synced 2026-03-22 05:07:17 +08:00
feat(iot):【协议改造】coap 初步改造(100%)
This commit is contained in:
@@ -2,8 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.config;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
|
||||
@@ -11,12 +9,7 @@ import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttDownstreamSubscr
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.manager.IotMqttConnectionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.manager.IotWebSocketConnectionManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.router.IotWebSocketDownstreamHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.core.Vertx;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -111,63 +104,4 @@ public class IotGatewayConfiguration {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议配置类
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.coap", name = "enabled", havingValue = "true")
|
||||
@Slf4j
|
||||
public static class CoapProtocolConfiguration {
|
||||
|
||||
@Bean
|
||||
public IotCoapUpstreamProtocol iotCoapUpstreamProtocol(IotGatewayProperties gatewayProperties) {
|
||||
return new IotCoapUpstreamProtocol(gatewayProperties.getProtocol().getCoap());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IotCoapDownstreamSubscriber iotCoapDownstreamSubscriber(IotCoapUpstreamProtocol coapUpstreamProtocol,
|
||||
IotMessageBus messageBus) {
|
||||
return new IotCoapDownstreamSubscriber(coapUpstreamProtocol, messageBus);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* IoT 网关 WebSocket 协议配置类
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.websocket", name = "enabled", havingValue = "true")
|
||||
@Slf4j
|
||||
public static class WebSocketProtocolConfiguration {
|
||||
|
||||
@Bean(name = "websocketVertx", destroyMethod = "close")
|
||||
public Vertx websocketVertx() {
|
||||
return Vertx.vertx();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IotWebSocketUpstreamProtocol iotWebSocketUpstreamProtocol(IotGatewayProperties gatewayProperties,
|
||||
IotDeviceService deviceService,
|
||||
IotDeviceMessageService messageService,
|
||||
IotWebSocketConnectionManager connectionManager,
|
||||
@Qualifier("websocketVertx") Vertx websocketVertx) {
|
||||
return new IotWebSocketUpstreamProtocol(gatewayProperties.getProtocol().getWebsocket(),
|
||||
deviceService, messageService, connectionManager, websocketVertx);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IotWebSocketDownstreamHandler iotWebSocketDownstreamHandler(IotDeviceMessageService messageService,
|
||||
IotWebSocketConnectionManager connectionManager) {
|
||||
return new IotWebSocketDownstreamHandler(messageService, connectionManager);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IotWebSocketDownstreamSubscriber iotWebSocketDownstreamSubscriber(IotWebSocketUpstreamProtocol protocolHandler,
|
||||
IotWebSocketDownstreamHandler downstreamHandler,
|
||||
IotMessageBus messageBus) {
|
||||
return new IotWebSocketDownstreamSubscriber(protocolHandler, downstreamHandler, messageBus);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.config;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapConfig;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpConfig;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpConfig;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpConfig;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketConfig;
|
||||
import io.vertx.core.net.KeyCertOptions;
|
||||
import io.vertx.core.net.TrustOptions;
|
||||
import jakarta.validation.Valid;
|
||||
@@ -90,16 +92,6 @@ public class IotGatewayProperties {
|
||||
*/
|
||||
private MqttProperties mqtt;
|
||||
|
||||
/**
|
||||
* CoAP 组件配置
|
||||
*/
|
||||
private CoapProperties coap;
|
||||
|
||||
/**
|
||||
* WebSocket 组件配置
|
||||
*/
|
||||
private WebSocketProperties websocket;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
@@ -344,93 +336,6 @@ public class IotGatewayProperties {
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class CoapProperties {
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
@NotNull(message = "是否开启不能为空")
|
||||
private Boolean enabled;
|
||||
|
||||
/**
|
||||
* 服务端口(CoAP 默认端口 5683)
|
||||
*/
|
||||
@NotNull(message = "服务端口不能为空")
|
||||
private Integer port = 5683;
|
||||
|
||||
/**
|
||||
* 最大消息大小(字节)
|
||||
*/
|
||||
@NotNull(message = "最大消息大小不能为空")
|
||||
private Integer maxMessageSize = 1024;
|
||||
|
||||
/**
|
||||
* ACK 超时时间(毫秒)
|
||||
*/
|
||||
@NotNull(message = "ACK 超时时间不能为空")
|
||||
private Integer ackTimeout = 2000;
|
||||
|
||||
/**
|
||||
* 最大重传次数
|
||||
*/
|
||||
@NotNull(message = "最大重传次数不能为空")
|
||||
private Integer maxRetransmit = 4;
|
||||
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class WebSocketProperties {
|
||||
|
||||
/**
|
||||
* 是否开启
|
||||
*/
|
||||
@NotNull(message = "是否开启不能为空")
|
||||
private Boolean enabled;
|
||||
|
||||
/**
|
||||
* 服务器端口(默认:8094)
|
||||
*/
|
||||
private Integer port = 8094;
|
||||
|
||||
/**
|
||||
* WebSocket 路径(默认:/ws)
|
||||
*/
|
||||
@NotEmpty(message = "WebSocket 路径不能为空")
|
||||
private String path = "/ws";
|
||||
|
||||
/**
|
||||
* 最大消息大小(字节,默认 64KB)
|
||||
*/
|
||||
private Integer maxMessageSize = 65536;
|
||||
|
||||
/**
|
||||
* 最大帧大小(字节,默认 64KB)
|
||||
*/
|
||||
private Integer maxFrameSize = 65536;
|
||||
|
||||
/**
|
||||
* 空闲超时时间(秒,默认 60)
|
||||
*/
|
||||
private Integer idleTimeoutSeconds = 60;
|
||||
|
||||
/**
|
||||
* 是否启用 SSL(wss://)
|
||||
*/
|
||||
private Boolean sslEnabled = false;
|
||||
|
||||
/**
|
||||
* SSL 证书路径
|
||||
*/
|
||||
private String sslCertPath;
|
||||
|
||||
/**
|
||||
* SSL 私钥路径
|
||||
*/
|
||||
private String sslKeyPath;
|
||||
|
||||
}
|
||||
|
||||
// TODO @AI:【暂时忽略】改成 ProtocolProperties
|
||||
/**
|
||||
* 协议实例配置
|
||||
@@ -489,6 +394,18 @@ public class IotGatewayProperties {
|
||||
@Valid
|
||||
private IotUdpConfig udp;
|
||||
|
||||
/**
|
||||
* CoAP 协议配置
|
||||
*/
|
||||
@Valid
|
||||
private IotCoapConfig coap;
|
||||
|
||||
/**
|
||||
* WebSocket 协议配置
|
||||
*/
|
||||
@Valid
|
||||
private IotWebSocketConfig websocket;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -4,9 +4,11 @@ import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.BooleanUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.tcp.IotTcpProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.udp.IotUdpProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.websocket.IotWebSocketUpstreamProtocol;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
@@ -100,6 +102,10 @@ public class IotProtocolManager implements SmartLifecycle {
|
||||
return createTcpProtocol(config);
|
||||
case UDP:
|
||||
return createUdpProtocol(config);
|
||||
case COAP:
|
||||
return createCoapProtocol(config);
|
||||
case WEBSOCKET:
|
||||
return createWebSocketProtocol(config);
|
||||
default:
|
||||
throw new IllegalArgumentException(String.format(
|
||||
"[createProtocol][协议实例 %s 的协议类型 %s 暂不支持]", config.getId(), protocolType));
|
||||
@@ -136,4 +142,24 @@ public class IotProtocolManager implements SmartLifecycle {
|
||||
return new IotUdpProtocol(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 CoAP 协议实例
|
||||
*
|
||||
* @param config 协议实例配置
|
||||
* @return CoAP 协议实例
|
||||
*/
|
||||
private IotCoapProtocol createCoapProtocol(IotGatewayProperties.ProtocolInstanceProperties config) {
|
||||
return new IotCoapProtocol(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 WebSocket 协议实例
|
||||
*
|
||||
* @param config 协议实例配置
|
||||
* @return WebSocket 协议实例
|
||||
*/
|
||||
private IotWebSocketUpstreamProtocol createWebSocketProtocol(IotGatewayProperties.ProtocolInstanceProperties config) {
|
||||
return new IotWebSocketUpstreamProtocol(config);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
|
||||
|
||||
import jakarta.validation.constraints.Min;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* IoT CoAP 协议配置
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Data
|
||||
public class IotCoapConfig {
|
||||
|
||||
/**
|
||||
* 最大消息大小(字节)
|
||||
*/
|
||||
@NotNull(message = "最大消息大小不能为空")
|
||||
@Min(value = 64, message = "最大消息大小必须大于 64 字节")
|
||||
private Integer maxMessageSize = 1024;
|
||||
|
||||
/**
|
||||
* ACK 超时时间(毫秒)
|
||||
*/
|
||||
@NotNull(message = "ACK 超时时间不能为空")
|
||||
@Min(value = 100, message = "ACK 超时时间必须大于 100 毫秒")
|
||||
private Integer ackTimeoutMs = 2000;
|
||||
|
||||
/**
|
||||
* 最大重传次数
|
||||
*/
|
||||
@NotNull(message = "最大重传次数不能为空")
|
||||
@Min(value = 0, message = "最大重传次数必须大于等于 0")
|
||||
private Integer maxRetransmit = 4;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,173 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
|
||||
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolInstanceProperties;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.downstream.IotCoapDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapAuthHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapAuthResource;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapRegisterHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapRegisterResource;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapRegisterSubHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapRegisterSubResource;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapUpstreamHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapUpstreamTopicResource;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.CoapResource;
|
||||
import org.eclipse.californium.core.CoapServer;
|
||||
import org.eclipse.californium.core.config.CoapConfig;
|
||||
import org.eclipse.californium.elements.config.Configuration;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* IoT CoAP 协议实现
|
||||
* <p>
|
||||
* 基于 Eclipse Californium 实现,支持:
|
||||
* 1. 认证:POST /auth
|
||||
* 2. 设备动态注册:POST /auth/register/device
|
||||
* 3. 子设备动态注册:POST /auth/register/sub-device/{productKey}/{deviceName}
|
||||
* 4. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post
|
||||
* 5. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotCoapProtocol implements IotProtocol {
|
||||
|
||||
/**
|
||||
* 协议配置
|
||||
*/
|
||||
private final ProtocolInstanceProperties properties;
|
||||
/**
|
||||
* 服务器 ID(用于消息追踪,全局唯一)
|
||||
*/
|
||||
@Getter
|
||||
private final String serverId;
|
||||
|
||||
/**
|
||||
* 运行状态
|
||||
*/
|
||||
@Getter
|
||||
private volatile boolean running = false;
|
||||
|
||||
/**
|
||||
* CoAP 服务器
|
||||
*/
|
||||
private CoapServer coapServer;
|
||||
|
||||
/**
|
||||
* 下行消息订阅者
|
||||
*/
|
||||
private final IotCoapDownstreamSubscriber downstreamSubscriber;
|
||||
|
||||
public IotCoapProtocol(ProtocolInstanceProperties properties) {
|
||||
IotCoapConfig coapConfig = properties.getCoap();
|
||||
Assert.notNull(coapConfig, "CoAP 协议配置(coap)不能为空");
|
||||
this.properties = properties;
|
||||
this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
|
||||
|
||||
// 初始化下行消息订阅者
|
||||
IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
|
||||
this.downstreamSubscriber = new IotCoapDownstreamSubscriber(this, messageBus);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return properties.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IotProtocolTypeEnum getType() {
|
||||
return IotProtocolTypeEnum.COAP;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (running) {
|
||||
log.warn("[start][IoT CoAP 协议 {} 已经在运行中]", getId());
|
||||
return;
|
||||
}
|
||||
|
||||
IotCoapConfig coapConfig = properties.getCoap();
|
||||
try {
|
||||
// 1.1 创建 CoAP 配置
|
||||
Configuration config = Configuration.createStandardWithoutFile();
|
||||
config.set(CoapConfig.COAP_PORT, properties.getPort());
|
||||
config.set(CoapConfig.MAX_MESSAGE_SIZE, coapConfig.getMaxMessageSize());
|
||||
config.set(CoapConfig.ACK_TIMEOUT, coapConfig.getAckTimeoutMs(), TimeUnit.MILLISECONDS);
|
||||
config.set(CoapConfig.MAX_RETRANSMIT, coapConfig.getMaxRetransmit());
|
||||
// 1.2 创建 CoAP 服务器
|
||||
coapServer = new CoapServer(config);
|
||||
|
||||
// 2.1 添加 /auth 认证资源
|
||||
IotCoapAuthHandler authHandler = new IotCoapAuthHandler(serverId);
|
||||
IotCoapAuthResource authResource = new IotCoapAuthResource(authHandler);
|
||||
coapServer.add(authResource);
|
||||
// 2.2 添加 /auth/register/device 设备动态注册资源(一型一密)
|
||||
IotCoapRegisterHandler registerHandler = new IotCoapRegisterHandler();
|
||||
IotCoapRegisterResource registerResource = new IotCoapRegisterResource(registerHandler);
|
||||
// 2.3 添加 /auth/register/sub-device/{productKey}/{deviceName} 子设备动态注册资源
|
||||
IotCoapRegisterSubHandler registerSubHandler = new IotCoapRegisterSubHandler();
|
||||
IotCoapRegisterSubResource registerSubResource = new IotCoapRegisterSubResource(registerSubHandler);
|
||||
authResource.add(new CoapResource("register") {{
|
||||
add(registerResource);
|
||||
add(registerSubResource);
|
||||
}});
|
||||
// 2.4 添加 /topic 根资源(用于上行消息)
|
||||
IotCoapUpstreamHandler upstreamHandler = new IotCoapUpstreamHandler(serverId);
|
||||
IotCoapUpstreamTopicResource topicResource = new IotCoapUpstreamTopicResource(serverId, upstreamHandler);
|
||||
coapServer.add(topicResource);
|
||||
|
||||
// 3. 启动服务器
|
||||
coapServer.start();
|
||||
running = true;
|
||||
log.info("[start][IoT CoAP 协议 {} 启动成功,端口:{},serverId:{}]",
|
||||
getId(), properties.getPort(), serverId);
|
||||
|
||||
// 4. 启动下行消息订阅者
|
||||
this.downstreamSubscriber.start();
|
||||
} catch (Exception e) {
|
||||
log.error("[start][IoT CoAP 协议 {} 启动失败]", getId(), e);
|
||||
if (coapServer != null) {
|
||||
coapServer.destroy();
|
||||
coapServer = null;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (!running) {
|
||||
return;
|
||||
}
|
||||
// 1. 停止下行消息订阅者
|
||||
try {
|
||||
downstreamSubscriber.stop();
|
||||
log.info("[stop][IoT CoAP 协议 {} 下行消息订阅者已停止]", getId());
|
||||
} catch (Exception e) {
|
||||
log.error("[stop][IoT CoAP 协议 {} 下行消息订阅者停止失败]", getId(), e);
|
||||
}
|
||||
|
||||
// 2. 关闭 CoAP 服务器
|
||||
if (coapServer != null) {
|
||||
try {
|
||||
coapServer.stop();
|
||||
coapServer.destroy();
|
||||
coapServer = null;
|
||||
log.info("[stop][IoT CoAP 协议 {} 服务器已停止]", getId());
|
||||
} catch (Exception e) {
|
||||
log.error("[stop][IoT CoAP 协议 {} 服务器停止失败]", getId(), e);
|
||||
}
|
||||
}
|
||||
running = false;
|
||||
log.info("[stop][IoT CoAP 协议 {} 已停止]", getId());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,120 +0,0 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthResource;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapRegisterHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapRegisterResource;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamTopicResource;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamHandler;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.CoapResource;
|
||||
import org.eclipse.californium.core.CoapServer;
|
||||
import org.eclipse.californium.core.config.CoapConfig;
|
||||
import org.eclipse.californium.elements.config.Configuration;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议:接收设备上行消息
|
||||
*
|
||||
* 基于 Eclipse Californium 实现,支持:
|
||||
* 1. 认证:POST /auth
|
||||
* 2. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post
|
||||
* 3. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotCoapUpstreamProtocol implements IotProtocol {
|
||||
|
||||
private static final String ID = "coap";
|
||||
|
||||
private final IotGatewayProperties.CoapProperties coapProperties;
|
||||
|
||||
private CoapServer coapServer;
|
||||
|
||||
@Getter
|
||||
private final String serverId;
|
||||
|
||||
private volatile boolean running = false;
|
||||
|
||||
public IotCoapUpstreamProtocol(IotGatewayProperties.CoapProperties coapProperties) {
|
||||
this.coapProperties = coapProperties;
|
||||
this.serverId = IotDeviceMessageUtils.generateServerId(coapProperties.getPort());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IotProtocolTypeEnum getType() {
|
||||
return IotProtocolTypeEnum.COAP;
|
||||
}
|
||||
|
||||
@Override
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
try {
|
||||
// 1.1 创建网络配置(Californium 3.x API)
|
||||
Configuration config = Configuration.createStandardWithoutFile();
|
||||
config.set(CoapConfig.COAP_PORT, coapProperties.getPort());
|
||||
config.set(CoapConfig.MAX_MESSAGE_SIZE, coapProperties.getMaxMessageSize());
|
||||
config.set(CoapConfig.ACK_TIMEOUT, coapProperties.getAckTimeout(), TimeUnit.MILLISECONDS);
|
||||
config.set(CoapConfig.MAX_RETRANSMIT, coapProperties.getMaxRetransmit());
|
||||
// 1.2 创建 CoAP 服务器
|
||||
coapServer = new CoapServer(config);
|
||||
|
||||
// 2.1 添加 /auth 认证资源
|
||||
IotCoapAuthHandler authHandler = new IotCoapAuthHandler();
|
||||
IotCoapAuthResource authResource = new IotCoapAuthResource(this, authHandler);
|
||||
coapServer.add(authResource);
|
||||
// 2.2 添加 /auth/register/device 设备动态注册资源(一型一密)
|
||||
IotCoapRegisterHandler registerHandler = new IotCoapRegisterHandler();
|
||||
IotCoapRegisterResource registerResource = new IotCoapRegisterResource(registerHandler);
|
||||
authResource.add(new CoapResource("register") {{
|
||||
add(registerResource);
|
||||
}});
|
||||
// 2.3 添加 /topic 根资源(用于上行消息)
|
||||
IotCoapUpstreamHandler upstreamHandler = new IotCoapUpstreamHandler();
|
||||
IotCoapUpstreamTopicResource topicResource = new IotCoapUpstreamTopicResource(this, upstreamHandler);
|
||||
coapServer.add(topicResource);
|
||||
|
||||
// 3. 启动服务器
|
||||
coapServer.start();
|
||||
running = true;
|
||||
log.info("[start][IoT 网关 CoAP 协议启动成功,端口:{},资源:/auth, /auth/register/device, /topic]", coapProperties.getPort());
|
||||
} catch (Exception e) {
|
||||
log.error("[start][IoT 网关 CoAP 协议启动失败]", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
if (coapServer != null) {
|
||||
try {
|
||||
coapServer.stop();
|
||||
running = false;
|
||||
log.info("[stop][IoT 网关 CoAP 协议已停止]");
|
||||
} catch (Exception e) {
|
||||
log.error("[stop][IoT 网关 CoAP 协议停止失败]", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,8 +1,9 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.downstream;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocolDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
@@ -13,7 +14,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
@Slf4j
|
||||
public class IotCoapDownstreamSubscriber extends IotProtocolDownstreamSubscriber {
|
||||
|
||||
public IotCoapDownstreamSubscriber(IotCoapUpstreamProtocol protocol, IotMessageBus messageBus) {
|
||||
public IotCoapDownstreamSubscriber(IotCoapProtocol protocol, IotMessageBus messageBus) {
|
||||
super(protocol, messageBus);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,186 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.ObjUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.exception.ServiceException;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.coap.CoAP;
|
||||
import org.eclipse.californium.core.coap.MediaTypeRegistry;
|
||||
import org.eclipse.californium.core.coap.Option;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*;
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议的处理器抽象基类:提供通用的前置处理(认证)、请求解析、响应处理、全局的异常捕获等
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class IotCoapAbstractHandler {
|
||||
|
||||
/**
|
||||
* 自定义 CoAP Option 编号,用于携带 Token
|
||||
* <p>
|
||||
* CoAP Option 范围 2048-65535 属于实验/自定义范围
|
||||
*/
|
||||
public static final int OPTION_TOKEN = 2088;
|
||||
|
||||
private final IotDeviceTokenService deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
|
||||
|
||||
/**
|
||||
* 处理 CoAP 请求(模板方法)
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
*/
|
||||
public final void handle(CoapExchange exchange) {
|
||||
try {
|
||||
// 1. 前置处理
|
||||
beforeHandle(exchange);
|
||||
|
||||
// 2. 执行业务逻辑
|
||||
CommonResult<Object> result = handle0(exchange);
|
||||
writeResponse(exchange, result);
|
||||
} catch (ServiceException e) {
|
||||
// 业务异常,返回对应的错误码和消息
|
||||
writeResponse(exchange, CommonResult.error(e.getCode(), e.getMessage()));
|
||||
} catch (IllegalArgumentException e) {
|
||||
// 参数校验异常(hutool Assert 抛出),返回 BAD_REQUEST
|
||||
writeResponse(exchange, CommonResult.error(BAD_REQUEST.getCode(), e.getMessage()));
|
||||
} catch (Exception e) {
|
||||
// 其他未知异常,返回 INTERNAL_SERVER_ERROR
|
||||
log.error("[handle][CoAP 请求处理异常]", e);
|
||||
writeResponse(exchange, CommonResult.error(INTERNAL_SERVER_ERROR));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 CoAP 请求(子类实现)
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @return 处理结果
|
||||
*/
|
||||
protected abstract CommonResult<Object> handle0(CoapExchange exchange);
|
||||
|
||||
/**
|
||||
* 前置处理:认证等
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
*/
|
||||
private void beforeHandle(CoapExchange exchange) {
|
||||
// 1.1 如果不需要认证,则不走前置处理
|
||||
if (!requiresAuthentication()) {
|
||||
return;
|
||||
}
|
||||
// 1.2 从自定义 Option 获取 token
|
||||
String token = getTokenFromOption(exchange);
|
||||
if (StrUtil.isEmpty(token)) {
|
||||
throw exception(UNAUTHORIZED);
|
||||
}
|
||||
// 1.3 校验 token
|
||||
IotDeviceIdentity deviceInfo = deviceTokenService.verifyToken(token);
|
||||
if (deviceInfo == null) {
|
||||
throw exception(UNAUTHORIZED);
|
||||
}
|
||||
|
||||
// 2.1 解析 productKey 和 deviceName
|
||||
List<String> uriPath = exchange.getRequestOptions().getUriPath();
|
||||
String productKey = getProductKey(uriPath);
|
||||
String deviceName = getDeviceName(uriPath);
|
||||
if (StrUtil.isEmpty(productKey) || StrUtil.isEmpty(deviceName)) {
|
||||
throw exception(BAD_REQUEST);
|
||||
}
|
||||
// 2.2 校验设备信息是否匹配
|
||||
if (ObjUtil.notEqual(productKey, deviceInfo.getProductKey())
|
||||
|| ObjUtil.notEqual(deviceName, deviceInfo.getDeviceName())) {
|
||||
throw exception(FORBIDDEN);
|
||||
}
|
||||
}
|
||||
|
||||
// ========== Token 相关方法 ==========
|
||||
|
||||
/**
|
||||
* 是否需要认证(子类可覆盖)
|
||||
* <p>
|
||||
* 默认不需要认证
|
||||
*
|
||||
* @return 是否需要认证
|
||||
*/
|
||||
protected boolean requiresAuthentication() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从 URI 路径中获取 productKey(子类实现)
|
||||
* <p>
|
||||
* 默认抛出异常,需要认证的子类必须实现此方法
|
||||
*
|
||||
* @param uriPath URI 路径
|
||||
* @return productKey
|
||||
*/
|
||||
protected String getProductKey(List<String> uriPath) {
|
||||
throw new UnsupportedOperationException("子类需要实现 getProductKey 方法");
|
||||
}
|
||||
|
||||
/**
|
||||
* 从 URI 路径中获取 deviceName(子类实现)
|
||||
* <p>
|
||||
* 默认抛出异常,需要认证的子类必须实现此方法
|
||||
*
|
||||
* @param uriPath URI 路径
|
||||
* @return deviceName
|
||||
*/
|
||||
protected String getDeviceName(List<String> uriPath) {
|
||||
throw new UnsupportedOperationException("子类需要实现 getDeviceName 方法");
|
||||
}
|
||||
|
||||
/**
|
||||
* 从自定义 CoAP Option 中获取 Token
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @return Token 值,如果不存在则返回 null
|
||||
*/
|
||||
protected String getTokenFromOption(CoapExchange exchange) {
|
||||
Option option = CollUtil.findOne(exchange.getRequestOptions().getOthers(),
|
||||
o -> o.getNumber() == OPTION_TOKEN);
|
||||
return option != null ? new String(option.getValue()) : null;
|
||||
}
|
||||
|
||||
// ========== 序列化相关方法 ==========
|
||||
|
||||
/**
|
||||
* 解析请求体为指定类型
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param clazz 目标类型
|
||||
* @param <T> 目标类型泛型
|
||||
* @return 解析后的对象,解析失败返回 null
|
||||
*/
|
||||
protected <T> T deserializeRequest(CoapExchange exchange, Class<T> clazz) {
|
||||
byte[] payload = exchange.getRequestPayload();
|
||||
if (ArrayUtil.isEmpty(payload)) {
|
||||
return null;
|
||||
}
|
||||
return JsonUtils.parseObject(payload, clazz);
|
||||
}
|
||||
|
||||
private static String serializeResponse(Object data) {
|
||||
return JsonUtils.toJsonString(data);
|
||||
}
|
||||
|
||||
protected void writeResponse(CoapExchange exchange, CommonResult<?> data) {
|
||||
String json = serializeResponse(data);
|
||||
exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.BooleanUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
|
||||
import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议的【认证】处理器
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotCoapAuthHandler extends IotCoapAbstractHandler {
|
||||
|
||||
private final String serverId;
|
||||
|
||||
private final IotDeviceTokenService deviceTokenService;
|
||||
private final IotDeviceCommonApi deviceApi;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
public IotCoapAuthHandler(String serverId) {
|
||||
this.serverId = serverId;
|
||||
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
|
||||
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("DuplicatedCode")
|
||||
protected CommonResult<Object> handle0(CoapExchange exchange) {
|
||||
// 1. 解析参数
|
||||
IotDeviceAuthReqDTO request = deserializeRequest(exchange, IotDeviceAuthReqDTO.class);
|
||||
Assert.notNull(request, "请求体不能为空");
|
||||
Assert.notBlank(request.getClientId(), "clientId 不能为空");
|
||||
Assert.notBlank(request.getUsername(), "username 不能为空");
|
||||
Assert.notBlank(request.getPassword(), "password 不能为空");
|
||||
|
||||
// 2.1 执行认证
|
||||
CommonResult<Boolean> result = deviceApi.authDevice(request);
|
||||
result.checkError();
|
||||
if (BooleanUtil.isFalse(result.getData())) {
|
||||
throw exception(DEVICE_AUTH_FAIL);
|
||||
}
|
||||
// 2.2 生成 Token
|
||||
IotDeviceIdentity deviceInfo = deviceTokenService.parseUsername(request.getUsername());
|
||||
Assert.notNull(deviceInfo, "设备信息不能为空");
|
||||
String token = deviceTokenService.createToken(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
|
||||
Assert.notBlank(token, "生成 token 不能为空");
|
||||
|
||||
// 3. 执行上线
|
||||
IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline();
|
||||
deviceMessageService.sendDeviceMessage(message,
|
||||
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), serverId);
|
||||
|
||||
// 4. 构建响应数据
|
||||
return CommonResult.success(MapUtil.of("token", token));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router;
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.CoapResource;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
@@ -17,13 +16,10 @@ public class IotCoapAuthResource extends CoapResource {
|
||||
|
||||
public static final String PATH = "auth";
|
||||
|
||||
private final IotCoapUpstreamProtocol protocol;
|
||||
private final IotCoapAuthHandler authHandler;
|
||||
|
||||
public IotCoapAuthResource(IotCoapUpstreamProtocol protocol,
|
||||
IotCoapAuthHandler authHandler) {
|
||||
public IotCoapAuthResource(IotCoapAuthHandler authHandler) {
|
||||
super(PATH);
|
||||
this.protocol = protocol;
|
||||
this.authHandler = authHandler;
|
||||
log.info("[IotCoapAuthResource][创建 CoAP 认证资源: /{}]", PATH);
|
||||
}
|
||||
@@ -31,7 +27,7 @@ public class IotCoapAuthResource extends CoapResource {
|
||||
@Override
|
||||
public void handlePOST(CoapExchange exchange) {
|
||||
log.debug("[handlePOST][收到 /auth POST 请求]");
|
||||
authHandler.handle(exchange, protocol);
|
||||
authHandler.handle(exchange);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议的【设备动态注册】处理器
|
||||
* <p>
|
||||
* 用于直连设备/网关的一型一密动态注册,不需要认证
|
||||
*
|
||||
* @author 芋道源码
|
||||
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotCoapRegisterHandler extends IotCoapAbstractHandler {
|
||||
|
||||
private final IotDeviceCommonApi deviceApi;
|
||||
|
||||
public IotCoapRegisterHandler() {
|
||||
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CommonResult<Object> handle0(CoapExchange exchange) {
|
||||
// 1. 解析参数
|
||||
IotDeviceRegisterReqDTO request = deserializeRequest(exchange, IotDeviceRegisterReqDTO.class);
|
||||
Assert.notNull(request, "请求体不能为空");
|
||||
Assert.notBlank(request.getProductKey(), "productKey 不能为空");
|
||||
Assert.notBlank(request.getDeviceName(), "deviceName 不能为空");
|
||||
Assert.notBlank(request.getProductSecret(), "productSecret 不能为空");
|
||||
|
||||
// 2. 调用动态注册
|
||||
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(request);
|
||||
result.checkError();
|
||||
|
||||
// 3. 构建响应数据
|
||||
return CommonResult.success(result.getData());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router;
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.CoapResource;
|
||||
@@ -0,0 +1,84 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotSubDeviceRegisterFullReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议的【子设备动态注册】处理器
|
||||
* <p>
|
||||
* 用于子设备的动态注册,需要网关认证
|
||||
*
|
||||
* @author 芋道源码
|
||||
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/register-devices">阿里云 - 动态注册子设备</a>
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotCoapRegisterSubHandler extends IotCoapAbstractHandler {
|
||||
|
||||
private final IotDeviceCommonApi deviceApi;
|
||||
|
||||
public IotCoapRegisterSubHandler() {
|
||||
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("DuplicatedCode")
|
||||
protected CommonResult<Object> handle0(CoapExchange exchange) {
|
||||
// 1.1 解析通用参数(从 URI 路径获取网关设备信息)
|
||||
List<String> uriPath = exchange.getRequestOptions().getUriPath();
|
||||
String productKey = getProductKey(uriPath);
|
||||
String deviceName = getDeviceName(uriPath);
|
||||
// 1.2 解析子设备列表
|
||||
SubDeviceRegisterRequest request = deserializeRequest(exchange, SubDeviceRegisterRequest.class);
|
||||
Assert.notNull(request, "请求参数不能为空");
|
||||
Assert.notEmpty(request.getParams(), "params 不能为空");
|
||||
|
||||
// 2. 调用子设备动态注册
|
||||
IotSubDeviceRegisterFullReqDTO reqDTO = new IotSubDeviceRegisterFullReqDTO()
|
||||
.setGatewayProductKey(productKey)
|
||||
.setGatewayDeviceName(deviceName)
|
||||
.setSubDevices(request.getParams());
|
||||
CommonResult<List<IotSubDeviceRegisterRespDTO>> result = deviceApi.registerSubDevices(reqDTO);
|
||||
result.checkError();
|
||||
|
||||
// 3. 返回结果
|
||||
return success(result.getData());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean requiresAuthentication() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getProductKey(List<String> uriPath) {
|
||||
// 路径格式:/auth/register/sub-device/{productKey}/{deviceName}
|
||||
return CollUtil.get(uriPath, 3);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getDeviceName(List<String> uriPath) {
|
||||
// 路径格式:/auth/register/sub-device/{productKey}/{deviceName}
|
||||
return CollUtil.get(uriPath, 4);
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class SubDeviceRegisterRequest {
|
||||
|
||||
private List<IotSubDeviceRegisterReqDTO> params;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.CoapResource;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
import org.eclipse.californium.core.server.resources.Resource;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议的子设备动态注册资源(/auth/register/sub-device/{productKey}/{deviceName})
|
||||
* <p>
|
||||
* 用于子设备的动态注册,需要网关认证
|
||||
* <p>
|
||||
* 支持动态路径匹配:productKey 和 deviceName 是网关设备的标识
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotCoapRegisterSubResource extends CoapResource {
|
||||
|
||||
public static final String PATH = "sub-device";
|
||||
|
||||
private final IotCoapRegisterSubHandler registerSubHandler;
|
||||
|
||||
/**
|
||||
* 创建根资源(/auth/register/sub-device)
|
||||
*/
|
||||
public IotCoapRegisterSubResource(IotCoapRegisterSubHandler registerSubHandler) {
|
||||
this(PATH, registerSubHandler);
|
||||
log.info("[IotCoapRegisterSubResource][创建 CoAP 子设备动态注册资源: /auth/register/{}]", PATH);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建子资源(动态路径)
|
||||
*/
|
||||
private IotCoapRegisterSubResource(String name, IotCoapRegisterSubHandler registerSubHandler) {
|
||||
super(name);
|
||||
this.registerSubHandler = registerSubHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getChild(String name) {
|
||||
// 递归创建动态子资源,支持 /sub-device/{productKey}/{deviceName} 路径
|
||||
return new IotCoapRegisterSubResource(name, registerSubHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlePOST(CoapExchange exchange) {
|
||||
log.debug("[handlePOST][收到子设备动态注册请求]");
|
||||
registerSubHandler.handle(exchange);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.text.StrPool;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议的【上行】处理器
|
||||
*
|
||||
* 处理设备通过 CoAP 协议发送的上行消息,包括:
|
||||
* 1. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post
|
||||
* 2. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post
|
||||
*
|
||||
* Token 通过自定义 CoAP Option 2088 携带
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotCoapUpstreamHandler extends IotCoapAbstractHandler {
|
||||
|
||||
private final String serverId;
|
||||
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
public IotCoapUpstreamHandler(String serverId) {
|
||||
this.serverId = serverId;
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("DuplicatedCode")
|
||||
protected CommonResult<Object> handle0(CoapExchange exchange) {
|
||||
// 1.1 解析通用参数
|
||||
List<String> uriPath = exchange.getRequestOptions().getUriPath();
|
||||
String productKey = getProductKey(uriPath);
|
||||
String deviceName = getDeviceName(uriPath);
|
||||
String method = String.join(StrPool.DOT, uriPath.subList(4, uriPath.size()));
|
||||
// 1.2 解析消息
|
||||
IotDeviceMessage message = deserializeRequest(exchange, IotDeviceMessage.class);
|
||||
Assert.notNull(message, "请求参数不能为空");
|
||||
Assert.equals(method, message.getMethod(), "method 不匹配");
|
||||
|
||||
// 2. 发送消息
|
||||
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
|
||||
|
||||
// 3. 返回结果
|
||||
return CommonResult.success(MapUtil.of("messageId", message.getId()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean requiresAuthentication() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getProductKey(List<String> uriPath) {
|
||||
// 路径格式:/topic/sys/{productKey}/{deviceName}/...
|
||||
return CollUtil.get(uriPath, 2);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getDeviceName(List<String> uriPath) {
|
||||
// 路径格式:/topic/sys/{productKey}/{deviceName}/...
|
||||
return CollUtil.get(uriPath, 3);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router;
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.CoapResource;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
@@ -20,15 +19,15 @@ public class IotCoapUpstreamTopicResource extends CoapResource {
|
||||
|
||||
public static final String PATH = "topic";
|
||||
|
||||
private final IotCoapUpstreamProtocol protocol;
|
||||
private final String serverId;
|
||||
private final IotCoapUpstreamHandler upstreamHandler;
|
||||
|
||||
/**
|
||||
* 创建根资源(/topic)
|
||||
*/
|
||||
public IotCoapUpstreamTopicResource(IotCoapUpstreamProtocol protocol,
|
||||
public IotCoapUpstreamTopicResource(String serverId,
|
||||
IotCoapUpstreamHandler upstreamHandler) {
|
||||
this(PATH, protocol, upstreamHandler);
|
||||
this(PATH, serverId, upstreamHandler);
|
||||
log.info("[IotCoapUpstreamTopicResource][创建 CoAP 上行 Topic 资源: /{}]", PATH);
|
||||
}
|
||||
|
||||
@@ -36,32 +35,32 @@ public class IotCoapUpstreamTopicResource extends CoapResource {
|
||||
* 创建子资源(动态路径)
|
||||
*/
|
||||
private IotCoapUpstreamTopicResource(String name,
|
||||
IotCoapUpstreamProtocol protocol,
|
||||
String serverId,
|
||||
IotCoapUpstreamHandler upstreamHandler) {
|
||||
super(name);
|
||||
this.protocol = protocol;
|
||||
this.serverId = serverId;
|
||||
this.upstreamHandler = upstreamHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getChild(String name) {
|
||||
// 递归创建动态子资源,支持任意深度路径
|
||||
return new IotCoapUpstreamTopicResource(name, protocol, upstreamHandler);
|
||||
return new IotCoapUpstreamTopicResource(name, serverId, upstreamHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleGET(CoapExchange exchange) {
|
||||
upstreamHandler.handle(exchange, protocol);
|
||||
upstreamHandler.handle(exchange);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlePOST(CoapExchange exchange) {
|
||||
upstreamHandler.handle(exchange, protocol);
|
||||
upstreamHandler.handle(exchange);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlePUT(CoapExchange exchange) {
|
||||
upstreamHandler.handle(exchange, protocol);
|
||||
upstreamHandler.handle(exchange);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -2,12 +2,5 @@
|
||||
* CoAP 协议实现包
|
||||
* <p>
|
||||
* 提供基于 Eclipse Californium 的 IoT 设备连接和消息处理功能
|
||||
* <p>
|
||||
* URI 路径:
|
||||
* - 认证:POST /auth
|
||||
* - 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post
|
||||
* - 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post
|
||||
* <p>
|
||||
* Token 通过 CoAP Option 2088 携带
|
||||
*/
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
|
||||
|
||||
@@ -1,115 +0,0 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.BooleanUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.util.IotCoapUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.coap.CoAP;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议的【认证】处理器
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotCoapAuthHandler {
|
||||
|
||||
private final IotDeviceTokenService deviceTokenService;
|
||||
private final IotDeviceCommonApi deviceApi;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
public IotCoapAuthHandler() {
|
||||
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
|
||||
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理认证请求
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param protocol 协议对象
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void handle(CoapExchange exchange, IotCoapUpstreamProtocol protocol) {
|
||||
try {
|
||||
// 1.1 解析请求体
|
||||
byte[] payload = exchange.getRequestPayload();
|
||||
if (payload == null || payload.length == 0) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空");
|
||||
return;
|
||||
}
|
||||
Map<String, Object> body;
|
||||
try {
|
||||
body = JsonUtils.parseObject(new String(payload), Map.class);
|
||||
} catch (Exception e) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体 JSON 格式错误");
|
||||
return;
|
||||
}
|
||||
// 1.2 解析参数
|
||||
String clientId = MapUtil.getStr(body, "clientId");
|
||||
if (StrUtil.isEmpty(clientId)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "clientId 不能为空");
|
||||
return;
|
||||
}
|
||||
String username = MapUtil.getStr(body, "username");
|
||||
if (StrUtil.isEmpty(username)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "username 不能为空");
|
||||
return;
|
||||
}
|
||||
String password = MapUtil.getStr(body, "password");
|
||||
if (StrUtil.isEmpty(password)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "password 不能为空");
|
||||
return;
|
||||
}
|
||||
|
||||
// 2.1 执行认证
|
||||
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
|
||||
.setClientId(clientId).setUsername(username).setPassword(password));
|
||||
if (result.isError()) {
|
||||
log.warn("[handle][认证失败,clientId: {}, 错误: {}]", clientId, result.getMsg());
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败:" + result.getMsg());
|
||||
return;
|
||||
}
|
||||
if (!BooleanUtil.isTrue(result.getData())) {
|
||||
log.warn("[handle][认证失败,clientId: {}]", clientId);
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败");
|
||||
return;
|
||||
}
|
||||
// 2.2 生成 Token
|
||||
IotDeviceIdentity deviceInfo = deviceTokenService.parseUsername(username);
|
||||
Assert.notNull(deviceInfo, "设备信息不能为空");
|
||||
String token = deviceTokenService.createToken(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
|
||||
Assert.notBlank(token, "生成 token 不能为空");
|
||||
|
||||
// 3. 执行上线
|
||||
IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline();
|
||||
deviceMessageService.sendDeviceMessage(message,
|
||||
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
|
||||
|
||||
// 4. 返回成功响应
|
||||
log.info("[handle][认证成功,productKey: {}, deviceName: {}]",
|
||||
deviceInfo.getProductKey(), deviceInfo.getDeviceName());
|
||||
IotCoapUtils.respondSuccess(exchange, MapUtil.of("token", token));
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][认证处理异常]", e);
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,97 +0,0 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.util.IotCoapUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.coap.CoAP;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议的【设备动态注册】处理器
|
||||
* <p>
|
||||
* 用于直连设备/网关的一型一密动态注册,不需要认证
|
||||
*
|
||||
* @author 芋道源码
|
||||
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/unique-certificate-per-product-verification">阿里云 - 一型一密</a>
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotCoapRegisterHandler {
|
||||
|
||||
private final IotDeviceCommonApi deviceApi;
|
||||
|
||||
public IotCoapRegisterHandler() {
|
||||
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理设备动态注册请求
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void handle(CoapExchange exchange) {
|
||||
try {
|
||||
// 1.1 解析请求体
|
||||
byte[] payload = exchange.getRequestPayload();
|
||||
if (payload == null || payload.length == 0) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空");
|
||||
return;
|
||||
}
|
||||
Map<String, Object> body;
|
||||
try {
|
||||
body = JsonUtils.parseObject(new String(payload), Map.class);
|
||||
} catch (Exception e) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体 JSON 格式错误");
|
||||
return;
|
||||
}
|
||||
|
||||
// 1.2 解析参数
|
||||
String productKey = MapUtil.getStr(body, "productKey");
|
||||
if (StrUtil.isEmpty(productKey)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "productKey 不能为空");
|
||||
return;
|
||||
}
|
||||
String deviceName = MapUtil.getStr(body, "deviceName");
|
||||
if (StrUtil.isEmpty(deviceName)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "deviceName 不能为空");
|
||||
return;
|
||||
}
|
||||
String productSecret = MapUtil.getStr(body, "productSecret");
|
||||
if (StrUtil.isEmpty(productSecret)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "productSecret 不能为空");
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 调用动态注册
|
||||
IotDeviceRegisterReqDTO reqDTO = new IotDeviceRegisterReqDTO()
|
||||
.setProductKey(productKey)
|
||||
.setDeviceName(deviceName)
|
||||
.setProductSecret(productSecret);
|
||||
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(reqDTO);
|
||||
if (result.isError()) {
|
||||
log.warn("[handle][设备动态注册失败,productKey: {}, deviceName: {}, 错误: {}]",
|
||||
productKey, deviceName, result.getMsg());
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST,
|
||||
"设备动态注册失败:" + result.getMsg());
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 返回成功响应
|
||||
log.info("[handle][设备动态注册成功,productKey: {}, deviceName: {}]", productKey, deviceName);
|
||||
IotCoapUtils.respondSuccess(exchange, result.getData());
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][设备动态注册处理异常]", e);
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,110 +0,0 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.text.StrPool;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.ObjUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.util.IotCoapUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.coap.CoAP;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议的【上行】处理器
|
||||
*
|
||||
* 处理设备通过 CoAP 协议发送的上行消息,包括:
|
||||
* 1. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post
|
||||
* 2. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post
|
||||
*
|
||||
* Token 通过自定义 CoAP Option 2088 携带
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotCoapUpstreamHandler {
|
||||
|
||||
private final IotDeviceTokenService deviceTokenService;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
public IotCoapUpstreamHandler() {
|
||||
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 CoAP 请求
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param protocol 协议对象
|
||||
*/
|
||||
public void handle(CoapExchange exchange, IotCoapUpstreamProtocol protocol) {
|
||||
try {
|
||||
// 1. 解析通用参数
|
||||
List<String> uriPath = exchange.getRequestOptions().getUriPath();
|
||||
String productKey = CollUtil.get(uriPath, 2);
|
||||
String deviceName = CollUtil.get(uriPath, 3);
|
||||
byte[] payload = exchange.getRequestPayload();
|
||||
if (StrUtil.isEmpty(productKey)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "productKey 不能为空");
|
||||
return;
|
||||
}
|
||||
if (StrUtil.isEmpty(deviceName)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "deviceName 不能为空");
|
||||
return;
|
||||
}
|
||||
if (ArrayUtil.isEmpty(payload)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空");
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 认证:从自定义 Option 获取 token
|
||||
String token = IotCoapUtils.getTokenFromOption(exchange, IotCoapUtils.OPTION_TOKEN);
|
||||
if (StrUtil.isEmpty(token)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "token 不能为空");
|
||||
return;
|
||||
}
|
||||
// 验证 token
|
||||
IotDeviceIdentity deviceInfo = deviceTokenService.verifyToken(token);
|
||||
if (deviceInfo == null) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "token 无效或已过期");
|
||||
return;
|
||||
}
|
||||
// 验证设备信息匹配
|
||||
if (ObjUtil.notEqual(productKey, deviceInfo.getProductKey())
|
||||
|| ObjUtil.notEqual(deviceName, deviceInfo.getDeviceName())) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.FORBIDDEN, "设备信息与 token 不匹配");
|
||||
return;
|
||||
}
|
||||
|
||||
// 2.1 解析 method:deviceName 后面的路径,用 . 拼接
|
||||
// 路径格式:[topic, sys, productKey, deviceName, thing, property, post]
|
||||
String method = String.join(StrPool.DOT, uriPath.subList(4, uriPath.size()));
|
||||
|
||||
// 2.2 解码消息
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
|
||||
if (ObjUtil.notEqual(method, message.getMethod())) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "method 不匹配");
|
||||
return;
|
||||
}
|
||||
// 2.3 发送消息到消息总线
|
||||
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, protocol.getServerId());
|
||||
|
||||
// 3. 返回成功响应
|
||||
IotCoapUtils.respondSuccess(exchange, MapUtil.of("messageId", message.getId()));
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][CoAP 请求处理异常]", e);
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,14 +1,6 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.util;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import org.eclipse.californium.core.coap.CoAP;
|
||||
import org.eclipse.californium.core.coap.MediaTypeRegistry;
|
||||
import org.eclipse.californium.core.coap.Option;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstrem.IotCoapAbstractHandler;
|
||||
|
||||
/**
|
||||
* IoT CoAP 协议工具类
|
||||
@@ -22,63 +14,6 @@ public class IotCoapUtils {
|
||||
* <p>
|
||||
* CoAP Option 范围 2048-65535 属于实验/自定义范围
|
||||
*/
|
||||
public static final int OPTION_TOKEN = 2088;
|
||||
|
||||
/**
|
||||
* 返回成功响应
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param data 响应数据
|
||||
*/
|
||||
public static void respondSuccess(CoapExchange exchange, Object data) {
|
||||
CommonResult<Object> result = CommonResult.success(data);
|
||||
String json = JsonUtils.toJsonString(result);
|
||||
exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON);
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回错误响应
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param code CoAP 响应码
|
||||
* @param message 错误消息
|
||||
*/
|
||||
public static void respondError(CoapExchange exchange, CoAP.ResponseCode code, String message) {
|
||||
int errorCode = mapCoapCodeToErrorCode(code);
|
||||
CommonResult<Object> result = CommonResult.error(errorCode, message);
|
||||
String json = JsonUtils.toJsonString(result);
|
||||
exchange.respond(code, json, MediaTypeRegistry.APPLICATION_JSON);
|
||||
}
|
||||
|
||||
/**
|
||||
* 从自定义 CoAP Option 中获取 Token
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param optionNumber Option 编号
|
||||
* @return Token 值,如果不存在则返回 null
|
||||
*/
|
||||
public static String getTokenFromOption(CoapExchange exchange, int optionNumber) {
|
||||
Option option = CollUtil.findOne(exchange.getRequestOptions().getOthers(),
|
||||
o -> o.getNumber() == optionNumber);
|
||||
return option != null ? new String(option.getValue()) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 CoAP 响应码映射到业务错误码
|
||||
*
|
||||
* @param code CoAP 响应码
|
||||
* @return 业务错误码
|
||||
*/
|
||||
public static int mapCoapCodeToErrorCode(CoAP.ResponseCode code) {
|
||||
if (code == CoAP.ResponseCode.BAD_REQUEST) {
|
||||
return BAD_REQUEST.getCode();
|
||||
} else if (code == CoAP.ResponseCode.UNAUTHORIZED) {
|
||||
return UNAUTHORIZED.getCode();
|
||||
} else if (code == CoAP.ResponseCode.FORBIDDEN) {
|
||||
return FORBIDDEN.getCode();
|
||||
} else {
|
||||
return INTERNAL_SERVER_ERROR.getCode();
|
||||
}
|
||||
}
|
||||
public static final int OPTION_TOKEN = IotCoapAbstractHandler.OPTION_TOKEN;
|
||||
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.http.router;
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
@@ -10,9 +10,6 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.framework.common.util.object.ObjectUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpAuthHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream.IotHttpRegisterHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.serialize.IotMessageSerializerManager;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.http.HttpHeaders;
|
||||
@@ -20,8 +17,7 @@ import io.vertx.ext.web.RoutingContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.MediaType;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.FORBIDDEN;
|
||||
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
|
||||
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*;
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException;
|
||||
|
||||
@@ -35,8 +31,6 @@ public abstract class IotHttpAbstractHandler implements Handler<RoutingContext>
|
||||
|
||||
private final IotDeviceTokenService deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
|
||||
|
||||
private final IotMessageSerializerManager serializerManager = SpringUtil.getBean(IotMessageSerializerManager.class);
|
||||
|
||||
@Override
|
||||
public final void handle(RoutingContext context) {
|
||||
try {
|
||||
@@ -47,15 +41,31 @@ public abstract class IotHttpAbstractHandler implements Handler<RoutingContext>
|
||||
CommonResult<Object> result = handle0(context);
|
||||
writeResponse(context, result);
|
||||
} catch (ServiceException e) {
|
||||
// 已知异常,返回对应的错误码和错误信息
|
||||
writeResponse(context, CommonResult.error(e.getCode(), e.getMessage()));
|
||||
} catch (IllegalArgumentException e) {
|
||||
// 参数校验异常,返回 400 错误
|
||||
writeResponse(context, CommonResult.error(BAD_REQUEST.getCode(), e.getMessage()));
|
||||
} catch (Exception e) {
|
||||
// 其他未知异常,返回 500 错误
|
||||
log.error("[handle][path({}) 处理异常]", context.request().path(), e);
|
||||
writeResponse(context, CommonResult.error(INTERNAL_SERVER_ERROR));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 HTTP 请求(子类实现)
|
||||
*
|
||||
* @param context RoutingContext 对象
|
||||
* @return 处理结果
|
||||
*/
|
||||
protected abstract CommonResult<Object> handle0(RoutingContext context);
|
||||
|
||||
/**
|
||||
* 前置处理:认证等
|
||||
*
|
||||
* @param context RoutingContext 对象
|
||||
*/
|
||||
private void beforeHandle(RoutingContext context) {
|
||||
// 如果不需要认证,则不走前置处理
|
||||
String path = context.request().path();
|
||||
@@ -102,7 +112,7 @@ public abstract class IotHttpAbstractHandler implements Handler<RoutingContext>
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public static void writeResponse(RoutingContext context, Object data) {
|
||||
public static void writeResponse(RoutingContext context, CommonResult<?> data) {
|
||||
context.response()
|
||||
.setStatusCode(200)
|
||||
.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
|
||||
@@ -3,7 +3,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.BooleanUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
@@ -11,13 +10,11 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAbstractHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException;
|
||||
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
|
||||
import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL;
|
||||
|
||||
@@ -48,23 +45,19 @@ public class IotHttpAuthHandler extends IotHttpAbstractHandler {
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("DuplicatedCode")
|
||||
public CommonResult<Object> handle0(RoutingContext context) {
|
||||
// 1. 解析参数
|
||||
IotDeviceAuthReqDTO request = deserializeRequest(context, IotDeviceAuthReqDTO.class);
|
||||
if (StrUtil.isEmpty(request.getClientId())) {
|
||||
throw invalidParamException("clientId 不能为空");
|
||||
}
|
||||
if (StrUtil.isEmpty(request.getUsername())) {
|
||||
throw invalidParamException("username 不能为空");
|
||||
}
|
||||
if (StrUtil.isEmpty(request.getPassword())) {
|
||||
throw invalidParamException("password 不能为空");
|
||||
}
|
||||
Assert.notNull(request, "请求参数不能为空");
|
||||
Assert.notBlank(request.getClientId(), "clientId 不能为空");
|
||||
Assert.notBlank(request.getUsername(), "username 不能为空");
|
||||
Assert.notBlank(request.getPassword(), "password 不能为空");
|
||||
|
||||
// 2.1 执行认证
|
||||
CommonResult<Boolean> result = deviceApi.authDevice(request);
|
||||
result.checkError();
|
||||
if (!BooleanUtil.isTrue(result.getData())) {
|
||||
if (BooleanUtil.isFalse(result.getData())) {
|
||||
throw exception(DEVICE_AUTH_FAIL);
|
||||
}
|
||||
// 2.2 生成 Token
|
||||
|
||||
@@ -1,15 +1,13 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotDeviceRegisterRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAbstractHandler;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException;
|
||||
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
|
||||
|
||||
/**
|
||||
@@ -34,15 +32,10 @@ public class IotHttpRegisterHandler extends IotHttpAbstractHandler {
|
||||
public CommonResult<Object> handle0(RoutingContext context) {
|
||||
// 1. 解析参数
|
||||
IotDeviceRegisterReqDTO request = deserializeRequest(context, IotDeviceRegisterReqDTO.class);
|
||||
if (StrUtil.isEmpty(request.getProductKey())) {
|
||||
throw invalidParamException("productKey 不能为空");
|
||||
}
|
||||
if (StrUtil.isEmpty(request.getDeviceName())) {
|
||||
throw invalidParamException("deviceName 不能为空");
|
||||
}
|
||||
if (StrUtil.isEmpty(request.getProductSecret())) {
|
||||
throw invalidParamException("productSecret 不能为空");
|
||||
}
|
||||
Assert.notNull(request, "请求参数不能为空");
|
||||
Assert.notBlank(request.getProductKey(), "productKey 不能为空");
|
||||
Assert.notBlank(request.getDeviceName(), "deviceName 不能为空");
|
||||
Assert.notBlank(request.getProductSecret(), "productSecret 不能为空");
|
||||
|
||||
// 2. 调用动态注册
|
||||
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(request);
|
||||
|
||||
@@ -1,19 +1,17 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.http.handler.upstream;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotSubDeviceRegisterFullReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.topic.auth.IotSubDeviceRegisterRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAbstractHandler;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.invalidParamException;
|
||||
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
|
||||
|
||||
/**
|
||||
@@ -46,9 +44,8 @@ public class IotHttpRegisterSubHandler extends IotHttpAbstractHandler {
|
||||
String deviceName = context.pathParam("deviceName");
|
||||
// 1.2 解析子设备列表
|
||||
SubDeviceRegisterRequest request = deserializeRequest(context, SubDeviceRegisterRequest.class);
|
||||
if (CollUtil.isEmpty(request.getParams())) {
|
||||
throw invalidParamException("params 不能为空");
|
||||
}
|
||||
Assert.notNull(request, "请求参数不能为空");
|
||||
Assert.notEmpty(request.getParams(), "params 不能为空");
|
||||
|
||||
// 2. 调用子设备动态注册
|
||||
IotSubDeviceRegisterFullReqDTO reqDTO = new IotSubDeviceRegisterFullReqDTO()
|
||||
|
||||
@@ -7,7 +7,6 @@ import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAbstractHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.ext.web.RoutingContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -33,15 +32,16 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
|
||||
|
||||
@Override
|
||||
protected CommonResult<Object> handle0(RoutingContext context) {
|
||||
// 1. 解析通用参数
|
||||
// 1.1 解析通用参数
|
||||
String productKey = context.pathParam("productKey");
|
||||
String deviceName = context.pathParam("deviceName");
|
||||
String method = context.pathParam("*").replaceAll(StrPool.SLASH, StrPool.DOT);
|
||||
|
||||
// 2.1 根据 Content-Type 反序列化消息
|
||||
// 1.2 根据 Content-Type 反序列化消息
|
||||
IotDeviceMessage message = deserializeRequest(context, IotDeviceMessage.class);
|
||||
Assert.notNull(message, "请求参数不能为空");
|
||||
Assert.equals(method, message.getMethod(), "method 不匹配");
|
||||
// 2.2 发送消息
|
||||
|
||||
// 2. 发送消息
|
||||
deviceMessageService.sendDeviceMessage(message,
|
||||
productKey, deviceName, serverId);
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp.handler.upstream;
|
||||
|
||||
import cn.hutool.core.util.BooleanUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.exception.ServiceException;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
@@ -26,7 +28,8 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*;
|
||||
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.UNAUTHORIZED;
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
|
||||
import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL;
|
||||
|
||||
/**
|
||||
* TCP 上行消息处理器
|
||||
@@ -132,6 +135,12 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
// 业务消息
|
||||
handleBusinessRequest(clientId, message, socket);
|
||||
}
|
||||
} catch (ServiceException e) {
|
||||
// 业务异常,返回对应的错误码和错误信息
|
||||
log.warn("[processMessage][业务异常,客户端 ID: {},错误: {}]", clientId, e.getMessage());
|
||||
String requestId = message != null ? message.getRequestId() : null;
|
||||
String method = message != null ? message.getMethod() : null;
|
||||
sendErrorResponse(socket, requestId, method, e.getCode(), e.getMessage());
|
||||
} catch (IllegalArgumentException e) {
|
||||
// 参数校验失败,返回 400
|
||||
log.warn("[processMessage][参数校验失败,客户端 ID: {},错误: {}]", clientId, e.getMessage());
|
||||
@@ -166,10 +175,9 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
|
||||
// 2.1 执行认证
|
||||
CommonResult<Boolean> authResult = deviceApi.authDevice(authParams);
|
||||
if (authResult.isError()) {
|
||||
log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]", clientId, authParams.getUsername());
|
||||
sendErrorResponse(socket, message.getRequestId(), AUTH_METHOD, authResult.getCode(), authResult.getMsg());
|
||||
return;
|
||||
authResult.checkError();
|
||||
if (BooleanUtil.isFalse(authResult.getData())) {
|
||||
throw exception(DEVICE_AUTH_FAIL);
|
||||
}
|
||||
// 2.2 解析设备信息
|
||||
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
|
||||
@@ -205,12 +213,7 @@ public class IotTcpUpstreamHandler implements Handler<NetSocket> {
|
||||
|
||||
// 2. 调用动态注册
|
||||
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(params);
|
||||
if (result.isError()) {
|
||||
log.warn("[handleRegisterRequest][注册失败,客户端 ID: {},错误: {}]", clientId, result.getMsg());
|
||||
sendErrorResponse(socket, message.getRequestId(), IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(),
|
||||
result.getCode(), result.getMsg());
|
||||
return;
|
||||
}
|
||||
result.checkError();
|
||||
|
||||
// 3. 发送成功响应
|
||||
sendSuccessResponse(socket, message.getRequestId(),
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.handler.upstream;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.BooleanUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.exception.ServiceException;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
@@ -27,10 +30,11 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*;
|
||||
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
|
||||
import static cn.iocoder.yudao.module.iot.gateway.enums.ErrorCodeConstants.DEVICE_AUTH_FAIL;
|
||||
|
||||
/**
|
||||
* UDP 上行消息处理器
|
||||
@@ -87,7 +91,7 @@ public class IotUdpUpstreamHandler {
|
||||
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
|
||||
}
|
||||
|
||||
// TODO @AI:vertx 有 udp 的实现么?
|
||||
// TODO done @AI:vertx 有 udp 的实现么?当前已使用 Vert.x DatagramSocket 实现
|
||||
/**
|
||||
* 处理 UDP 数据包
|
||||
*
|
||||
@@ -99,18 +103,7 @@ public class IotUdpUpstreamHandler {
|
||||
Buffer data = packet.data();
|
||||
String addressKey = sessionManager.buildAddressKey(senderAddress);
|
||||
log.debug("[handle][收到 UDP 数据包,来源: {},数据长度: {} 字节]", addressKey, data.length());
|
||||
try {
|
||||
processMessage(data, senderAddress, socket);
|
||||
} catch (IllegalArgumentException e) {
|
||||
// 参数校验失败,返回 400
|
||||
log.warn("[handle][参数校验失败,来源: {},错误: {}]", addressKey, e.getMessage());
|
||||
sendErrorResponse(socket, senderAddress, null, null, BAD_REQUEST.getCode(), e.getMessage());
|
||||
} catch (Exception e) {
|
||||
// 其他异常,返回 500
|
||||
log.error("[handle][处理消息失败,来源: {}]", addressKey, e);
|
||||
sendErrorResponse(socket, senderAddress, null, null,
|
||||
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
|
||||
}
|
||||
processMessage(data, senderAddress, socket);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -121,6 +114,7 @@ public class IotUdpUpstreamHandler {
|
||||
* @param socket UDP Socket
|
||||
*/
|
||||
private void processMessage(Buffer buffer, InetSocketAddress senderAddress, DatagramSocket socket) {
|
||||
String addressKey = sessionManager.buildAddressKey(senderAddress);
|
||||
// 1.1 基础检查
|
||||
if (ArrayUtil.isEmpty(buffer)) {
|
||||
return;
|
||||
@@ -133,15 +127,35 @@ public class IotUdpUpstreamHandler {
|
||||
}
|
||||
|
||||
// 2. 根据消息类型路由处理
|
||||
if (AUTH_METHOD.equals(message.getMethod())) {
|
||||
// 认证请求
|
||||
handleAuthenticationRequest(message, senderAddress, socket);
|
||||
} else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) {
|
||||
// 设备动态注册请求
|
||||
handleRegisterRequest(message, senderAddress, socket);
|
||||
} else {
|
||||
// 业务消息
|
||||
handleBusinessRequest(message, senderAddress, socket);
|
||||
try {
|
||||
if (AUTH_METHOD.equals(message.getMethod())) {
|
||||
// 认证请求
|
||||
handleAuthenticationRequest(message, senderAddress, socket);
|
||||
} else if (IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod().equals(message.getMethod())) {
|
||||
// 设备动态注册请求
|
||||
handleRegisterRequest(message, senderAddress, socket);
|
||||
} else {
|
||||
// 业务消息
|
||||
handleBusinessRequest(message, senderAddress, socket);
|
||||
}
|
||||
} catch (ServiceException e) {
|
||||
// 业务异常,返回对应的错误码和错误信息
|
||||
log.warn("[processMessage][业务异常,来源: {},requestId: {},method: {},错误: {}]",
|
||||
addressKey, message.getRequestId(), message.getMethod(), e.getMessage());
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), message.getMethod(),
|
||||
e.getCode(), e.getMessage());
|
||||
} catch (IllegalArgumentException e) {
|
||||
// 参数校验失败,返回 400
|
||||
log.warn("[processMessage][参数校验失败,来源: {},requestId: {},method: {},错误: {}]",
|
||||
addressKey, message.getRequestId(), message.getMethod(), e.getMessage());
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), message.getMethod(),
|
||||
BAD_REQUEST.getCode(), e.getMessage());
|
||||
} catch (Exception e) {
|
||||
// 其他异常,返回 500
|
||||
log.error("[processMessage][处理消息失败,来源: {},requestId: {},method: {}]",
|
||||
addressKey, message.getRequestId(), message.getMethod(), e);
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), message.getMethod(),
|
||||
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,12 +178,9 @@ public class IotUdpUpstreamHandler {
|
||||
|
||||
// 2.1 执行认证
|
||||
CommonResult<Boolean> authResult = deviceApi.authDevice(authParams);
|
||||
if (authResult.isError()) {
|
||||
log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]",
|
||||
clientId, authParams.getUsername());
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(), AUTH_METHOD,
|
||||
authResult.getCode(), authResult.getMsg());
|
||||
return;
|
||||
authResult.checkError();
|
||||
if (!BooleanUtil.isTrue(authResult.getData())) {
|
||||
throw exception(DEVICE_AUTH_FAIL);
|
||||
}
|
||||
// 2.2 解析设备信息
|
||||
IotDeviceIdentity deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername());
|
||||
@@ -187,7 +198,8 @@ public class IotUdpUpstreamHandler {
|
||||
// 4.2 发送上线消息
|
||||
sendOnlineMessage(device);
|
||||
// 4.3 发送成功响应(包含 token)
|
||||
sendAuthSuccessResponse(socket, senderAddress, message.getRequestId(), token);
|
||||
sendSuccessResponse(socket, senderAddress, message.getRequestId(), AUTH_METHOD,
|
||||
MapUtil.of("token", token));
|
||||
log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {},来源: {}]",
|
||||
device.getId(), device.getDeviceName(), sessionManager.buildAddressKey(senderAddress));
|
||||
}
|
||||
@@ -211,13 +223,7 @@ public class IotUdpUpstreamHandler {
|
||||
|
||||
// 2. 调用动态注册
|
||||
CommonResult<IotDeviceRegisterRespDTO> result = deviceApi.registerDevice(params);
|
||||
if (result.isError()) {
|
||||
log.warn("[handleRegisterRequest][注册失败,来源: {},错误: {}]",
|
||||
sessionManager.buildAddressKey(senderAddress), result.getMsg());
|
||||
sendErrorResponse(socket, senderAddress, message.getRequestId(),
|
||||
IotDeviceMessageMethodEnum.DEVICE_REGISTER.getMethod(), result.getCode(), result.getMsg());
|
||||
return;
|
||||
}
|
||||
result.checkError();
|
||||
|
||||
// 3. 发送成功响应
|
||||
sendSuccessResponse(socket, senderAddress, message.getRequestId(),
|
||||
@@ -274,17 +280,8 @@ public class IotUdpUpstreamHandler {
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 更新会话活跃时间和地址
|
||||
// TODO @AI:是不是合并到 sessionManager 里面更好?
|
||||
IotUdpSessionManager.SessionInfo sessionInfo = sessionManager.getSessionInfo(device.getId());
|
||||
if (sessionInfo != null) {
|
||||
// 检查地址是否变化,变化则更新
|
||||
if (!senderAddress.equals(sessionInfo.getAddress())) {
|
||||
sessionManager.updateSessionAddress(device.getId(), senderAddress);
|
||||
} else {
|
||||
sessionManager.updateSessionActivity(device.getId());
|
||||
}
|
||||
}
|
||||
// 2. 更新会话地址(如有变化)
|
||||
sessionManager.updateSessionAddress(device.getId(), senderAddress);
|
||||
|
||||
// 3. 将 body 设置为实际的 params,发送消息到消息总线
|
||||
message.setParams(body);
|
||||
@@ -306,8 +303,7 @@ public class IotUdpUpstreamHandler {
|
||||
.setDeviceId(device.getId())
|
||||
.setProductKey(device.getProductKey())
|
||||
.setDeviceName(device.getDeviceName())
|
||||
.setAddress(address)
|
||||
.setLastActiveTime(LocalDateTime.now());
|
||||
.setAddress(address);
|
||||
sessionManager.registerSession(device.getId(), sessionInfo);
|
||||
}
|
||||
|
||||
@@ -324,21 +320,6 @@ public class IotUdpUpstreamHandler {
|
||||
|
||||
// ===================== 发送响应消息 =====================
|
||||
|
||||
/**
|
||||
* 发送认证成功响应(包含 token)
|
||||
*
|
||||
* @param socket UDP Socket
|
||||
* @param address 目标地址
|
||||
* @param requestId 请求 ID
|
||||
* @param token JWT Token
|
||||
*/
|
||||
private void sendAuthSuccessResponse(DatagramSocket socket, InetSocketAddress address,
|
||||
String requestId, String token) {
|
||||
IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, token,
|
||||
SUCCESS.getCode(), null);
|
||||
writeResponse(socket, address, responseMessage);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送成功响应
|
||||
*
|
||||
|
||||
@@ -1,25 +1,23 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.udp.manager;
|
||||
|
||||
import cn.hutool.core.util.ObjUtil;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.datagram.DatagramSocket;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* IoT 网关 UDP 会话管理器
|
||||
* <p>
|
||||
* 统一管理 UDP 会话的认证状态、设备会话和消息发送功能:
|
||||
* 1. 管理 UDP 会话的认证状态
|
||||
* 2. 管理设备会话和在线状态
|
||||
* 3. 管理消息发送到设备
|
||||
* 基于 Guava Cache 实现会话的自动过期清理:
|
||||
* 1. 管理设备会话信息(设备 ID -> 地址映射)
|
||||
* 2. 自动清理超时会话(expireAfterAccess)
|
||||
* 3. 限制最大会话数(maximumSize)
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@@ -27,109 +25,76 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
public class IotUdpSessionManager {
|
||||
|
||||
/**
|
||||
* 最大会话数
|
||||
* 设备会话缓存:设备 ID -> 会话信息
|
||||
* <p>
|
||||
* 使用 Guava Cache 自动管理过期:expireAfterAccess:每次访问(get/put)自动刷新过期时间
|
||||
*/
|
||||
private final Cache<Long, SessionInfo> deviceSessionCache;
|
||||
|
||||
private final int maxSessions;
|
||||
|
||||
/**
|
||||
* 设备 ID -> 会话信息
|
||||
*/
|
||||
private final Map<Long, SessionInfo> deviceSessionMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 设备地址 Key -> 设备 ID(反向映射,用于清理时同步)
|
||||
*/
|
||||
// TODO @AI:1)这个变量是否必须?2)unregisterSession 这个方法是否必须?
|
||||
private final Map<String, Long> addressDeviceMap = new ConcurrentHashMap<>();
|
||||
|
||||
public IotUdpSessionManager(int maxSessions) {
|
||||
public IotUdpSessionManager(int maxSessions, long sessionTimeoutMs) {
|
||||
this.maxSessions = maxSessions;
|
||||
this.deviceSessionCache = CacheBuilder.newBuilder()
|
||||
.maximumSize(maxSessions)
|
||||
.expireAfterAccess(sessionTimeoutMs, TimeUnit.MILLISECONDS)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册设备会话(包含认证信息)
|
||||
* 注册设备会话
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @param sessionInfo 会话信息
|
||||
*/
|
||||
public void registerSession(Long deviceId, SessionInfo sessionInfo) {
|
||||
// 检查会话数是否已达上限
|
||||
if (deviceSessionMap.size() >= maxSessions) {
|
||||
// 检查是否为新设备,且会话数已达上限
|
||||
if (deviceSessionCache.getIfPresent(deviceId) == null
|
||||
&& deviceSessionCache.size() >= maxSessions) {
|
||||
throw new IllegalStateException("会话数已达上限: " + maxSessions);
|
||||
}
|
||||
// 如果设备已有其他会话,先清理旧会话
|
||||
SessionInfo oldSessionInfo = deviceSessionMap.get(deviceId);
|
||||
if (oldSessionInfo != null) {
|
||||
String oldAddressKey = buildAddressKey(oldSessionInfo.getAddress());
|
||||
addressDeviceMap.remove(oldAddressKey, deviceId);
|
||||
log.info("[registerSession][设备已有其他会话,清理旧会话,设备 ID: {},旧地址: {}]",
|
||||
deviceId, oldAddressKey);
|
||||
}
|
||||
|
||||
// 注册新会话
|
||||
String addressKey = buildAddressKey(sessionInfo.getAddress());
|
||||
deviceSessionMap.put(deviceId, sessionInfo);
|
||||
addressDeviceMap.put(addressKey, deviceId);
|
||||
log.info("[registerSession][注册设备会话,设备 ID: {},地址: {},product key: {},device name: {}]",
|
||||
deviceId, addressKey, sessionInfo.getProductKey(), sessionInfo.getDeviceName());
|
||||
// 注册会话
|
||||
deviceSessionCache.put(deviceId, sessionInfo);
|
||||
log.info("[registerSession][注册设备会话,设备 ID: {},地址: {},productKey: {},deviceName: {}]",
|
||||
deviceId, buildAddressKey(sessionInfo.getAddress()),
|
||||
sessionInfo.getProductKey(), sessionInfo.getDeviceName());
|
||||
}
|
||||
|
||||
/**
|
||||
* 注销设备会话
|
||||
* 获取会话信息
|
||||
* <p>
|
||||
* 注意:调用此方法会自动刷新会话的过期时间
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @return 会话信息,不存在则返回 null
|
||||
*/
|
||||
public void unregisterSession(Long deviceId) {
|
||||
SessionInfo sessionInfo = deviceSessionMap.remove(deviceId);
|
||||
if (sessionInfo == null) {
|
||||
return;
|
||||
}
|
||||
String addressKey = buildAddressKey(sessionInfo.getAddress());
|
||||
// 仅当 addressDeviceMap 中的 deviceId 是当前 deviceId 时才移除,避免误删新会话
|
||||
addressDeviceMap.remove(addressKey, deviceId);
|
||||
log.info("[unregisterSession][注销设备会话,设备 ID: {},地址: {}]", deviceId, addressKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新会话活跃时间(每次收到上行消息时调用)
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
*/
|
||||
public void updateSessionActivity(Long deviceId) {
|
||||
SessionInfo sessionInfo = deviceSessionMap.get(deviceId);
|
||||
if (sessionInfo != null) {
|
||||
sessionInfo.setLastActiveTime(LocalDateTime.now());
|
||||
}
|
||||
public SessionInfo getSession(Long deviceId) {
|
||||
return deviceSessionCache.getIfPresent(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新设备会话地址(设备地址变更时调用)
|
||||
* <p>
|
||||
* 注意:getIfPresent 已自动刷新过期时间,无需重新 put
|
||||
*
|
||||
* @param deviceId 设备 ID
|
||||
* @param newAddress 新地址
|
||||
*/
|
||||
public void updateSessionAddress(Long deviceId, InetSocketAddress newAddress) {
|
||||
SessionInfo sessionInfo = deviceSessionMap.get(deviceId);
|
||||
// 地址未变化,无需更新
|
||||
SessionInfo sessionInfo = deviceSessionCache.getIfPresent(deviceId);
|
||||
if (sessionInfo == null) {
|
||||
return;
|
||||
}
|
||||
// 清理旧地址映射
|
||||
if (ObjUtil.equals(newAddress, sessionInfo.getAddress())) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 更新地址
|
||||
String oldAddressKey = buildAddressKey(sessionInfo.getAddress());
|
||||
addressDeviceMap.remove(oldAddressKey, deviceId);
|
||||
|
||||
// 更新新地址
|
||||
String newAddressKey = buildAddressKey(newAddress);
|
||||
sessionInfo.setAddress(newAddress);
|
||||
sessionInfo.setLastActiveTime(LocalDateTime.now());
|
||||
addressDeviceMap.put(newAddressKey, deviceId);
|
||||
log.debug("[updateSessionAddress][更新设备地址,设备 ID: {},新地址: {}]", deviceId, newAddressKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取会话信息
|
||||
*/
|
||||
public SessionInfo getSessionInfo(Long deviceId) {
|
||||
return deviceSessionMap.get(deviceId);
|
||||
log.debug("[updateSessionAddress][更新设备地址,设备 ID: {},旧地址: {},新地址: {}]",
|
||||
deviceId, oldAddressKey, buildAddressKey(newAddress));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -141,7 +106,7 @@ public class IotUdpSessionManager {
|
||||
* @return 是否发送成功
|
||||
*/
|
||||
public boolean sendToDevice(Long deviceId, byte[] data, DatagramSocket socket) {
|
||||
SessionInfo sessionInfo = deviceSessionMap.get(deviceId);
|
||||
SessionInfo sessionInfo = deviceSessionCache.getIfPresent(deviceId);
|
||||
if (sessionInfo == null || sessionInfo.getAddress() == null) {
|
||||
log.warn("[sendToDevice][设备会话不存在,设备 ID: {}]", deviceId);
|
||||
return false;
|
||||
@@ -165,38 +130,7 @@ public class IotUdpSessionManager {
|
||||
}
|
||||
|
||||
/**
|
||||
* 定期清理不活跃的设备会话
|
||||
*
|
||||
* @param timeoutMs 超时时间(毫秒)
|
||||
* @return 清理的设备 ID 列表(用于发送离线消息)
|
||||
*/
|
||||
public List<Long> cleanExpiredSessions(long timeoutMs) {
|
||||
List<Long> offlineDeviceIds = new ArrayList<>();
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
LocalDateTime expireTime = now.minusNanos(timeoutMs * 1_000_000);
|
||||
Iterator<Map.Entry<Long, SessionInfo>> iterator = deviceSessionMap.entrySet().iterator();
|
||||
// TODO @AI:改成 for each 会不会更好?
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<Long, SessionInfo> entry = iterator.next();
|
||||
SessionInfo sessionInfo = entry.getValue();
|
||||
// 未过期,跳过
|
||||
if (sessionInfo.getLastActiveTime().isAfter(expireTime)) {
|
||||
continue;
|
||||
}
|
||||
// 过期处理:记录离线设备 ID
|
||||
Long deviceId = entry.getKey();
|
||||
String addressKey = buildAddressKey(sessionInfo.getAddress());
|
||||
addressDeviceMap.remove(addressKey, deviceId);
|
||||
offlineDeviceIds.add(deviceId);
|
||||
log.debug("[cleanExpiredSessions][清理超时设备,设备 ID: {},地址: {},最后活跃时间: {}]",
|
||||
deviceId, addressKey, sessionInfo.getLastActiveTime());
|
||||
iterator.remove();
|
||||
}
|
||||
return offlineDeviceIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建地址 Key
|
||||
* 构建地址 Key(用于日志输出)
|
||||
*
|
||||
* @param address 地址
|
||||
* @return 地址 Key
|
||||
@@ -206,7 +140,7 @@ public class IotUdpSessionManager {
|
||||
}
|
||||
|
||||
/**
|
||||
* 会话信息(包含认证信息)
|
||||
* 会话信息
|
||||
*/
|
||||
@Data
|
||||
public static class SessionInfo {
|
||||
@@ -228,10 +162,6 @@ public class IotUdpSessionManager {
|
||||
* 设备地址
|
||||
*/
|
||||
private InetSocketAddress address;
|
||||
/**
|
||||
* 最后活跃时间
|
||||
*/
|
||||
private LocalDateTime lastActiveTime;
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -81,13 +81,38 @@ yudao:
|
||||
- id: udp-json
|
||||
type: udp
|
||||
port: 8093
|
||||
enabled: true
|
||||
enabled: false
|
||||
serialize: json
|
||||
udp:
|
||||
max-sessions: 1000 # 最大会话数
|
||||
session-timeout-ms: 60000 # 会话超时时间(毫秒),基于 Guava Cache 自动过期
|
||||
receive-buffer-size: 65536 # 接收缓冲区大小(字节)
|
||||
send-buffer-size: 65536 # 发送缓冲区大小(字节)
|
||||
# ====================================
|
||||
# 针对引入的 WebSocket 组件的配置
|
||||
# ====================================
|
||||
- id: websocket-json
|
||||
type: websocket
|
||||
port: 8094
|
||||
enabled: false
|
||||
serialize: json
|
||||
websocket:
|
||||
path: /ws
|
||||
max-message-size: 65536 # 最大消息大小(字节,默认 64KB)
|
||||
max-frame-size: 65536 # 最大帧大小(字节,默认 64KB)
|
||||
idle-timeout-seconds: 60 # 空闲超时时间(秒,默认 60)
|
||||
ssl-enabled: false # 是否启用 SSL(wss://)
|
||||
# ====================================
|
||||
# 针对引入的 CoAP 组件的配置
|
||||
# ====================================
|
||||
- id: coap-json
|
||||
type: coap
|
||||
port: 5683
|
||||
enabled: true
|
||||
coap:
|
||||
max-message-size: 1024 # 最大消息大小(字节)
|
||||
ack-timeout-ms: 2000 # ACK 超时时间(毫秒)
|
||||
max-retransmit: 4 # 最大重传次数
|
||||
|
||||
# 协议配置(旧版,保持兼容)
|
||||
protocol:
|
||||
@@ -134,27 +159,6 @@ yudao:
|
||||
max-message-size: 8192
|
||||
connect-timeout-seconds: 60
|
||||
ssl-enabled: false
|
||||
# ====================================
|
||||
# 针对引入的 CoAP 组件的配置
|
||||
# ====================================
|
||||
coap:
|
||||
enabled: false # 是否启用 CoAP 协议
|
||||
port: 5683 # CoAP 服务端口(默认 5683)
|
||||
max-message-size: 1024 # 最大消息大小(字节)
|
||||
ack-timeout: 2000 # ACK 超时时间(毫秒)
|
||||
max-retransmit: 4 # 最大重传次数
|
||||
# ====================================
|
||||
# 针对引入的 WebSocket 组件的配置
|
||||
# ====================================
|
||||
websocket:
|
||||
enabled: false # 是否启用 WebSocket 协议
|
||||
port: 8094 # WebSocket 服务端口(默认 8094)
|
||||
path: /ws # WebSocket 路径(默认 /ws)
|
||||
max-message-size: 65536 # 最大消息大小(字节,默认 64KB)
|
||||
max-frame-size: 65536 # 最大帧大小(字节,默认 64KB)
|
||||
idle-timeout-seconds: 60 # 空闲超时时间(秒,默认 60)
|
||||
ssl-enabled: false # 是否启用 SSL(wss://)
|
||||
|
||||
--- #################### 日志相关配置 ####################
|
||||
|
||||
# 基础日志配置
|
||||
|
||||
Reference in New Issue
Block a user