feat:【iot】WebSocket 连接线程安全与 JDK 8 兼容

This commit is contained in:
puhui999
2026-01-21 18:12:28 +08:00
parent 45638b35f4
commit 44838510c9
7 changed files with 480 additions and 66 deletions

View File

@@ -76,6 +76,8 @@
<jimubi.version>2.3.0</jimubi.version>
<weixin-java.version>4.7.9-20251224.161447</weixin-java.version>
<alipay-sdk-java.version>4.40.607.ALL</alipay-sdk-java.version>
<!-- OkHttp -->
<okhttp.version>4.12.0</okhttp.version>
</properties>
<dependencyManagement>
@@ -653,6 +655,19 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${mqtt.version}</version>
</dependency>
<!-- OkHttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>${okhttp.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

View File

@@ -73,6 +73,17 @@
<artifactId>yudao-spring-boot-starter-excel</artifactId>
</dependency>
<!-- OkHttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<!-- 消息队列相关 -->
<!-- TODO @芋艿:临时打开 -->
<dependency>

View File

@@ -84,4 +84,12 @@ public interface RedisKeyConstants {
*/
String SCENE_RULE_LIST = "iot:scene_rule_list";
/**
* WebSocket 连接分布式锁
* <p>
* KEY 格式websocket_connect_lock:${serverUrl}
* 用于保证 WebSocket 重连操作的线程安全
*/
String WEBSOCKET_CONNECT_LOCK = "iot:websocket_connect_lock:%s";
}

View File

@@ -0,0 +1,67 @@
package cn.iocoder.yudao.module.iot.dal.redis.rule;
import jakarta.annotation.Resource;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Repository;
import java.util.concurrent.TimeUnit;
import static cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants.WEBSOCKET_CONNECT_LOCK;
/**
* IoT WebSocket 连接锁 Redis DAO
* <p>
* 用于保证 WebSocket 重连操作的线程安全,避免多线程同时重连导致的资源竞争
*
* @author HUIHUI
*/
@Repository
public class IotWebSocketLockRedisDAO {
/**
* 锁等待超时时间(毫秒)
*/
private static final long LOCK_WAIT_TIME_MS = 5000;
/**
* 锁持有超时时间(毫秒)
*/
private static final long LOCK_LEASE_TIME_MS = 10000;
@Resource
private RedissonClient redissonClient;
/**
* 在分布式锁保护下执行操作
*
* @param serverUrl WebSocket 服务器地址
* @param runnable 需要执行的操作
* @throws Exception 如果获取锁超时或执行操作时发生异常
*/
public void lock(String serverUrl, Runnable runnable) throws Exception {
String lockKey = formatKey(serverUrl);
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试获取分布式锁
boolean acquired = lock.tryLock(LOCK_WAIT_TIME_MS, LOCK_LEASE_TIME_MS, TimeUnit.MILLISECONDS);
if (!acquired) {
throw new RuntimeException("获取 WebSocket 连接锁超时,服务器: " + serverUrl);
}
// 执行操作
runnable.run();
} finally {
// 释放锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
private static String formatKey(String serverUrl) {
return String.format(WEBSOCKET_CONNECT_LOCK, serverUrl);
}
}

View File

@@ -3,8 +3,10 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
import cn.iocoder.yudao.module.iot.dal.redis.rule.IotWebSocketLockRedisDAO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -22,6 +24,9 @@ import org.springframework.stereotype.Component;
public class IotWebSocketDataRuleAction extends
IotDataRuleCacheableAction<IotDataSinkWebSocketConfig, IotWebSocketClient> {
@Resource
private IotWebSocketLockRedisDAO webSocketLockRedisDAO;
@Override
public Integer getType() {
return IotDataSinkTypeEnum.WEBSOCKET.getType();
@@ -62,12 +67,11 @@ public class IotWebSocketDataRuleAction extends
protected void execute(IotDeviceMessage message, IotDataSinkWebSocketConfig config) throws Exception {
try {
// 1.1 获取或创建 WebSocket 客户端
// TODO @puhui999需要加锁保证必须连接上
IotWebSocketClient webSocketClient = getProducer(config);
// 1.2 检查连接状态,如果断开则重新连接
// 1.2 检查连接状态,如果断开则使用分布式锁保证重连的线程安全
if (!webSocketClient.isConnected()) {
log.warn("[execute][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl());
webSocketClient.connect();
reconnectWithLock(webSocketClient, config);
}
// 2.1 发送消息
@@ -82,4 +86,24 @@ public class IotWebSocketDataRuleAction extends
}
}
/**
* 使用分布式锁进行重连
*
* @param webSocketClient WebSocket 客户端
* @param config 配置信息
*/
private void reconnectWithLock(IotWebSocketClient webSocketClient, IotDataSinkWebSocketConfig config) throws Exception {
webSocketLockRedisDAO.lock(config.getServerUrl(), () -> {
// 双重检查:获取锁后再次检查连接状态,避免重复连接
if (!webSocketClient.isConnected()) {
log.warn("[reconnectWithLock][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl());
try {
webSocketClient.connect();
} catch (Exception e) {
throw new RuntimeException("WebSocket 重连失败,服务器: " + config.getServerUrl(), e);
}
}
});
}
}

View File

@@ -4,13 +4,9 @@ import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -19,21 +15,23 @@ import java.util.concurrent.atomic.AtomicBoolean;
* <p>
* 负责与外部 WebSocket 服务器建立连接并发送设备消息
* 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式
* 基于 Java 11+ 内置的 java.net.http.WebSocket 实现
* 基于 OkHttp WebSocket 实现,兼容 JDK 8+
* <p>
* 注意该类的线程安全由调用方IotWebSocketDataRuleAction通过分布式锁保证
*
* @author HUIHUI
*/
@Slf4j
public class IotWebSocketClient implements WebSocket.Listener {
public class IotWebSocketClient {
private final String serverUrl;
private final Integer connectTimeoutMs;
private final Integer sendTimeoutMs;
private final String dataFormat;
private WebSocket webSocket;
private OkHttpClient okHttpClient;
private volatile WebSocket webSocket;
private final AtomicBoolean connected = new AtomicBoolean(false);
private final StringBuilder messageBuffer = new StringBuilder();
public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) {
this.serverUrl = serverUrl;
@@ -44,8 +42,9 @@ public class IotWebSocketClient implements WebSocket.Listener {
/**
* 连接到 WebSocket 服务器
* <p>
* 注意:调用方需要通过分布式锁保证并发安全
*/
@SuppressWarnings("resource")
public void connect() throws Exception {
if (connected.get()) {
log.warn("[connect][WebSocket 客户端已经连接,无需重复连接]");
@@ -53,17 +52,32 @@ public class IotWebSocketClient implements WebSocket.Listener {
}
try {
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(connectTimeoutMs))
// 创建 OkHttpClient
okHttpClient = new OkHttpClient.Builder()
.connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
.readTimeout(sendTimeoutMs, TimeUnit.MILLISECONDS)
.writeTimeout(sendTimeoutMs, TimeUnit.MILLISECONDS)
.build();
CompletableFuture<WebSocket> future = httpClient.newWebSocketBuilder()
.connectTimeout(Duration.ofMillis(connectTimeoutMs))
.buildAsync(URI.create(serverUrl), this);
// 创建 WebSocket 请求
Request request = new Request.Builder()
.url(serverUrl)
.build();
// 使用 CountDownLatch 等待连接完成
CountDownLatch connectLatch = new CountDownLatch(1);
AtomicBoolean connectSuccess = new AtomicBoolean(false);
// 创建 WebSocket 连接
webSocket = okHttpClient.newWebSocket(request, new IotWebSocketListener(connectLatch, connectSuccess));
// 等待连接完成
webSocket = future.get(connectTimeoutMs, TimeUnit.MILLISECONDS);
connected.set(true);
boolean await = connectLatch.await(connectTimeoutMs, TimeUnit.MILLISECONDS);
if (!await || !connectSuccess.get()) {
close();
throw new Exception("WebSocket 连接超时或失败,服务器地址: " + serverUrl);
}
log.info("[connect][WebSocket 客户端连接成功,服务器地址: {}]", serverUrl);
} catch (Exception e) {
close();
@@ -72,36 +86,6 @@ public class IotWebSocketClient implements WebSocket.Listener {
}
}
@Override
public void onOpen(WebSocket webSocket) {
log.debug("[onOpen][WebSocket 连接已打开]");
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
messageBuffer.append(data);
if (last) {
log.debug("[onText][收到 WebSocket 消息: {}]", messageBuffer);
messageBuffer.setLength(0);
}
webSocket.request(1);
return null;
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
connected.set(false);
log.info("[onClose][WebSocket 连接已关闭,状态码: {},原因: {}]", statusCode, reason);
return null;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
connected.set(false);
log.error("[onError][WebSocket 发生错误]", error);
}
/**
* 发送设备消息
*
@@ -109,7 +93,8 @@ public class IotWebSocketClient implements WebSocket.Listener {
* @throws Exception 发送异常
*/
public void sendMessage(IotDeviceMessage message) throws Exception {
if (!connected.get() || webSocket == null) {
WebSocket ws = this.webSocket;
if (!connected.get() || ws == null) {
throw new IllegalStateException("WebSocket 客户端未连接");
}
@@ -121,9 +106,11 @@ public class IotWebSocketClient implements WebSocket.Listener {
messageData = message.toString();
}
// 发送消息并等待完成
CompletableFuture<WebSocket> future = webSocket.sendText(messageData, true);
future.get(sendTimeoutMs, TimeUnit.MILLISECONDS);
// 发送消息
boolean success = ws.send(messageData);
if (!success) {
throw new Exception("WebSocket 发送消息失败,消息队列已满或连接已关闭");
}
log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]",
message.getDeviceId(), messageData.length());
} catch (Exception e) {
@@ -136,18 +123,17 @@ public class IotWebSocketClient implements WebSocket.Listener {
* 关闭连接
*/
public void close() {
if (!connected.get() && webSocket == null) {
return;
}
try {
if (webSocket != null) {
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "客户端主动关闭")
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(e -> {
log.warn("[close][发送关闭帧失败]", e);
return null;
});
// 发送正常关闭帧,状态码 1000 表示正常关闭
webSocket.close(1000, "客户端主动关闭");
webSocket = null;
}
if (okHttpClient != null) {
// 关闭连接池和调度器
okHttpClient.dispatcher().executorService().shutdown();
okHttpClient.connectionPool().evictAll();
okHttpClient = null;
}
connected.set(false);
log.info("[close][WebSocket 客户端连接已关闭,服务器地址: {}]", serverUrl);
@@ -174,4 +160,50 @@ public class IotWebSocketClient implements WebSocket.Listener {
'}';
}
/**
* OkHttp WebSocket 监听器
*/
private class IotWebSocketListener extends WebSocketListener {
private final CountDownLatch connectLatch;
private final AtomicBoolean connectSuccess;
public IotWebSocketListener(CountDownLatch connectLatch, AtomicBoolean connectSuccess) {
this.connectLatch = connectLatch;
this.connectSuccess = connectSuccess;
}
@Override
public void onOpen(WebSocket webSocket, Response response) {
connected.set(true);
connectSuccess.set(true);
connectLatch.countDown();
log.info("[onOpen][WebSocket 连接已打开,服务器: {}]", serverUrl);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
log.debug("[onMessage][收到消息: {}]", text);
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
connected.set(false);
log.info("[onClosing][WebSocket 正在关闭code: {}, reason: {}]", code, reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
connected.set(false);
log.info("[onClosed][WebSocket 已关闭code: {}, reason: {}]", code, reason);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
connected.set(false);
connectLatch.countDown(); // 确保连接失败时也释放等待
log.error("[onFailure][WebSocket 连接失败]", t);
}
}
}

View File

@@ -0,0 +1,257 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
/**
* {@link IotWebSocketClient} 的单元测试
*
* @author HUIHUI
*/
class IotWebSocketClientTest {
private MockWebServer mockWebServer;
@BeforeEach
public void setUp() throws Exception {
mockWebServer = new MockWebServer();
mockWebServer.start();
}
@AfterEach
public void tearDown() throws Exception {
if (mockWebServer != null) {
mockWebServer.shutdown();
}
}
/**
* 简单的 WebSocket 监听器,用于测试
*/
private static class TestWebSocketListener extends WebSocketListener {
@Override
public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
// 连接打开
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
// 收到消息
}
@Override
public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
webSocket.close(code, reason);
}
@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
// 连接失败
}
}
@Test
public void testConstructor_defaultValues() {
// 准备参数
String serverUrl = "ws://localhost:8080";
// 调用
IotWebSocketClient client = new IotWebSocketClient(serverUrl, null, null, null);
// 断言:验证默认值被正确设置
assertNotNull(client);
assertFalse(client.isConnected());
}
@Test
public void testConstructor_customValues() {
// 准备参数
String serverUrl = "ws://localhost:8080";
Integer connectTimeoutMs = 3000;
Integer sendTimeoutMs = 5000;
String dataFormat = "TEXT";
// 调用
IotWebSocketClient client = new IotWebSocketClient(serverUrl, connectTimeoutMs, sendTimeoutMs, dataFormat);
// 断言
assertNotNull(client);
assertFalse(client.isConnected());
}
@Test
public void testConnect_success() throws Exception {
// 准备参数:使用 MockWebServer 的 WebSocket 端点
String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
// mock设置 MockWebServer 响应 WebSocket 升级请求
mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
// 调用
client.connect();
// 断言
assertTrue(client.isConnected());
// 清理
client.close();
}
@Test
public void testConnect_alreadyConnected() throws Exception {
// 准备参数
String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
// mock
mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
// 调用:第一次连接
client.connect();
assertTrue(client.isConnected());
// 调用:第二次连接(应该不会重复连接)
client.connect();
assertTrue(client.isConnected());
// 清理
client.close();
}
@Test
public void testSendMessage_success() throws Exception {
// 准备参数
String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
IotDeviceMessage message = IotDeviceMessage.builder()
.deviceId(123L)
.method("thing.property.report")
.params("{\"temperature\": 25.5}")
.build();
// mock
mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
// 调用
client.connect();
client.sendMessage(message);
// 断言:消息发送成功不抛异常
assertTrue(client.isConnected());
// 清理
client.close();
}
@Test
public void testSendMessage_notConnected() {
// 准备参数
String serverUrl = "ws://localhost:8080";
IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
IotDeviceMessage message = IotDeviceMessage.builder()
.deviceId(123L)
.method("thing.property.report")
.params("{\"temperature\": 25.5}")
.build();
// 调用 & 断言:未连接时发送消息应抛出异常
assertThrows(IllegalStateException.class, () -> client.sendMessage(message));
}
@Test
public void testClose_success() throws Exception {
// 准备参数
String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
// mock
mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
// 调用
client.connect();
assertTrue(client.isConnected());
client.close();
// 断言
assertFalse(client.isConnected());
}
@Test
public void testClose_notConnected() {
// 准备参数
String serverUrl = "ws://localhost:8080";
IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
// 调用:关闭未连接的客户端不应抛异常
assertDoesNotThrow(client::close);
assertFalse(client.isConnected());
}
@Test
public void testIsConnected_initialState() {
// 准备参数
String serverUrl = "ws://localhost:8080";
IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
// 断言:初始状态应为未连接
assertFalse(client.isConnected());
}
@Test
public void testToString() {
// 准备参数
String serverUrl = "ws://localhost:8080";
IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "JSON");
// 调用
String result = client.toString();
// 断言
assertNotNull(result);
assertTrue(result.contains("serverUrl='ws://localhost:8080'"));
assertTrue(result.contains("dataFormat='JSON'"));
assertTrue(result.contains("connected=false"));
}
@Test
public void testSendMessage_textFormat() throws Exception {
// 准备参数
String serverUrl = "ws://" + mockWebServer.getHostName() + ":" + mockWebServer.getPort();
IotWebSocketClient client = new IotWebSocketClient(serverUrl, 5000, 5000, "TEXT");
IotDeviceMessage message = IotDeviceMessage.builder()
.deviceId(123L)
.method("thing.property.report")
.params("{\"temperature\": 25.5}")
.build();
// mock
mockWebServer.enqueue(new MockResponse().withWebSocketUpgrade(new TestWebSocketListener()));
// 调用
client.connect();
client.sendMessage(message);
// 断言:消息发送成功不抛异常
assertTrue(client.isConnected());
// 清理
client.close();
}
}