【重构】站内信消息SSE彻底更改为ws方案

This commit is contained in:
俞宝山
2025-12-24 18:08:58 +08:00
parent cc5bf91e89
commit 315eee3794
19 changed files with 382 additions and 916 deletions

View File

@@ -33,5 +33,11 @@
<groupId>vip.xiaonuo</groupId>
<artifactId>snowy-plugin-sys-api</artifactId>
</dependency>
<!-- websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,25 @@
package vip.xiaonuo.dev.core.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* WebSocket配置
*
* @author yubaoshan
* @date 2025/12/24 18:01
*/
@Configuration
@EnableScheduling
public class WebSocketConfig {
/**
* 注入ServerEndpointExporter这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

View File

@@ -25,6 +25,7 @@ import vip.xiaonuo.dev.modular.message.param.DevMessageIdParam;
import vip.xiaonuo.dev.modular.message.param.DevMessageListParam;
import vip.xiaonuo.dev.modular.message.param.DevMessageSendParam;
import vip.xiaonuo.dev.modular.message.service.DevMessageService;
import vip.xiaonuo.dev.modular.message.websocket.DevMessageWebSocket;
import vip.xiaonuo.dev.modular.relation.entity.DevRelation;
import vip.xiaonuo.dev.modular.relation.enums.DevRelationCategoryEnum;
import vip.xiaonuo.dev.modular.relation.service.DevRelationService;
@@ -105,5 +106,7 @@ public class DevMessageProvider implements DevMessageApi {
.eq(DevRelation::getTargetId, StpUtil.getLoginIdAsString())
.eq(DevRelation::getCategory, DevRelationCategoryEnum.MSG_TO_USER.getValue())
.set(DevRelation::getExtJson, myMessageExtJson));
// 发送WebSocket消息
DevMessageWebSocket.sendMessage(StpUtil.getLoginIdAsString());
}
}

View File

@@ -41,6 +41,7 @@ import vip.xiaonuo.dev.modular.message.param.DevMessagePageParam;
import vip.xiaonuo.dev.modular.message.param.DevMessageSendParam;
import vip.xiaonuo.dev.modular.message.result.DevMessageResult;
import vip.xiaonuo.dev.modular.message.service.DevMessageService;
import vip.xiaonuo.dev.modular.message.websocket.DevMessageWebSocket;
import vip.xiaonuo.dev.modular.relation.entity.DevRelation;
import vip.xiaonuo.dev.modular.relation.enums.DevRelationCategoryEnum;
import vip.xiaonuo.dev.modular.relation.service.DevRelationService;
@@ -81,6 +82,8 @@ public class DevMessageServiceImpl extends ServiceImpl<DevMessageMapper, DevMess
.set("read", false))).collect(Collectors.toList());
devRelationService.saveRelationBatchWithAppend(devMessage.getId(), receiverIdList,
DevRelationCategoryEnum.MSG_TO_USER.getValue(), extJsonList);
// 发送WebSocket消息
DevMessageWebSocket.sendMessage(receiverIdList);
}
}
@@ -155,9 +158,10 @@ public class DevMessageServiceImpl extends ServiceImpl<DevMessageMapper, DevMess
@Override
public Long unreadCount(String loginId){
return devRelationService.getRelationListByTargetIdAndCategory(loginId,
DevRelationCategoryEnum.MSG_TO_USER.getValue()).stream().filter(devRelation -> JSONUtil
.parseObj(devRelation.getExtJson()).getBool("read").equals(false)).count();
return devRelationService.count(new LambdaQueryWrapper<DevRelation>()
.eq(DevRelation::getTargetId, loginId)
.eq(DevRelation::getCategory, DevRelationCategoryEnum.MSG_TO_USER.getValue())
.like(DevRelation::getExtJson, "\"read\":false"));
}
@Transactional(rollbackFor = Exception.class)
@@ -188,6 +192,8 @@ public class DevMessageServiceImpl extends ServiceImpl<DevMessageMapper, DevMess
devRelationService.update(new LambdaUpdateWrapper<DevRelation>()
.eq(DevRelation::getObjectId, devMessage.getId()).eq(DevRelation::getTargetId, StpUtil.getLoginIdAsString())
.eq(DevRelation::getCategory, DevRelationCategoryEnum.MSG_TO_USER.getValue()).set(DevRelation::getExtJson, myMessageExtJson));
// 发送WebSocket消息
DevMessageWebSocket.sendMessage(StpUtil.getLoginIdAsString());
}
List<DevMessageResult.DevReceiveInfo> receiveInfoList = devRelationService.getRelationListByObjectIdAndCategory(devMessage.getId(),
DevRelationCategoryEnum.MSG_TO_USER.getValue()).stream().map(devRelation -> {

View File

@@ -0,0 +1,144 @@
package vip.xiaonuo.dev.modular.message.websocket;
import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.http.HttpUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONUtil;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import vip.xiaonuo.dev.modular.message.service.DevMessageService;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 站内信WebSocket
*
* @author yubaoshan
* @date 2025/12/24 18:01
*/
@Component
@Slf4j
@ServerEndpoint("/dev/message/ws")
public class DevMessageWebSocket {
/**
* WebSocket Session池key: sessionId, value: session
*/
private static final ConcurrentHashMap<String, Session> SESSION_POOL = new ConcurrentHashMap<>();
/**
* SessionUser映射key: sessionId, value: userId
*/
private static final ConcurrentHashMap<String, String> SESSION_USER_MAP = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(Session session) {
try {
String queryString = session.getQueryString();
if (ObjectUtil.isEmpty(queryString)) {
session.close();
return;
}
Map<String, String> paramMap = HttpUtil.decodeParamMap(queryString, Charset.defaultCharset());
String token = paramMap.get("token");
if (ObjectUtil.isEmpty(token)) {
session.close();
return;
}
// 验证token
Object loginId = StpUtil.getLoginIdByToken(token);
if (ObjectUtil.isEmpty(loginId)) {
session.close();
return;
}
String userId = loginId.toString();
SESSION_POOL.put(session.getId(), session);
SESSION_USER_MAP.put(session.getId(), userId);
// 发送初始未读数量
sendUnreadCount(session, userId);
} catch (Exception e) {
log.error("WebSocket onOpen error", e);
}
}
@OnClose
public void onClose(Session session) {
SESSION_POOL.remove(session.getId());
SESSION_USER_MAP.remove(session.getId());
}
@OnError
public void onError(Session session, Throwable error) {
log.error("WebSocket onError", error);
onClose(session);
}
@OnMessage
public void onMessage(String message, Session session) {
// 收到消息,暂不处理
}
/**
* 发送未读消息数量给指定用户
*
* @param userId 用户ID
*/
public static void sendMessage(String userId) {
SESSION_USER_MAP.forEach((sessionId, sessionUserId) -> {
if (sessionUserId.equals(userId)) {
Session session = SESSION_POOL.get(sessionId);
if (session != null && session.isOpen()) {
sendUnreadCount(session, userId);
}
}
});
}
/**
* 发送未读消息数量给指定用户列表
*
* @author yubaoshan
* @date 2025/12/24 22:20
* @param userIds 用户ID列表
*/
public static void sendMessage(List<String> userIds) {
Set<String> userIdSet = new HashSet<>(userIds);
SESSION_USER_MAP.forEach((sessionId, sessionUserId) -> {
if (userIdSet.contains(sessionUserId)) {
Session session = SESSION_POOL.get(sessionId);
if (session != null && session.isOpen()) {
sendUnreadCount(session, sessionUserId);
}
}
});
}
/**
* 发送未读消息数量
*/
private static void sendUnreadCount(Session session, String userId) {
try {
if (session.isOpen()) {
DevMessageService devMessageService = SpringUtil.getBean(DevMessageService.class);
Long count = devMessageService.unreadCount(userId);
Map<String, Object> result = Map.of("code", 200, "data", count);
session.getBasicRemote().sendText(JSONUtil.toJsonStr(result));
}
} catch (IOException e) {
log.error("Send message error", e);
}
}
}

View File

@@ -1,98 +0,0 @@
/*
* Copyright [2022] [https://www.xiaonuo.vip]
*
* Snowy采用APACHE LICENSE 2.0开源协议,您在使用过程中,需要注意以下几点:
*
* 1.请不要删除和修改根目录下的LICENSE文件。
* 2.请不要删除和修改Snowy源码头部的版权声明。
* 3.本项目代码可免费商业使用,商业使用请保留源码和相关描述文件的项目出处,作者声明等。
* 4.分发源码时候,请注明软件出处 https://www.xiaonuo.vip
* 5.不可二次分发开源参与同类竞品如有想法可联系团队xiaonuobase@qq.com商议合作。
* 6.若您的项目无法满足以上几点需要更多功能代码获取Snowy商业授权许可请在官网购买授权地址为 https://www.xiaonuo.vip
*/
package vip.xiaonuo.dev.modular.sse.controller;
import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
import com.github.xiaoymin.knife4j.annotations.ApiSupport;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import vip.xiaonuo.common.sse.CommonSseParam;
import vip.xiaonuo.dev.modular.sse.service.DevSseEmitterService;
import java.util.function.Consumer;
/**
* SSE通信控制器
*
* @author diantu
* @date 2023/7/3
**/
@Tag(name = "SSE通信控制器")
@ApiSupport(author = "SNOWY_TEAM", order = 10)
@RestController
@Validated
public class DevSseEmitterController {
@Resource
private DevSseEmitterService devSseEmitterService;
/**
* 创建sse连接
*
* @author diantu
* @date 2023/7/3
**/
@ApiOperationSupport(order = 1)
@Operation(summary = "创建sse连接")
@GetMapping("/dev/sse/createConnect")
public SseEmitter createConnect(String clientId,
@RequestParam(required = false)Boolean setHeartBeat,
@RequestParam(required = false)Boolean defaultHeartbeat,
@RequestParam(required = false)Consumer<CommonSseParam> consumer){
return devSseEmitterService.createSseConnect(clientId,setHeartBeat,defaultHeartbeat,consumer);
}
/**
* 关闭sse连接
*
* @author diantu
* @date 2023/7/3
**/
@ApiOperationSupport(order = 2)
@Operation(summary = "关闭sse连接")
@GetMapping("/dev/sse/closeSseConnect")
public void closeSseConnect(String clientId){
devSseEmitterService.closeSseConnect(clientId);
}
/**
* 推送消息到所有客户端
*
* @author diantu
* @date 2023/7/3
**/
@ApiOperationSupport(order = 3)
@Operation(summary = "推送消息到所有客户端")
@PostMapping("/dev/sse/broadcast")
public void sendMessageToAllClient(@RequestBody(required = false) String msg){
devSseEmitterService.sendMessageToAllClient(msg);
}
/**
* 根据clientId发送消息给某一客户端
*
* @author diantu
* @date 2023/7/3
**/
@ApiOperationSupport(order = 4)
@Operation(summary = "根据clientId发送消息给某一客户端")
@PostMapping("/dev/sse/sendMessage")
public void sendMessageToOneClient(String clientId,String msg){
devSseEmitterService.sendMessageToOneClient(clientId,msg);
}
}

View File

@@ -1,47 +0,0 @@
/*
* Copyright [2022] [https://www.xiaonuo.vip]
*
* Snowy采用APACHE LICENSE 2.0开源协议,您在使用过程中,需要注意以下几点:
*
* 1.请不要删除和修改根目录下的LICENSE文件。
* 2.请不要删除和修改Snowy源码头部的版权声明。
* 3.本项目代码可免费商业使用,商业使用请保留源码和相关描述文件的项目出处,作者声明等。
* 4.分发源码时候,请注明软件出处 https://www.xiaonuo.vip
* 5.不可二次分发开源参与同类竞品如有想法可联系团队xiaonuobase@qq.com商议合作。
* 6.若您的项目无法满足以上几点需要更多功能代码获取Snowy商业授权许可请在官网购买授权地址为 https://www.xiaonuo.vip
*/
package vip.xiaonuo.dev.modular.sse.enums;
import lombok.Getter;
/**
* SSE通信参数枚举
*
* @author diantu
* @date 2023/7/17
**/
@Getter
public enum DevSseEmitterParameterEnum {
/**
* 通信
*/
EMITTER("EMITTER"),
/**
* 心跳
*/
FUTURE("FUTURE"),
/**
* 用户ID
*/
LOGINID("LOGINID");
private final String value;
DevSseEmitterParameterEnum(String value) {
this.value = value;
}
}

View File

@@ -1,79 +0,0 @@
/*
* Copyright [2022] [https://www.xiaonuo.vip]
*
* Snowy采用APACHE LICENSE 2.0开源协议,您在使用过程中,需要注意以下几点:
*
* 1.请不要删除和修改根目录下的LICENSE文件。
* 2.请不要删除和修改Snowy源码头部的版权声明。
* 3.本项目代码可免费商业使用,商业使用请保留源码和相关描述文件的项目出处,作者声明等。
* 4.分发源码时候,请注明软件出处 https://www.xiaonuo.vip
* 5.不可二次分发开源参与同类竞品如有想法可联系团队xiaonuobase@qq.com商议合作。
* 6.若您的项目无法满足以上几点需要更多功能代码获取Snowy商业授权许可请在官网购买授权地址为 https://www.xiaonuo.vip
*/
package vip.xiaonuo.dev.modular.sse.provider;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import vip.xiaonuo.common.sse.CommonSseParam;
import vip.xiaonuo.dev.api.DevSseApi;
import vip.xiaonuo.dev.modular.sse.service.DevSseEmitterService;
import java.util.function.Consumer;
/**
* SSE API接口提供者
*
* @author diantu
* @date 2023/7/5
**/
@Service
public class DevSseProvider implements DevSseApi {
@Resource
private DevSseEmitterService devSseEmitterService;
/**
* 创建SSE连接
*
* @author diantu
* @date 2023/7/5
**/
@Override
public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer<CommonSseParam> consumer) {
return devSseEmitterService.createSseConnect(clientId,setHeartBeat,defaultHeartbeat,consumer);
}
/**
* 关闭连接
*
* @author diantu
* @date 2023/7/5
**/
@Override
public void closeSseConnect(String clientId) {
devSseEmitterService.closeSseConnect(clientId);
}
/**
* 推送消息到所有客户端
*
* @author diantu
* @date 2023/7/5
**/
@Override
public void sendMessageToAllClient(String msg) {
devSseEmitterService.sendMessageToAllClient(msg);
}
/**
* 根据clientId发送消息给某一客户端
*
* @author diantu
* @date 2023/7/5
**/
@Override
public void sendMessageToOneClient(String clientId, String msg) {
devSseEmitterService.sendMessageToOneClient(clientId,msg);
}
}

