【同步】jdk21 和 jdk8 的代码
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled
yudao-ui-admin CI / build (14.x) (push) Has been cancelled
yudao-ui-admin CI / build (16.x) (push) Has been cancelled

This commit is contained in:
YunaiV
2026-01-18 18:35:15 +08:00
parent a39647efc0
commit 7646884008
10 changed files with 32 additions and 292 deletions

View File

@@ -1,85 +0,0 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* WebSocket 的 {@link IotDataRuleAction} 实现类
* <p>
* 负责将设备消息发送到外部 WebSocket 服务器
* 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式
* 使用连接池管理 WebSocket 连接,提高性能和资源利用率
*
* @author HUIHUI
*/
@Component
@Slf4j
public class IotWebSocketDataRuleAction extends
IotDataRuleCacheableAction<IotDataSinkWebSocketConfig, IotWebSocketClient> {
@Override
public Integer getType() {
return IotDataSinkTypeEnum.WEBSOCKET.getType();
}
@Override
protected IotWebSocketClient initProducer(IotDataSinkWebSocketConfig config) throws Exception {
// 1. 参数校验
if (StrUtil.isBlank(config.getServerUrl())) {
throw new IllegalArgumentException("WebSocket 服务器地址不能为空");
}
if (!StrUtil.startWithAny(config.getServerUrl(), "ws://", "wss://")) {
throw new IllegalArgumentException("WebSocket 服务器地址必须以 ws:// 或 wss:// 开头");
}
// 2.1 创建 WebSocket 客户端
IotWebSocketClient webSocketClient = new IotWebSocketClient(
config.getServerUrl(),
config.getConnectTimeoutMs(),
config.getSendTimeoutMs(),
config.getDataFormat()
);
// 2.2 连接服务器
webSocketClient.connect();
log.info("[initProducer][WebSocket 客户端创建并连接成功,服务器: {},数据格式: {}]",
config.getServerUrl(), config.getDataFormat());
return webSocketClient;
}
@Override
protected void closeProducer(IotWebSocketClient producer) throws Exception {
if (producer != null) {
producer.close();
}
}
@Override
protected void execute(IotDeviceMessage message, IotDataSinkWebSocketConfig config) throws Exception {
try {
// 1.1 获取或创建 WebSocket 客户端
// TODO @puhui999需要加锁保证必须连接上
IotWebSocketClient webSocketClient = getProducer(config);
// 1.2 检查连接状态,如果断开则重新连接
if (!webSocketClient.isConnected()) {
log.warn("[execute][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl());
webSocketClient.connect();
}
// 2.1 发送消息
webSocketClient.sendMessage(message);
// 2.2 记录发送成功日志
log.info("[execute][message({}) config({}) 发送成功WebSocket 服务器: {}]",
message, config, config.getServerUrl());
} catch (Exception e) {
log.error("[execute][message({}) config({}) 发送失败WebSocket 服务器: {}]",
message, config, config.getServerUrl(), e);
throw e;
}
}
}

View File

@@ -1,177 +0,0 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* IoT WebSocket 客户端
* <p>
* 负责与外部 WebSocket 服务器建立连接并发送设备消息
* 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式
* 基于 Java 11+ 内置的 java.net.http.WebSocket 实现
*
* @author HUIHUI
*/
@Slf4j
public class IotWebSocketClient implements WebSocket.Listener {
private final String serverUrl;
private final Integer connectTimeoutMs;
private final Integer sendTimeoutMs;
private final String dataFormat;
private WebSocket webSocket;
private final AtomicBoolean connected = new AtomicBoolean(false);
private final StringBuilder messageBuffer = new StringBuilder();
public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) {
this.serverUrl = serverUrl;
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_CONNECT_TIMEOUT_MS;
this.sendTimeoutMs = sendTimeoutMs != null ? sendTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_SEND_TIMEOUT_MS;
this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkWebSocketConfig.DEFAULT_DATA_FORMAT;
}
/**
* 连接到 WebSocket 服务器
*/
@SuppressWarnings("resource")
public void connect() throws Exception {
if (connected.get()) {
log.warn("[connect][WebSocket 客户端已经连接,无需重复连接]");
return;
}
try {
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(connectTimeoutMs))
.build();
CompletableFuture<WebSocket> future = httpClient.newWebSocketBuilder()
.connectTimeout(Duration.ofMillis(connectTimeoutMs))
.buildAsync(URI.create(serverUrl), this);
// 等待连接完成
webSocket = future.get(connectTimeoutMs, TimeUnit.MILLISECONDS);
connected.set(true);
log.info("[connect][WebSocket 客户端连接成功,服务器地址: {}]", serverUrl);
} catch (Exception e) {
close();
log.error("[connect][WebSocket 客户端连接失败,服务器地址: {}]", serverUrl, e);
throw e;
}
}
@Override
public void onOpen(WebSocket webSocket) {
log.debug("[onOpen][WebSocket 连接已打开]");
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
messageBuffer.append(data);
if (last) {
log.debug("[onText][收到 WebSocket 消息: {}]", messageBuffer);
messageBuffer.setLength(0);
}
webSocket.request(1);
return null;
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
connected.set(false);
log.info("[onClose][WebSocket 连接已关闭,状态码: {},原因: {}]", statusCode, reason);
return null;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
connected.set(false);
log.error("[onError][WebSocket 发生错误]", error);
}
/**
* 发送设备消息
*
* @param message 设备消息
* @throws Exception 发送异常
*/
public void sendMessage(IotDeviceMessage message) throws Exception {
if (!connected.get() || webSocket == null) {
throw new IllegalStateException("WebSocket 客户端未连接");
}
try {
String messageData;
if (IotDataSinkWebSocketConfig.DEFAULT_DATA_FORMAT.equalsIgnoreCase(dataFormat)) {
messageData = JsonUtils.toJsonString(message);
} else {
messageData = message.toString();
}
// 发送消息并等待完成
CompletableFuture<WebSocket> future = webSocket.sendText(messageData, true);
future.get(sendTimeoutMs, TimeUnit.MILLISECONDS);
log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]",
message.getDeviceId(), messageData.length());
} catch (Exception e) {
log.error("[sendMessage][发送消息失败,设备 ID: {}]", message.getDeviceId(), e);
throw e;
}
}
/**
* 关闭连接
*/
public void close() {
if (!connected.get() && webSocket == null) {
return;
}
try {
if (webSocket != null) {
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "客户端主动关闭")
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(e -> {
log.warn("[close][发送关闭帧失败]", e);
return null;
});
}
connected.set(false);
log.info("[close][WebSocket 客户端连接已关闭,服务器地址: {}]", serverUrl);
} catch (Exception e) {
log.error("[close][关闭 WebSocket 客户端连接异常]", e);
}
}
/**
* 检查连接状态
*
* @return 是否已连接
*/
public boolean isConnected() {
return connected.get() && webSocket != null;
}
@Override
public String toString() {
return "IotWebSocketClient{" +
"serverUrl='" + serverUrl + '\'' +
", dataFormat='" + dataFormat + '\'' +
", connected=" + connected.get() +
'}';
}
}

