feat:【iot】coap 协议接入 50%:初始化整体实现,基于 pure-wishing-muffin.md 规划

This commit is contained in:
YunaiV
2026-01-18 09:05:00 +08:00
parent 3027caa1d2
commit bec8cc6ef8
14 changed files with 877 additions and 0 deletions

View File

@@ -1,6 +1,10 @@
package cn.iocoder.yudao.module.iot.gateway.config;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxAuthEventProtocol;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxDownstreamSubscriber;
import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.IotEmqxUpstreamProtocol;
@@ -194,4 +198,28 @@ public class IotGatewayConfiguration {
}
/**
* IoT 网关 CoAP 协议配置类
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.gateway.protocol.coap", name = "enabled", havingValue = "true")
@Slf4j
public static class CoapProtocolConfiguration {
@Bean
public IotCoapUpstreamProtocol iotCoapUpstreamProtocol(IotGatewayProperties gatewayProperties,
IotCoapAuthHandler authHandler,
IotCoapUpstreamHandler upstreamHandler) {
return new IotCoapUpstreamProtocol(gatewayProperties.getProtocol().getCoap(),
authHandler, upstreamHandler);
}
@Bean
public IotCoapDownstreamSubscriber iotCoapDownstreamSubscriber(IotCoapUpstreamProtocol coapUpstreamProtocol,
IotMessageBus messageBus) {
return new IotCoapDownstreamSubscriber(coapUpstreamProtocol, messageBus);
}
}
}

View File

@@ -93,6 +93,11 @@ public class IotGatewayProperties {
*/
private MqttWsProperties mqttWs;
/**
* CoAP 组件配置
*/
private CoapProperties coap;
}
@Data
@@ -503,4 +508,39 @@ public class IotGatewayProperties {
}
@Data
public static class CoapProperties {
/**
* 是否开启
*/
@NotNull(message = "是否开启不能为空")
private Boolean enabled;
/**
* 服务端口CoAP 默认端口 5683
*/
// TODO @AI默认不为空
private Integer port = 5683;
/**
* 最大消息大小(字节)
*/
// TODO @AI默认不为空
private Integer maxMessageSize = 1024;
/**
* ACK 超时时间(毫秒)
*/
// TODO @AI默认不为空
private Integer ackTimeout = 2000;
/**
* 最大重传次数
*/
// TODO @AI默认不为空
private Integer maxRetransmit = 4;
}
}

View File

@@ -0,0 +1,46 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 网关 CoAP 订阅者:接收下行给设备的消息
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotCoapDownstreamSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
private final IotCoapUpstreamProtocol protocol;
private final IotMessageBus messageBus;
@PostConstruct
public void init() {
messageBus.register(this);
}
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override
public String getGroup() {
// 保证点对点消费,需要保证独立的 Group所以使用 Topic 作为 Group
return getTopic();
}
@Override
public void onMessage(IotDeviceMessage message) {
// 如需支持,可通过 CoAP Observe 模式实现(设备订阅资源,服务器推送变更)
log.warn("[onMessage][IoT 网关 CoAP 协议暂不支持下行消息,忽略消息:{}]", message);
}
}

View File