View File

@@ -1,59 +0,0 @@
/*
* Copyright [2022] [https://www.xiaonuo.vip]
*
* Snowy采用APACHE LICENSE 2.0开源协议,您在使用过程中,需要注意以下几点:
*
* 1.请不要删除和修改根目录下的LICENSE文件。
* 2.请不要删除和修改Snowy源码头部的版权声明。
* 3.本项目代码可免费商业使用,商业使用请保留源码和相关描述文件的项目出处,作者声明等。
* 4.分发源码时候,请注明软件出处 https://www.xiaonuo.vip
* 5.不可二次分发开源参与同类竞品如有想法可联系团队xiaonuobase@qq.com商议合作。
* 6.若您的项目无法满足以上几点需要更多功能代码获取Snowy商业授权许可请在官网购买授权地址为 https://www.xiaonuo.vip
*/
package vip.xiaonuo.dev.modular.sse.service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import vip.xiaonuo.common.sse.CommonSseParam;
import java.util.function.Consumer;
/**
* SSE通信Service接口
*
* @author diantu
* @date 2023/7/3
**/
public interface DevSseEmitterService {
/**
* 创建连接
*
* @author diantu
* @date 2023/7/3
**/
public SseEmitter createSseConnect(String clientId,Boolean setHeartBeat,Boolean defaultHeartbeat,Consumer<CommonSseParam> consumer);
/**
* 关闭连接
*
* @author diantu
* @date 2023/7/3
**/
public void closeSseConnect(String clientId);
/**
* 推送消息到所有客户端
*
* @author diantu
* @date 2023/7/3
**/
public void sendMessageToAllClient(String msg);
/**
* 根据clientId发送消息给某一客户端
*
* @author diantu
* @date 2023/7/3
**/
public void sendMessageToOneClient(String clientId, String msg);
}

