Merge pull request #42 from CatCccC/develop

风控结果异步发送
This commit is contained in:
feihu.wang
2020-05-07 16:21:01 +08:00
committed by GitHub
4 changed files with 87 additions and 296 deletions

View File

@@ -1,234 +0,0 @@
package com.pgmmers.radar.service.impl.cache;
import com.alibaba.fastjson.JSON;
import com.pgmmers.radar.service.cache.RedisService;
import com.pgmmers.radar.service.cache.SubscribeHandle;
import com.pgmmers.radar.service.cache.SubscribeHandle2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPool;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
//@Service
@Deprecated
public class RedisServiceImpl implements RedisService {
public static Logger logger = LoggerFactory.getLogger(RedisServiceImpl.class);
@Autowired
private ShardedJedisPool jedisPool;
public boolean setex(String key, String val, int seconds) {
ShardedJedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.setex(key, seconds, val);
} catch (Exception e) {
logger.warn("redis error", e);
return false;
} finally {
if (jedis != null) {
jedis.close();
}
}
return true;
}
public String get(String key) {
ShardedJedis jedis = null;
String value = null;
try {
jedis = jedisPool.getResource();
value = jedis.get(key);
} catch (Exception e) {
return null;
} finally {
if (jedis != null) {
jedis.close();
}
}
return value;
}
public void del(String key) {
ShardedJedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.del(key);
} catch (Exception e) {
} finally {
if (jedis != null) {
jedis.close();
}
}
}
public boolean set(String key, String val) {
ShardedJedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set(key, val);
} catch (Exception e) {
logger.error("redis set val error" + key + ":" + val);
return false;
} finally {
if (jedis != null) {
jedis.close();
}
}
return true;
}
public String hget(String key, String field) {
ShardedJedis jedis = null;
String value = null;
try {
jedis = jedisPool.getResource();
value = jedis.hget(key, field);
} catch (Exception e) {
return null;
} finally {
if (jedis != null) {
jedis.close();
}
}
return value;
}
public boolean hset(String key, String field, String val) {
ShardedJedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hset(key, field, val);
} catch (Exception e) {
logger.error("redis set val error" + key + ":" + field + ":" + val);
return false;
} finally {
if (jedis != null) {
jedis.close();
}
}
return true;
}
@Override
public boolean set(String key, Serializable val) {
return set(key, JSON.toJSONString(val));
}
@Override
public boolean setex(String key, Serializable val, int seconds) {
return false;
}
@Override
public <T> T get(String key, Class<T> clazz) {
String val = get(key);
return JSON.parseObject(val, clazz);
}
@Override
public boolean hset(String key, String field, Serializable val) {
return hset(key, field, JSON.toJSONString(val));
}
@Override
public <T> List<T> hget(String key, Class<T> clazz) {
List<T> objList = new ArrayList<T>();
ShardedJedis jedis = null;
List<String> vals = null;
try {
jedis = jedisPool.getResource();
vals = jedis.hvals(key);
for (String val : vals) {
objList.add(JSON.parseObject(val, clazz));
}
} catch (Exception e) {
return null;
} finally {
if (jedis != null) {
jedis.close();
}
}
return objList;
}
@Override
public List hvals(String key) {
return null;
}
public void publish(String channel, String message) {
ShardedJedis jedis = null;
try {
jedis = jedisPool.getResource();
Jedis jedis_ = jedis.getShard(channel);
jedis_.publish(channel, message);
logger.info("publish success! channel={},message={}", channel, message);
} catch (RuntimeException ex) {
logger.error("publish error! channel={},message={}", channel, message);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
public void subscribe(String channel, SubscribeHandle handle) {
ShardedJedis jedis = null;
try {
jedis = jedisPool.getResource();
Jedis jedis_ = jedis.getShard(channel);
//异步操作jedis.subscribe会阻塞
new Thread(new Runnable() {
@Override
public void run() {
JedisPubSub listener = new JedisPubSub() {
public void onMessage(String channel, String message) {
handle.onMessage(channel, message);
};
};
jedis_.subscribe(listener, channel);//block
}
}).start();
logger.info("subscribe success! channel={}", channel);
} catch (RuntimeException ex) {
logger.error("subscribe error! channel={}", channel);
} finally {
/*if (jedis != null) {
jedis.close();
}*/
}
}
@Override
public void subscribe(byte[] channel, SubscribeHandle2 handle) {
}
@Override
public boolean contains(String token) {
return jedisPool.getResource().exists(token);
}
}

View File

@@ -6,58 +6,55 @@ import com.pgmmers.radar.enums.StatusType;
import com.pgmmers.radar.service.RiskAnalysisEngineService;
import com.pgmmers.radar.service.cache.CacheService;
import com.pgmmers.radar.service.common.CommonResult;
import com.pgmmers.radar.service.data.RiskResultService;
import com.pgmmers.radar.service.engine.AntiFraudService;
import com.pgmmers.radar.service.engine.ValidateService;
import com.pgmmers.radar.service.model.EntityService;
import com.pgmmers.radar.service.model.ModelService;
import com.pgmmers.radar.util.DateUtils;
import com.pgmmers.radar.vo.model.ModelVO;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
@ConditionalOnProperty(prefix = "sys.conf", name="app", havingValue = "engine")
@ConditionalOnProperty(prefix = "sys.conf", name = "app", havingValue = "engine")
@Service
public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService {
private static Logger logger = LoggerFactory.getLogger(RiskAnalysisEngineServiceImpl.class);
@Autowired
private ModelService modelService;
private final ModelService modelService;
@Autowired
private EntityService entityService;
private final EntityService entityService;
@Autowired
private AntiFraudService antiFraudService;
private final AntiFraudService antiFraudService;
@Value("${sys.conf.entity-duplicate-insert}")
private String isDuplicate;
@Autowired
private CacheService cacheService;
private final CacheService cacheService;
private final RiskResultService riskResultService;
@Autowired
private ValidateService validateService;
private final ValidateService validateService;
@Autowired
private RestHighLevelClient esClient;
public RiskAnalysisEngineServiceImpl(
ModelService modelService, EntityService entityService,
AntiFraudService antiFraudService, CacheService cacheService,
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") RiskResultService riskResultService,
ValidateService validateService) {
this.modelService = modelService;
this.entityService = entityService;
this.antiFraudService = antiFraudService;
this.cacheService = cacheService;
this.riskResultService = riskResultService;
this.validateService = validateService;
}
@Override
public CommonResult uploadInfo(String modelGuid, String reqId, String jsonInfo) {
@@ -75,12 +72,12 @@ public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService
result.setMsg("模型不存在!");
return result;
}
if (model.getStatus() != StatusType.ACTIVE.getKey()) {
result.setMsg("模型未激活");
return result;
}
if (model.getFieldValidate()) {
Map<String, Object> vldMap = validateService.validate(model.getId(), eventJson);
if (vldMap.size() > 0) {
@@ -98,16 +95,19 @@ public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService
if (isDuplicate != null && isDuplicate.equals("true")) {
isAllowDuplicate = true;
}
entityService.save(model.getId(), eventJson.toJSONString(), JSON.toJSONString(preItemMap), isAllowDuplicate);
entityService
.save(model.getId(), eventJson.toJSONString(), JSON.toJSONString(preItemMap),
isAllowDuplicate);
// 4. 执行分析
context.put("fields", eventJson);
context.put("preItems", preItemMap);
result = antiFraudService.process(model.getId(), context);
// 5. for elastic analysis
Long eventTimeMillis = (Long) eventJson.get(model.getReferenceDate());
String timeStr = DateUtils.formatDate(new Date(eventTimeMillis), "yyyy-MM-dd'T'HH:mm:ssZ");
String timeStr = DateUtils
.formatDate(new Date(eventTimeMillis), "yyyy-MM-dd'T'HH:mm:ssZ");
preItemMap.put("radar_ref_datetime", timeStr);
} catch (Exception e) {
e.printStackTrace();
@@ -118,18 +118,12 @@ public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService
cacheService.saveAntiFraudResult(modelGuid, reqId, result);
// 保存事件信息和分析结果用于后续分析
try {
sendResult(modelGuid, reqId, JSON.toJSONString(context));
} catch (IOException e) {
logger.error(e.getMessage(), e);
logger.error("向es中保存数据失败");
}
riskResultService.sendResult(modelGuid, reqId, JSON.toJSONString(context));
// 返回分析结果
return result;
}
@Override
public CommonResult getScore(String modelGuid, String reqId) {
CommonResult result = cacheService.getAntiFraudResult(modelGuid, reqId);
@@ -141,25 +135,4 @@ public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService
return result;
}
/**
* 通过消息队列缓存事件内容和分析结果,用于实时或者离线分析.
* @param modelGuid
* @param reqId
* @param info event info and analyze result.
*/
private void sendResult(String modelGuid, String reqId, String info) throws IOException {
// 这里可以根据情况进行异步处理。
send2ES(modelGuid, reqId, info);
}
private void send2ES(String guid, String reqId, String json) throws IOException {
IndexRequest request = new IndexRequest(guid.toLowerCase());
request.id(reqId);
request.source(json, XContentType.JSON);
IndexResponse result = this.esClient.index(request, RequestOptions.DEFAULT);
// ResponseEntity<String> result = restTemplate.postForEntity(url, requestEntity, String.class, new Object[]{});
logger.info("es result:{}", result.toString());
}
}

View File

@@ -0,0 +1,47 @@
package com.pgmmers.radar.service.impl.data;
import com.pgmmers.radar.service.data.RiskResultService;
import java.util.concurrent.CompletableFuture;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* author: wangcheng Date: 2020/4/23 Time: 上午10:44 Description: 使用 线程池 发送数据
*/
@ConditionalOnProperty(prefix = "sys.conf", name = "app", havingValue = "engine")
@ConditionalOnExpression("'${sys.conf.riskResult:ThreadPool}'.equals('ThreadPool')")
@Service
public class RiskResultAsyncService implements RiskResultService {
private static final Logger logger = LoggerFactory.getLogger(RiskResultAsyncService.class);
private final RestHighLevelClient restHighLevelClient;
public RiskResultAsyncService(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
logger.info("RiskResult SaveModel ThreadPool");
}
@Override
public void sendResult(String modelGuid, String reqId, String info) {
CompletableFuture.runAsync(() -> {
try {
IndexRequest request = new IndexRequest(modelGuid.toLowerCase());
request.id(reqId);
request.source(info, XContentType.JSON);
IndexResponse result = restHighLevelClient.index(request, RequestOptions.DEFAULT);
logger.info("es result:{}", result.toString());
} catch (Exception e) {
logger.error("es result error");
}
}
);
}
}

View File

@@ -0,0 +1,5 @@
package com.pgmmers.radar.service.data;
public interface RiskResultService {
void sendResult(String modelGuid, String reqId, String info);
}