From fa8b7fe7cfdd2852e835ae24728b3bd62990fff2 Mon Sep 17 00:00:00 2001 From: maxman Date: Tue, 10 Mar 2020 17:42:42 +0800 Subject: [PATCH] feat: update es api to high level api --- pom.xml | 26 +-- radar-admin/pom.xml | 12 +- .../com/pgmmers/radar/config/EsConfig.java | 65 +++---- .../radar/controller/EventApiController.java | 2 +- radar-engine/pom.xml | 10 +- .../com/pgmmers/radar/config/EsConfig.java | 66 +++---- radar-service-impl/pom.xml | 5 +- .../core/RiskAnalysisEngineServiceImpl.java | 39 ++-- .../service/impl/logs/EventServiceImpl.java | 13 +- .../service/impl/model/ModelServiceImpl.java | 6 +- .../service/impl/model/RuleServiceImpl.java | 2 +- .../impl/search/SearchEngineServiceImpl.java | 166 ++++++++++-------- radar-service/pom.xml | 13 +- .../radar/service/logs/EventService.java | 3 +- .../radar/service/model/ModelService.java | 3 +- .../service/search/SearchEngineService.java | 19 +- 16 files changed, 251 insertions(+), 199 deletions(-) diff --git a/pom.xml b/pom.xml index c8a4148..e56505e 100644 --- a/pom.xml +++ b/pom.xml @@ -33,6 +33,7 @@ 2.1.7.RELEASE 8.5.37 1.12.0 + 7.6.1 @@ -128,32 +129,21 @@ + org.elasticsearch.client - transport - 6.8.5 + elasticsearch-rest-high-level-client + ${es.version} - org.elasticsearch elasticsearch - 6.8.5 - - - com.google.guava - guava - - - org.hdrhistogram - HdrHistogram - - + ${es.version} - - org.elasticsearch.plugin - transport-netty4-client - 6.8.5 + org.elasticsearch.client + elasticsearch-rest-client + ${es.version} diff --git a/radar-admin/pom.xml b/radar-admin/pom.xml index 1ddfbde..28149ee 100644 --- a/radar-admin/pom.xml +++ b/radar-admin/pom.xml @@ -81,12 +81,18 @@ easyexcel 2.1.3 - org.elasticsearch.client - transport + elasticsearch-rest-high-level-client + + + org.elasticsearch + elasticsearch + + + org.elasticsearch.client + elasticsearch-rest-client - diff --git a/radar-admin/src/main/java/com/pgmmers/radar/config/EsConfig.java b/radar-admin/src/main/java/com/pgmmers/radar/config/EsConfig.java index 6f4e1b8..1ec4457 100644 --- a/radar-admin/src/main/java/com/pgmmers/radar/config/EsConfig.java +++ b/radar-admin/src/main/java/com/pgmmers/radar/config/EsConfig.java @@ -1,47 +1,50 @@ package com.pgmmers.radar.config; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.net.InetAddress; @Configuration public class EsConfig { private static final Logger logger = LoggerFactory.getLogger(EsConfig.class); - @Value("${elasticsearch.ip}") - private String hostName; - @Value("${elasticsearch.port}") - private String port; - @Value("${elasticsearch.cluster.name}") - private String clusterName; - @Value("${elasticsearch.pool-size}") - private String poolSize; - @Bean(name = "transportClient") - public TransportClient transportClient() { - logger.info("Elasticsearch初始化开始。。。。。"); - TransportClient transportClient = null; - try { - // 配置信息 - Settings esSetting = Settings.builder() - .put("cluster.name", clusterName) //集群名字 - .put("client.transport.sniff", true)//增加嗅探机制,找到ES集群 - .put("thread_pool.search.size", Integer.parseInt(poolSize))//增加线程池个数,暂时设为5 - .build(); - //配置信息Settings自定义 - transportClient = new PreBuiltTransportClient(esSetting); - TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(hostName), Integer.valueOf(port)); - transportClient.addTransportAddresses(transportAddress); - } catch (Exception e) { - logger.error("elasticsearch TransportClient create error!!", e); + @Value("${elasticsearch.username}") + private String username; + @Value("${elasticsearch.password}") + private String password; + @Value("${elasticsearch.url}") + private String url; + + + @Bean + public RestHighLevelClient esClient(){ + if(StringUtils.isEmpty(this.username)){ + RestHighLevelClient client = new RestHighLevelClient( + RestClient.builder(HttpHost.create(this.url))); + return client; } - return transportClient; + + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); //es账号密码(默认用户名为elastic) + RestHighLevelClient client = new RestHighLevelClient( + RestClient.builder(HttpHost.create(this.url)) + .setHttpClientConfigCallback(httpAsyncClientBuilder -> { + httpAsyncClientBuilder.disableAuthCaching(); + return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + }) + ); + return client; } + } \ No newline at end of file diff --git a/radar-admin/src/main/java/com/pgmmers/radar/controller/EventApiController.java b/radar-admin/src/main/java/com/pgmmers/radar/controller/EventApiController.java index ea9b777..d18c02c 100644 --- a/radar-admin/src/main/java/com/pgmmers/radar/controller/EventApiController.java +++ b/radar-admin/src/main/java/com/pgmmers/radar/controller/EventApiController.java @@ -62,7 +62,7 @@ public class EventApiController { @PostMapping("/query") - public CommonResult query(@RequestBody EventQuery query) { + public CommonResult query(@RequestBody EventQuery query) throws IOException { CommonResult result = new CommonResult(); List list = eventService.query(query); result.getData().put("page", list); diff --git a/radar-engine/pom.xml b/radar-engine/pom.xml index 84eb44c..0e1c977 100644 --- a/radar-engine/pom.xml +++ b/radar-engine/pom.xml @@ -70,7 +70,15 @@ org.elasticsearch.client - transport + elasticsearch-rest-high-level-client + + + org.elasticsearch + elasticsearch + + + org.elasticsearch.client + elasticsearch-rest-client diff --git a/radar-engine/src/main/java/com/pgmmers/radar/config/EsConfig.java b/radar-engine/src/main/java/com/pgmmers/radar/config/EsConfig.java index 6f4e1b8..96c6dd6 100644 --- a/radar-engine/src/main/java/com/pgmmers/radar/config/EsConfig.java +++ b/radar-engine/src/main/java/com/pgmmers/radar/config/EsConfig.java @@ -1,47 +1,51 @@ package com.pgmmers.radar.config; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.transport.client.PreBuiltTransportClient; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.net.InetAddress; @Configuration public class EsConfig { private static final Logger logger = LoggerFactory.getLogger(EsConfig.class); - @Value("${elasticsearch.ip}") - private String hostName; - @Value("${elasticsearch.port}") - private String port; - @Value("${elasticsearch.cluster.name}") - private String clusterName; - @Value("${elasticsearch.pool-size}") - private String poolSize; - @Bean(name = "transportClient") - public TransportClient transportClient() { - logger.info("Elasticsearch初始化开始。。。。。"); - TransportClient transportClient = null; - try { - // 配置信息 - Settings esSetting = Settings.builder() - .put("cluster.name", clusterName) //集群名字 - .put("client.transport.sniff", true)//增加嗅探机制,找到ES集群 - .put("thread_pool.search.size", Integer.parseInt(poolSize))//增加线程池个数,暂时设为5 - .build(); - //配置信息Settings自定义 - transportClient = new PreBuiltTransportClient(esSetting); - TransportAddress transportAddress = new TransportAddress(InetAddress.getByName(hostName), Integer.valueOf(port)); - transportClient.addTransportAddresses(transportAddress); - } catch (Exception e) { - logger.error("elasticsearch TransportClient create error!!", e); + + @Value("${elasticsearch.username}") + private String username; + @Value("${elasticsearch.password}") + private String password; + @Value("${elasticsearch.url}") + private String url; + + + @Bean + public RestHighLevelClient esClient(){ + if(StringUtils.isEmpty(this.username)){ + RestHighLevelClient client = new RestHighLevelClient( + RestClient.builder(HttpHost.create(this.url))); + return client; } - return transportClient; + + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); //es账号密码(默认用户名为elastic) + RestHighLevelClient client = new RestHighLevelClient( + RestClient.builder(HttpHost.create(this.url)) + .setHttpClientConfigCallback(httpAsyncClientBuilder -> { + httpAsyncClientBuilder.disableAuthCaching(); + return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + }) + ); + return client; } + } \ No newline at end of file diff --git a/radar-service-impl/pom.xml b/radar-service-impl/pom.xml index f62924d..0ed8215 100644 --- a/radar-service-impl/pom.xml +++ b/radar-service-impl/pom.xml @@ -52,9 +52,6 @@ org.tensorflow tensorflow - - org.elasticsearch - elasticsearch - + \ No newline at end of file diff --git a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/core/RiskAnalysisEngineServiceImpl.java b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/core/RiskAnalysisEngineServiceImpl.java index 3ba012e..a505a61 100644 --- a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/core/RiskAnalysisEngineServiceImpl.java +++ b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/core/RiskAnalysisEngineServiceImpl.java @@ -12,6 +12,11 @@ import com.pgmmers.radar.service.model.EntityService; import com.pgmmers.radar.service.model.ModelService; import com.pgmmers.radar.util.DateUtils; import com.pgmmers.radar.vo.model.ModelVO; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -21,6 +26,7 @@ import org.springframework.http.*; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; +import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -49,11 +55,9 @@ public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService @Autowired private ValidateService validateService; - @Autowired - private RestTemplate restTemplate; - @Value("${elasticsearch.url}") - private String elasticsearchUrl; + @Autowired + private RestHighLevelClient esClient; @Override public CommonResult uploadInfo(String modelGuid, String reqId, String jsonInfo) { @@ -114,8 +118,12 @@ public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService cacheService.saveAntiFraudResult(modelGuid, reqId, result); // 保存事件信息和分析结果用于后续分析 - sendResult(modelGuid, reqId, JSON.toJSONString(context)); - + try { + sendResult(modelGuid, reqId, JSON.toJSONString(context)); + } catch (IOException e) { + logger.error(e.getMessage(), e); + logger.error("向es中保存数据失败!"); + } // 返回分析结果 return result; } @@ -139,19 +147,18 @@ public class RiskAnalysisEngineServiceImpl implements RiskAnalysisEngineService * @param reqId * @param info event info and analyze result. */ - private void sendResult(String modelGuid, String reqId, String info) { + private void sendResult(String modelGuid, String reqId, String info) throws IOException { // 这里可以根据情况进行异步处理。 - send2ES(modelGuid, info); + send2ES(modelGuid, reqId, info); } - private void send2ES(String guid, String json) { - String url = elasticsearchUrl + "/" + guid.toLowerCase() + "/" + "radar"; - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - HttpEntity requestEntity = new HttpEntity<>(json, headers); - - ResponseEntity result = restTemplate.postForEntity(url, requestEntity, String.class, new Object[]{}); - logger.info("es result:{}", result); + private void send2ES(String guid, String reqId, String json) throws IOException { + IndexRequest request = new IndexRequest(guid.toLowerCase()); + request.id(reqId); + request.source(json, XContentType.JSON); + IndexResponse result = this.esClient.index(request, RequestOptions.DEFAULT); +// ResponseEntity result = restTemplate.postForEntity(url, requestEntity, String.class, new Object[]{}); + logger.info("es result:{}", result.toString()); } diff --git a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/logs/EventServiceImpl.java b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/logs/EventServiceImpl.java index 0802399..5d1103a 100644 --- a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/logs/EventServiceImpl.java +++ b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/logs/EventServiceImpl.java @@ -17,6 +17,7 @@ import org.apache.poi.ss.usermodel.*; import org.apache.poi.xssf.streaming.SXSSFWorkbook; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.slf4j.Logger; @@ -24,6 +25,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.io.IOException; import java.util.*; @Service @@ -38,7 +40,7 @@ public class EventServiceImpl implements EventService { private ModelDal modelDal; @Override - public List query(EventQuery query) { + public List query(EventQuery query) throws IOException { List list = new ArrayList<>(); ModelVO model = modelDal.getModelById(query.getModelId()); String entityName = model.getEntityName(); @@ -65,7 +67,7 @@ public class EventServiceImpl implements EventService { } SearchHits hitsRet = searchService.search( - model.getGuid().toLowerCase(), "radar", queryMap, filterMap, + model.getGuid().toLowerCase(), queryMap, filterMap, (query.getPageNo() - 1) * query.getPageSize(), query.getPageSize()); SearchHit[] hits = hitsRet.getHits(); @@ -99,8 +101,7 @@ public class EventServiceImpl implements EventService { query = QueryBuilders.termQuery(term.getFieldName(), term.getFieldValue()); } - QueryBuilder filter = null; - filter = QueryBuilders.rangeQuery("fields." + dateField) + RangeQueryBuilder filter = QueryBuilders.rangeQuery("fields."+dateField) .from(beginTime.getTimeInMillis()) .to(endTime.getTimeInMillis()); SearchHits hitsRet; @@ -108,7 +109,7 @@ public class EventServiceImpl implements EventService { PageResult pageResult = null; try { hitsRet = searchService.search(model.getGuid().toLowerCase(), - "radar", query, filter, + query, filter, (term.getPageNo() - 1) * term.getPageSize(), term.getPageSize()); @@ -119,7 +120,7 @@ public class EventServiceImpl implements EventService { list.add(JSONObject.parse(info)); } pageResult = new PageResult<>(term.getPageNo(), - term.getPageSize(), (int) hitsRet.getTotalHits(), list); + term.getPageSize(), (int) hitsRet.getTotalHits().value, list); } catch (Exception e) { logger.error("", e); } diff --git a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/model/ModelServiceImpl.java b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/model/ModelServiceImpl.java index 6bec1a4..2589a61 100644 --- a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/model/ModelServiceImpl.java +++ b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/model/ModelServiceImpl.java @@ -31,6 +31,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -169,7 +170,7 @@ public class ModelServiceImpl implements ModelService, SubscribeHandle { } @Override - public CommonResult build(Long id) { + public CommonResult build(Long id) throws IOException { CommonResult result = new CommonResult(); ModelVO modelVO = modelDal.getModelById(id); List fields = modelDal.listField(id); @@ -215,8 +216,7 @@ public class ModelServiceImpl implements ModelService, SubscribeHandle { JSONObject total = buildEsMappingJson(fields, items); // execute - boolean isCreated; - isCreated = searchService.createIndex(modelVO.getGuid().toLowerCase(), modelVO.getModelName().toLowerCase(), "radar", total.toJSONString()); + boolean isCreated = searchService.createIndex(modelVO.getGuid().toLowerCase(), modelVO.getModelName().toLowerCase(), total.toJSONString()); logger.info("index mapping:{} is create {}", total.toJSONString(), isCreated); if (isCreated) { modelVO.setStatus(StatusType.INACTIVE.getKey()); diff --git a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/model/RuleServiceImpl.java b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/model/RuleServiceImpl.java index 5311c8c..a76e8a7 100644 --- a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/model/RuleServiceImpl.java +++ b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/model/RuleServiceImpl.java @@ -168,7 +168,7 @@ public class RuleServiceImpl implements RuleService, SubscribeHandle { long qty = 0; try { qty = searchService.count(model.getGuid().toLowerCase(), - "radar", QueryBuilders.termQuery(keyStr,rule.getId() + ""), null); + QueryBuilders.termQuery(keyStr,rule.getId() + ""), null); } catch (Exception e) { logger.error("search error", e); } diff --git a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/search/SearchEngineServiceImpl.java b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/search/SearchEngineServiceImpl.java index 623a4c5..329789a 100644 --- a/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/search/SearchEngineServiceImpl.java +++ b/radar-service-impl/src/main/java/com/pgmmers/radar/service/impl/search/SearchEngineServiceImpl.java @@ -1,51 +1,64 @@ package com.pgmmers.radar.service.impl.search; +import com.alibaba.fastjson.JSON; import com.pgmmers.radar.service.search.SearchEngineService; +import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; -import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.IndicesAdminClient; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; + +import java.io.IOException; import java.util.Map; @Service public class SearchEngineServiceImpl implements SearchEngineService { public static Logger logger = LoggerFactory .getLogger(SearchEngineServiceImpl.class); - @Autowired - private TransportClient client; + private RestHighLevelClient client; + public RestHighLevelClient getClient() { + return client; + } + + public void setClient(RestHighLevelClient client) { + this.client = client; + } @Override - public SearchHits search(String index, String type, String queryJson, String filterJson, Integer offset, Integer limit) { - SearchResponse response = null; - SearchRequestBuilder builder = client.prepareSearch(index) - .setTypes(type).setSearchType(SearchType.DFS_QUERY_THEN_FETCH) - .setFrom(offset).setSize(limit).setExplain(true); - if (!StringUtils.isEmpty(queryJson)) { - builder.setQuery(QueryBuilders.queryStringQuery(queryJson)); - } - if (!StringUtils.isEmpty(filterJson)) { - builder.setPostFilter(QueryBuilders.queryStringQuery(filterJson)); - } + public SearchHits search(String index, String queryJson, String filterJson, Integer offset, Integer limit) throws IOException { - response = builder.execute().actionGet(); + SearchRequest request = new SearchRequest(index); + request.searchType(SearchType.DFS_QUERY_THEN_FETCH); + + SearchSourceBuilder builder = new SearchSourceBuilder(); + + builder + .query(QueryBuilders.wrapperQuery(queryJson)) + .postFilter(QueryBuilders.wrapperQuery(filterJson)) +// .postFilter(QueryBuilders.matchQuery("type", type)) + .from(offset) + .size(limit); + request.source(builder); + SearchResponse response = client.search(request, RequestOptions.DEFAULT); RestStatus status = response.status(); if (status.equals(RestStatus.OK)) { SearchHits hits = response.getHits(); @@ -56,7 +69,7 @@ public class SearchEngineServiceImpl implements SearchEngineService { } @Override - public SearchHits search(String index, String type, Map queryMap, Map filterMap, Integer offset, Integer limit) { + public SearchHits search(String index, Map queryMap, Map filterMap, Integer offset, Integer limit) throws IOException { QueryBuilder query = null; QueryBuilder filter = null; if (queryMap != null) { @@ -75,23 +88,26 @@ public class SearchEngineServiceImpl implements SearchEngineService { .from(beginTime.longValue()).to(endTime.longValue()); } - return search(index, type, query, filter, offset, limit); + return search(index, query, filter, offset, limit); } @Override - public SearchHits search(String index, String type, QueryBuilder query, QueryBuilder filter, Integer offset, Integer limit) { - SearchResponse response = null; - SearchRequestBuilder builder = client.prepareSearch(index) - .setTypes(type).setSearchType(SearchType.DFS_QUERY_THEN_FETCH) - .setFrom(offset).setSize(limit).setExplain(true); - if (query != null) { - builder.setQuery(query); - } - if (filter != null) { - builder.setPostFilter(filter); - } + public SearchHits search(String index, QueryBuilder query, QueryBuilder filter, Integer offset, Integer limit) throws IOException { + logger.debug("search, index={}, query:{}, filter:{}, offset:{}, limit:{}", index, JSON.toJSONString(query), JSON.toJSONString(filter), offset, limit); + SearchRequest request = new SearchRequest(index); + request.searchType(SearchType.DFS_QUERY_THEN_FETCH); + SearchSourceBuilder builder = new SearchSourceBuilder(); - response = builder.execute().actionGet(); + builder.query(query) + .postFilter(filter) +// .postFilter(QueryBuilders.matchQuery("type", type)) + .from(offset) + .size(limit) + .explain(true) + ; + request.source(builder); + SearchResponse response = client.search(request, RequestOptions.DEFAULT); + logger.debug("search result: {}", JSON.toJSONString(response)); RestStatus status = response.status(); if (status.equals(RestStatus.OK)) { SearchHits hits = response.getHits(); @@ -102,73 +118,85 @@ public class SearchEngineServiceImpl implements SearchEngineService { } @Override - public Long count(String index, String type, String query, String filter) { + public Long count(String index, String query, String filter) { return 0L; } @Override - public Long count(String index, String type, QueryBuilder query, QueryBuilder filter) { - SearchResponse response = null; - SearchRequestBuilder builder = client.prepareSearch(index) - .setTypes(type).setSearchType(SearchType.DFS_QUERY_THEN_FETCH) - .setSize(0).setExplain(true); - if (!StringUtils.isEmpty(query)) { - builder.setQuery(query); - } - if (!StringUtils.isEmpty(filter)) { - builder.setPostFilter(filter); - } - response = builder.execute().actionGet(); + public Long count(String index, QueryBuilder query, QueryBuilder filter) throws IOException { + SearchRequest request = new SearchRequest(index); + request.searchType(SearchType.DFS_QUERY_THEN_FETCH); +// request.types(type); + SearchSourceBuilder builder = new SearchSourceBuilder(); + builder.query(query) + .postFilter(filter) +// .postFilter(QueryBuilders.matchQuery("type", type)) + .size(0) + .explain(true) + ; + request.source(builder); + SearchResponse response = client.search(request, RequestOptions.DEFAULT); RestStatus status = response.status(); if (status.equals(RestStatus.OK)) { SearchHits hits = response.getHits(); - return hits.getTotalHits(); + return hits.getTotalHits().value; } else { return null; } } @Override - public double avg(String index, String type, String field, String filter) { + public double avg(String index,String field, String filter) { return 0; } @Override - public double max(String index, String type, String field, String filter) { + public double max(String index, String field, String filter) { return 0; } @Override - public Terms aggregateByTerms(String index, String type, String field, String filter) { + public Terms aggregateByTerms(String index, String field, String filter) { return null; } + public boolean indexExist(String index) throws IOException { + GetIndexRequest request = new GetIndexRequest(index); + request.local(false); + request.humanReadable(true); + request.includeDefaults(false); + return client.indices().exists(request, RequestOptions.DEFAULT); + } + @Override - public boolean createIndex(String index, String indexAlias, String type, String jsonMapping) { - IndicesAdminClient adminClient = client.admin().indices(); - DeleteIndexRequest request = Requests.deleteIndexRequest(index); - adminClient.delete(request); - adminClient.prepareCreate(index).get(); - AcknowledgedResponse resp; - PutMappingRequestBuilder builder = adminClient.preparePutMapping(index); - if (!StringUtils.isEmpty(jsonMapping)) { - builder.setType(type).setSource(jsonMapping, XContentType.JSON); - } else { - return false; + public boolean createIndex(String index, String indexAlias, String jsonMapping) throws IOException { + boolean exist = this.indexExist(index); + if(exist){ + this.deleteIndex(index); } - resp = builder.get(); + CreateIndexRequest request = new CreateIndexRequest(index); + request.mapping(jsonMapping, XContentType.JSON); + request.alias(new Alias(indexAlias)); + CreateIndexResponse resp = client.indices().create(request, RequestOptions.DEFAULT); + if (resp.isAcknowledged()) { // 创建别名 - AcknowledgedResponse aliasResp = adminClient.prepareAliases().addAlias(index, indexAlias).get(); - if (aliasResp.isAcknowledged()) { - return true; - } else { - return false; - } +// AcknowledgedResponse aliasResp = adminClient.prepareAliases().addAlias(index, indexAlias).get(); +// if (aliasResp.isAcknowledged()) { +// return true; +// } else { +// return false; +// } + return true; } else { return false; } } + public void deleteIndex(String index) throws IOException { + client.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT); + } + + } diff --git a/radar-service/pom.xml b/radar-service/pom.xml index e3ff087..1dc32a0 100644 --- a/radar-service/pom.xml +++ b/radar-service/pom.xml @@ -29,13 +29,18 @@ com.alibaba fastjson - org.elasticsearch.client - transport + elasticsearch-rest-high-level-client + + + org.elasticsearch + elasticsearch + + + org.elasticsearch.client + elasticsearch-rest-client - - io.springfox springfox-swagger2 diff --git a/radar-service/src/main/java/com/pgmmers/radar/service/logs/EventService.java b/radar-service/src/main/java/com/pgmmers/radar/service/logs/EventService.java index 9250bba..fbf4ce7 100644 --- a/radar-service/src/main/java/com/pgmmers/radar/service/logs/EventService.java +++ b/radar-service/src/main/java/com/pgmmers/radar/service/logs/EventService.java @@ -6,6 +6,7 @@ import com.pgmmers.radar.dal.bean.PageResult; import com.pgmmers.radar.dal.bean.TermQuery; import org.apache.poi.ss.usermodel.Workbook; +import java.io.IOException; import java.util.List; /** @@ -15,7 +16,7 @@ import java.util.List; */ public interface EventService { - List query(EventQuery query); + List query(EventQuery query) throws IOException; PageResult query(TermQuery term); diff --git a/radar-service/src/main/java/com/pgmmers/radar/service/model/ModelService.java b/radar-service/src/main/java/com/pgmmers/radar/service/model/ModelService.java index 1f06a0a..735a020 100644 --- a/radar-service/src/main/java/com/pgmmers/radar/service/model/ModelService.java +++ b/radar-service/src/main/java/com/pgmmers/radar/service/model/ModelService.java @@ -6,6 +6,7 @@ import com.pgmmers.radar.dal.bean.ModelQuery; import com.pgmmers.radar.service.common.CommonResult; import com.pgmmers.radar.vo.model.ModelVO; +import java.io.IOException; import java.util.List; public interface ModelService { @@ -28,7 +29,7 @@ public interface ModelService { CommonResult delete(Long[] id); - CommonResult build(Long id); + CommonResult build(Long id) throws IOException; CommonResult copy(Long id, String merchantCode, String name, String label); diff --git a/radar-service/src/main/java/com/pgmmers/radar/service/search/SearchEngineService.java b/radar-service/src/main/java/com/pgmmers/radar/service/search/SearchEngineService.java index dcbd1fc..1d61e8c 100644 --- a/radar-service/src/main/java/com/pgmmers/radar/service/search/SearchEngineService.java +++ b/radar-service/src/main/java/com/pgmmers/radar/service/search/SearchEngineService.java @@ -4,6 +4,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import java.io.IOException; import java.util.Map; @@ -14,24 +15,24 @@ import java.util.Map; */ public interface SearchEngineService { - SearchHits search(String index, String type, String queryJson, String filterJson, Integer offset, Integer limit); + SearchHits search(String index,String queryJson, String filterJson, Integer offset, Integer limit) throws IOException; - SearchHits search(String index, String type, Map queryMap, Map filterMap, Integer offset, Integer limit); + SearchHits search(String index,Map queryMap, Map filterMap, Integer offset, Integer limit) throws IOException; - SearchHits search(String index, String type, QueryBuilder query, QueryBuilder filter, Integer offset, Integer limit); + SearchHits search(String index, QueryBuilder query, QueryBuilder filter, Integer offset, Integer limit) throws IOException; - Long count(String index, String type, String query, String filter); + Long count(String index, String query, String filter); - Long count(String index, String type, QueryBuilder query, QueryBuilder filter); + Long count(String index, QueryBuilder query, QueryBuilder filter) throws IOException; - double avg(String index, String type, String field, String filter); + double avg(String index, String field, String filter); - double max(String index, String type, String field, String filter); + double max(String index, String field, String filter); - Terms aggregateByTerms(String index, String type, String field, String filter); + Terms aggregateByTerms(String index, String field, String filter); - boolean createIndex(String index, String indexAlias, String type, String jsonMapping); + boolean createIndex(String index, String indexAlias, String jsonMapping) throws IOException; }