View File

@@ -1,135 +0,0 @@
/*
* Copyright [2022] [https://www.xiaonuo.vip]
*
* Snowy采用APACHE LICENSE 2.0开源协议,您在使用过程中,需要注意以下几点:
*
* 1.请不要删除和修改根目录下的LICENSE文件。
* 2.请不要删除和修改Snowy源码头部的版权声明。
* 3.本项目代码可免费商业使用,商业使用请保留源码和相关描述文件的项目出处,作者声明等。
* 4.分发源码时候,请注明软件出处 https://www.xiaonuo.vip
* 5.不可二次分发开源参与同类竞品如有想法可联系团队xiaonuobase@qq.com商议合作。
* 6.若您的项目无法满足以上几点需要更多功能代码获取Snowy商业授权许可请在官网购买授权地址为 https://www.xiaonuo.vip
*/
package vip.xiaonuo.dev.modular.sse.service.impl;
import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.util.IdUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import vip.xiaonuo.common.pojo.CommonResult;
import vip.xiaonuo.common.sse.CommonSseParam;
import vip.xiaonuo.dev.modular.sse.service.DevSseEmitterService;
import vip.xiaonuo.dev.modular.sse.util.DevSseCacheUtil;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* SSE通信Service接口实现类
*
* @author diantu
* @date 2023/7/3
**/
@Slf4j
@Service
public class DevSseEmitterServiceImpl implements DevSseEmitterService {
/**
* 心跳线程池
*/
private static final ScheduledExecutorService heartbeatExecutors = Executors.newScheduledThreadPool(10);
/**
* 创建连接
*
* @author diantu
* @date 2023/7/3
**/
@Override
public SseEmitter createSseConnect(String clientId, Boolean setHeartBeat, Boolean defaultHeartbeat, Consumer<CommonSseParam> consumer) {
// 设置超时时间0表示不过期。默认30秒超过时间未完成会抛出异常AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
String loginId = StpUtil.getLoginIdAsString();
// 判断连接是否有效
if (DevSseCacheUtil.connectionValidity(clientId,loginId)) {
return DevSseCacheUtil.getSseEmitterByClientId(clientId);
}else{
DevSseCacheUtil.removeConnection(clientId);
}
clientId = IdUtil.simpleUUID();
String finalClientId = clientId;
// 增加心跳
final ScheduledFuture<?> future;
// 是否自定义心跳任务
if (setHeartBeat!=null&&setHeartBeat) {
//是否使用默认心跳任务
if(defaultHeartbeat!=null&&defaultHeartbeat){
//默认心跳任务
future = heartbeatExecutors.scheduleAtFixedRate(() ->
DevSseCacheUtil.sendMessageToOneClient(finalClientId,finalClientId+"-"+loginId),
2, 10, TimeUnit.SECONDS);
}else{
//自定义心跳任务
CommonSseParam commonSseParam = new CommonSseParam();
commonSseParam.setClientId(clientId);
commonSseParam.setLoginId(loginId);
future = heartbeatExecutors.scheduleAtFixedRate(() -> consumer.accept(commonSseParam),
2, 10, TimeUnit.SECONDS);
}
// 增加连接
DevSseCacheUtil.addConnection(clientId, loginId, sseEmitter, future);
} else {
// 增加连接
DevSseCacheUtil.addConnection(clientId, loginId, sseEmitter, null);
}
// 长链接完成后回调(即关闭连接时调用)
sseEmitter.onCompletion(DevSseCacheUtil.completionCallBack(clientId));
// 连接超时回调
sseEmitter.onTimeout(DevSseCacheUtil.timeoutCallBack(clientId));
// 推送消息异常回调
sseEmitter.onError(DevSseCacheUtil.errorCallBack(clientId));
// 初次建立连接,推送客户端id
CommonResult<String> message = new CommonResult<>(0,"",clientId);
DevSseCacheUtil.sendMessageToClientByClientId(clientId,message);
return sseEmitter;
}
/**
* 关闭连接
*
* @author diantu
* @date 2023/7/3
**/
@Override
public void closeSseConnect(String clientId){
DevSseCacheUtil.removeConnection(clientId);
}
/**
* 推送消息到所有客户端
*
* @author diantu
* @date 2023/7/3
**/
@Override
public void sendMessageToAllClient(String msg) {
DevSseCacheUtil.sendMessageToAllClient(msg);
}
/**
* 根据clientId发送消息给某一客户端
*
* @author diantu
* @date 2023/7/3
**/
@Override
public void sendMessageToOneClient(String clientId, String msg) {
DevSseCacheUtil.sendMessageToOneClient(clientId,msg);
}
}