@@ -0,0 +1,91 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthHandler;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapAuthResource;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamTopicResource;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamHandler;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.CoapServer;
import org.eclipse.californium.core.config.CoapConfig;
import org.eclipse.californium.elements.config.Configuration;
import java.util.concurrent.TimeUnit;
/**
* IoT 网关 CoAP 协议:接收设备上行消息
*
* 基于 Eclipse Californium 实现,支持:
* 1. 认证POST /auth
* 2. 属性上报POST /topic/sys/{productKey}/{deviceName}/thing/property/post
* 3. 事件上报POST /topic/sys/{productKey}/{deviceName}/thing/event/{eventId}/post
*
* @author 芋道源码
*/
@Slf4j
public class IotCoapUpstreamProtocol {
private final IotGatewayProperties.CoapProperties coapProperties;
private final IotCoapAuthHandler authHandler;
private final IotCoapUpstreamHandler upstreamHandler;
private CoapServer coapServer;
@Getter
private final String serverId;
public IotCoapUpstreamProtocol(IotGatewayProperties.CoapProperties coapProperties,
IotCoapAuthHandler authHandler,
IotCoapUpstreamHandler upstreamHandler) {
this.coapProperties = coapProperties;
this.authHandler = authHandler;
this.upstreamHandler = upstreamHandler;
this.serverId = IotDeviceMessageUtils.generateServerId(coapProperties.getPort());
}
@PostConstruct
public void start() {
try {
// 1.1 创建网络配置Californium 3.x API
Configuration config = Configuration.createStandardWithoutFile();
config.set(CoapConfig.COAP_PORT, coapProperties.getPort());
config.set(CoapConfig.MAX_MESSAGE_SIZE, coapProperties.getMaxMessageSize());
config.set(CoapConfig.ACK_TIMEOUT, coapProperties.getAckTimeout(), TimeUnit.MILLISECONDS);
config.set(CoapConfig.MAX_RETRANSMIT, coapProperties.getMaxRetransmit());
// 1.2 创建 CoAP 服务器
coapServer = new CoapServer(config);
// 2.1 添加 /auth 认证资源
IotCoapAuthResource authResource = new IotCoapAuthResource(this, authHandler);
coapServer.add(authResource);
// 2.2 添加 /topic 根资源(用于上行消息)
IotCoapUpstreamTopicResource topicResource = new IotCoapUpstreamTopicResource(this, upstreamHandler);
coapServer.add(topicResource);
// 3. 启动服务器
coapServer.start();
log.info("[start][IoT 网关 CoAP 协议启动成功,端口:{},资源:/auth, /topic]", coapProperties.getPort());
} catch (Exception e) {
log.error("[start][IoT 网关 CoAP 协议启动失败]", e);
throw e;
}
}
@PreDestroy
public void stop() {
if (coapServer != null) {
try {
coapServer.stop();
log.info("[stop][IoT 网关 CoAP 协议已停止]");
} catch (Exception e) {
log.error("[stop][IoT 网关 CoAP 协议停止失败]", e);
}
}
}
}

View File

@@ -0,0 +1,14 @@
/**
* IoT 网关 CoAP 协议
*
* 基于 Eclipse Californium 实现,支持设备通过 CoAP 协议进行:
* 1. 属性上报POST /sys/{productKey}/{deviceName}/thing/property/post
* 2. 事件上报POST /sys/{productKey}/{deviceName}/thing/event/{eventId}/post
*
* 认证方式:通过 URI Query 参数 token 进行认证
* 示例coap://server:5683/sys/pk/dn/thing/property/post?token=xxx
*
* @author 芋道源码
*/
// TODO @AI参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java (现在注释应该有点不太对)
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;

View File

@@ -0,0 +1,138 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.springframework.stereotype.Component;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
/**
* IoT 网关 CoAP 协议的【认证】处理器
*
* 参考 {@link cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpAuthHandler}
*
* @author 芋道源码
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class IotCoapAuthHandler {
private final IotDeviceTokenService deviceTokenService;
private final IotDeviceCommonApi deviceApi;
private final IotDeviceMessageService deviceMessageService;
/**
* 处理认证请求
*
* @param exchange CoAP 交换对象
* @param protocol 协议对象
*/
public void handle(CoapExchange exchange, IotCoapUpstreamProtocol protocol) {
try {
// 1.1 解析请求体
byte[] payload = exchange.getRequestPayload();
if (payload == null || payload.length == 0) {
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空");
return;
}
Map<String, Object> body;
try {
body = JsonUtils.parseObject(new String(payload), Map.class);
} catch (Exception e) {
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体 JSON 格式错误");
return;
}
// TODO @AI通过 hutool maputil 去获取,简化下;
// 1.2 解析参数
String clientId = (String) body.get("clientId");
if (StrUtil.isEmpty(clientId)) {
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "clientId 不能为空");
return;
}
String username = (String) body.get("username");
if (StrUtil.isEmpty(username)) {
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "username 不能为空");
return;
}
String password = (String) body.get("password");
if (StrUtil.isEmpty(password)) {
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "password 不能为空");
return;
}
// 2.1 执行认证
CommonResult<Boolean> result = deviceApi.authDevice(new IotDeviceAuthReqDTO()
.setClientId(clientId).setUsername(username).setPassword(password));
if (result.isError()) {
log.warn("[handle][认证失败clientId: {}, 错误: {}]", clientId, result.getMsg());
respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败:" + result.getMsg());
return;
}
if (!BooleanUtil.isTrue(result.getData())) {
log.warn("[handle][认证失败clientId: {}]", clientId);
respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "认证失败");
return;
}
// 2.2 生成 Token
IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.parseUsername(username);
Assert.notNull(deviceInfo, "设备信息不能为空");
String token = deviceTokenService.createToken(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
Assert.notBlank(token, "生成 token 不能为空");
// 3. 执行上线
IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(message,
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
// 4. 返回成功响应
log.info("[handle][认证成功productKey: {}, deviceName: {}]",
deviceInfo.getProductKey(), deviceInfo.getDeviceName());
respondSuccess(exchange, MapUtil.of("token", token));
} catch (Exception e) {
log.error("[handle][认证处理异常]", e);
respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
}
}
// TODO @AI抽到 coap 的 util 里;
/**
* 返回成功响应
*/
private void respondSuccess(CoapExchange exchange, Object data) {
CommonResult<Object> result = success(data);
String json = JsonUtils.toJsonString(result);
exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON);
}
// TODO @AI抽到 coap 的 util 里;
/**
* 返回错误响应
*/
private void respondError(CoapExchange exchange, CoAP.ResponseCode code, String message) {
CommonResult<Object> result = CommonResult.error(code.value, message);
String json = JsonUtils.toJsonString(result);
exchange.respond(code, json, MediaTypeRegistry.APPLICATION_JSON);
}
}

