From 04c9f63f2c5f1f9e508c25b7d0e21e8243be6b91 Mon Sep 17 00:00:00 2001 From: wangcheng Date: Thu, 7 May 2020 14:21:28 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E9=A3=8E=E6=8E=A7=E7=BB=93=E6=9E=9C?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/cache/RedisServiceImpl.java | 234 ------------------ .../core/RiskAnalysisEngineServiceImpl.java | 97 +++----- .../impl/data/RiskResultAsyncService.java | 47 ++++ .../radar/service/data/RiskResultService.java | 5 + 4 files changed, 87 insertions(+), 296 deletions(-) delete mode 100644 radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/cache/RedisServiceImpl.java create mode 100644 radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/data/RiskResultAsyncService.java create mode 100644 radar-service/src/main/java/com/pgmmers/radar/service/data/RiskResultService.java diff --git a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/cache/RedisServiceImpl.java b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/cache/RedisServiceImpl.java deleted file mode 100644 index 56588f4..0000000 --- a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/cache/RedisServiceImpl.java +++ /dev/null @@ -1,234 +0,0 @@ -package com.pgmmers.radar.service.impl.cache; - -import com.alibaba.fastjson.JSON; -import com.pgmmers.radar.service.cache.RedisService; -import com.pgmmers.radar.service.cache.SubscribeHandle; -import com.pgmmers.radar.service.cache.SubscribeHandle2; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Service; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPubSub; -import redis.clients.jedis.ShardedJedis; -import redis.clients.jedis.ShardedJedisPool; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - - -//@Service -@Deprecated -public class RedisServiceImpl implements RedisService { - public static Logger logger = LoggerFactory.getLogger(RedisServiceImpl.class); - - @Autowired - private ShardedJedisPool jedisPool; - - - public boolean setex(String key, String val, int seconds) { - ShardedJedis jedis = null; - try { - jedis = jedisPool.getResource(); - jedis.setex(key, seconds, val); - } catch (Exception e) { - logger.warn("redis error", e); - return false; - } finally { - if (jedis != null) { - jedis.close(); - } - } - return true; - } - - - public String get(String key) { - ShardedJedis jedis = null; - String value = null; - try { - jedis = jedisPool.getResource(); - value = jedis.get(key); - } catch (Exception e) { - return null; - } finally { - if (jedis != null) { - jedis.close(); - } - } - return value; - } - - - public void del(String key) { - ShardedJedis jedis = null; - try { - jedis = jedisPool.getResource(); - jedis.del(key); - } catch (Exception e) { - - } finally { - if (jedis != null) { - jedis.close(); - } - } - } - - - public boolean set(String key, String val) { - ShardedJedis jedis = null; - try { - jedis = jedisPool.getResource(); - jedis.set(key, val); - } catch (Exception e) { - logger.error("redis set val error" + key + ":" + val); - return false; - } finally { - if (jedis != null) { - jedis.close(); - } - } - return true; - } - - - public String hget(String key, String field) { - ShardedJedis jedis = null; - String value = null; - try { - jedis = jedisPool.getResource(); - value = jedis.hget(key, field); - } catch (Exception e) { - return null; - } finally { - if (jedis != null) { - jedis.close(); - } - } - return value; - } - - - public boolean hset(String key, String field, String val) { - ShardedJedis jedis = null; - try { - jedis = jedisPool.getResource(); - jedis.hset(key, field, val); - } catch (Exception e) { - logger.error("redis set val error" + key + ":" + field + ":" + val); - return false; - } finally { - if (jedis != null) { - jedis.close(); - } - } - return true; - } - - - @Override - public boolean set(String key, Serializable val) { - return set(key, JSON.toJSONString(val)); - } - - @Override - public boolean setex(String key, Serializable val, int seconds) { - return false; - } - - - @Override - public T get(String key, Class clazz) { - String val = get(key); - return JSON.parseObject(val, clazz); - } - - - @Override - public boolean hset(String key, String field, Serializable val) { - return hset(key, field, JSON.toJSONString(val)); - } - - - @Override - public List hget(String key, Class clazz) { - List objList = new ArrayList(); - ShardedJedis jedis = null; - List vals = null; - try { - jedis = jedisPool.getResource(); - vals = jedis.hvals(key); - for (String val : vals) { - objList.add(JSON.parseObject(val, clazz)); - } - } catch (Exception e) { - return null; - } finally { - if (jedis != null) { - jedis.close(); - } - } - return objList; - } - - @Override - public List hvals(String key) { - return null; - } - - public void publish(String channel, String message) { - ShardedJedis jedis = null; - try { - jedis = jedisPool.getResource(); - Jedis jedis_ = jedis.getShard(channel); - jedis_.publish(channel, message); - logger.info("publish success! channel={},message={}", channel, message); - } catch (RuntimeException ex) { - logger.error("publish error! channel={},message={}", channel, message); - } finally { - if (jedis != null) { - jedis.close(); - } - } - } - - public void subscribe(String channel, SubscribeHandle handle) { - ShardedJedis jedis = null; - try { - jedis = jedisPool.getResource(); - Jedis jedis_ = jedis.getShard(channel); - //异步操作,jedis.subscribe会阻塞 - new Thread(new Runnable() { - - @Override - public void run() { - JedisPubSub listener = new JedisPubSub() { - public void onMessage(String channel, String message) { - handle.onMessage(channel, message); - }; - }; - jedis_.subscribe(listener, channel);//block - } - }).start(); - logger.info("subscribe success! channel={}", channel); - } catch (RuntimeException ex) { - logger.error("subscribe error! channel={}", channel); - } finally { - /*if (jedis != null) { - jedis.close(); - }*/ - } - } - - @Override - public void subscribe(byte[] channel, SubscribeHandle2 handle) { - - } - - @Override - public boolean contains(String token) { - return jedisPool.getResource().exists(token); - } -} diff --git a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/core/RiskAnalysisEngineServiceImpl.java b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/core/RiskAnalysisEngineServiceImpl.java index a505a61..ecce4af 100644 --- a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/core/RiskAnalysisEngineServiceImpl.java +++ b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/core/RiskAnalysisEngineServiceImpl.java @@ -6,58 +6,55 @@ import com.pgmmers.radar.enums.StatusType; import com.pgmmers.radar.service.RiskAnalysisEngineService; import com.pgmmers.radar.service.cache.CacheService; import com.pgmmers.radar.service.common.CommonResult; +import com.pgmmers.radar.service.data.RiskResultService; import com.pgmmers.radar.service.engine.AntiFraudService; import com.pgmmers.radar.service.engine.ValidateService; import com.pgmmers.radar.service.model.EntityService; import com.pgmmers.radar.service.model.ModelService; import com.pgmmers.radar.util.DateUtils; import com.pgmmers.radar.vo.model.ModelVO; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.xcontent.XContentType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.http.*; -import org.springframework.stereotype.Service; -import org.springframework.web.client.RestTemplate; - -import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; -@ConditionalOnProperty(prefix = "sys.conf", name="app", havingValue = "engine") +@ConditionalOnProperty(prefix = "sys.conf", name = "app", havingValue = "engine") @Service public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService { + private static Logger logger = LoggerFactory.getLogger(RiskAnalysisEngineServiceImpl.class); - @Autowired - private ModelService modelService; + private final ModelService modelService; - @Autowired - private EntityService entityService; + private final EntityService entityService; - @Autowired - private AntiFraudService antiFraudService; + private final AntiFraudService antiFraudService; @Value("${sys.conf.entity-duplicate-insert}") private String isDuplicate; - @Autowired - private CacheService cacheService; + private final CacheService cacheService; + private final RiskResultService riskResultService; - @Autowired - private ValidateService validateService; + private final ValidateService validateService; - - @Autowired - private RestHighLevelClient esClient; + public RiskAnalysisEngineServiceImpl( + ModelService modelService, EntityService entityService, + AntiFraudService antiFraudService, CacheService cacheService, + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") RiskResultService riskResultService, + ValidateService validateService) { + this.modelService = modelService; + this.entityService = entityService; + this.antiFraudService = antiFraudService; + this.cacheService = cacheService; + this.riskResultService = riskResultService; + this.validateService = validateService; + } @Override public CommonResult uploadInfo(String modelGuid, String reqId, String jsonInfo) { @@ -75,12 +72,12 @@ public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService result.setMsg("模型不存在!"); return result; } - + if (model.getStatus() != StatusType.ACTIVE.getKey()) { result.setMsg("模型未激活"); return result; } - + if (model.getFieldValidate()) { Map vldMap = validateService.validate(model.getId(), eventJson); if (vldMap.size() > 0) { @@ -98,16 +95,19 @@ public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService if (isDuplicate != null && isDuplicate.equals("true")) { isAllowDuplicate = true; } - entityService.save(model.getId(), eventJson.toJSONString(), JSON.toJSONString(preItemMap), isAllowDuplicate); + entityService + .save(model.getId(), eventJson.toJSONString(), JSON.toJSONString(preItemMap), + isAllowDuplicate); // 4. 执行分析 context.put("fields", eventJson); context.put("preItems", preItemMap); result = antiFraudService.process(model.getId(), context); - + // 5. for elastic analysis Long eventTimeMillis = (Long) eventJson.get(model.getReferenceDate()); - String timeStr = DateUtils.formatDate(new Date(eventTimeMillis), "yyyy-MM-dd'T'HH:mm:ssZ"); + String timeStr = DateUtils + .formatDate(new Date(eventTimeMillis), "yyyy-MM-dd'T'HH:mm:ssZ"); preItemMap.put("radar_ref_datetime", timeStr); } catch (Exception e) { e.printStackTrace(); @@ -118,18 +118,12 @@ public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService cacheService.saveAntiFraudResult(modelGuid, reqId, result); // 保存事件信息和分析结果用于后续分析 - try { - sendResult(modelGuid, reqId, JSON.toJSONString(context)); - } catch (IOException e) { - logger.error(e.getMessage(), e); - logger.error("向es中保存数据失败!"); - } + riskResultService.sendResult(modelGuid, reqId, JSON.toJSONString(context)); // 返回分析结果 return result; } - @Override public CommonResult getScore(String modelGuid, String reqId) { CommonResult result = cacheService.getAntiFraudResult(modelGuid, reqId); @@ -141,25 +135,4 @@ public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService return result; } - /** - * 通过消息队列缓存事件内容和分析结果,用于实时或者离线分析. - * @param modelGuid - * @param reqId - * @param info event info and analyze result. - */ - private void sendResult(String modelGuid, String reqId, String info) throws IOException { - // 这里可以根据情况进行异步处理。 - send2ES(modelGuid, reqId, info); - } - - private void send2ES(String guid, String reqId, String json) throws IOException { - IndexRequest request = new IndexRequest(guid.toLowerCase()); - request.id(reqId); - request.source(json, XContentType.JSON); - IndexResponse result = this.esClient.index(request, RequestOptions.DEFAULT); -// ResponseEntity result = restTemplate.postForEntity(url, requestEntity, String.class, new Object[]{}); - logger.info("es result:{}", result.toString()); - } - - } diff --git a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/data/RiskResultAsyncService.java b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/data/RiskResultAsyncService.java new file mode 100644 index 0000000..3f11f12 --- /dev/null +++ b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/data/RiskResultAsyncService.java @@ -0,0 +1,47 @@ +package com.pgmmers.radar.service.impl.data; + +import com.pgmmers.radar.service.data.RiskResultService; +import java.util.concurrent.CompletableFuture; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** + * author: wangcheng Date: 2020/4/23 Time: 上午10:44 Description: 使用 线程池 发送数据 + */ +@ConditionalOnProperty(prefix = "sys.conf", name = "app", havingValue = "engine") +@ConditionalOnExpression("'${sys.conf.riskResult:ThreadPool}'.equals('ThreadPool')") +@Service +public class RiskResultAsyncService implements RiskResultService { + + private static final Logger logger = LoggerFactory.getLogger(RiskResultAsyncService.class); + private final RestHighLevelClient restHighLevelClient; + + public RiskResultAsyncService(RestHighLevelClient restHighLevelClient) { + this.restHighLevelClient = restHighLevelClient; + logger.info("RiskResult SaveModel ThreadPool"); + } + + @Override + public void sendResult(String modelGuid, String reqId, String info) { + CompletableFuture.runAsync(() -> { + try { + IndexRequest request = new IndexRequest(modelGuid.toLowerCase()); + request.id(reqId); + request.source(info, XContentType.JSON); + IndexResponse result = restHighLevelClient.index(request, RequestOptions.DEFAULT); + logger.info("es result:{}", result.toString()); + } catch (Exception e) { + logger.error("es result error"); + } + } + ); + } +} diff --git a/radar-service/src/main/java/com/pgmmers/radar/service/data/RiskResultService.java b/radar-service/src/main/java/com/pgmmers/radar/service/data/RiskResultService.java new file mode 100644 index 0000000..5b54850 --- /dev/null +++ b/radar-service/src/main/java/com/pgmmers/radar/service/data/RiskResultService.java @@ -0,0 +1,5 @@ +package com.pgmmers.radar.service.data; + +public interface RiskResultService { + void sendResult(String modelGuid, String reqId, String info); +}