mirror of
https://gitee.com/yudaocode/yudao-boot-mini.git
synced 2026-03-22 05:27:15 +08:00
feat:【iot】coap 协议接入 100%:,基于 rippling-noodling-wombat.d 规划
This commit is contained in:
@@ -68,6 +68,7 @@
|
||||
<netty.version>4.2.9.Final</netty.version>
|
||||
<mqtt.version>1.2.5</mqtt.version>
|
||||
<vertx.version>4.5.22</vertx.version>
|
||||
<californium.version>3.12.0</californium.version>
|
||||
<!-- 三方云服务相关 -->
|
||||
<awssdk.version>2.40.15</awssdk.version>
|
||||
<justauth.version>1.16.7</justauth.version>
|
||||
@@ -653,6 +654,13 @@
|
||||
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
|
||||
<version>${mqtt.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- CoAP - Eclipse Californium -->
|
||||
<dependency>
|
||||
<groupId>org.eclipse.californium</groupId>
|
||||
<artifactId>californium-core</artifactId>
|
||||
<version>${californium.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
||||
@@ -48,6 +48,12 @@
|
||||
<artifactId>vertx-mqtt</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- CoAP 相关 - Eclipse Californium -->
|
||||
<dependency>
|
||||
<groupId>org.eclipse.californium</groupId>
|
||||
<artifactId>californium-core</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 测试相关 -->
|
||||
<dependency>
|
||||
<groupId>cn.iocoder.boot</groupId>
|
||||
|
||||
@@ -3,8 +3,6 @@ package cn.iocoder.yudao.module.iot.gateway.config;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamHandler;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
|
||||
@@ -207,11 +205,8 @@ public class IotGatewayConfiguration {
|
||||
public static class CoapProtocolConfiguration {
|
||||
|
||||
@Bean
|
||||
public IotCoapUpstreamProtocol iotCoapUpstreamProtocol(IotGatewayProperties gatewayProperties,
|
||||
IotCoapAuthHandler authHandler,
|
||||
IotCoapUpstreamHandler upstreamHandler) {
|
||||
return new IotCoapUpstreamProtocol(gatewayProperties.getProtocol().getCoap(),
|
||||
authHandler, upstreamHandler);
|
||||
public IotCoapUpstreamProtocol iotCoapUpstreamProtocol(IotGatewayProperties gatewayProperties) {
|
||||
return new IotCoapUpstreamProtocol(gatewayProperties.getProtocol().getCoap());
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
|
||||
* 基于 Eclipse Californium 实现,支持:
|
||||
* 1. 认证:POST /auth
|
||||
* 2. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post
|
||||
* 3. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/{eventId}/post
|
||||
* 3. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@@ -31,20 +31,13 @@ public class IotCoapUpstreamProtocol {
|
||||
|
||||
private final IotGatewayProperties.CoapProperties coapProperties;
|
||||
|
||||
private final IotCoapAuthHandler authHandler;
|
||||
private final IotCoapUpstreamHandler upstreamHandler;
|
||||
|
||||
private CoapServer coapServer;
|
||||
|
||||
@Getter
|
||||
private final String serverId;
|
||||
|
||||
public IotCoapUpstreamProtocol(IotGatewayProperties.CoapProperties coapProperties,
|
||||
IotCoapAuthHandler authHandler,
|
||||
IotCoapUpstreamHandler upstreamHandler) {
|
||||
public IotCoapUpstreamProtocol(IotGatewayProperties.CoapProperties coapProperties) {
|
||||
this.coapProperties = coapProperties;
|
||||
this.authHandler = authHandler;
|
||||
this.upstreamHandler = upstreamHandler;
|
||||
this.serverId = IotDeviceMessageUtils.generateServerId(coapProperties.getPort());
|
||||
}
|
||||
|
||||
@@ -61,9 +54,11 @@ public class IotCoapUpstreamProtocol {
|
||||
coapServer = new CoapServer(config);
|
||||
|
||||
// 2.1 添加 /auth 认证资源
|
||||
IotCoapAuthHandler authHandler = new IotCoapAuthHandler();
|
||||
IotCoapAuthResource authResource = new IotCoapAuthResource(this, authHandler);
|
||||
coapServer.add(authResource);
|
||||
// 2.2 添加 /topic 根资源(用于上行消息)
|
||||
IotCoapUpstreamHandler upstreamHandler = new IotCoapUpstreamHandler();
|
||||
IotCoapUpstreamTopicResource topicResource = new IotCoapUpstreamTopicResource(this, upstreamHandler);
|
||||
coapServer.add(topicResource);
|
||||
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
/**
|
||||
* IoT 网关 CoAP 协议
|
||||
*
|
||||
* 基于 Eclipse Californium 实现,支持设备通过 CoAP 协议进行:
|
||||
* 1. 属性上报:POST /sys/{productKey}/{deviceName}/thing/property/post
|
||||
* 2. 事件上报:POST /sys/{productKey}/{deviceName}/thing/event/{eventId}/post
|
||||
*
|
||||
* 认证方式:通过 URI Query 参数 token 进行认证
|
||||
* 示例:coap://server:5683/sys/pk/dn/thing/property/post?token=xxx
|
||||
*
|
||||
* @author 芋道源码
|
||||
* CoAP 协议实现包
|
||||
* <p>
|
||||
* 提供基于 Eclipse Californium 的 IoT 设备连接和消息处理功能
|
||||
* <p>
|
||||
* URI 路径:
|
||||
* - 认证:POST /auth
|
||||
* - 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post
|
||||
* - 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post
|
||||
* <p>
|
||||
* Token 通过 CoAP Option 2088 携带
|
||||
*/
|
||||
// TODO @AI:参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java (现在注释应该有点不太对)
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
|
||||
|
||||
@@ -4,6 +4,7 @@ import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.BooleanUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
@@ -11,19 +12,15 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.util.IotCoapUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.coap.CoAP;
|
||||
import org.eclipse.californium.core.coap.MediaTypeRegistry;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议的【认证】处理器
|
||||
*
|
||||
@@ -31,53 +28,55 @@ import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class IotCoapAuthHandler {
|
||||
|
||||
private final IotDeviceTokenService deviceTokenService;
|
||||
|
||||
private final IotDeviceCommonApi deviceApi;
|
||||
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
public IotCoapAuthHandler() {
|
||||
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
|
||||
this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class);
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理认证请求
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param protocol 协议对象
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void handle(CoapExchange exchange, IotCoapUpstreamProtocol protocol) {
|
||||
try {
|
||||
// 1.1 解析请求体
|
||||
byte[] payload = exchange.getRequestPayload();
|
||||
if (payload == null || payload.length == 0) {
|
||||
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空");
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空");
|
||||
return;
|
||||
}
|
||||
Map<String, Object> body;
|
||||
try {
|
||||
body = JsonUtils.parseObject(new String(payload), Map.class);
|
||||
} catch (Exception e) {
|
||||
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体 JSON 格式错误");
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体 JSON 格式错误");
|
||||
return;
|
||||
}
|
||||
// TODO @AI:通过 hutool maputil 去获取,简化下;
|
||||
// 1.2 解析参数
|
||||
String clientId = (String) body.get("clientId");
|
||||
String clientId = MapUtil.getStr(body, "clientId");
|
||||
if (StrUtil.isEmpty(clientId)) {
|
||||
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "clientId 不能为空");
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "clientId 不能为空");
|
||||
return;
|
||||
}
|
||||
String username = (String) body.get("username");
|
||||
String username = MapUtil.getStr(body, "username");
|
||||
if (StrUtil.isEmpty(username)) {
|
||||
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "username 不能为空");
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "username 不能为空");
|
||||
return;
|
||||
}
|
||||
String password = (String) body.get("password");
|
||||
String password = MapUtil.getStr(body, "password");
|
||||
if (StrUtil.isEmpty(password)) {
|
||||
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "password 不能为空");
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "password 不能为空");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -86,12 +85,12 @@ public class IotCoapAuthHandler {
|
||||
.setClientId(clientId).setUsername(username).setPassword(password));
|
||||
if (result.isError()) {
|
||||
log.warn("[handle][认证失败,clientId: {}, 错误: {}]", clientId, result.getMsg());
|
||||
respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败:" + result.getMsg());
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败:" + result.getMsg());
|
||||
return;
|
||||
}
|
||||
if (!BooleanUtil.isTrue(result.getData())) {
|
||||
log.warn("[handle][认证失败,clientId: {}]", clientId);
|
||||
respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败");
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败");
|
||||
return;
|
||||
}
|
||||
// 2.2 生成 Token
|
||||
@@ -108,31 +107,11 @@ public class IotCoapAuthHandler {
|
||||
// 4. 返回成功响应
|
||||
log.info("[handle][认证成功,productKey: {}, deviceName: {}]",
|
||||
deviceInfo.getProductKey(), deviceInfo.getDeviceName());
|
||||
respondSuccess(exchange, MapUtil.of("token", token));
|
||||
IotCoapUtils.respondSuccess(exchange, MapUtil.of("token", token));
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][认证处理异常]", e);
|
||||
respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
|
||||
}
|
||||
}
|
||||
|
||||
// TODO @AI:抽到 coap 的 util 里;
|
||||
/**
|
||||
* 返回成功响应
|
||||
*/
|
||||
private void respondSuccess(CoapExchange exchange, Object data) {
|
||||
CommonResult<Object> result = success(data);
|
||||
String json = JsonUtils.toJsonString(result);
|
||||
exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON);
|
||||
}
|
||||
|
||||
// TODO @AI:抽到 coap 的 util 里;
|
||||
/**
|
||||
* 返回错误响应
|
||||
*/
|
||||
private void respondError(CoapExchange exchange, CoAP.ResponseCode code, String message) {
|
||||
CommonResult<Object> result = CommonResult.error(code.value, message);
|
||||
String json = JsonUtils.toJsonString(result);
|
||||
exchange.respond(code, json, MediaTypeRegistry.APPLICATION_JSON);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,243 +1,108 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.text.StrPool;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.ObjUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.util.IotCoapUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.coap.CoAP;
|
||||
import org.eclipse.californium.core.coap.MediaTypeRegistry;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议的【上行】处理器
|
||||
*
|
||||
* 处理设备通过 CoAP 协议发送的上行消息,包括:
|
||||
* 1. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post
|
||||
* 2. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/{eventId}/post
|
||||
* 2. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post
|
||||
*
|
||||
* Token 通过自定义 CoAP Option 2088 携带
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class IotCoapUpstreamHandler {
|
||||
|
||||
/**
|
||||
* 自定义 CoAP Option 编号,用于携带 Token
|
||||
* CoAP Option 范围 2048-65535 属于实验/自定义范围
|
||||
*/
|
||||
public static final int OPTION_TOKEN = 2088;
|
||||
|
||||
private final IotDeviceTokenService deviceTokenService;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
public IotCoapUpstreamHandler() {
|
||||
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 CoAP 请求
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param httpMethod HTTP 方法
|
||||
* @param protocol 协议对象
|
||||
*/
|
||||
public void handle(CoapExchange exchange, String httpMethod, IotCoapUpstreamProtocol protocol) {
|
||||
public void handle(CoapExchange exchange, IotCoapUpstreamProtocol protocol) {
|
||||
try {
|
||||
// TODO @AI:这种路径的解析,不用了,简化下,类似 IotHttpUpstreamHandler 这种就很简洁;
|
||||
// 1. 解析 URI 路径:/topic/sys/{productKey}/{deviceName}/thing/...
|
||||
// 完整路径是 [topic, sys, productKey, deviceName, thing, ...]
|
||||
// 1. 解析通用参数
|
||||
List<String> uriPath = exchange.getRequestOptions().getUriPath();
|
||||
if (uriPath.size() < 6) {
|
||||
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST,
|
||||
"URI 路径格式错误,期望:/topic/sys/{productKey}/{deviceName}/...");
|
||||
String productKey = CollUtil.get(uriPath, 2);
|
||||
String deviceName = CollUtil.get(uriPath, 3);
|
||||
byte[] payload = exchange.getRequestPayload();
|
||||
if (StrUtil.isEmpty(productKey)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "productKey 不能为空");
|
||||
return;
|
||||
}
|
||||
if (StrUtil.isEmpty(deviceName)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "deviceName 不能为空");
|
||||
return;
|
||||
}
|
||||
if (ArrayUtil.isEmpty(payload)) {
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空");
|
||||
return;
|
||||
}
|
||||
|
||||
// 验证路径格式:第一个应该是 "topic",第二个应该是 "sys"
|
||||
if (!"topic".equals(uriPath.get(0)) || !"sys".equals(uriPath.get(1))) {
|
||||
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "URI 路径格式错误,期望以 /topic/sys 开头");
|
||||
return;
|
||||
}
|
||||
|
||||
// 解析 productKey 和 deviceName(索引 2 和 3)
|
||||
String productKey = uriPath.get(2);
|
||||
String deviceName = uriPath.get(3);
|
||||
|
||||
// 2. 认证:优先从自定义 Option 获取 token,兼容 Query 参数
|
||||
String token = getTokenFromOption(exchange);
|
||||
// 2. 认证:从自定义 Option 获取 token
|
||||
String token = IotCoapUtils.getTokenFromOption(exchange, IotCoapUtils.OPTION_TOKEN);
|
||||
if (StrUtil.isEmpty(token)) {
|
||||
// 兼容 Query 参数方式
|
||||
// TODO @AI:不用兼容 query,简化下;
|
||||
token = getQueryParam(exchange, "token");
|
||||
}
|
||||
if (StrUtil.isEmpty(token)) {
|
||||
respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "缺少 token(请使用 Option " + OPTION_TOKEN + " 或 Query 参数携带)");
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "token 不能为空");
|
||||
return;
|
||||
}
|
||||
|
||||
// 验证 token
|
||||
// TODO @AI:这里参考 IotHttpAbstractHandler 简化点校验;
|
||||
IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.verifyToken(token);
|
||||
if (deviceInfo == null) {
|
||||
respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "token 无效或已过期");
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "token 无效或已过期");
|
||||
return;
|
||||
}
|
||||
// 验证设备信息匹配
|
||||
if (ObjUtil.notEqual(productKey, deviceInfo.getProductKey())
|
||||
|| ObjUtil.notEqual(deviceName, deviceInfo.getDeviceName())) {
|
||||
respondError(exchange, CoAP.ResponseCode.FORBIDDEN, "设备信息与 token 不匹配");
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.FORBIDDEN, "设备信息与 token 不匹配");
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 解析 method:将 URI 路径转换为 method 格式
|
||||
// /topic/sys/pk/dn/thing/property/post -> thing.property.post
|
||||
// 路径是 [sys, pk, dn, thing, property, post],从索引 3 开始
|
||||
String method = buildMethod(uriPath);
|
||||
// 2.1 解析 method:deviceName 后面的路径,用 . 拼接
|
||||
// 路径格式:[topic, sys, productKey, deviceName, thing, property, post]
|
||||
String method = String.join(StrPool.DOT, uriPath.subList(4, uriPath.size()));
|
||||
|
||||
// 4. 解析并处理消息体
|
||||
byte[] payload = exchange.getRequestPayload();
|
||||
if (payload == null || payload.length == 0) {
|
||||
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空");
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 解码消息
|
||||
IotDeviceMessage message;
|
||||
try {
|
||||
message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][消息解码失败]", e);
|
||||
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "消息解码失败:" + e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
// 校验 method
|
||||
// TODO @AI:不用校验 method;以 message 解析出来的为主;
|
||||
if (!method.equals(message.getMethod())) {
|
||||
log.warn("[handle][method 不匹配,URI: {}, 消息: {}]", method, message.getMethod());
|
||||
}
|
||||
|
||||
// 6. 发送消息到消息总线
|
||||
// 2.2 解码消息
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
|
||||
Assert.equals(method, message.getMethod(), "method 不匹配");
|
||||
// 2.3 发送消息到消息总线
|
||||
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, protocol.getServerId());
|
||||
|
||||
// 7. 返回成功响应
|
||||
respondSuccess(exchange, message.getId());
|
||||
// 3. 返回成功响应
|
||||
IotCoapUtils.respondSuccess(exchange, MapUtil.of("messageId", message.getId()));
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][CoAP 请求处理异常]", e);
|
||||
respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
|
||||
IotCoapUtils.respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建 method 字符串
|
||||
*
|
||||
* 将 URI 路径转换为 method 格式,例如:
|
||||
* [sys, pk, dn, thing, property, post] -> thing.property.post
|
||||
*
|
||||
* @param uriPath URI 路径列表
|
||||
* @return method 字符串
|
||||
*/
|
||||
private String buildMethod(List<String> uriPath) {
|
||||
// 跳过 sys, productKey, deviceName,从第4个元素开始
|
||||
if (uriPath.size() > 3) {
|
||||
return String.join(StrPool.DOT, uriPath.subList(3, uriPath.size()));
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
// TODO @AI:抽到 coap 的 util 里;
|
||||
/**
|
||||
* 从自定义 CoAP Option 中获取 Token
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @return Token 值,如果不存在则返回 null
|
||||
*/
|
||||
private String getTokenFromOption(CoapExchange exchange) {
|
||||
// 尝试从自定义 Option 2088 获取 Token
|
||||
byte[] tokenBytes = exchange.getRequestOptions().getOthers().stream()
|
||||
.filter(option -> option.getNumber() == OPTION_TOKEN)
|
||||
.findFirst()
|
||||
.map(option -> option.getValue())
|
||||
.orElse(null);
|
||||
if (tokenBytes != null) {
|
||||
return new String(tokenBytes);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// TODO @AI:抽到 coap 的 util 里;
|
||||
/**
|
||||
* 从 URI Query 参数中获取指定 key 的值
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param key 参数名
|
||||
* @return 参数值,如果不存在则返回 null
|
||||
*/
|
||||
private String getQueryParam(CoapExchange exchange, String key) {
|
||||
for (String query : exchange.getRequestOptions().getUriQuery()) {
|
||||
if (query.startsWith(key + "=")) {
|
||||
return query.substring((key + "=").length());
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// TODO @AI:抽到 coap 的 util 里;
|
||||
/**
|
||||
* 返回成功响应
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param messageId 消息 ID
|
||||
*/
|
||||
private void respondSuccess(CoapExchange exchange, String messageId) {
|
||||
CommonResult<Object> result = CommonResult.success(MapUtil.of("messageId", messageId));
|
||||
String json = JsonUtils.toJsonString(result);
|
||||
exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON);
|
||||
}
|
||||
|
||||
// TODO @AI:抽到 coap 的 util 里;
|
||||
/**
|
||||
* 返回错误响应
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param code CoAP 响应码
|
||||
* @param message 错误消息
|
||||
*/
|
||||
private void respondError(CoapExchange exchange, CoAP.ResponseCode code, String message) {
|
||||
// 将 CoAP 响应码映射到业务错误码
|
||||
int errorCode = mapCoapCodeToErrorCode(code);
|
||||
CommonResult<Object> result = CommonResult.error(errorCode, message);
|
||||
String json = JsonUtils.toJsonString(result);
|
||||
exchange.respond(code, json, MediaTypeRegistry.APPLICATION_JSON);
|
||||
}
|
||||
|
||||
// TODO @AI:兼容 jdk8 的写法;
|
||||
/**
|
||||
* 将 CoAP 响应码映射到业务错误码
|
||||
*
|
||||
* @param code CoAP 响应码
|
||||
* @return 业务错误码
|
||||
*/
|
||||
private int mapCoapCodeToErrorCode(CoAP.ResponseCode code) {
|
||||
return switch (code) {
|
||||
case BAD_REQUEST -> BAD_REQUEST.getCode();
|
||||
case UNAUTHORIZED -> UNAUTHORIZED.getCode();
|
||||
case FORBIDDEN -> FORBIDDEN.getCode();
|
||||
default -> INTERNAL_SERVER_ERROR.getCode();
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -51,17 +51,17 @@ public class IotCoapUpstreamTopicResource extends CoapResource {
|
||||
|
||||
@Override
|
||||
public void handleGET(CoapExchange exchange) {
|
||||
upstreamHandler.handle(exchange, "GET", protocol);
|
||||
upstreamHandler.handle(exchange, protocol);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlePOST(CoapExchange exchange) {
|
||||
upstreamHandler.handle(exchange, "POST", protocol);
|
||||
upstreamHandler.handle(exchange, protocol);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlePUT(CoapExchange exchange) {
|
||||
upstreamHandler.handle(exchange, "PUT", protocol);
|
||||
upstreamHandler.handle(exchange, protocol);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.util;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import org.eclipse.californium.core.coap.CoAP;
|
||||
import org.eclipse.californium.core.coap.MediaTypeRegistry;
|
||||
import org.eclipse.californium.core.coap.Option;
|
||||
import org.eclipse.californium.core.server.resources.CoapExchange;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*;
|
||||
|
||||
/**
|
||||
* IoT CoAP 协议工具类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class IotCoapUtils {
|
||||
|
||||
/**
|
||||
* 自定义 CoAP Option 编号,用于携带 Token
|
||||
* <p>
|
||||
* CoAP Option 范围 2048-65535 属于实验/自定义范围
|
||||
*/
|
||||
public static final int OPTION_TOKEN = 2088;
|
||||
|
||||
/**
|
||||
* 返回成功响应
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param data 响应数据
|
||||
*/
|
||||
public static void respondSuccess(CoapExchange exchange, Object data) {
|
||||
CommonResult<Object> result = CommonResult.success(data);
|
||||
String json = JsonUtils.toJsonString(result);
|
||||
exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON);
|
||||
}
|
||||
|
||||
/**
|
||||
* 返回错误响应
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param code CoAP 响应码
|
||||
* @param message 错误消息
|
||||
*/
|
||||
public static void respondError(CoapExchange exchange, CoAP.ResponseCode code, String message) {
|
||||
int errorCode = mapCoapCodeToErrorCode(code);
|
||||
CommonResult<Object> result = CommonResult.error(errorCode, message);
|
||||
String json = JsonUtils.toJsonString(result);
|
||||
exchange.respond(code, json, MediaTypeRegistry.APPLICATION_JSON);
|
||||
}
|
||||
|
||||
/**
|
||||
* 从自定义 CoAP Option 中获取 Token
|
||||
*
|
||||
* @param exchange CoAP 交换对象
|
||||
* @param optionNumber Option 编号
|
||||
* @return Token 值,如果不存在则返回 null
|
||||
*/
|
||||
public static String getTokenFromOption(CoapExchange exchange, int optionNumber) {
|
||||
Option option = CollUtil.findOne(exchange.getRequestOptions().getOthers(),
|
||||
o -> o.getNumber() == optionNumber);
|
||||
return option != null ? new String(option.getValue()) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 CoAP 响应码映射到业务错误码
|
||||
*
|
||||
* @param code CoAP 响应码
|
||||
* @return 业务错误码
|
||||
*/
|
||||
public static int mapCoapCodeToErrorCode(CoAP.ResponseCode code) {
|
||||
if (code == CoAP.ResponseCode.BAD_REQUEST) {
|
||||
return BAD_REQUEST.getCode();
|
||||
} else if (code == CoAP.ResponseCode.UNAUTHORIZED) {
|
||||
return UNAUTHORIZED.getCode();
|
||||
} else if (code == CoAP.ResponseCode.FORBIDDEN) {
|
||||
return FORBIDDEN.getCode();
|
||||
} else {
|
||||
return INTERNAL_SERVER_ERROR.getCode();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -108,7 +108,7 @@ yudao:
|
||||
# 针对引入的 MQTT WebSocket 组件的配置
|
||||
# ====================================
|
||||
mqtt-ws:
|
||||
enabled: true # 是否启用 MQTT WebSocket
|
||||
enabled: false # 是否启用 MQTT WebSocket
|
||||
port: 8083 # WebSocket 服务端口
|
||||
path: /mqtt # WebSocket 路径
|
||||
max-message-size: 8192 # 最大消息大小(字节)
|
||||
@@ -121,7 +121,7 @@ yudao:
|
||||
# 针对引入的 CoAP 组件的配置
|
||||
# ====================================
|
||||
coap:
|
||||
enabled: false # 是否启用 CoAP 协议
|
||||
enabled: true # 是否启用 CoAP 协议
|
||||
port: 5683 # CoAP 服务端口(默认 5683)
|
||||
max-message-size: 1024 # 最大消息大小(字节)
|
||||
ack-timeout: 2000 # ACK 超时时间(毫秒)
|
||||
|
||||
@@ -1,57 +1,74 @@
|
||||
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamHandler;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.util.IotCoapUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.CoapClient;
|
||||
import org.eclipse.californium.core.CoapResponse;
|
||||
import org.eclipse.californium.core.coap.MediaTypeRegistry;
|
||||
import org.eclipse.californium.core.coap.Option;
|
||||
import org.eclipse.californium.core.coap.Request;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.eclipse.californium.core.config.CoapConfig;
|
||||
import org.eclipse.californium.elements.config.Configuration;
|
||||
import org.eclipse.californium.elements.config.UdpConfig;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
/**
|
||||
* IoT 网关 CoAP 协议集成测试(手动测试)
|
||||
*
|
||||
* 使用步骤:
|
||||
* 1. 启动 CoAP 网关服务(端口 5683)
|
||||
* 2. 运行 testAuth() 获取 token
|
||||
* 3. 将 token 粘贴到 TOKEN 常量
|
||||
* 4. 运行 testPropertyPost() 或 testEventPost()
|
||||
* <p>使用步骤:
|
||||
* <ol>
|
||||
* <li>启动 yudao-module-iot-gateway 服务(CoAP 端口 5683)</li>
|
||||
* <li>运行 {@link #testAuth()} 获取 token,将返回的 token 粘贴到 {@link #TOKEN} 常量</li>
|
||||
* <li>运行 {@link #testPropertyPost()} 测试属性上报,或运行 {@link #testEventPost()} 测试事件上报</li>
|
||||
* </ol>
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
class IotCoapProtocolIntegrationTest {
|
||||
public class IotCoapProtocolIntegrationTest {
|
||||
|
||||
private static final String SERVER_HOST = "127.0.0.1";
|
||||
private static final int SERVER_PORT = 5683;
|
||||
|
||||
// 设备信息(根据实际情况修改)
|
||||
private static final String PRODUCT_KEY = "testProductKey";
|
||||
private static final String DEVICE_NAME = "testDeviceName";
|
||||
// 设备信息(根据实际情况修改 PRODUCT_KEY、DEVICE_NAME、PASSWORD)
|
||||
private static final String PRODUCT_KEY = "4aymZgOTOOCrDKRT";
|
||||
private static final String DEVICE_NAME = "small";
|
||||
private static final String PASSWORD = "509e2b08f7598eb139d276388c600435913ba4c94cd0d50aebc5c0d1855bcb75";
|
||||
|
||||
private static final String CLIENT_ID = PRODUCT_KEY + "." + DEVICE_NAME;
|
||||
private static final String USERNAME = DEVICE_NAME + "&" + PRODUCT_KEY;
|
||||
private static final String PASSWORD = "testPassword123";
|
||||
|
||||
// TODO: 运行 testAuth() 后,将返回的 token 粘贴到这里
|
||||
private static final String TOKEN = "粘贴你的token到这里";
|
||||
/**
|
||||
* 设备 Token:从 {@link #testAuth()} 方法获取后,粘贴到这里
|
||||
*/
|
||||
private static final String TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwcm9kdWN0S2V5IjoiNGF5bVpnT1RPT0NyREtSVCIsImV4cCI6MTc2OTMwNTA1NSwiZGV2aWNlTmFtZSI6InNtYWxsIn0.mf3MEATCn5bp6cXgULunZjs8d00RGUxj96JEz0hMS7k";
|
||||
|
||||
// ========== 1. 认证测试 ==========
|
||||
@BeforeAll
|
||||
public static void initCaliforniumConfig() {
|
||||
// 注册 Californium 配置定义
|
||||
CoapConfig.register();
|
||||
UdpConfig.register();
|
||||
// 创建默认配置
|
||||
Configuration.setStandard(Configuration.createStandardWithoutFile());
|
||||
}
|
||||
|
||||
/**
|
||||
* 认证测试:获取设备 Token
|
||||
*/
|
||||
@Test
|
||||
@Order(1)
|
||||
@DisplayName("1. 认证 - 获取 Token")
|
||||
void testAuth() throws Exception {
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testAuth() throws Exception {
|
||||
String uri = String.format("coap://%s:%d/auth", SERVER_HOST, SERVER_PORT);
|
||||
|
||||
String payload = String.format("""
|
||||
{
|
||||
"clientId": "%s",
|
||||
"username": "%s",
|
||||
"password": "%s"
|
||||
}
|
||||
""", CLIENT_ID, USERNAME, PASSWORD);
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("clientId", CLIENT_ID)
|
||||
.put("username", USERNAME)
|
||||
.put("password", PASSWORD)
|
||||
.build());
|
||||
|
||||
CoapClient client = new CoapClient(uri);
|
||||
try {
|
||||
@@ -68,38 +85,33 @@ class IotCoapProtocolIntegrationTest {
|
||||
}
|
||||
}
|
||||
|
||||
// ========== 2. 属性上报测试 ==========
|
||||
|
||||
/**
|
||||
* 属性上报测试
|
||||
*/
|
||||
@Test
|
||||
@Order(2)
|
||||
@DisplayName("2. 属性上报")
|
||||
void testPropertyPost() throws Exception {
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testPropertyPost() throws Exception {
|
||||
String uri = String.format("coap://%s:%d/topic/sys/%s/%s/thing/property/post",
|
||||
SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME);
|
||||
|
||||
String payload = """
|
||||
{
|
||||
"id": "123",
|
||||
"method": "thing.property.post",
|
||||
"params": {
|
||||
"temperature": 25.5,
|
||||
"humidity": 60
|
||||
}
|
||||
}
|
||||
""";
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())
|
||||
.put("version", "1.0")
|
||||
.put("params", MapUtil.builder()
|
||||
.put("width", 1)
|
||||
.put("height", "2")
|
||||
.build())
|
||||
.build());
|
||||
|
||||
CoapClient client = new CoapClient(uri);
|
||||
try {
|
||||
// 构造带自定义 Option 的请求
|
||||
Request request = Request.newPost();
|
||||
request.setURI(uri);
|
||||
request.setPayload(payload);
|
||||
request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
|
||||
// 添加自定义 Token Option (2088)
|
||||
request.getOptions().addOption(new Option(IotCoapUpstreamHandler.OPTION_TOKEN, TOKEN));
|
||||
request.getOptions().addOption(new Option(IotCoapUtils.OPTION_TOKEN, TOKEN));
|
||||
|
||||
log.info("[testPropertyPost][请求 URI: {}]", uri);
|
||||
log.info("[testPropertyPost][Token: {}]", TOKEN);
|
||||
log.info("[testPropertyPost][请求体: {}]", payload);
|
||||
|
||||
CoapResponse response = client.advanced(request);
|
||||
@@ -111,39 +123,35 @@ class IotCoapProtocolIntegrationTest {
|
||||
}
|
||||
}
|
||||
|
||||
// ========== 3. 事件上报测试 ==========
|
||||
|
||||
/**
|
||||
* 事件上报测试
|
||||
*/
|
||||
@Test
|
||||
@Order(3)
|
||||
@DisplayName("3. 事件上报")
|
||||
void testEventPost() throws Exception {
|
||||
String uri = String.format("coap://%s:%d/topic/sys/%s/%s/thing/event/alarm/post",
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testEventPost() throws Exception {
|
||||
String uri = String.format("coap://%s:%d/topic/sys/%s/%s/thing/event/post",
|
||||
SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME);
|
||||
|
||||
String payload = """
|
||||
{
|
||||
"id": "456",
|
||||
"method": "thing.event.alarm.post",
|
||||
"params": {
|
||||
"alarmType": "temperature_high",
|
||||
"level": "warning",
|
||||
"value": 85.2
|
||||
}
|
||||
}
|
||||
""";
|
||||
String payload = JsonUtils.toJsonString(MapUtil.builder()
|
||||
.put("id", IdUtil.fastSimpleUUID())
|
||||
.put("method", IotDeviceMessageMethodEnum.EVENT_POST.getMethod())
|
||||
.put("version", "1.0")
|
||||
.put("identifier", "eat")
|
||||
.put("params", MapUtil.builder()
|
||||
.put("width", 1)
|
||||
.put("height", "2")
|
||||
.put("oneThree", "3")
|
||||
.build())
|
||||
.build());
|
||||
|
||||
CoapClient client = new CoapClient(uri);
|
||||
try {
|
||||
// 构造带自定义 Option 的请求
|
||||
Request request = Request.newPost();
|
||||
request.setURI(uri);
|
||||
request.setPayload(payload);
|
||||
request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
|
||||
// 添加自定义 Token Option (2088)
|
||||
request.getOptions().addOption(new Option(IotCoapUpstreamHandler.OPTION_TOKEN, TOKEN));
|
||||
request.getOptions().addOption(new Option(IotCoapUtils.OPTION_TOKEN, TOKEN));
|
||||
|
||||
log.info("[testEventPost][请求 URI: {}]", uri);
|
||||
log.info("[testEventPost][Token: {}]", TOKEN);
|
||||
log.info("[testEventPost][请求体: {}]", payload);
|
||||
|
||||
CoapResponse response = client.advanced(request);
|
||||
|
||||
Reference in New Issue
Block a user