View File

@@ -0,0 +1,37 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.CoapResource;
import org.eclipse.californium.core.server.resources.CoapExchange;
/**
* IoT 网关 CoAP 协议的认证资源(/auth
*
* 设备通过此资源进行认证,获取 Token
*
* @author 芋道源码
*/
@Slf4j
public class IotCoapAuthResource extends CoapResource {
public static final String PATH = "auth";
private final IotCoapUpstreamProtocol protocol;
private final IotCoapAuthHandler authHandler;
public IotCoapAuthResource(IotCoapUpstreamProtocol protocol,
IotCoapAuthHandler authHandler) {
super(PATH);
this.protocol = protocol;
this.authHandler = authHandler;
log.info("[IotCoapAuthResource][创建 CoAP 认证资源: /{}]", PATH);
}
@Override
public void handlePOST(CoapExchange exchange) {
log.debug("[handlePOST][收到 /auth POST 请求]");
authHandler.handle(exchange, protocol);
}
}

View File

@@ -0,0 +1,243 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.springframework.stereotype.Component;
import java.util.List;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*;
/**
* IoT 网关 CoAP 协议的【上行】处理器
*
* 处理设备通过 CoAP 协议发送的上行消息,包括:
* 1. 属性上报POST /topic/sys/{productKey}/{deviceName}/thing/property/post
* 2. 事件上报POST /topic/sys/{productKey}/{deviceName}/thing/event/{eventId}/post
*
* Token 通过自定义 CoAP Option 2088 携带
*
* @author 芋道源码
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class IotCoapUpstreamHandler {
/**
* 自定义 CoAP Option 编号,用于携带 Token
* CoAP Option 范围 2048-65535 属于实验/自定义范围
*/
public static final int OPTION_TOKEN = 2088;
private final IotDeviceTokenService deviceTokenService;
private final IotDeviceMessageService deviceMessageService;
/**
* 处理 CoAP 请求
*
* @param exchange CoAP 交换对象
* @param httpMethod HTTP 方法
* @param protocol 协议对象
*/
public void handle(CoapExchange exchange, String httpMethod, IotCoapUpstreamProtocol protocol) {
try {
// TODO @AI这种路径的解析不用了简化下类似 IotHttpUpstreamHandler 这种就很简洁;
// 1. 解析 URI 路径:/topic/sys/{productKey}/{deviceName}/thing/...
// 完整路径是 [topic, sys, productKey, deviceName, thing, ...]
List<String> uriPath = exchange.getRequestOptions().getUriPath();
if (uriPath.size() < 6) {
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST,
"URI 路径格式错误,期望:/topic/sys/{productKey}/{deviceName}/...");
return;
}
// 验证路径格式:第一个应该是 "topic",第二个应该是 "sys"
if (!"topic".equals(uriPath.get(0)) || !"sys".equals(uriPath.get(1))) {
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "URI 路径格式错误,期望以 /topic/sys 开头");
return;
}
// 解析 productKey 和 deviceName索引 2 和 3
String productKey = uriPath.get(2);
String deviceName = uriPath.get(3);
// 2. 认证:优先从自定义 Option 获取 token兼容 Query 参数
String token = getTokenFromOption(exchange);
if (StrUtil.isEmpty(token)) {
// 兼容 Query 参数方式
// TODO @AI不用兼容 query简化下
token = getQueryParam(exchange, "token");
}
if (StrUtil.isEmpty(token)) {
respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "缺少 token请使用 Option " + OPTION_TOKEN + " 或 Query 参数携带)");
return;
}
// 验证 token
// TODO @AI这里参考 IotHttpAbstractHandler 简化点校验;
IotDeviceAuthUtils.DeviceInfo deviceInfo = deviceTokenService.verifyToken(token);
if (deviceInfo == null) {
respondError(exchange, CoAP.ResponseCode.UNAUTHORIZED, "token 无效或已过期");
return;
}
// 验证设备信息匹配
if (ObjUtil.notEqual(productKey, deviceInfo.getProductKey())
|| ObjUtil.notEqual(deviceName, deviceInfo.getDeviceName())) {
respondError(exchange, CoAP.ResponseCode.FORBIDDEN, "设备信息与 token 不匹配");
return;
}
// 3. 解析 method将 URI 路径转换为 method 格式
// /topic/sys/pk/dn/thing/property/post -> thing.property.post
// 路径是 [sys, pk, dn, thing, property, post],从索引 3 开始
String method = buildMethod(uriPath);
// 4. 解析并处理消息体
byte[] payload = exchange.getRequestPayload();
if (payload == null || payload.length == 0) {
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "请求体不能为空");
return;
}
// 5. 解码消息
IotDeviceMessage message;
try {
message = deviceMessageService.decodeDeviceMessage(payload, productKey, deviceName);
} catch (Exception e) {
log.error("[handle][消息解码失败]", e);
respondError(exchange, CoAP.ResponseCode.BAD_REQUEST, "消息解码失败:" + e.getMessage());
return;
}
// 校验 method
// TODO @AI不用校验 method以 message 解析出来的为主;
if (!method.equals(message.getMethod())) {
log.warn("[handle][method 不匹配URI: {}, 消息: {}]", method, message.getMethod());
}
// 6. 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, protocol.getServerId());
// 7. 返回成功响应
respondSuccess(exchange, message.getId());
} catch (Exception e) {
log.error("[handle][CoAP 请求处理异常]", e);
respondError(exchange, CoAP.ResponseCode.INTERNAL_SERVER_ERROR, "服务器内部错误");
}
}
/**
* 构建 method 字符串
*
* 将 URI 路径转换为 method 格式,例如:
* [sys, pk, dn, thing, property, post] -> thing.property.post
*
* @param uriPath URI 路径列表
* @return method 字符串
*/
private String buildMethod(List<String> uriPath) {
// 跳过 sys, productKey, deviceName从第4个元素开始
if (uriPath.size() > 3) {
return String.join(StrPool.DOT, uriPath.subList(3, uriPath.size()));
}
return "";
}
// TODO @AI抽到 coap 的 util 里;
/**
* 从自定义 CoAP Option 中获取 Token
*
* @param exchange CoAP 交换对象
* @return Token 值,如果不存在则返回 null
*/
private String getTokenFromOption(CoapExchange exchange) {
// 尝试从自定义 Option 2088 获取 Token
byte[] tokenBytes = exchange.getRequestOptions().getOthers().stream()
.filter(option -> option.getNumber() == OPTION_TOKEN)
.findFirst()
.map(option -> option.getValue())
.orElse(null);
if (tokenBytes != null) {
return new String(tokenBytes);
}
return null;
}
// TODO @AI抽到 coap 的 util 里;
/**
* 从 URI Query 参数中获取指定 key 的值
*
* @param exchange CoAP 交换对象
* @param key 参数名
* @return 参数值,如果不存在则返回 null
*/
private String getQueryParam(CoapExchange exchange, String key) {
for (String query : exchange.getRequestOptions().getUriQuery()) {
if (query.startsWith(key + "=")) {
return query.substring((key + "=").length());
}
}
return null;
}
// TODO @AI抽到 coap 的 util 里;
/**
* 返回成功响应
*
* @param exchange CoAP 交换对象
* @param messageId 消息 ID
*/
private void respondSuccess(CoapExchange exchange, String messageId) {
CommonResult<Object> result = CommonResult.success(MapUtil.of("messageId", messageId));
String json = JsonUtils.toJsonString(result);
exchange.respond(CoAP.ResponseCode.CONTENT, json, MediaTypeRegistry.APPLICATION_JSON);
}
// TODO @AI抽到 coap 的 util 里;
/**
* 返回错误响应
*
* @param exchange CoAP 交换对象
* @param code CoAP 响应码
* @param message 错误消息
*/
private void respondError(CoapExchange exchange, CoAP.ResponseCode code, String message) {
// 将 CoAP 响应码映射到业务错误码
int errorCode = mapCoapCodeToErrorCode(code);
CommonResult<Object> result = CommonResult.error(errorCode, message);
String json = JsonUtils.toJsonString(result);
exchange.respond(code, json, MediaTypeRegistry.APPLICATION_JSON);
}
// TODO @AI兼容 jdk8 的写法;
/**
* 将 CoAP 响应码映射到业务错误码
*
* @param code CoAP 响应码
* @return 业务错误码
*/
private int mapCoapCodeToErrorCode(CoAP.ResponseCode code) {
return switch (code) {
case BAD_REQUEST -> BAD_REQUEST.getCode();
case UNAUTHORIZED -> UNAUTHORIZED.getCode();
case FORBIDDEN -> FORBIDDEN.getCode();
default -> INTERNAL_SERVER_ERROR.getCode();
};
}
}