View File

@@ -1,281 +0,0 @@
/*
* Copyright [2022] [https://www.xiaonuo.vip]
*
* Snowy采用APACHE LICENSE 2.0开源协议,您在使用过程中,需要注意以下几点:
*
* 1.请不要删除和修改根目录下的LICENSE文件。
* 2.请不要删除和修改Snowy源码头部的版权声明。
* 3.本项目代码可免费商业使用,商业使用请保留源码和相关描述文件的项目出处,作者声明等。
* 4.分发源码时候,请注明软件出处 https://www.xiaonuo.vip
* 5.不可二次分发开源参与同类竞品如有想法可联系团队xiaonuobase@qq.com商议合作。
* 6.若您的项目无法满足以上几点需要更多功能代码获取Snowy商业授权许可请在官网购买授权地址为 https://www.xiaonuo.vip
*/
package vip.xiaonuo.dev.modular.sse.util;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import vip.xiaonuo.common.exception.CommonException;
import vip.xiaonuo.common.pojo.CommonResult;
import vip.xiaonuo.dev.modular.sse.enums.DevSseEmitterParameterEnum;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
/**
* SseEmitter工具类
*
* @author diantu
* @date 2023/7/3
**/
@Slf4j
public class DevSseCacheUtil {
/**
* 创建一个容器来存储所有的 SseEmitter(使用ConcurrentHashMap是因为它是线程安全的)。
*/
public static Map<String, Map<String, Object>> sseCache = new ConcurrentHashMap<>();
/**
* 根据客户端id获取连接对象
*
* @author diantu
* @date 2023/7/3
**/
public static SseEmitter getSseEmitterByClientId(String clientId) {
Map<String, Object> map = sseCache.get(clientId);
if (map == null || map.isEmpty()) {
return null;
}
return (SseEmitter) map.get(DevSseEmitterParameterEnum.EMITTER.getValue());
}
/**
* 根据客户端id获取心跳
*
* @author diantu
* @date 2023/7/18
**/
public static ScheduledFuture<?> getSseFutureByClientId(String clientId) {
Map<String, Object> map = sseCache.get(clientId);
if (map == null || map.isEmpty()) {
return null;
}
return (ScheduledFuture<?>) map.get(DevSseEmitterParameterEnum.FUTURE.getValue());
}
/**
* 根据客户端id获取用户id
*
* @author diantu
* @date 2023/7/18
**/
public static ScheduledFuture<?> getLoginIdByClientId(String clientId) {
Map<String, Object> map = sseCache.get(clientId);
if (map == null || map.isEmpty()) {
return null;
}
return (ScheduledFuture<?>) map.get(DevSseEmitterParameterEnum.LOGINID.getValue());
}
/**
* 根据用户id获取客户端id
*
* @author diantu
* @date 2023/7/18
**/
public static String getClientIdByLoginId(String loginId) {
if (existSseCache()) {
for (Map.Entry<String, Map<String, Object>> entry : sseCache.entrySet()) {
Map<String, Object> map = sseCache.get(entry.getKey());
String lId = (String) map.get(DevSseEmitterParameterEnum.LOGINID.getValue());
if (loginId.equals(lId)) {
return entry.getKey();
}
}
}
return null;
}
/**
* 判断容器是否存在连接
*
* @author diantu
* @date 2023/7/3
**/
public static boolean existSseCache() {
return !sseCache.isEmpty();
}
/**
* 判断连接是否有效
*
* @author diantu
* @date 2023/7/3
**/
public static boolean connectionValidity(String clientId, String loginId) {
if (sseCache.get(clientId) == null) {
return false;
}
return Objects.equals(loginId, sseCache.get(clientId).get(DevSseEmitterParameterEnum.LOGINID.getValue()));
}
/**
* 增加连接
*
* @author diantu
* @date 2023/7/3
**/
public static void addConnection(String clientId, String loginId, SseEmitter emitter, ScheduledFuture<?> future) {
final SseEmitter oldEmitter = getSseEmitterByClientId(clientId);
if (oldEmitter != null) {
throw new CommonException("连接已存在:{}", clientId);
}
Map<String, Object> map = new ConcurrentHashMap<>();
map.put(DevSseEmitterParameterEnum.EMITTER.getValue(), emitter);
if (future != null) {
map.put(DevSseEmitterParameterEnum.FUTURE.getValue(), future);
}
map.put(DevSseEmitterParameterEnum.LOGINID.getValue(), loginId);
sseCache.put(clientId, map);
}
/**
* 移除连接
*
* @author diantu
* @date 2023/7/3
**/
public static void removeConnection(String clientId) {
SseEmitter emitter = getSseEmitterByClientId(clientId);
if (emitter != null) {
cancelScheduledFuture(clientId);
}
sseCache.remove(clientId);
log.info("移除连接:{}", clientId);
}
/**
* 中断心跳发送任务
*
* @author diantu
* @date 2023/7/3
*/
public static void cancelScheduledFuture(String clientId) {
ScheduledFuture<?> future = getSseFutureByClientId(clientId);
if (future != null) {
future.cancel(true);
}
}
/**
* 长链接完成后回调
*
* @author diantu
* @date 2023/7/3
**/
public static Runnable completionCallBack(String clientId) {
return () -> {
log.info("结束连接:{}", clientId);
removeConnection(clientId);
cancelScheduledFuture(clientId);
};
}
/**
* 连接超时回调
*
* @author diantu
* @date 2023/7/3
**/
public static Runnable timeoutCallBack(String clientId) {
return () -> {
log.info("连接超时:{}", clientId);
removeConnection(clientId);
cancelScheduledFuture(clientId);
};
}
/**
* 推送消息异常时回调
*
* @author diantu
* @date 2023/7/3
**/
public static Consumer<Throwable> errorCallBack(String clientId) {
return throwable -> {
log.info("推送消息异常:{}", clientId);
removeConnection(clientId);
cancelScheduledFuture(clientId);
};
}
/**
* 推送消息到所有客户端
*
* @author diantu
* @date 2023/7/3
**/
public static void sendMessageToAllClient(String msg) {
if (!existSseCache()) {
return;
}
// 判断发送的消息是否为空
if (StrUtil.isEmpty(msg)) {
log.info("群发消息为空");
return;
}
CommonResult<String> message = new CommonResult<>(CommonResult.CODE_SUCCESS, "", msg);
for (Map.Entry<String, Map<String, Object>> entry : sseCache.entrySet()) {
sendMessageToClientByClientId(entry.getKey(), message);
}
}
/**
* 根据clientId发送消息给某一客户端
*
* @author diantu
* @date 2023/7/3
**/
public static void sendMessageToOneClient(String clientId, String msg) {
if (StrUtil.isEmpty(clientId)) {
log.info("客户端ID为空");
return;
}
if (StrUtil.isEmpty(msg)) {
log.info("向客户端{}推送消息为空", clientId);
return;
}
CommonResult<String> message = new CommonResult<>(CommonResult.CODE_SUCCESS, "", msg);
sendMessageToClientByClientId(clientId, message);
}
/**
* 推送消息到客户端
*
* @author diantu
* @date 2023/7/3
**/
public static void sendMessageToClientByClientId(String clientId, CommonResult<String> message) {
Map<String, Object> map = sseCache.get(clientId);
if (map == null || map.isEmpty()) {
log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}", clientId, message.toString());
return;
}
SseEmitter.SseEventBuilder sendData = SseEmitter.event().data(message, MediaType.APPLICATION_JSON);
SseEmitter sseEmitter = getSseEmitterByClientId(clientId);
try {
Objects.requireNonNull(sseEmitter).send(sendData);
} catch (Exception e) {
log.error("推送消息失败,报错异常:", e);
removeConnection(clientId);
}
}
}

