feat(foundations):新增同步服务相关修改/新增到ES索引的功能

This commit is contained in:
JIAN 2024-08-27 00:37:14 +08:00
parent ba389a4b80
commit e743f616c7
5 changed files with 128 additions and 28 deletions

View File

@ -41,11 +41,6 @@
<artifactId>jzo2o-knife4j-web</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.jzo2o</groupId>-->
<!-- <artifactId>jzo2o-es</artifactId>-->
<!-- </dependency>-->
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
@ -63,10 +58,14 @@
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.jzo2o</groupId>-->
<!-- <artifactId>jzo2o-canal-sync</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>com.jzo2o</groupId>
<artifactId>jzo2o-es</artifactId>
</dependency>
<dependency>
<groupId>com.jzo2o</groupId>
<artifactId>jzo2o-canal-sync</artifactId>
</dependency>
<dependency>
<groupId>com.jzo2o</groupId>

View File

@ -0,0 +1,60 @@
package com.jzo2o.foundations.handler;
import com.jzo2o.canal.listeners.AbstractCanalRabbitMqMsgListener;
import com.jzo2o.es.core.ElasticSearchTemplate;
import com.jzo2o.foundations.constants.IndexConstants;
import com.jzo2o.foundations.model.domain.ServeSync;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* 接受从Canal发送的MQ消息并解析处理完成ES的索引更新
* @author JIAN
*/
@Slf4j
@Component
public class ServeCanalDataSyncHandler extends AbstractCanalRabbitMqMsgListener<ServeSync> {
/**
* 监听MQ的消息
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = "exchange.canal-jzo2o", type = ExchangeTypes.TOPIC),
value = @Queue(name = "canal-mq-jzo2o-foundations",
// 限制单消费者保证顺序性
arguments = @Argument(name = "x-single-active-consumer", value = "true", type = "java.lang.Boolean")),
key = "canal-mq-jzo2o-foundations"),
// 限制单进程消费保证顺序性
concurrency = "1")
public void onMessage(Message message) throws Exception {
parseMsg(message);
}
@Resource
private ElasticSearchTemplate elasticSearchTemplate;
@Override
public void batchSave(List<ServeSync> data) {
Boolean isSuccess = elasticSearchTemplate.opsForDoc().batchInsert(IndexConstants.SERVE, data);
log.warn("[batchSave] status: {} data: {}", isSuccess, data.toString());
if (!isSuccess) {
// 通过抛出异常自动发送unack消息
throw new RuntimeException("同步失败");
}
}
@Override
public void batchDelete(List<Long> ids) {
Boolean isSuccess = elasticSearchTemplate.opsForDoc().batchDelete(IndexConstants.SERVE, ids);
log.warn("[batchDelete] status: {} ids: {}", isSuccess, ids.toString());
if (!isSuccess) {
// 通过抛出异常自动发送unack消息
throw new RuntimeException("同步失败");
}
}
}

View File

@ -3,6 +3,7 @@ package com.jzo2o.foundations.model.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
@ -19,6 +20,7 @@ import java.math.BigDecimal;
* @since 2023-07-10
*/
@Data
@Builder
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("serve_sync")
@ -112,4 +114,4 @@ public class ServeSync implements Serializable {
private String serveItemIcon;
}
}

View File