View File

@@ -0,0 +1,67 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.coap.router;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapUpstreamProtocol;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.CoapResource;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.californium.core.server.resources.Resource;
/**
* IoT 网关 CoAP 协议的【上行】Topic 资源
*
* 支持任意深度的路径匹配:
* - /topic/sys/{productKey}/{deviceName}/thing/property/post
* - /topic/sys/{productKey}/{deviceName}/thing/event/{eventId}/post
*
* @author 芋道源码
*/
@Slf4j
public class IotCoapUpstreamTopicResource extends CoapResource {
public static final String PATH = "topic";
private final IotCoapUpstreamProtocol protocol;
private final IotCoapUpstreamHandler upstreamHandler;
/**
* 创建根资源(/topic
*/
public IotCoapUpstreamTopicResource(IotCoapUpstreamProtocol protocol,
IotCoapUpstreamHandler upstreamHandler) {
this(PATH, protocol, upstreamHandler);
log.info("[IotCoapUpstreamTopicResource][创建 CoAP 上行 Topic 资源: /{}]", PATH);
}
/**
* 创建子资源(动态路径)
*/
private IotCoapUpstreamTopicResource(String name,
IotCoapUpstreamProtocol protocol,
IotCoapUpstreamHandler upstreamHandler) {
super(name);
this.protocol = protocol;
this.upstreamHandler = upstreamHandler;
}
@Override
public Resource getChild(String name) {
// 递归创建动态子资源,支持任意深度路径
return new IotCoapUpstreamTopicResource(name, protocol, upstreamHandler);
}
@Override
public void handleGET(CoapExchange exchange) {
upstreamHandler.handle(exchange, "GET", protocol);
}
@Override
public void handlePOST(CoapExchange exchange) {
upstreamHandler.handle(exchange, "POST", protocol);
}
@Override
public void handlePUT(CoapExchange exchange) {
upstreamHandler.handle(exchange, "PUT", protocol);
}
}