View File

@@ -158,19 +158,6 @@ public class SysIndexController {
return CommonResult.data(sysIndexService.opLogList());
}
/**
* 创建sse连接
*
* @author diantu
* @date 2023/7/10
**/
@ApiOperationSupport(order = 9)
@Operation(summary = "创建sse连接")
@GetMapping("/dev/message/createSseConnect")
public SseEmitter createSseConnect(String clientId){
return sysIndexService.createSseConnect(clientId);
}
/**
* 获取基础系统业务数据
*

View File

@@ -90,14 +90,6 @@ public interface SysIndexService {
*/
List<SysIndexOpLogListResult> opLogList();
/**
* 创建连接
*
* @author diantu
* @date 2023/7/10
**/
SseEmitter createSseConnect(String clientId);
/**
* 获取基础系统业务数据
*

View File

@@ -21,15 +21,12 @@ import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import vip.xiaonuo.auth.api.AuthApi;
import vip.xiaonuo.auth.core.pojo.SaBaseLoginUser;
import vip.xiaonuo.auth.core.util.StpLoginUserUtil;
import vip.xiaonuo.common.sse.CommonSseParam;
import vip.xiaonuo.dev.api.DevApi;
import vip.xiaonuo.dev.api.DevLogApi;
import vip.xiaonuo.dev.api.DevMessageApi;
import vip.xiaonuo.dev.api.DevSseApi;
import vip.xiaonuo.sys.modular.index.param.*;
import vip.xiaonuo.sys.modular.index.result.*;
import vip.xiaonuo.sys.modular.index.service.SysIndexService;
@@ -63,9 +60,6 @@ public class SysIndexServiceImpl implements SysIndexService {
@Resource
private DevLogApi devLogApi;
@Resource
private DevSseApi devSseApi;
@Resource
private SysUserService sysUserService;
@@ -143,17 +137,6 @@ public class SysIndexServiceImpl implements SysIndexService {
.map(jsonObject -> JSONUtil.toBean(jsonObject, SysIndexOpLogListResult.class)).collect(Collectors.toList());
}
@Override
public SseEmitter createSseConnect(String clientId){
Consumer<CommonSseParam> consumer = m -> {
//获取用户未读消息
long unreadMessageNum = devMessageApi.unreadCount(m.getLoginId());
//发送消息
devSseApi.sendMessageToOneClient(m.getClientId(), String.valueOf(unreadMessageNum));
};
return devSseApi.createSseConnect(clientId,true,false,consumer);
}
@Override
public SysBizDataCountResult getBizDataCount() {
SysBizDataCountResult result = new SysBizDataCountResult();