mirror of
https://gitee.com/yudaocode/yudao-boot-mini.git
synced 2026-03-22 05:27:15 +08:00
!1496 修复了一些 Iot 模块 TODO 提到的问题
Merge pull request !1496 from puhui999/feature/iot
This commit is contained in:
@@ -21,6 +21,7 @@ import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@@ -56,6 +57,11 @@ public class IotSceneRuleDO extends TenantBaseDO {
|
||||
*/
|
||||
private Integer status;
|
||||
|
||||
/**
|
||||
* 最后触发时间
|
||||
*/
|
||||
private LocalDateTime lastTriggerTime;
|
||||
|
||||
/**
|
||||
* 场景定义配置
|
||||
*/
|
||||
|
||||
@@ -10,6 +10,35 @@ import lombok.Data;
|
||||
@Data
|
||||
public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig {
|
||||
|
||||
/**
|
||||
* 默认连接超时时间(毫秒)
|
||||
*/
|
||||
public static final int DEFAULT_CONNECT_TIMEOUT_MS = 5000;
|
||||
/**
|
||||
* 默认读取超时时间(毫秒)
|
||||
*/
|
||||
public static final int DEFAULT_READ_TIMEOUT_MS = 10000;
|
||||
/**
|
||||
* 默认是否启用 SSL
|
||||
*/
|
||||
public static final boolean DEFAULT_SSL = false;
|
||||
/**
|
||||
* 默认数据格式
|
||||
*/
|
||||
public static final String DEFAULT_DATA_FORMAT = "JSON";
|
||||
/**
|
||||
* 默认心跳间隔时间(毫秒)
|
||||
*/
|
||||
public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000L;
|
||||
/**
|
||||
* 默认重连间隔时间(毫秒)
|
||||
*/
|
||||
public static final long DEFAULT_RECONNECT_INTERVAL_MS = 5000L;
|
||||
/**
|
||||
* 默认最大重连次数
|
||||
*/
|
||||
public static final int DEFAULT_MAX_RECONNECT_ATTEMPTS = 3;
|
||||
|
||||
/**
|
||||
* TCP 服务器地址
|
||||
*/
|
||||
@@ -23,17 +52,17 @@ public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig {
|
||||
/**
|
||||
* 连接超时时间(毫秒)
|
||||
*/
|
||||
private Integer connectTimeoutMs = 5000;
|
||||
private Integer connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MS;
|
||||
|
||||
/**
|
||||
* 读取超时时间(毫秒)
|
||||
*/
|
||||
private Integer readTimeoutMs = 10000;
|
||||
private Integer readTimeoutMs = DEFAULT_READ_TIMEOUT_MS;
|
||||
|
||||
/**
|
||||
* 是否启用 SSL
|
||||
*/
|
||||
private Boolean ssl = false;
|
||||
private Boolean ssl = DEFAULT_SSL;
|
||||
|
||||
/**
|
||||
* SSL 证书路径(当 ssl=true 时需要)
|
||||
@@ -43,21 +72,21 @@ public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig {
|
||||
/**
|
||||
* 数据格式:JSON 或 BINARY
|
||||
*/
|
||||
private String dataFormat = "JSON";
|
||||
private String dataFormat = DEFAULT_DATA_FORMAT;
|
||||
|
||||
/**
|
||||
* 心跳间隔时间(毫秒),0 表示不启用心跳
|
||||
*/
|
||||
private Long heartbeatIntervalMs = 30000L;
|
||||
private Long heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
|
||||
|
||||
/**
|
||||
* 重连间隔时间(毫秒)
|
||||
*/
|
||||
private Long reconnectIntervalMs = 5000L;
|
||||
private Long reconnectIntervalMs = DEFAULT_RECONNECT_INTERVAL_MS;
|
||||
|
||||
/**
|
||||
* 最大重连次数
|
||||
*/
|
||||
private Integer maxReconnectAttempts = 3;
|
||||
private Integer maxReconnectAttempts = DEFAULT_MAX_RECONNECT_ATTEMPTS;
|
||||
|
||||
}
|
||||
@@ -13,6 +13,51 @@ import lombok.Data;
|
||||
@Data
|
||||
public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig {
|
||||
|
||||
/**
|
||||
* 默认连接超时时间(毫秒)
|
||||
*/
|
||||
public static final int DEFAULT_CONNECT_TIMEOUT_MS = 5000;
|
||||
/**
|
||||
* 默认发送超时时间(毫秒)
|
||||
*/
|
||||
public static final int DEFAULT_SEND_TIMEOUT_MS = 10000;
|
||||
/**
|
||||
* 默认心跳间隔时间(毫秒)
|
||||
*/
|
||||
public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000L;
|
||||
/**
|
||||
* 默认心跳消息内容
|
||||
*/
|
||||
public static final String DEFAULT_HEARTBEAT_MESSAGE = "{\"type\":\"heartbeat\"}";
|
||||
/**
|
||||
* 默认是否启用 SSL 证书验证
|
||||
*/
|
||||
public static final boolean DEFAULT_VERIFY_SSL_CERT = true;
|
||||
/**
|
||||
* 默认数据格式
|
||||
*/
|
||||
public static final String DEFAULT_DATA_FORMAT = "JSON";
|
||||
/**
|
||||
* 默认重连间隔时间(毫秒)
|
||||
*/
|
||||
public static final long DEFAULT_RECONNECT_INTERVAL_MS = 5000L;
|
||||
/**
|
||||
* 默认最大重连次数
|
||||
*/
|
||||
public static final int DEFAULT_MAX_RECONNECT_ATTEMPTS = 3;
|
||||
/**
|
||||
* 默认是否启用压缩
|
||||
*/
|
||||
public static final boolean DEFAULT_ENABLE_COMPRESSION = false;
|
||||
/**
|
||||
* 默认消息发送重试次数
|
||||
*/
|
||||
public static final int DEFAULT_SEND_RETRY_COUNT = 1;
|
||||
/**
|
||||
* 默认消息发送重试间隔(毫秒)
|
||||
*/
|
||||
public static final long DEFAULT_SEND_RETRY_INTERVAL_MS = 1000L;
|
||||
|
||||
/**
|
||||
* WebSocket 服务器地址
|
||||
* 例如:ws://localhost:8080/ws 或 wss://example.com/ws
|
||||
@@ -22,22 +67,22 @@ public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig {
|
||||
/**
|
||||
* 连接超时时间(毫秒)
|
||||
*/
|
||||
private Integer connectTimeoutMs = 5000;
|
||||
private Integer connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MS;
|
||||
|
||||
/**
|
||||
* 发送超时时间(毫秒)
|
||||
*/
|
||||
private Integer sendTimeoutMs = 10000;
|
||||
private Integer sendTimeoutMs = DEFAULT_SEND_TIMEOUT_MS;
|
||||
|
||||
/**
|
||||
* 心跳间隔时间(毫秒),0 表示不启用心跳
|
||||
*/
|
||||
private Long heartbeatIntervalMs = 30000L;
|
||||
private Long heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
|
||||
|
||||
/**
|
||||
* 心跳消息内容(JSON 格式)
|
||||
*/
|
||||
private String heartbeatMessage = "{\"type\":\"heartbeat\"}";
|
||||
private String heartbeatMessage = DEFAULT_HEARTBEAT_MESSAGE;
|
||||
|
||||
/**
|
||||
* 子协议列表(逗号分隔)
|
||||
@@ -52,36 +97,36 @@ public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig {
|
||||
/**
|
||||
* 是否启用 SSL 证书验证(仅对 wss:// 生效)
|
||||
*/
|
||||
private Boolean verifySslCert = true;
|
||||
private Boolean verifySslCert = DEFAULT_VERIFY_SSL_CERT;
|
||||
|
||||
/**
|
||||
* 数据格式:JSON 或 TEXT
|
||||
*/
|
||||
private String dataFormat = "JSON";
|
||||
private String dataFormat = DEFAULT_DATA_FORMAT;
|
||||
|
||||
/**
|
||||
* 重连间隔时间(毫秒)
|
||||
*/
|
||||
private Long reconnectIntervalMs = 5000L;
|
||||
private Long reconnectIntervalMs = DEFAULT_RECONNECT_INTERVAL_MS;
|
||||
|
||||
/**
|
||||
* 最大重连次数
|
||||
*/
|
||||
private Integer maxReconnectAttempts = 3;
|
||||
private Integer maxReconnectAttempts = DEFAULT_MAX_RECONNECT_ATTEMPTS;
|
||||
|
||||
/**
|
||||
* 是否启用压缩
|
||||
*/
|
||||
private Boolean enableCompression = false;
|
||||
private Boolean enableCompression = DEFAULT_ENABLE_COMPRESSION;
|
||||
|
||||
/**
|
||||
* 消息发送重试次数
|
||||
*/
|
||||
private Integer sendRetryCount = 1;
|
||||
private Integer sendRetryCount = DEFAULT_SEND_RETRY_COUNT;
|
||||
|
||||
/**
|
||||
* 消息发送重试间隔(毫秒)
|
||||
*/
|
||||
private Long sendRetryIntervalMs = 1000L;
|
||||
private Long sendRetryIntervalMs = DEFAULT_SEND_RETRY_INTERVAL_MS;
|
||||
|
||||
}
|
||||
@@ -16,8 +16,8 @@ import java.util.Arrays;
|
||||
public enum IotDataSinkTypeEnum implements ArrayValuable<Integer> {
|
||||
|
||||
HTTP(1, "HTTP"),
|
||||
TCP(2, "TCP"), // TODO @puhui999:待实现;
|
||||
WEBSOCKET(3, "WebSocket"), // TODO @puhui999:待实现;
|
||||
TCP(2, "TCP"),
|
||||
WEBSOCKET(3, "WebSocket"),
|
||||
|
||||
MQTT(10, "MQTT"), // TODO 待实现;
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package cn.iocoder.yudao.module.iot.service.device.property;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.convert.Convert;
|
||||
import cn.hutool.core.date.LocalDateTimeUtil;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
@@ -145,6 +146,12 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
|
||||
IotDataSpecsDataTypeEnum.STRUCT.getDataType(), IotDataSpecsDataTypeEnum.ARRAY.getDataType())) {
|
||||
// 特殊:STRUCT 和 ARRAY 类型,在 TDengine 里,有没对应数据类型,只能通过 JSON 来存储
|
||||
properties.put((String) key, JsonUtils.toJsonString(value));
|
||||
} else if (IotDataSpecsDataTypeEnum.DOUBLE.getDataType().equals(thingModel.getProperty().getDataType())) {
|
||||
properties.put((String) key, Convert.toDouble(value));
|
||||
} else if (IotDataSpecsDataTypeEnum.FLOAT.getDataType().equals(thingModel.getProperty().getDataType())) {
|
||||
properties.put((String) key, Convert.toFloat(value));
|
||||
} else if (IotDataSpecsDataTypeEnum.BOOL.getDataType().equals(thingModel.getProperty().getDataType())) {
|
||||
properties.put((String) key, Convert.toByte(value));
|
||||
} else {
|
||||
properties.put((String) key, value);
|
||||
}
|
||||
|
||||
@@ -7,8 +7,6 @@ import cn.iocoder.yudao.module.iot.service.rule.data.action.tcp.IotTcpClient;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* TCP 的 {@link IotDataRuleAction} 实现类
|
||||
* <p>
|
||||
@@ -23,9 +21,6 @@ import java.time.Duration;
|
||||
public class IotTcpDataRuleAction extends
|
||||
IotDataRuleCacheableAction<IotDataSinkTcpConfig, IotTcpClient> {
|
||||
|
||||
private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(5);
|
||||
private static final Duration SEND_TIMEOUT = Duration.ofSeconds(10);
|
||||
|
||||
@Override
|
||||
public Integer getType() {
|
||||
return IotDataSinkTypeEnum.TCP.getType();
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
package cn.iocoder.yudao.module.iot.service.rule.data.action;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
|
||||
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* WebSocket 的 {@link IotDataRuleAction} 实现类
|
||||
* <p>
|
||||
* 负责将设备消息发送到外部 WebSocket 服务器
|
||||
* 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式
|
||||
* 使用连接池管理 WebSocket 连接,提高性能和资源利用率
|
||||
*
|
||||
* @author HUIHUI
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class IotWebSocketDataRuleAction extends
|
||||
IotDataRuleCacheableAction<IotDataSinkWebSocketConfig, IotWebSocketClient> {
|
||||
|
||||
@Override
|
||||
public Integer getType() {
|
||||
return IotDataSinkTypeEnum.WEBSOCKET.getType();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected IotWebSocketClient initProducer(IotDataSinkWebSocketConfig config) throws Exception {
|
||||
// 1.1 参数校验
|
||||
if (config.getServerUrl() == null || config.getServerUrl().trim().isEmpty()) {
|
||||
throw new IllegalArgumentException("WebSocket 服务器地址不能为空");
|
||||
}
|
||||
if (!config.getServerUrl().startsWith("ws://") && !config.getServerUrl().startsWith("wss://")) {
|
||||
throw new IllegalArgumentException("WebSocket 服务器地址必须以 ws:// 或 wss:// 开头");
|
||||
}
|
||||
|
||||
// 2.1 创建 WebSocket 客户端
|
||||
IotWebSocketClient webSocketClient = new IotWebSocketClient(
|
||||
config.getServerUrl(),
|
||||
config.getConnectTimeoutMs(),
|
||||
config.getSendTimeoutMs(),
|
||||
config.getDataFormat()
|
||||
);
|
||||
// 2.2 连接服务器
|
||||
webSocketClient.connect();
|
||||
log.info("[initProducer][WebSocket 客户端创建并连接成功,服务器: {},数据格式: {}]",
|
||||
config.getServerUrl(), config.getDataFormat());
|
||||
return webSocketClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void closeProducer(IotWebSocketClient producer) throws Exception {
|
||||
if (producer != null) {
|
||||
producer.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void execute(IotDeviceMessage message, IotDataSinkWebSocketConfig config) throws Exception {
|
||||
try {
|
||||
// 1.1 获取或创建 WebSocket 客户端
|
||||
IotWebSocketClient webSocketClient = getProducer(config);
|
||||
// 1.2 检查连接状态,如果断开则重新连接
|
||||
if (!webSocketClient.isConnected()) {
|
||||
log.warn("[execute][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl());
|
||||
webSocketClient.connect();
|
||||
}
|
||||
|
||||
// 2.1 发送消息
|
||||
webSocketClient.sendMessage(message);
|
||||
// 2.2 记录发送成功日志
|
||||
log.info("[execute][message({}) config({}) 发送成功,WebSocket 服务器: {}]",
|
||||
message, config, config.getServerUrl());
|
||||
} catch (Exception e) {
|
||||
log.error("[execute][message({}) config({}) 发送失败,WebSocket 服务器: {}]",
|
||||
message, config, config.getServerUrl(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp;
|
||||
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
@@ -38,16 +39,15 @@ public class IotTcpClient {
|
||||
private BufferedReader reader;
|
||||
private final AtomicBoolean connected = new AtomicBoolean(false);
|
||||
|
||||
// TODO @puhui999:default 值,IotDataSinkTcpConfig.java 枚举起来哈;
|
||||
public IotTcpClient(String host, Integer port, Integer connectTimeoutMs, Integer readTimeoutMs,
|
||||
Boolean ssl, String sslCertPath, String dataFormat) {
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : 5000;
|
||||
this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : 10000;
|
||||
this.ssl = ssl != null ? ssl : false;
|
||||
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkTcpConfig.DEFAULT_CONNECT_TIMEOUT_MS;
|
||||
this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : IotDataSinkTcpConfig.DEFAULT_READ_TIMEOUT_MS;
|
||||
this.ssl = ssl != null ? ssl : IotDataSinkTcpConfig.DEFAULT_SSL;
|
||||
this.sslCertPath = sslCertPath;
|
||||
this.dataFormat = dataFormat != null ? dataFormat : "JSON";
|
||||
this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -99,9 +99,8 @@ public class IotTcpClient {
|
||||
}
|
||||
|
||||
try {
|
||||
// TODO @puhui999:枚举值
|
||||
String messageData;
|
||||
if ("JSON".equalsIgnoreCase(dataFormat)) {
|
||||
if (IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT.equalsIgnoreCase(dataFormat)) {
|
||||
// JSON 格式
|
||||
messageData = JsonUtils.toJsonString(message);
|
||||
} else {
|
||||
|
||||
@@ -0,0 +1,176 @@
|
||||
package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket;
|
||||
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.WebSocket;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* IoT WebSocket 客户端
|
||||
* <p>
|
||||
* 负责与外部 WebSocket 服务器建立连接并发送设备消息
|
||||
* 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式
|
||||
* 基于 Java 11+ 内置的 java.net.http.WebSocket 实现
|
||||
*
|
||||
* @author HUIHUI
|
||||
*/
|
||||
@Slf4j
|
||||
public class IotWebSocketClient implements WebSocket.Listener {
|
||||
|
||||
private final String serverUrl;
|
||||
private final Integer connectTimeoutMs;
|
||||
private final Integer sendTimeoutMs;
|
||||
private final String dataFormat;
|
||||
|
||||
private WebSocket webSocket;
|
||||
private final AtomicBoolean connected = new AtomicBoolean(false);
|
||||
private final StringBuilder messageBuffer = new StringBuilder();
|
||||
|
||||
public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) {
|
||||
this.serverUrl = serverUrl;
|
||||
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_CONNECT_TIMEOUT_MS;
|
||||
this.sendTimeoutMs = sendTimeoutMs != null ? sendTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_SEND_TIMEOUT_MS;
|
||||
this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkWebSocketConfig.DEFAULT_DATA_FORMAT;
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接到 WebSocket 服务器
|
||||
*/
|
||||
public void connect() throws Exception {
|
||||
if (connected.get()) {
|
||||
log.warn("[connect][WebSocket 客户端已经连接,无需重复连接]");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
HttpClient httpClient = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofMillis(connectTimeoutMs))
|
||||
.build();
|
||||
|
||||
CompletableFuture<WebSocket> future = httpClient.newWebSocketBuilder()
|
||||
.connectTimeout(Duration.ofMillis(connectTimeoutMs))
|
||||
.buildAsync(URI.create(serverUrl), this);
|
||||
|
||||
// 等待连接完成
|
||||
webSocket = future.get(connectTimeoutMs, TimeUnit.MILLISECONDS);
|
||||
connected.set(true);
|
||||
log.info("[connect][WebSocket 客户端连接成功,服务器地址: {}]", serverUrl);
|
||||
} catch (Exception e) {
|
||||
close();
|
||||
log.error("[connect][WebSocket 客户端连接失败,服务器地址: {}]", serverUrl, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(WebSocket webSocket) {
|
||||
log.debug("[onOpen][WebSocket 连接已打开]");
|
||||
webSocket.request(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
|
||||
messageBuffer.append(data);
|
||||
if (last) {
|
||||
log.debug("[onText][收到 WebSocket 消息: {}]", messageBuffer);
|
||||
messageBuffer.setLength(0);
|
||||
}
|
||||
webSocket.request(1);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
|
||||
connected.set(false);
|
||||
log.info("[onClose][WebSocket 连接已关闭,状态码: {},原因: {}]", statusCode, reason);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(WebSocket webSocket, Throwable error) {
|
||||
connected.set(false);
|
||||
log.error("[onError][WebSocket 发生错误]", error);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送设备消息
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @throws Exception 发送异常
|
||||
*/
|
||||
public void sendMessage(IotDeviceMessage message) throws Exception {
|
||||
if (!connected.get() || webSocket == null) {
|
||||
throw new IllegalStateException("WebSocket 客户端未连接");
|
||||
}
|
||||
|
||||
try {
|
||||
String messageData;
|
||||
if (IotDataSinkWebSocketConfig.DEFAULT_DATA_FORMAT.equalsIgnoreCase(dataFormat)) {
|
||||
messageData = JsonUtils.toJsonString(message);
|
||||
} else {
|
||||
messageData = message.toString();
|
||||
}
|
||||
|
||||
// 发送消息并等待完成
|
||||
CompletableFuture<WebSocket> future = webSocket.sendText(messageData, true);
|
||||
future.get(sendTimeoutMs, TimeUnit.MILLISECONDS);
|
||||
log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]",
|
||||
message.getDeviceId(), messageData.length());
|
||||
} catch (Exception e) {
|
||||
log.error("[sendMessage][发送消息失败,设备 ID: {}]", message.getDeviceId(), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭连接
|
||||
*/
|
||||
public void close() {
|
||||
if (!connected.get() && webSocket == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (webSocket != null) {
|
||||
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "客户端主动关闭")
|
||||
.orTimeout(5, TimeUnit.SECONDS)
|
||||
.exceptionally(e -> {
|
||||
log.warn("[close][发送关闭帧失败]", e);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
connected.set(false);
|
||||
log.info("[close][WebSocket 客户端连接已关闭,服务器地址: {}]", serverUrl);
|
||||
} catch (Exception e) {
|
||||
log.error("[close][关闭 WebSocket 客户端连接异常]", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查连接状态
|
||||
*
|
||||
* @return 是否已连接
|
||||
*/
|
||||
public boolean isConnected() {
|
||||
return connected.get() && webSocket != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "IotWebSocketClient{" +
|
||||
"serverUrl='" + serverUrl + '\'' +
|
||||
", dataFormat='" + dataFormat + '\'' +
|
||||
", connected=" + connected.get() +
|
||||
'}';
|
||||
}
|
||||
|
||||
}
|
||||
@@ -30,6 +30,7 @@ import org.springframework.cache.annotation.Cacheable;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
@@ -392,9 +393,28 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// 3. 更新最后触发时间
|
||||
updateLastTriggerTime(sceneRule.getId());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新规则场景的最后触发时间
|
||||
*
|
||||
* @param id 规则场景编号
|
||||
*/
|
||||
private void updateLastTriggerTime(Long id) {
|
||||
try {
|
||||
IotSceneRuleDO updateObj = new IotSceneRuleDO()
|
||||
.setId(id)
|
||||
.setLastTriggerTime(LocalDateTime.now());
|
||||
sceneRuleMapper.updateById(updateObj);
|
||||
} catch (Exception e) {
|
||||
log.error("[updateLastTriggerTime][规则场景编号({}) 更新最后触发时间异常]", id, e);
|
||||
}
|
||||
}
|
||||
|
||||
private IotSceneRuleServiceImpl getSelf() {
|
||||
return SpringUtil.getBean(IotSceneRuleServiceImpl.class);
|
||||
}
|
||||
|
||||
@@ -36,11 +36,11 @@ public class IotDevicePropertyPostTriggerMatcher implements IotSceneRuleTriggerM
|
||||
return false;
|
||||
}
|
||||
|
||||
// 1.3 检查标识符是否匹配
|
||||
String messageIdentifier = IotDeviceMessageUtils.getIdentifier(message);
|
||||
if (!IotSceneRuleMatcherHelper.isIdentifierMatched(trigger.getIdentifier(), messageIdentifier)) {
|
||||
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "标识符不匹配,期望: " +
|
||||
trigger.getIdentifier() + ", 实际: " + messageIdentifier);
|
||||
// 1.3 检查消息中是否包含触发器指定的属性标识符
|
||||
// 注意:属性上报可能同时上报多个属性,所以需要判断 trigger.getIdentifier() 是否在 message 的 params 中
|
||||
if (!IotDeviceMessageUtils.containsIdentifier(message, trigger.getIdentifier())) {
|
||||
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中不包含属性: " +
|
||||
trigger.getIdentifier());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.system.SystemUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
|
||||
@@ -69,6 +70,55 @@ public class IotDeviceMessageUtils {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断消息中是否包含指定的标识符
|
||||
*
|
||||
* 对于不同消息类型的处理:
|
||||
* - EVENT_POST/SERVICE_INVOKE:检查 params.identifier 是否匹配
|
||||
* - STATE_UPDATE:检查 params.state 是否匹配
|
||||
* - PROPERTY_POST:检查 params 中是否包含该属性 key
|
||||
*
|
||||
* @param message 消息
|
||||
* @param identifier 要检查的标识符
|
||||
* @return 是否包含
|
||||
*/
|
||||
public static boolean containsIdentifier(IotDeviceMessage message, String identifier) {
|
||||
if (message.getParams() == null || StrUtil.isBlank(identifier)) {
|
||||
return false;
|
||||
}
|
||||
// EVENT_POST / SERVICE_INVOKE / STATE_UPDATE:使用原有逻辑
|
||||
String messageIdentifier = getIdentifier(message);
|
||||
if (messageIdentifier != null) {
|
||||
return identifier.equals(messageIdentifier);
|
||||
}
|
||||
// PROPERTY_POST:检查 params 中是否包含该属性 key
|
||||
if (StrUtil.equals(message.getMethod(), IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod())) {
|
||||
Map<String, Object> params = parseParamsToMap(message.getParams());
|
||||
return params != null && params.containsKey(identifier);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 params 解析为 Map
|
||||
*
|
||||
* @param params 参数(可能是 Map 或 JSON 字符串)
|
||||
* @return Map,解析失败返回 null
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Map<String, Object> parseParamsToMap(Object params) {
|
||||
if (params instanceof Map) {
|
||||
return (Map<String, Object>) params;
|
||||
}
|
||||
if (params instanceof String) {
|
||||
try {
|
||||
return JsonUtils.parseObject((String) params, Map.class);
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从设备消息中提取指定标识符的属性值
|
||||
* - 支持多种消息格式和属性值提取策略
|
||||
|
||||
@@ -21,6 +21,7 @@ import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
|
||||
import io.vertx.core.Vertx;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
@@ -59,20 +60,20 @@ public class IotGatewayConfiguration {
|
||||
@Slf4j
|
||||
public static class EmqxProtocolConfiguration {
|
||||
|
||||
@Bean(destroyMethod = "close")
|
||||
@Bean(name = "emqxVertx", destroyMethod = "close")
|
||||
public Vertx emqxVertx() {
|
||||
return Vertx.vertx();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IotEmqxAuthEventProtocol iotEmqxAuthEventProtocol(IotGatewayProperties gatewayProperties,
|
||||
Vertx emqxVertx) {
|
||||
@Qualifier("emqxVertx") Vertx emqxVertx) {
|
||||
return new IotEmqxAuthEventProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IotEmqxUpstreamProtocol iotEmqxUpstreamProtocol(IotGatewayProperties gatewayProperties,
|
||||
Vertx emqxVertx) {
|
||||
@Qualifier("emqxVertx") Vertx emqxVertx) {
|
||||
return new IotEmqxUpstreamProtocol(gatewayProperties.getProtocol().getEmqx(), emqxVertx);
|
||||
}
|
||||
|
||||
@@ -91,7 +92,7 @@ public class IotGatewayConfiguration {
|
||||
@Slf4j
|
||||
public static class TcpProtocolConfiguration {
|
||||
|
||||
@Bean(destroyMethod = "close")
|
||||
@Bean(name = "tcpVertx", destroyMethod = "close")
|
||||
public Vertx tcpVertx() {
|
||||
return Vertx.vertx();
|
||||
}
|
||||
@@ -101,7 +102,7 @@ public class IotGatewayConfiguration {
|
||||
IotDeviceService deviceService,
|
||||
IotDeviceMessageService messageService,
|
||||
IotTcpConnectionManager connectionManager,
|
||||
Vertx tcpVertx) {
|
||||
@Qualifier("tcpVertx") Vertx tcpVertx) {
|
||||
return new IotTcpUpstreamProtocol(gatewayProperties.getProtocol().getTcp(),
|
||||
deviceService, messageService, connectionManager, tcpVertx);
|
||||
}
|
||||
@@ -126,7 +127,7 @@ public class IotGatewayConfiguration {
|
||||
@Slf4j
|
||||
public static class MqttProtocolConfiguration {
|
||||
|
||||
@Bean(destroyMethod = "close")
|
||||
@Bean(name = "mqttVertx", destroyMethod = "close")
|
||||
public Vertx mqttVertx() {
|
||||
return Vertx.vertx();
|
||||
}
|
||||
@@ -135,7 +136,7 @@ public class IotGatewayConfiguration {
|
||||
public IotMqttUpstreamProtocol iotMqttUpstreamProtocol(IotGatewayProperties gatewayProperties,
|
||||
IotDeviceMessageService messageService,
|
||||
IotMqttConnectionManager connectionManager,
|
||||
Vertx mqttVertx) {
|
||||
@Qualifier("mqttVertx") Vertx mqttVertx) {
|
||||
return new IotMqttUpstreamProtocol(gatewayProperties.getProtocol().getMqtt(), messageService,
|
||||
connectionManager, mqttVertx);
|
||||
}
|
||||
@@ -163,7 +164,7 @@ public class IotGatewayConfiguration {
|
||||
@Slf4j
|
||||
public static class MqttWsProtocolConfiguration {
|
||||
|
||||
@Bean(destroyMethod = "close")
|
||||
@Bean(name = "mqttWsVertx", destroyMethod = "close")
|
||||
public Vertx mqttWsVertx() {
|
||||
return Vertx.vertx();
|
||||
}
|
||||
@@ -172,7 +173,7 @@ public class IotGatewayConfiguration {
|
||||
public IotMqttWsUpstreamProtocol iotMqttWsUpstreamProtocol(IotGatewayProperties gatewayProperties,
|
||||
IotDeviceMessageService messageService,
|
||||
IotMqttWsConnectionManager connectionManager,
|
||||
Vertx mqttWsVertx) {
|
||||
@Qualifier("mqttWsVertx") Vertx mqttWsVertx) {
|
||||
return new IotMqttWsUpstreamProtocol(gatewayProperties.getProtocol().getMqttWs(),
|
||||
messageService, connectionManager, mqttWsVertx);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user