View File

@@ -0,0 +1 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.emqx;

View File

@@ -0,0 +1,2 @@
// TODO @AI参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java 完善注释;
package cn.iocoder.yudao.module.iot.gateway.protocol.http;

View File

@@ -0,0 +1,2 @@
// TODO @AI参考 /Users/yunai/Java/ruoyi-vue-pro-jdk25/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/package-info.java
package cn.iocoder.yudao.module.iot.gateway.protocol.tcp;

View File

@@ -117,6 +117,15 @@ yudao:
keep-alive-timeout-seconds: 300 # 保持连接超时时间(秒)
ssl-enabled: false # 是否启用 SSLwss://
sub-protocol: mqtt # WebSocket 子协议
# ====================================
# 针对引入的 CoAP 组件的配置
# ====================================
coap:
enabled: false # 是否启用 CoAP 协议
port: 5683 # CoAP 服务端口(默认 5683
max-message-size: 1024 # 最大消息大小(字节)
ack-timeout: 2000 # ACK 超时时间(毫秒)
max-retransmit: 4 # 最大重传次数
--- #################### 日志相关配置 ####################
@@ -137,6 +146,7 @@ logging:
cn.iocoder.yudao.module.iot.gateway.protocol.http: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.mqtt: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.mqttws: DEBUG
cn.iocoder.yudao.module.iot.gateway.protocol.coap: DEBUG
# 根日志级别
root: INFO

View File