View File

@@ -23,13 +23,13 @@ import cn.iocoder.yudao.module.iot.service.product.IotProductService;
import cn.iocoder.yudao.module.iot.service.rule.scene.action.IotSceneRuleAction;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherManager;
import cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotSceneRuleTimerHandler;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;

View File

@@ -6,9 +6,10 @@ import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqttws.router.IotMqttWsDownstreamHandler;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
/**
* IoT MQTT WebSocket 下行消息订阅器
* <p>

View File

@@ -9,11 +9,12 @@ import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.ServerWebSocket;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* IoT 网关 MQTT WebSocket 协议:接收设备上行消息
* <p>

View File

@@ -1,9 +1,9 @@
package cn.iocoder.yudao.module.system.api.mail.dto;
import jakarta.validation.constraints.Email;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import javax.validation.constraints.Email;
import javax.validation.constraints.NotNull;
import java.io.File;
import java.util.List;
import java.util.Map;

View File

@@ -1,9 +1,9 @@
package cn.iocoder.yudao.module.system.mq.message.mail;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.io.File;
import java.util.Collection;

View File

@@ -1,11 +1,11 @@
package cn.iocoder.yudao.module.system.mq.producer.mail;
import cn.iocoder.yudao.module.system.mq.message.mail.MailSendMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.File;
import java.util.Collection;

View File

@@ -3,6 +3,8 @@ package cn.iocoder.yudao.module.system.service.mail;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Validator;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.mail.MailAccount;
import cn.hutool.extra.mail.MailUtil;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.enums.UserTypeEnum;
import cn.iocoder.yudao.module.system.dal.dataobject.mail.MailAccountDO;
@@ -13,13 +15,11 @@ import cn.iocoder.yudao.module.system.mq.producer.mail.MailProducer;
import cn.iocoder.yudao.module.system.service.member.MemberService;
import cn.iocoder.yudao.module.system.service.user.AdminUserService;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.dromara.hutool.extra.mail.MailAccount;
import org.dromara.hutool.extra.mail.MailUtil;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import javax.annotation.Resource;
import java.io.File;
import java.util.Collection;
import java.util.LinkedHashSet;
@@ -137,7 +137,7 @@ public class MailSendServiceImpl implements MailSendService {
private MailAccount buildMailAccount(MailAccountDO account, String nickname) {
String from = StrUtil.isNotEmpty(nickname) ? nickname + " <" + account.getMail() + ">" : account.getMail();
return new MailAccount().setFrom(from).setAuth(true)
.setUser(account.getUsername()).setPass(account.getPassword().toCharArray())
.setUser(account.getUsername()).setPass(account.getPassword())
.setHost(account.getHost()).setPort(account.getPort())
.setSslEnable(account.getSslEnable()).setStarttlsEnable(account.getStarttlsEnable());
}

View File

@@ -1,6 +1,8 @@
package cn.iocoder.yudao.module.system.service.mail;
import cn.hutool.core.map.MapUtil;
import cn.hutool.extra.mail.MailAccount;
import cn.hutool.extra.mail.MailUtil;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.enums.UserTypeEnum;
import cn.iocoder.yudao.framework.test.core.ut.BaseMockitoUnitTest;
@@ -13,8 +15,6 @@ import cn.iocoder.yudao.module.system.mq.producer.mail.MailProducer;
import cn.iocoder.yudao.module.system.service.member.MemberService;
import cn.iocoder.yudao.module.system.service.user.AdminUserService;
import org.assertj.core.util.Lists;
import org.dromara.hutool.extra.mail.MailAccount;
import org.dromara.hutool.extra.mail.MailUtil;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
@@ -62,7 +62,7 @@ public class MailSendServiceImplTest extends BaseMockitoUnitTest {
// .setFrom("奥特曼 <ydym_test@163.com>")
.setFrom("ydym_test@163.com") // 邮箱地址
.setHost("smtp.163.com").setPort(465).setSslEnable(true) // SMTP 服务器
.setAuth(true).setUser("ydym_test@163.com").setPass("WBZTEINMIFVRYSOE".toCharArray()); // 登录账号密码
.setAuth(true).setUser("ydym_test@163.com").setPass("WBZTEINMIFVRYSOE"); // 登录账号密码
String messageId = MailUtil.send(mailAccount, "7685413@qq.com", "主题", "内容", false);
System.out.println("发送结果:" + messageId);
}
@@ -263,7 +263,7 @@ public class MailSendServiceImplTest extends BaseMockitoUnitTest {
// 调用,并断言异常
assertServiceException(() -> mailSendService.sendSingleMail(toMails, null, null, userId,
UserTypeEnum.ADMIN.getValue(), templateCode, templateParams, (java.io.File[]) null),
UserTypeEnum.ADMIN.getValue(), templateCode, templateParams, (java.io.File[]) null),
MAIL_SEND_MAIL_NOT_EXISTS);
}
@@ -280,17 +280,17 @@ public class MailSendServiceImplTest extends BaseMockitoUnitTest {
// mock 方法(发送邮件)
String messageId = randomString();
mailUtilMock.when(() -> MailUtil.send(
argThat(mailAccount -> {
assertEquals("芋艿 <7685@qq.com>", mailAccount.getFrom());
assertTrue(mailAccount.isAuth());
assertEquals(account.getUsername(), mailAccount.getUser());
assertArrayEquals(account.getPassword().toCharArray(), mailAccount.getPass());
assertEquals(account.getHost(), mailAccount.getHost());
assertEquals(account.getPort(), mailAccount.getPort());
assertEquals(account.getSslEnable(), mailAccount.isSslEnable());
return true;
}), eq(message.getToMails()), eq(message.getCcMails()), eq(message.getBccMails()),
eq(message.getTitle()), eq(message.getContent()), eq(true), eq(message.getAttachments())))
argThat(mailAccount -> {
assertEquals("芋艿 <7685@qq.com>", mailAccount.getFrom());
assertTrue(mailAccount.isAuth());
assertEquals(account.getUsername(), mailAccount.getUser());
assertArrayEquals(account.getPassword().toCharArray(), mailAccount.getPass().toCharArray());
assertEquals(account.getHost(), mailAccount.getHost());
assertEquals(account.getPort(), mailAccount.getPort());
assertEquals(account.getSslEnable(), mailAccount.isSslEnable());
return true;
}), eq(message.getToMails()), eq(message.getCcMails()), eq(message.getBccMails()),
eq(message.getTitle()), eq(message.getContent()), eq(true), any()))
.thenReturn(messageId);
// 调用
@@ -316,13 +316,13 @@ public class MailSendServiceImplTest extends BaseMockitoUnitTest {
assertEquals("芋艿 <7685@qq.com>", mailAccount.getFrom());
assertTrue(mailAccount.isAuth());
assertEquals(account.getUsername(), mailAccount.getUser());
assertArrayEquals(account.getPassword().toCharArray(), mailAccount.getPass());
assertArrayEquals(account.getPassword().toCharArray(), mailAccount.getPass().toCharArray());
assertEquals(account.getHost(), mailAccount.getHost());
assertEquals(account.getPort(), mailAccount.getPort());
assertEquals(account.getSslEnable(), mailAccount.isSslEnable());
return true;
}), eq(message.getToMails()), eq(message.getCcMails()), eq(message.getBccMails()),
eq(message.getTitle()), eq(message.getContent()), eq(true), same(message.getAttachments()))).thenThrow(e);
eq(message.getTitle()), eq(message.getContent()), eq(true), any())).thenThrow(e);
// 调用
mailSendService.doSendMail(message);