diff --git a/jzo2o-foundations/pom.xml b/jzo2o-foundations/pom.xml
index e870984..f8a35b8 100644
--- a/jzo2o-foundations/pom.xml
+++ b/jzo2o-foundations/pom.xml
@@ -41,11 +41,6 @@
jzo2o-knife4j-web
-
-
-
-
-
org.springframework.boot
@@ -63,10 +58,14 @@
jackson-databind
-
-
-
-
+
+ com.jzo2o
+ jzo2o-es
+
+
+ com.jzo2o
+ jzo2o-canal-sync
+
com.jzo2o
diff --git a/jzo2o-foundations/src/main/java/com/jzo2o/foundations/handler/ServeCanalDataSyncHandler.java b/jzo2o-foundations/src/main/java/com/jzo2o/foundations/handler/ServeCanalDataSyncHandler.java
new file mode 100644
index 0000000..bd3d103
--- /dev/null
+++ b/jzo2o-foundations/src/main/java/com/jzo2o/foundations/handler/ServeCanalDataSyncHandler.java
@@ -0,0 +1,60 @@
+package com.jzo2o.foundations.handler;
+
+import com.jzo2o.canal.listeners.AbstractCanalRabbitMqMsgListener;
+import com.jzo2o.es.core.ElasticSearchTemplate;
+import com.jzo2o.foundations.constants.IndexConstants;
+import com.jzo2o.foundations.model.domain.ServeSync;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.ExchangeTypes;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.*;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.List;
+
+/**
+ * 接受从Canal发送的MQ消息并解析处理完成ES的索引更新
+ * @author JIAN
+ */
+@Slf4j
+@Component
+public class ServeCanalDataSyncHandler extends AbstractCanalRabbitMqMsgListener {
+ /**
+ * 监听MQ的消息
+ */
+ @RabbitListener(bindings = @QueueBinding(
+ exchange = @Exchange(name = "exchange.canal-jzo2o", type = ExchangeTypes.TOPIC),
+ value = @Queue(name = "canal-mq-jzo2o-foundations",
+ // 限制单消费者保证顺序性
+ arguments = @Argument(name = "x-single-active-consumer", value = "true", type = "java.lang.Boolean")),
+ key = "canal-mq-jzo2o-foundations"),
+ // 限制单进程消费保证顺序性
+ concurrency = "1")
+ public void onMessage(Message message) throws Exception {
+ parseMsg(message);
+ }
+
+ @Resource
+ private ElasticSearchTemplate elasticSearchTemplate;
+
+ @Override
+ public void batchSave(List data) {
+ Boolean isSuccess = elasticSearchTemplate.opsForDoc().batchInsert(IndexConstants.SERVE, data);
+ log.warn("[batchSave] status: {} data: {}", isSuccess, data.toString());
+ if (!isSuccess) {
+ // 通过抛出异常自动发送unack消息
+ throw new RuntimeException("同步失败");
+ }
+ }
+
+ @Override
+ public void batchDelete(List ids) {
+ Boolean isSuccess = elasticSearchTemplate.opsForDoc().batchDelete(IndexConstants.SERVE, ids);
+ log.warn("[batchDelete] status: {} ids: {}", isSuccess, ids.toString());
+ if (!isSuccess) {
+ // 通过抛出异常自动发送unack消息
+ throw new RuntimeException("同步失败");
+ }
+ }
+}
\ No newline at end of file
diff --git a/jzo2o-foundations/src/main/java/com/jzo2o/foundations/model/domain/ServeSync.java b/jzo2o-foundations/src/main/java/com/jzo2o/foundations/model/domain/ServeSync.java
index ba3fa77..ea8afa0 100644
--- a/jzo2o-foundations/src/main/java/com/jzo2o/foundations/model/domain/ServeSync.java
+++ b/jzo2o-foundations/src/main/java/com/jzo2o/foundations/model/domain/ServeSync.java
@@ -3,6 +3,7 @@ package com.jzo2o.foundations.model.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
@@ -19,6 +20,7 @@ import java.math.BigDecimal;
* @since 2023-07-10
*/
@Data
+@Builder
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("serve_sync")
@@ -112,4 +114,4 @@ public class ServeSync implements Serializable {
private String serveItemIcon;
-}
+}
\ No newline at end of file
diff --git a/jzo2o-foundations/src/main/java/com/jzo2o/foundations/service/impl/ServeServiceImpl.java b/jzo2o-foundations/src/main/java/com/jzo2o/foundations/service/impl/ServeServiceImpl.java
index df51e43..419d41e 100644
--- a/jzo2o-foundations/src/main/java/com/jzo2o/foundations/service/impl/ServeServiceImpl.java
+++ b/jzo2o-foundations/src/main/java/com/jzo2o/foundations/service/impl/ServeServiceImpl.java
@@ -1,6 +1,7 @@
package com.jzo2o.foundations.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.baomidou.mybatisplus.extension.toolkit.ChainWrappers;
import com.jzo2o.common.expcetions.CommonException;
import com.jzo2o.common.expcetions.ForbiddenOperationException;
import com.jzo2o.common.model.PageResult;
@@ -10,19 +11,19 @@ import com.jzo2o.common.utils.ObjectUtils;
import com.jzo2o.foundations.constants.RedisConstants;
import com.jzo2o.foundations.enums.FoundationStatusEnum;
import com.jzo2o.foundations.enums.HotStatusEnum;
-import com.jzo2o.foundations.mapper.RegionMapper;
import com.jzo2o.foundations.mapper.ServeMapper;
-import com.jzo2o.foundations.model.domain.Region;
-import com.jzo2o.foundations.model.domain.Serve;
-import com.jzo2o.foundations.model.domain.ServeItem;
+import com.jzo2o.foundations.mapper.ServeSyncMapper;
+import com.jzo2o.foundations.model.domain.*;
import com.jzo2o.foundations.model.dto.request.ServePageQueryReqDTO;
import com.jzo2o.foundations.model.dto.request.ServeUpsertReqDTO;
import com.jzo2o.foundations.model.dto.response.ServeAggregationSimpleResDTO;
import com.jzo2o.foundations.model.dto.response.ServeCategoryResDTO;
import com.jzo2o.foundations.model.dto.response.ServeResDTO;
import com.jzo2o.foundations.model.dto.response.ServeSimpleResDTO;
+import com.jzo2o.foundations.service.IRegionService;
import com.jzo2o.foundations.service.IServeItemService;
import com.jzo2o.foundations.service.IServeService;
+import com.jzo2o.foundations.service.IServeTypeService;
import com.jzo2o.mysql.utils.PageHelperUtils;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
@@ -80,7 +81,7 @@ public class ServeServiceImpl extends ServiceImpl implements
}
Serve serve = BeanUtils.toBean(serveItemDTO, Serve.class);
- serve.setCityCode(regionMapper.selectById(regionId).getCityCode());
+ serve.setCityCode(regionService.getById(regionId).getCityCode());
baseMapper.insert(serve);
}
}
@@ -111,6 +112,13 @@ public class ServeServiceImpl extends ServiceImpl implements
throw new CommonException("价格更新失败");
}
+ // 更新sync数据同步ES
+ ChainWrappers.lambdaUpdateChain(serveSyncMapper)
+ .eq(ServeSync::getId, id)
+ .set(ServeSync::getPrice, price)
+ .update();
+
+ // 返回修改后对象同步缓存
serve.setPrice(price);
return serve;
}
@@ -141,7 +149,35 @@ public class ServeServiceImpl extends ServiceImpl implements
throw new CommonException("状态更新失败");
}
+ // 更新服务启用状态
serve.setSaleStatus(enableStatus);
+ // 服务项信息
+ ServeItem serveItem = serveItemService.getById(serve.getServeItemId());
+ // 服务类型
+ ServeType serveType = serveTypeService.getById(serveItem.getServeTypeId());
+
+ // 插入sync表添加ES索引
+ serveSyncMapper.insert(ServeSync.builder()
+ // serve_type data
+ .serveTypeId(serveType.getId())
+ .serveTypeName(serveType.getName())
+ .serveItemIcon(serveType.getServeTypeIcon())
+ .serveTypeImg(serveType.getImg())
+ .serveTypeSortNum(serveType.getSortNum())
+ // serve_item data
+ .serveItemId(serveItem.getId())
+ .serveItemName(serveItem.getName())
+ .serveItemIcon(serveItem.getServeItemIcon())
+ .serveItemImg(serveItem.getImg())
+ .serveItemSortNum(serveItem.getSortNum())
+ .unit(serveItem.getUnit())
+ .detailImg(serveItem.getDetailImg())
+ // serve data
+ .id(serve.getId())
+ .price(serve.getPrice())
+ .cityCode(serve.getCityCode())
+ .isHot(serve.getIsHot()).build());
+
return serve;
}
@@ -164,6 +200,9 @@ public class ServeServiceImpl extends ServiceImpl implements
.update()) {
throw new CommonException("状态更新失败");
}
+
+ // 删除sync表数据删除ES索引
+ serveSyncMapper.deleteById(id);
}
@Override
@@ -195,8 +234,6 @@ public class ServeServiceImpl extends ServiceImpl implements
.update()) {
throw new CommonException("热门状态更新失败");
}
-
- serve.setIsHot(hotStatus);
}
@Override
@@ -215,8 +252,6 @@ public class ServeServiceImpl extends ServiceImpl implements
.update()) {
throw new CommonException("热门状态更新失败");
}
-
- serve.setIsHot(hotStatus);
}
@Override
@@ -228,7 +263,7 @@ public class ServeServiceImpl extends ServiceImpl implements
cacheManager = RedisConstants.CacheManager.FOREVER,
unless = "#result.size() == 0")})
public List getFirstPageServeList(Long regionId) {
- Region region = regionMapper.selectById(regionId);
+ Region region = regionService.getById(regionId);
if (ObjectUtils.isEmpty(region)) {
return Collections.emptyList();
}
@@ -257,7 +292,7 @@ public class ServeServiceImpl extends ServiceImpl implements
cacheManager = RedisConstants.CacheManager.FOREVER,
unless = "#result.size() == 0")})
public List getHotServeList(Long regionId) {
- Region region = regionMapper.selectById(regionId);
+ Region region = regionService.getById(regionId);
if (ObjectUtils.isEmpty(region)) {
return Collections.emptyList();
}
@@ -295,9 +330,13 @@ public class ServeServiceImpl extends ServiceImpl implements
}
@Resource
- private IServeItemService serveItemService;
- @Resource
- private RegionMapper regionMapper;
+ private IRegionService regionService;
@Resource
private IServeService serveService;
+ @Resource
+ private IServeTypeService serveTypeService;
+ @Resource
+ private IServeItemService serveItemService;
+ @Resource
+ private ServeSyncMapper serveSyncMapper;
}
\ No newline at end of file
diff --git a/jzo2o-foundations/src/main/resources/bootstrap.yml b/jzo2o-foundations/src/main/resources/bootstrap.yml
index e249482..92f3e8b 100644
--- a/jzo2o-foundations/src/main/resources/bootstrap.yml
+++ b/jzo2o-foundations/src/main/resources/bootstrap.yml
@@ -43,10 +43,10 @@ spring:
refresh: false
- data-id: shared-xxl-job.yaml # xxl-job配置
refresh: false
-# - data-id: shared-rabbitmq.yaml # rabbitmq配置
-# refresh: false
-# - data-id: shared-es.yaml # rabbitmq配置
-# refresh: false
+ - data-id: shared-rabbitmq.yaml # rabbitmq配置
+ refresh: false
+ - data-id: shared-es.yaml # elasticsearch配置
+ refresh: false
- data-id: shared-mysql.yaml # mysql配置
refresh: false