@@ -0,0 +1,158 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.IotCoapUpstreamHandler;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapResponse;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.eclipse.californium.core.coap.Option;
import org.eclipse.californium.core.coap.Request;
import org.junit.jupiter.api.*;
/**
* IoT 网关 CoAP 协议集成测试(手动测试)
*
* 使用步骤:
* 1. 启动 CoAP 网关服务(端口 5683
* 2. 运行 testAuth() 获取 token
* 3. 将 token 粘贴到 TOKEN 常量
* 4. 运行 testPropertyPost() 或 testEventPost()
*
* @author 芋道源码
*/
@Slf4j
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class IotCoapProtocolIntegrationTest {
private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 5683;
// 设备信息(根据实际情况修改)
private static final String PRODUCT_KEY = "testProductKey";
private static final String DEVICE_NAME = "testDeviceName";
private static final String CLIENT_ID = PRODUCT_KEY + "." + DEVICE_NAME;
private static final String USERNAME = DEVICE_NAME + "&" + PRODUCT_KEY;
private static final String PASSWORD = "testPassword123";
// TODO: 运行 testAuth() 后,将返回的 token 粘贴到这里
private static final String TOKEN = "粘贴你的token到这里";
// ========== 1. 认证测试 ==========
@Test
@Order(1)
@DisplayName("1. 认证 - 获取 Token")
void testAuth() throws Exception {
String uri = String.format("coap://%s:%d/auth", SERVER_HOST, SERVER_PORT);
String payload = String.format("""
{
"clientId": "%s",
"username": "%s",
"password": "%s"
}
""", CLIENT_ID, USERNAME, PASSWORD);
CoapClient client = new CoapClient(uri);
try {
log.info("[testAuth][请求 URI: {}]", uri);
log.info("[testAuth][请求体: {}]", payload);
CoapResponse response = client.post(payload, MediaTypeRegistry.APPLICATION_JSON);
log.info("[testAuth][响应码: {}]", response.getCode());
log.info("[testAuth][响应体: {}]", response.getResponseText());
log.info("[testAuth][请将返回的 token 复制到 TOKEN 常量中]");
} finally {
client.shutdown();
}
}
// ========== 2. 属性上报测试 ==========
@Test
@Order(2)
@DisplayName("2. 属性上报")
void testPropertyPost() throws Exception {
String uri = String.format("coap://%s:%d/topic/sys/%s/%s/thing/property/post",
SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME);
String payload = """
{
"id": "123",
"method": "thing.property.post",
"params": {
"temperature": 25.5,
"humidity": 60
}
}
""";
CoapClient client = new CoapClient(uri);
try {
// 构造带自定义 Option 的请求
Request request = Request.newPost();
request.setURI(uri);
request.setPayload(payload);
request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
// 添加自定义 Token Option (2088)
request.getOptions().addOption(new Option(IotCoapUpstreamHandler.OPTION_TOKEN, TOKEN));
log.info("[testPropertyPost][请求 URI: {}]", uri);
log.info("[testPropertyPost][Token: {}]", TOKEN);
log.info("[testPropertyPost][请求体: {}]", payload);
CoapResponse response = client.advanced(request);
log.info("[testPropertyPost][响应码: {}]", response.getCode());
log.info("[testPropertyPost][响应体: {}]", response.getResponseText());
} finally {
client.shutdown();
}
}
// ========== 3. 事件上报测试 ==========
@Test
@Order(3)
@DisplayName("3. 事件上报")
void testEventPost() throws Exception {
String uri = String.format("coap://%s:%d/topic/sys/%s/%s/thing/event/alarm/post",
SERVER_HOST, SERVER_PORT, PRODUCT_KEY, DEVICE_NAME);
String payload = """
{
"id": "456",
"method": "thing.event.alarm.post",
"params": {
"alarmType": "temperature_high",
"level": "warning",
"value": 85.2
}
}
""";
CoapClient client = new CoapClient(uri);
try {
// 构造带自定义 Option 的请求
Request request = Request.newPost();
request.setURI(uri);
request.setPayload(payload);
request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
// 添加自定义 Token Option (2088)
request.getOptions().addOption(new Option(IotCoapUpstreamHandler.OPTION_TOKEN, TOKEN));
log.info("[testEventPost][请求 URI: {}]", uri);
log.info("[testEventPost][Token: {}]", TOKEN);
log.info("[testEventPost][请求体: {}]", payload);
CoapResponse response = client.advanced(request);
log.info("[testEventPost][响应码: {}]", response.getCode());
log.info("[testEventPost][响应体: {}]", response.getResponseText());
} finally {
client.shutdown();
}
}
}