From a4660c8c10a5fd860d9ea7829454692f18d8c4d6 Mon Sep 17 00:00:00 2001 From: xuwujing <1060589146@qq.com> Date: Sat, 30 Mar 2019 19:01:51 +0800 Subject: [PATCH] =?UTF-8?q?es=20=E6=B5=8B=E8=AF=95=E7=B1=BB=E6=8F=90?= =?UTF-8?q?=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../EsHighLevelRestSearchTest.java | 31 +- .../easticsearch/EsHighLevelRestTest1.java | 449 ++++++++++-------- .../com/pancm/sql/easticsearch/JestTest.java | 47 +- 3 files changed, 294 insertions(+), 233 deletions(-) diff --git a/src/main/java/com/pancm/sql/easticsearch/EsHighLevelRestSearchTest.java b/src/main/java/com/pancm/sql/easticsearch/EsHighLevelRestSearchTest.java index 9342051..6f215bf 100644 --- a/src/main/java/com/pancm/sql/easticsearch/EsHighLevelRestSearchTest.java +++ b/src/main/java/com/pancm/sql/easticsearch/EsHighLevelRestSearchTest.java @@ -1,9 +1,4 @@ -package com.pancm.sql.easticsearch; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; +package com.pancm.test.esTest; import org.apache.http.HttpHost; import org.elasticsearch.action.search.SearchRequest; @@ -12,6 +7,7 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.Fuzziness; import org.elasticsearch.common.unit.TimeValue; @@ -35,6 +31,11 @@ import org.elasticsearch.search.suggest.term.TermSuggestion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + /** * @Title: EsHighLevelRestSearchTest @@ -49,7 +50,6 @@ public class EsHighLevelRestSearchTest { private static String elasticIp = "192.169.0.23"; private static int elasticPort = 9200; - private static Logger logger = LoggerFactory.getLogger(EsHighLevelRestSearchTest.class); private static RestHighLevelClient client = null; @@ -76,7 +76,8 @@ public class EsHighLevelRestSearchTest { * 初始化服务 */ private static void init() { - client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticIp, elasticPort, "http"))); + RestClientBuilder restClientBuilder =RestClient.builder(new HttpHost(elasticIp, elasticPort)); + client = new RestHighLevelClient(restClientBuilder); } @@ -89,6 +90,8 @@ public class EsHighLevelRestSearchTest { client.close(); } catch (IOException e) { e.printStackTrace(); + }finally{ + client=null; } } } @@ -331,14 +334,10 @@ public class EsHighLevelRestSearchTest { } } - - - - - - - - } + + + + } diff --git a/src/main/java/com/pancm/sql/easticsearch/EsHighLevelRestTest1.java b/src/main/java/com/pancm/sql/easticsearch/EsHighLevelRestTest1.java index 74bf214..98f4f97 100644 --- a/src/main/java/com/pancm/sql/easticsearch/EsHighLevelRestTest1.java +++ b/src/main/java/com/pancm/sql/easticsearch/EsHighLevelRestTest1.java @@ -1,4 +1,4 @@ -package com.pancm.sql.easticsearch; +package com.pancm.test.esTest; import java.io.IOException; import java.util.HashMap; @@ -11,6 +11,10 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; @@ -41,8 +45,8 @@ import org.slf4j.LoggerFactory; /** * @Title: EsHighLevelRestTest1 - * @Description: Java High Level REST Client Es高级客户端使用教程一 (基本CRUD使用) - 官方文档地址: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html + * @Description: Java High Level REST Client Es高级客户端使用教程一 (基本CRUD使用) 官方文档地址: + * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html * @since jdk 1.8 * @Version:1.0.0 * @author pancm @@ -54,7 +58,7 @@ public class EsHighLevelRestTest1 { private static int elasticPort = 9200; private static Logger logger = LoggerFactory.getLogger(EsHighLevelRestTest1.class); - + private static RestHighLevelClient client = null; public static void main(String[] args) { @@ -77,7 +81,8 @@ public class EsHighLevelRestTest1 { * 初始化服务 */ private static void init() { - client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticIp, elasticPort, "http"))); + + client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticIp, elasticPort))); } @@ -111,7 +116,7 @@ public class EsHighLevelRestTest1 { * 第一种方式,通过jsonString进行创建 */ // json - String jsonString = "{" + "\"user\":\"pancm\"," + "\"postDate\":\"2019-03-08\","+ "\"age\":\"18\"," + String jsonString = "{" + "\"user\":\"pancm\"," + "\"postDate\":\"2019-03-08\"," + "\"age\":\"18\"," + "\"message\":\"study Elasticsearch\"" + "}"; request.source(jsonString, XContentType.JSON); @@ -144,11 +149,9 @@ public class EsHighLevelRestTest1 { request.source(builder); IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT); - - - - //对响应结果进行处理 - + + // 对响应结果进行处理 + String index1 = indexResponse.getIndex(); String type1 = indexResponse.getType(); String id1 = indexResponse.getId(); @@ -169,7 +172,82 @@ public class EsHighLevelRestTest1 { } } - System.out.println("创建成功!"); + System.out.println("创建索引库成功!"); + + // 类型 + String type2 = "student"; + String index2 = "student"; + + // setting 的值 + Map setmapping = new HashMap<>(); + + // 分区数、路由分片数、副本数、缓存刷新时间 + setmapping.put("number_of_shards", 12); + setmapping.put("number_of_routing_shards", 24); + setmapping.put("number_of_replicas", 1); + setmapping.put("refresh_interval", "5s"); + + + + // mapping 的值 +// Map mapping = new HashMap<>(); +// +// mapping.put("id", "long"); +// mapping.put("name", "keyword"); + + + Map jsonMap2 = new HashMap<>(); + Map message = new HashMap<>(); + //设置类型 + message.put("type", "text"); + Map properties = new HashMap<>(); + //设置字段message信息 + properties.put("message", message); + Map mapping = new HashMap<>(); + mapping.put("properties", properties); + jsonMap2.put(type2, mapping); + + + GetIndexRequest getRequest2 = new GetIndexRequest(); + getRequest2.indices(index); + getRequest2.local(false); + getRequest2.humanReadable(true); + boolean exists2 = client.indices().exists(getRequest2, RequestOptions.DEFAULT); + //如果存在就不创建了 + if(exists2) { + System.out.println(type2+"索引库已经存在!"); + return; + } + + // 开始创建库 + CreateIndexRequest request2 = new CreateIndexRequest(index2); + try { + // 加载数据类型 + request2.settings(setmapping); + //第二种方式 +// request2.settings(Settings.builder() +// .put("index.number_of_shards", 3) +// .put("index.number_of_replicas", 1)); + + + //设置mapping参数 + request2.mapping(type2, jsonMap2); + + //设置别名 + request2.alias(new Alias("user_alias")); + + + CreateIndexResponse createIndexResponse = client.indices().create(request2, RequestOptions.DEFAULT); + boolean falg = createIndexResponse.isAcknowledged(); + logger.info("创建索引库" + index + ",状态为:" + falg); + } catch (IOException e) { + logger.error("创建INDEX报错", e); + } catch (NullPointerException e) { + logger.error("模板文件中的mappings或settings不能为空", e); + } + + + } /** @@ -194,7 +272,7 @@ public class EsHighLevelRestTest1 { } catch (ElasticsearchException e) { // 如果是索引不存在 if (e.status() == RestStatus.NOT_FOUND) { - System.out.println("该索引库不存在!"+index); + System.out.println("该索引库不存在!" + index); } } @@ -204,17 +282,17 @@ public class EsHighLevelRestTest1 { String sourceAsString = getResponse.getSourceAsString(); Map sourceAsMap = getResponse.getSourceAsMap(); byte[] sourceAsBytes = getResponse.getSourceAsBytes(); - System.out.println("查询返回结果String:"+sourceAsString); - System.out.println("查询返回结果Map:"+sourceAsMap); + System.out.println("查询返回结果String:" + sourceAsString); + System.out.println("查询返回结果Map:" + sourceAsMap); } else { System.out.println("没有找到该数据!"); } } - - + /** - * 是否存在 - * @throws IOException + * 是否存在 + * + * @throws IOException */ private static void exists() throws IOException { String index = "user"; @@ -223,30 +301,30 @@ public class EsHighLevelRestTest1 { String id = "1"; // 创建查询请求 GetRequest getRequest = new GetRequest(index, type, id); + boolean exists = client.exists(getRequest, RequestOptions.DEFAULT); - ActionListener listener = new ActionListener() { - @Override - public void onResponse(Boolean exists) { - System.out.println("=="+exists); - } + @Override + public void onResponse(Boolean exists) { + System.out.println("==" + exists); + } - @Override - public void onFailure(Exception e) { - System.out.println("失败的原因:"+e.getMessage()); - } + @Override + public void onFailure(Exception e) { + System.out.println("失败的原因:" + e.getMessage()); + } }; - //进行异步监听 + // 进行异步监听 // client.existsAsync(getRequest, RequestOptions.DEFAULT, listener); - - System.out.println("是否存在:"+exists); + + System.out.println("数据是否存在:" + exists); } - - + /** - * 更新操作 + * 更新操作 + * * @throws IOException */ private static void update() throws IOException { @@ -254,31 +332,32 @@ public class EsHighLevelRestTest1 { String type = "userindex"; // 唯一编号 String id = "1"; - UpdateRequest upateRequest=new UpdateRequest(); + UpdateRequest upateRequest = new UpdateRequest(); upateRequest.id(id); upateRequest.index(index); upateRequest.type(type); - - //依旧可以使用Map这种集合作为更新条件 + + // 依旧可以使用Map这种集合作为更新条件 Map jsonMap = new HashMap<>(); jsonMap.put("user", "xuwujing"); jsonMap.put("postDate", "2019-03-11"); - + upateRequest.doc(jsonMap); - + // upateRequest.docAsUpsert(true); // upsert 方法表示如果数据不存在,那么就新增一条 upateRequest.upsert(jsonMap); - + client.update(upateRequest, RequestOptions.DEFAULT); System.out.println("更新成功!"); - + } - + /** * 删除 - * @throws IOException + * + * @throws IOException * */ private static void delete() throws IOException { @@ -287,189 +366,169 @@ public class EsHighLevelRestTest1 { String type = "userindex"; // 唯一编号 String id = "1"; - DeleteRequest deleteRequest=new DeleteRequest(); + DeleteRequest deleteRequest = new DeleteRequest(); deleteRequest.id(id); deleteRequest.index(index); deleteRequest.type(type); - - //设置超时时间 - deleteRequest.timeout(TimeValue.timeValueMinutes(2)); - //设置刷新策略"wait_for" - //保持此请求打开,直到刷新使此请求的内容可以搜索为止。此刷新策略与高索引和搜索吞吐量兼容,但它会导致请求等待响应,直到发生刷新 - deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); - - //同步删除 - DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT); - - /* - * 异步删除操作 - */ - - //进行监听 - ActionListener listener = new ActionListener() { - @Override - public void onResponse(DeleteResponse deleteResponse) { - System.out.println("响应:"+deleteResponse); - } - @Override - public void onFailure(Exception e) { - System.out.println("删除监听异常:"+e.getMessage()); - } + // 设置超时时间 + deleteRequest.timeout(TimeValue.timeValueMinutes(2)); + // 设置刷新策略"wait_for" + // 保持此请求打开,直到刷新使此请求的内容可以搜索为止。此刷新策略与高索引和搜索吞吐量兼容,但它会导致请求等待响应,直到发生刷新 + deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + + // 同步删除 + DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT); + + /* + * 异步删除操作 + */ + + // 进行监听 + ActionListener listener = new ActionListener() { + @Override + public void onResponse(DeleteResponse deleteResponse) { + System.out.println("响应:" + deleteResponse); + } + + @Override + public void onFailure(Exception e) { + System.out.println("删除监听异常:" + e.getMessage()); + } }; - - //异步删除 + + // 异步删除 // client.deleteAsync(deleteRequest, RequestOptions.DEFAULT, listener); - + ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); - //如果处理成功碎片的数量少于总碎片的情况,说明还在处理或者处理发生异常 + // 如果处理成功碎片的数量少于总碎片的情况,说明还在处理或者处理发生异常 if (shardInfo.getTotal() != shardInfo.getSuccessful()) { - System.out.println("需要处理的碎片总量:"+shardInfo.getTotal()); - System.out.println("处理成功的碎片总量:"+shardInfo.getSuccessful()); + System.out.println("需要处理的碎片总量:" + shardInfo.getTotal()); + System.out.println("处理成功的碎片总量:" + shardInfo.getSuccessful()); } - + if (shardInfo.getFailed() > 0) { - for (ReplicationResponse.ShardInfo.Failure failure : - shardInfo.getFailures()) { - String reason = failure.reason(); - } + for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { + String reason = failure.reason(); + } } System.out.println("删除成功!"); } - - + /** - * 批量操作示例 - * @throws InterruptedException + * 批量操作示例 + * + * @throws InterruptedException */ - private static void bulk() throws IOException, InterruptedException { + private static void bulk() throws IOException, InterruptedException { String index = "estest"; String type = "estest"; - - BulkRequest request = new BulkRequest(); - //批量新增 - request.add(new IndexRequest(index, type, "1") - .source(XContentType.JSON,"field", "foo")); - request.add(new IndexRequest(index, type, "2") - .source(XContentType.JSON,"field", "bar")); - request.add(new IndexRequest(index, type, "3") - .source(XContentType.JSON,"field", "baz")); - - //可以进行修改/删除/新增 操作 - request.add(new UpdateRequest(index, type, "2") - .doc(XContentType.JSON,"field", "test")); - request.add(new DeleteRequest(index, type, "3")); - request.add(new IndexRequest(index, type, "4") - .source(XContentType.JSON,"field", "baz")); - - + + BulkRequest request = new BulkRequest(); + // 批量新增 + request.add(new IndexRequest(index, type, "1").source(XContentType.JSON, "field", "foo")); + request.add(new IndexRequest(index, type, "2").source(XContentType.JSON, "field", "bar")); + request.add(new IndexRequest(index, type, "3").source(XContentType.JSON, "field", "baz")); + + // 可以进行修改/删除/新增 操作 + request.add(new UpdateRequest(index, type, "2").doc(XContentType.JSON, "field", "test")); + request.add(new DeleteRequest(index, type, "3")); + request.add(new IndexRequest(index, type, "4").source(XContentType.JSON, "field", "baz")); + BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); - - //可以快速检查一个或多个操作是否失败 true是有至少一个失败! - if (bulkResponse.hasFailures()) { + + // 可以快速检查一个或多个操作是否失败 true是有至少一个失败! + if (bulkResponse.hasFailures()) { System.out.println("有一个操作失败!"); } - - //对处理结果进行遍历操作并根据不同的操作进行处理 - for (BulkItemResponse bulkItemResponse : bulkResponse) { - DocWriteResponse itemResponse = bulkItemResponse.getResponse(); - - //操作失败的进行处理 - if (bulkItemResponse.isFailed()) { - BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); - } - - if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX - || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { - IndexResponse indexResponse = (IndexResponse) itemResponse; + // 对处理结果进行遍历操作并根据不同的操作进行处理 + for (BulkItemResponse bulkItemResponse : bulkResponse) { + DocWriteResponse itemResponse = bulkItemResponse.getResponse(); - } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { - UpdateResponse updateResponse = (UpdateResponse) itemResponse; + // 操作失败的进行处理 + if (bulkItemResponse.isFailed()) { + BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); + + } + + if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX + || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { + IndexResponse indexResponse = (IndexResponse) itemResponse; + + } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { + UpdateResponse updateResponse = (UpdateResponse) itemResponse; + + } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { + DeleteResponse deleteResponse = (DeleteResponse) itemResponse; + } + } - } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { - DeleteResponse deleteResponse = (DeleteResponse) itemResponse; - } - } - System.out.println("批量执行成功!"); - - - + /* - * 批量执行处理器相关示例代码 + * 批量执行处理器相关示例代码 */ - - - //批量处理器的监听器设置 - + + // 批量处理器的监听器设置 + BulkProcessor.Listener listener = new BulkProcessor.Listener() { - - //在执行BulkRequest的每次执行之前调用,这个方法允许知道将要在BulkRequest中执行的操作的数量 - @Override - public void beforeBulk(long executionId, BulkRequest request) { - int numberOfActions = request.numberOfActions(); - logger.debug("Executing bulk [{}] with {} requests", - executionId, numberOfActions); - } - - //在每次执行BulkRequest之后调用,这个方法允许知道BulkResponse是否包含错误 - @Override - public void afterBulk(long executionId, BulkRequest request, - BulkResponse response) { - if (response.hasFailures()) { - logger.warn("Bulk [{}] executed with failures", executionId); - } else { - logger.debug("Bulk [{}] completed in {} milliseconds", - executionId, response.getTook().getMillis()); - } - } - - //如果BulkRequest失败,则调用该方法,该方法允许知道失败 - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - logger.error("Failed to execute bulk", failure); - } + + // 在执行BulkRequest的每次执行之前调用,这个方法允许知道将要在BulkRequest中执行的操作的数量 + @Override + public void beforeBulk(long executionId, BulkRequest request) { + int numberOfActions = request.numberOfActions(); + logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); + } + + // 在每次执行BulkRequest之后调用,这个方法允许知道BulkResponse是否包含错误 + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { + logger.warn("Bulk [{}] executed with failures", executionId); + } else { + logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); + } + } + + // 如果BulkRequest失败,则调用该方法,该方法允许知道失败 + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + logger.error("Failed to execute bulk", failure); + } }; - - BiConsumer> bulkConsumer = - (request2, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); - //创建一个批量执行的处理器 - BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build(); + + BiConsumer> bulkConsumer = (request2, bulkListener) -> client + .bulkAsync(request, RequestOptions.DEFAULT, bulkListener); + // 创建一个批量执行的处理器 + BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build(); BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); - //根据当前添加的操作数量设置刷新新批量请求的时间(默认为1000,使用-1禁用它) - builder.setBulkActions(500); - //根据当前添加的操作大小设置刷新新批量请求的时间(默认为5Mb,使用-1禁用) - builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); - //设置允许执行的并发请求数量(默认为1,使用0只允许执行单个请求) - builder.setConcurrentRequests(0); - //设置刷新间隔如果间隔通过,则刷新任何挂起的BulkRequest(默认为未设置) - builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); - //设置一个常量后退策略,该策略最初等待1秒并重试最多3次。 - builder.setBackoffPolicy(BackoffPolicy - .constantBackoff(TimeValue.timeValueSeconds(1L), 3)); - - - IndexRequest one = new IndexRequest(index, type, "1"). - source(XContentType.JSON, "title", - "In which order are my Elasticsearch queries executed?"); - IndexRequest two = new IndexRequest(index, type, "2") - .source(XContentType.JSON, "title", - "Current status and upcoming changes in Elasticsearch"); - IndexRequest three = new IndexRequest(index, type, "3") - .source(XContentType.JSON, "title", - "The Future of Federated Search in Elasticsearch"); + // 根据当前添加的操作数量设置刷新新批量请求的时间(默认为1000,使用-1禁用它) + builder.setBulkActions(500); + // 根据当前添加的操作大小设置刷新新批量请求的时间(默认为5Mb,使用-1禁用) + builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); + // 设置允许执行的并发请求数量(默认为1,使用0只允许执行单个请求) + builder.setConcurrentRequests(0); + // 设置刷新间隔如果间隔通过,则刷新任何挂起的BulkRequest(默认为未设置) + builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); + // 设置一个常量后退策略,该策略最初等待1秒并重试最多3次。 + builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); + + IndexRequest one = new IndexRequest(index, type, "1").source(XContentType.JSON, "title", + "In which order are my Elasticsearch queries executed?"); + IndexRequest two = new IndexRequest(index, type, "2").source(XContentType.JSON, "title", + "Current status and upcoming changes in Elasticsearch"); + IndexRequest three = new IndexRequest(index, type, "3").source(XContentType.JSON, "title", + "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three); - - - //如果所有大容量请求都已完成,则该方法返回true;如果在所有大容量请求完成之前的等待时间已经过去,则返回false - boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); - - System.out.println("请求的响应结果:"+terminated); - - + + // 如果所有大容量请求都已完成,则该方法返回true;如果在所有大容量请求完成之前的等待时间已经过去,则返回false + boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); + + System.out.println("请求的响应结果:" + terminated); + } - + } diff --git a/src/main/java/com/pancm/sql/easticsearch/JestTest.java b/src/main/java/com/pancm/sql/easticsearch/JestTest.java index 13c4127..ce41f99 100644 --- a/src/main/java/com/pancm/sql/easticsearch/JestTest.java +++ b/src/main/java/com/pancm/sql/easticsearch/JestTest.java @@ -1,26 +1,20 @@ -package com.pancm.sql.easticsearch; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.builder.SearchSourceBuilder; +package com.pancm.test.esTest; import io.searchbox.client.JestClient; import io.searchbox.client.JestClientFactory; import io.searchbox.client.JestResult; import io.searchbox.client.config.HttpClientConfig; -import io.searchbox.core.Bulk; -import io.searchbox.core.BulkResult; -import io.searchbox.core.Delete; -import io.searchbox.core.DocumentResult; -import io.searchbox.core.Index; -import io.searchbox.core.Search; +import io.searchbox.core.*; import io.searchbox.indices.CreateIndex; import io.searchbox.indices.DeleteIndex; import io.searchbox.indices.mapping.GetMapping; import io.searchbox.indices.mapping.PutMapping; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; /** * @@ -34,7 +28,7 @@ public class JestTest { private static JestClient jestClient; private static String indexName = "userindex"; private static String typeName = "user"; - private static String elasticIps="http://127.0.0.1:9200"; + private static String elasticIps="http://192.169.0.23:9200,http://192.169.0.24:9200"; public static void main(String[] args) throws Exception { @@ -48,8 +42,13 @@ public class JestTest { } private static JestClient getJestClient() { - JestClientFactory factory = new JestClientFactory(); - factory.setHttpClientConfig(new HttpClientConfig.Builder(elasticIps).connTimeout(60000).readTimeout(60000).multiThreaded(true).build()); + JestClientFactory factory = new JestClientFactory(); + //指定集群地址,设置超时时间,开启多线程,并发连接最多两个,最大连接数 + factory.setHttpClientConfig(new HttpClientConfig.Builder(elasticIps) + .connTimeout(60000).readTimeout(60000).multiThreaded(true) + .defaultMaxTotalConnectionPerRoute(2) + .maxTotalConnection(10) + .build()); return factory.getObject(); } @@ -174,7 +173,11 @@ public class JestTest { Index index = new Index.Builder(obj).build(); bulk.addAction(index); } + BulkResult br = jestClient.execute(bulk.build()); + System.out.println("DSL语句:"+br.toString()); + System.out.println("DSL语句:"+br.getJsonString()); + System.out.println("DSL语句:"+br.getJsonObject()); return br.isSucceeded(); } @@ -275,9 +278,9 @@ public class JestTest { } - /** - * 设置编号 - * @param Long id + /** + * 设置编号 + * @param Long id */ public void setId(Long id) { this.id = id; @@ -295,7 +298,7 @@ public class JestTest { /** * 设置姓名 - * @param String name + * @param String name */ public void setName(String name) { this.name = name; @@ -313,7 +316,7 @@ public class JestTest { /** * 设置年龄 - * @param Integer age + * @param age */ public void setAge(Integer age) { this.age = age;