@ -1,6 +1,7 @@
package com.jzo2o.foundations.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.baomidou.mybatisplus.extension.toolkit.ChainWrappers;
import com.jzo2o.common.expcetions.CommonException;
import com.jzo2o.common.expcetions.ForbiddenOperationException;
import com.jzo2o.common.model.PageResult;
@ -10,19 +11,19 @@ import com.jzo2o.common.utils.ObjectUtils;
import com.jzo2o.foundations.constants.RedisConstants;
import com.jzo2o.foundations.enums.FoundationStatusEnum;
import com.jzo2o.foundations.enums.HotStatusEnum;
import com.jzo2o.foundations.mapper.RegionMapper;
import com.jzo2o.foundations.mapper.ServeMapper;
import com.jzo2o.foundations.model.domain.Region;
import com.jzo2o.foundations.model.domain.Serve;
import com.jzo2o.foundations.model.domain.ServeItem;
import com.jzo2o.foundations.mapper.ServeSyncMapper;
import com.jzo2o.foundations.model.domain.*;
import com.jzo2o.foundations.model.dto.request.ServePageQueryReqDTO;
import com.jzo2o.foundations.model.dto.request.ServeUpsertReqDTO;
import com.jzo2o.foundations.model.dto.response.ServeAggregationSimpleResDTO;
import com.jzo2o.foundations.model.dto.response.ServeCategoryResDTO;
import com.jzo2o.foundations.model.dto.response.ServeResDTO;
import com.jzo2o.foundations.model.dto.response.ServeSimpleResDTO;
import com.jzo2o.foundations.service.IRegionService;
import com.jzo2o.foundations.service.IServeItemService;
import com.jzo2o.foundations.service.IServeService;
import com.jzo2o.foundations.service.IServeTypeService;
import com.jzo2o.mysql.utils.PageHelperUtils;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
@ -80,7 +81,7 @@ public class ServeServiceImpl extends ServiceImpl<ServeMapper, Serve> implements
}
Serve serve = BeanUtils.toBean(serveItemDTO, Serve.class);
serve.setCityCode(regionMapper.selectById(regionId).getCityCode());
serve.setCityCode(regionService.getById(regionId).getCityCode());
baseMapper.insert(serve);
}
}
@ -111,6 +112,13 @@ public class ServeServiceImpl extends ServiceImpl<ServeMapper, Serve> implements
throw new CommonException("价格更新失败");
}
// 更新sync数据同步ES
ChainWrappers.lambdaUpdateChain(serveSyncMapper)
.eq(ServeSync::getId, id)
.set(ServeSync::getPrice, price)
.update();
// 返回修改后对象同步缓存
serve.setPrice(price);
return serve;
}
@ -141,7 +149,35 @@ public class ServeServiceImpl extends ServiceImpl<ServeMapper, Serve> implements
throw new CommonException("状态更新失败");
}
// 更新服务启用状态
serve.setSaleStatus(enableStatus);
// 服务项信息
ServeItem serveItem = serveItemService.getById(serve.getServeItemId());
// 服务类型
ServeType serveType = serveTypeService.getById(serveItem.getServeTypeId());
// 插入sync表添加ES索引
serveSyncMapper.insert(ServeSync.builder()
// serve_type data
.serveTypeId(serveType.getId())
.serveTypeName(serveType.getName())
.serveItemIcon(serveType.getServeTypeIcon())
.serveTypeImg(serveType.getImg())
.serveTypeSortNum(serveType.getSortNum())
// serve_item data
.serveItemId(serveItem.getId())
.serveItemName(serveItem.getName())
.serveItemIcon(serveItem.getServeItemIcon())
.serveItemImg(serveItem.getImg())
.serveItemSortNum(serveItem.getSortNum())
.unit(serveItem.getUnit())
.detailImg(serveItem.getDetailImg())
// serve data
.id(serve.getId())
.price(serve.getPrice())
.cityCode(serve.getCityCode())
.isHot(serve.getIsHot()).build());
return serve;
}
@ -164,6 +200,9 @@ public class ServeServiceImpl extends ServiceImpl<ServeMapper, Serve> implements
.update()) {
throw new CommonException("状态更新失败");
}
// 删除sync表数据删除ES索引
serveSyncMapper.deleteById(id);
}
@Override
@ -195,8 +234,6 @@ public class ServeServiceImpl extends ServiceImpl<ServeMapper, Serve> implements
.update()) {
throw new CommonException("热门状态更新失败");
}
serve.setIsHot(hotStatus);
}
@Override
@ -215,8 +252,6 @@ public class ServeServiceImpl extends ServiceImpl<ServeMapper, Serve> implements
.update()) {
throw new CommonException("热门状态更新失败");
}
serve.setIsHot(hotStatus);
}
@Override
@ -228,7 +263,7 @@ public class ServeServiceImpl extends ServiceImpl<ServeMapper, Serve> implements
cacheManager = RedisConstants.CacheManager.FOREVER,
unless = "#result.size() == 0")})
public List<ServeCategoryResDTO> getFirstPageServeList(Long regionId) {
Region region = regionMapper.selectById(regionId);
Region region = regionService.getById(regionId);
if (ObjectUtils.isEmpty(region)) {
return Collections.emptyList();
}
@ -257,7 +292,7 @@ public class ServeServiceImpl extends ServiceImpl<ServeMapper, Serve> implements
cacheManager = RedisConstants.CacheManager.FOREVER,
unless = "#result.size() == 0")})
public List<ServeAggregationSimpleResDTO> getHotServeList(Long regionId) {
Region region = regionMapper.selectById(regionId);
Region region = regionService.getById(regionId);
if (ObjectUtils.isEmpty(region)) {
return Collections.emptyList();
}
@ -295,9 +330,13 @@ public class ServeServiceImpl extends ServiceImpl<ServeMapper, Serve> implements
}
@Resource
private IServeItemService serveItemService;
@Resource
private RegionMapper regionMapper;
private IRegionService regionService;
@Resource
private IServeService serveService;
@Resource
private IServeTypeService serveTypeService;
@Resource
private IServeItemService serveItemService;
@Resource
private ServeSyncMapper serveSyncMapper;
}

View File

@ -43,10 +43,10 @@ spring:
refresh: false
- data-id: shared-xxl-job.yaml # xxl-job配置
refresh: false
# - data-id: shared-rabbitmq.yaml # rabbitmq配置
# refresh: false
# - data-id: shared-es.yaml # rabbitmq配置
# refresh: false
- data-id: shared-rabbitmq.yaml # rabbitmq配置
refresh: false
- data-id: shared-es.yaml # elasticsearch配置
refresh: false
- data-id: shared-mysql.yaml # mysql配置
refresh: false