From e743f616c7f2c98bf23d33c737436bee443920d6 Mon Sep 17 00:00:00 2001 From: JIAN Date: Tue, 27 Aug 2024 00:37:14 +0800 Subject: [PATCH] =?UTF-8?q?feat(foundations):=E6=96=B0=E5=A2=9E=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E6=9C=8D=E5=8A=A1=E7=9B=B8=E5=85=B3=E4=BF=AE=E6=94=B9?= =?UTF-8?q?/=E6=96=B0=E5=A2=9E=E5=88=B0ES=E7=B4=A2=E5=BC=95=E7=9A=84?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jzo2o-foundations/pom.xml | 17 +++-- .../handler/ServeCanalDataSyncHandler.java | 60 +++++++++++++++++ .../foundations/model/domain/ServeSync.java | 4 +- .../service/impl/ServeServiceImpl.java | 67 +++++++++++++++---- .../src/main/resources/bootstrap.yml | 8 +-- 5 files changed, 128 insertions(+), 28 deletions(-) create mode 100644 jzo2o-foundations/src/main/java/com/jzo2o/foundations/handler/ServeCanalDataSyncHandler.java diff --git a/jzo2o-foundations/pom.xml b/jzo2o-foundations/pom.xml index e870984..f8a35b8 100644 --- a/jzo2o-foundations/pom.xml +++ b/jzo2o-foundations/pom.xml @@ -41,11 +41,6 @@ jzo2o-knife4j-web - - - - - org.springframework.boot @@ -63,10 +58,14 @@ jackson-databind - - - - + + com.jzo2o + jzo2o-es + + + com.jzo2o + jzo2o-canal-sync + com.jzo2o diff --git a/jzo2o-foundations/src/main/java/com/jzo2o/foundations/handler/ServeCanalDataSyncHandler.java b/jzo2o-foundations/src/main/java/com/jzo2o/foundations/handler/ServeCanalDataSyncHandler.java new file mode 100644 index 0000000..bd3d103 --- /dev/null +++ b/jzo2o-foundations/src/main/java/com/jzo2o/foundations/handler/ServeCanalDataSyncHandler.java @@ -0,0 +1,60 @@ +package com.jzo2o.foundations.handler; + +import com.jzo2o.canal.listeners.AbstractCanalRabbitMqMsgListener; +import com.jzo2o.es.core.ElasticSearchTemplate; +import com.jzo2o.foundations.constants.IndexConstants; +import com.jzo2o.foundations.model.domain.ServeSync; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.*; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; + +/** + * 接受从Canal发送的MQ消息并解析处理完成ES的索引更新 + * @author JIAN + */ +@Slf4j +@Component +public class ServeCanalDataSyncHandler extends AbstractCanalRabbitMqMsgListener { + /** + * 监听MQ的消息 + */ + @RabbitListener(bindings = @QueueBinding( + exchange = @Exchange(name = "exchange.canal-jzo2o", type = ExchangeTypes.TOPIC), + value = @Queue(name = "canal-mq-jzo2o-foundations", + // 限制单消费者保证顺序性 + arguments = @Argument(name = "x-single-active-consumer", value = "true", type = "java.lang.Boolean")), + key = "canal-mq-jzo2o-foundations"), + // 限制单进程消费保证顺序性 + concurrency = "1") + public void onMessage(Message message) throws Exception { + parseMsg(message); + } + + @Resource + private ElasticSearchTemplate elasticSearchTemplate; + + @Override + public void batchSave(List data) { + Boolean isSuccess = elasticSearchTemplate.opsForDoc().batchInsert(IndexConstants.SERVE, data); + log.warn("[batchSave] status: {} data: {}", isSuccess, data.toString()); + if (!isSuccess) { + // 通过抛出异常自动发送unack消息 + throw new RuntimeException("同步失败"); + } + } + + @Override + public void batchDelete(List ids) { + Boolean isSuccess = elasticSearchTemplate.opsForDoc().batchDelete(IndexConstants.SERVE, ids); + log.warn("[batchDelete] status: {} ids: {}", isSuccess, ids.toString()); + if (!isSuccess) { + // 通过抛出异常自动发送unack消息 + throw new RuntimeException("同步失败"); + } + } +} \ No newline at end of file diff --git a/jzo2o-foundations/src/main/java/com/jzo2o/foundations/model/domain/ServeSync.java b/jzo2o-foundations/src/main/java/com/jzo2o/foundations/model/domain/ServeSync.java index ba3fa77..ea8afa0 100644 --- a/jzo2o-foundations/src/main/java/com/jzo2o/foundations/model/domain/ServeSync.java +++ b/jzo2o-foundations/src/main/java/com/jzo2o/foundations/model/domain/ServeSync.java @@ -3,6 +3,7 @@ package com.jzo2o.foundations.model.domain; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; @@ -19,6 +20,7 @@ import java.math.BigDecimal; * @since 2023-07-10 */ @Data +@Builder @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) @TableName("serve_sync") @@ -112,4 +114,4 @@ public class ServeSync implements Serializable { private String serveItemIcon; -} +} \ No newline at end of file diff --git a/jzo2o-foundations/src/main/java/com/jzo2o/foundations/service/impl/ServeServiceImpl.java b/jzo2o-foundations/src/main/java/com/jzo2o/foundations/service/impl/ServeServiceImpl.java index df51e43..419d41e 100644 --- a/jzo2o-foundations/src/main/java/com/jzo2o/foundations/service/impl/ServeServiceImpl.java +++ b/jzo2o-foundations/src/main/java/com/jzo2o/foundations/service/impl/ServeServiceImpl.java @@ -1,6 +1,7 @@ package com.jzo2o.foundations.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.baomidou.mybatisplus.extension.toolkit.ChainWrappers; import com.jzo2o.common.expcetions.CommonException; import com.jzo2o.common.expcetions.ForbiddenOperationException; import com.jzo2o.common.model.PageResult; @@ -10,19 +11,19 @@ import com.jzo2o.common.utils.ObjectUtils; import com.jzo2o.foundations.constants.RedisConstants; import com.jzo2o.foundations.enums.FoundationStatusEnum; import com.jzo2o.foundations.enums.HotStatusEnum; -import com.jzo2o.foundations.mapper.RegionMapper; import com.jzo2o.foundations.mapper.ServeMapper; -import com.jzo2o.foundations.model.domain.Region; -import com.jzo2o.foundations.model.domain.Serve; -import com.jzo2o.foundations.model.domain.ServeItem; +import com.jzo2o.foundations.mapper.ServeSyncMapper; +import com.jzo2o.foundations.model.domain.*; import com.jzo2o.foundations.model.dto.request.ServePageQueryReqDTO; import com.jzo2o.foundations.model.dto.request.ServeUpsertReqDTO; import com.jzo2o.foundations.model.dto.response.ServeAggregationSimpleResDTO; import com.jzo2o.foundations.model.dto.response.ServeCategoryResDTO; import com.jzo2o.foundations.model.dto.response.ServeResDTO; import com.jzo2o.foundations.model.dto.response.ServeSimpleResDTO; +import com.jzo2o.foundations.service.IRegionService; import com.jzo2o.foundations.service.IServeItemService; import com.jzo2o.foundations.service.IServeService; +import com.jzo2o.foundations.service.IServeTypeService; import com.jzo2o.mysql.utils.PageHelperUtils; import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.CachePut; @@ -80,7 +81,7 @@ public class ServeServiceImpl extends ServiceImpl implements } Serve serve = BeanUtils.toBean(serveItemDTO, Serve.class); - serve.setCityCode(regionMapper.selectById(regionId).getCityCode()); + serve.setCityCode(regionService.getById(regionId).getCityCode()); baseMapper.insert(serve); } } @@ -111,6 +112,13 @@ public class ServeServiceImpl extends ServiceImpl implements throw new CommonException("价格更新失败"); } + // 更新sync数据同步ES + ChainWrappers.lambdaUpdateChain(serveSyncMapper) + .eq(ServeSync::getId, id) + .set(ServeSync::getPrice, price) + .update(); + + // 返回修改后对象同步缓存 serve.setPrice(price); return serve; } @@ -141,7 +149,35 @@ public class ServeServiceImpl extends ServiceImpl implements throw new CommonException("状态更新失败"); } + // 更新服务启用状态 serve.setSaleStatus(enableStatus); + // 服务项信息 + ServeItem serveItem = serveItemService.getById(serve.getServeItemId()); + // 服务类型 + ServeType serveType = serveTypeService.getById(serveItem.getServeTypeId()); + + // 插入sync表添加ES索引 + serveSyncMapper.insert(ServeSync.builder() + // serve_type data + .serveTypeId(serveType.getId()) + .serveTypeName(serveType.getName()) + .serveItemIcon(serveType.getServeTypeIcon()) + .serveTypeImg(serveType.getImg()) + .serveTypeSortNum(serveType.getSortNum()) + // serve_item data + .serveItemId(serveItem.getId()) + .serveItemName(serveItem.getName()) + .serveItemIcon(serveItem.getServeItemIcon()) + .serveItemImg(serveItem.getImg()) + .serveItemSortNum(serveItem.getSortNum()) + .unit(serveItem.getUnit()) + .detailImg(serveItem.getDetailImg()) + // serve data + .id(serve.getId()) + .price(serve.getPrice()) + .cityCode(serve.getCityCode()) + .isHot(serve.getIsHot()).build()); + return serve; } @@ -164,6 +200,9 @@ public class ServeServiceImpl extends ServiceImpl implements .update()) { throw new CommonException("状态更新失败"); } + + // 删除sync表数据删除ES索引 + serveSyncMapper.deleteById(id); } @Override @@ -195,8 +234,6 @@ public class ServeServiceImpl extends ServiceImpl implements .update()) { throw new CommonException("热门状态更新失败"); } - - serve.setIsHot(hotStatus); } @Override @@ -215,8 +252,6 @@ public class ServeServiceImpl extends ServiceImpl implements .update()) { throw new CommonException("热门状态更新失败"); } - - serve.setIsHot(hotStatus); } @Override @@ -228,7 +263,7 @@ public class ServeServiceImpl extends ServiceImpl implements cacheManager = RedisConstants.CacheManager.FOREVER, unless = "#result.size() == 0")}) public List getFirstPageServeList(Long regionId) { - Region region = regionMapper.selectById(regionId); + Region region = regionService.getById(regionId); if (ObjectUtils.isEmpty(region)) { return Collections.emptyList(); } @@ -257,7 +292,7 @@ public class ServeServiceImpl extends ServiceImpl implements cacheManager = RedisConstants.CacheManager.FOREVER, unless = "#result.size() == 0")}) public List getHotServeList(Long regionId) { - Region region = regionMapper.selectById(regionId); + Region region = regionService.getById(regionId); if (ObjectUtils.isEmpty(region)) { return Collections.emptyList(); } @@ -295,9 +330,13 @@ public class ServeServiceImpl extends ServiceImpl implements } @Resource - private IServeItemService serveItemService; - @Resource - private RegionMapper regionMapper; + private IRegionService regionService; @Resource private IServeService serveService; + @Resource + private IServeTypeService serveTypeService; + @Resource + private IServeItemService serveItemService; + @Resource + private ServeSyncMapper serveSyncMapper; } \ No newline at end of file diff --git a/jzo2o-foundations/src/main/resources/bootstrap.yml b/jzo2o-foundations/src/main/resources/bootstrap.yml index e249482..92f3e8b 100644 --- a/jzo2o-foundations/src/main/resources/bootstrap.yml +++ b/jzo2o-foundations/src/main/resources/bootstrap.yml @@ -43,10 +43,10 @@ spring: refresh: false - data-id: shared-xxl-job.yaml # xxl-job配置 refresh: false -# - data-id: shared-rabbitmq.yaml # rabbitmq配置 -# refresh: false -# - data-id: shared-es.yaml # rabbitmq配置 -# refresh: false + - data-id: shared-rabbitmq.yaml # rabbitmq配置 + refresh: false + - data-id: shared-es.yaml # elasticsearch配置 + refresh: false - data-id: shared-mysql.yaml # mysql配置 refresh: false