es 测试类提交

This commit is contained in:
xuwujing
2019-03-30 19:01:51 +08:00
parent 3f26e9165d
commit a4660c8c10
3 changed files with 294 additions and 233 deletions

View File

@@ -1,9 +1,4 @@
package com.pancm.sql.easticsearch;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
package com.pancm.test.esTest;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
@@ -12,6 +7,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.common.unit.TimeValue;
@@ -35,6 +31,11 @@ import org.elasticsearch.search.suggest.term.TermSuggestion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @Title: EsHighLevelRestSearchTest
@@ -49,7 +50,6 @@ public class EsHighLevelRestSearchTest {
private static String elasticIp = "192.169.0.23";
private static int elasticPort = 9200;
private static Logger logger = LoggerFactory.getLogger(EsHighLevelRestSearchTest.class);
private static RestHighLevelClient client = null;
@@ -76,7 +76,8 @@ public class EsHighLevelRestSearchTest {
* 初始化服务
*/
private static void init() {
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticIp, elasticPort, "http")));
RestClientBuilder restClientBuilder =RestClient.builder(new HttpHost(elasticIp, elasticPort));
client = new RestHighLevelClient(restClientBuilder);
}
@@ -89,6 +90,8 @@ public class EsHighLevelRestSearchTest {
client.close();
} catch (IOException e) {
e.printStackTrace();
}finally{
client=null;
}
}
}
@@ -331,14 +334,10 @@ public class EsHighLevelRestSearchTest {
}
}
}
}

View File

@@ -1,4 +1,4 @@
package com.pancm.sql.easticsearch;
package com.pancm.test.esTest;
import java.io.IOException;
import java.util.HashMap;
@@ -11,6 +11,10 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
@@ -41,8 +45,8 @@ import org.slf4j.LoggerFactory;
/**
* @Title: EsHighLevelRestTest1
* @Description: Java High Level REST Client Es高级客户端使用教程一 (基本CRUD使用)
官方文档地址: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html
* @Description: Java High Level REST Client Es高级客户端使用教程一 (基本CRUD使用) 官方文档地址:
* https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html
* @since jdk 1.8
* @Version:1.0.0
* @author pancm
@@ -54,7 +58,7 @@ public class EsHighLevelRestTest1 {
private static int elasticPort = 9200;
private static Logger logger = LoggerFactory.getLogger(EsHighLevelRestTest1.class);
private static RestHighLevelClient client = null;
public static void main(String[] args) {
@@ -77,7 +81,8 @@ public class EsHighLevelRestTest1 {
* 初始化服务
*/
private static void init() {
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticIp, elasticPort, "http")));
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticIp, elasticPort)));
}
@@ -111,7 +116,7 @@ public class EsHighLevelRestTest1 {
* 第一种方式通过jsonString进行创建
*/
// json
String jsonString = "{" + "\"user\":\"pancm\"," + "\"postDate\":\"2019-03-08\","+ "\"age\":\"18\","
String jsonString = "{" + "\"user\":\"pancm\"," + "\"postDate\":\"2019-03-08\"," + "\"age\":\"18\","
+ "\"message\":\"study Elasticsearch\"" + "}";
request.source(jsonString, XContentType.JSON);
@@ -144,11 +149,9 @@ public class EsHighLevelRestTest1 {
request.source(builder);
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
//对响应结果进行处理
// 对响应结果进行处理
String index1 = indexResponse.getIndex();
String type1 = indexResponse.getType();
String id1 = indexResponse.getId();
@@ -169,7 +172,82 @@ public class EsHighLevelRestTest1 {
}
}
System.out.println("创建成功!");
System.out.println("创建索引库成功!");
// 类型
String type2 = "student";
String index2 = "student";
// setting 的值
Map<String, Object> setmapping = new HashMap<>();
// 分区数、路由分片数、副本数、缓存刷新时间
setmapping.put("number_of_shards", 12);
setmapping.put("number_of_routing_shards", 24);
setmapping.put("number_of_replicas", 1);
setmapping.put("refresh_interval", "5s");
// mapping 的值
// Map<String, Object> mapping = new HashMap<>();
//
// mapping.put("id", "long");
// mapping.put("name", "keyword");
Map<String, Object> jsonMap2 = new HashMap<>();
Map<String, Object> message = new HashMap<>();
//设置类型
message.put("type", "text");
Map<String, Object> properties = new HashMap<>();
//设置字段message信息
properties.put("message", message);
Map<String, Object> mapping = new HashMap<>();
mapping.put("properties", properties);
jsonMap2.put(type2, mapping);
GetIndexRequest getRequest2 = new GetIndexRequest();
getRequest2.indices(index);
getRequest2.local(false);
getRequest2.humanReadable(true);
boolean exists2 = client.indices().exists(getRequest2, RequestOptions.DEFAULT);
//如果存在就不创建了
if(exists2) {
System.out.println(type2+"索引库已经存在!");
return;
}
// 开始创建库
CreateIndexRequest request2 = new CreateIndexRequest(index2);
try {
// 加载数据类型
request2.settings(setmapping);
//第二种方式
// request2.settings(Settings.builder()
// .put("index.number_of_shards", 3)
// .put("index.number_of_replicas", 1));
//设置mapping参数
request2.mapping(type2, jsonMap2);
//设置别名
request2.alias(new Alias("user_alias"));
CreateIndexResponse createIndexResponse = client.indices().create(request2, RequestOptions.DEFAULT);
boolean falg = createIndexResponse.isAcknowledged();
logger.info("创建索引库" + index + ",状态为:" + falg);
} catch (IOException e) {
logger.error("创建INDEX报错", e);
} catch (NullPointerException e) {
logger.error("模板文件中的mappings或settings不能为空", e);
}
}
/**
@@ -194,7 +272,7 @@ public class EsHighLevelRestTest1 {
} catch (ElasticsearchException e) {
// 如果是索引不存在
if (e.status() == RestStatus.NOT_FOUND) {
System.out.println("该索引库不存在!"+index);
System.out.println("该索引库不存在!" + index);
}
}
@@ -204,17 +282,17 @@ public class EsHighLevelRestTest1 {
String sourceAsString = getResponse.getSourceAsString();
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
System.out.println("查询返回结果String:"+sourceAsString);
System.out.println("查询返回结果Map:"+sourceAsMap);
System.out.println("查询返回结果String:" + sourceAsString);
System.out.println("查询返回结果Map:" + sourceAsMap);
} else {
System.out.println("没有找到该数据!");
}
}
/**
* 是否存在
* @throws IOException
* 是否存在
*
* @throws IOException
*/
private static void exists() throws IOException {
String index = "user";
@@ -223,30 +301,30 @@ public class EsHighLevelRestTest1 {
String id = "1";
// 创建查询请求
GetRequest getRequest = new GetRequest(index, type, id);
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
ActionListener<Boolean> listener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean exists) {
System.out.println("=="+exists);
}
@Override
public void onResponse(Boolean exists) {
System.out.println("==" + exists);
}
@Override
public void onFailure(Exception e) {
System.out.println("失败的原因:"+e.getMessage());
}
@Override
public void onFailure(Exception e) {
System.out.println("失败的原因:" + e.getMessage());
}
};
//进行异步监听
// 进行异步监听
// client.existsAsync(getRequest, RequestOptions.DEFAULT, listener);
System.out.println("是否存在:"+exists);
System.out.println("数据是否存在:" + exists);
}
/**
* 更新操作
* 更新操作
*
* @throws IOException
*/
private static void update() throws IOException {
@@ -254,31 +332,32 @@ public class EsHighLevelRestTest1 {
String type = "userindex";
// 唯一编号
String id = "1";
UpdateRequest upateRequest=new UpdateRequest();
UpdateRequest upateRequest = new UpdateRequest();
upateRequest.id(id);
upateRequest.index(index);
upateRequest.type(type);
//依旧可以使用Map这种集合作为更新条件
// 依旧可以使用Map这种集合作为更新条件
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "xuwujing");
jsonMap.put("postDate", "2019-03-11");
upateRequest.doc(jsonMap);
//
upateRequest.docAsUpsert(true);
// upsert 方法表示如果数据不存在,那么就新增一条
upateRequest.upsert(jsonMap);
client.update(upateRequest, RequestOptions.DEFAULT);
System.out.println("更新成功!");
}
/**
* 删除
* @throws IOException
*
* @throws IOException
*
*/
private static void delete() throws IOException {
@@ -287,189 +366,169 @@ public class EsHighLevelRestTest1 {
String type = "userindex";
// 唯一编号
String id = "1";
DeleteRequest deleteRequest=new DeleteRequest();
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.id(id);
deleteRequest.index(index);
deleteRequest.type(type);
//设置超时时间
deleteRequest.timeout(TimeValue.timeValueMinutes(2));
//设置刷新策略"wait_for"
//保持此请求打开,直到刷新使此请求的内容可以搜索为止。此刷新策略与高索引和搜索吞吐量兼容,但它会导致请求等待响应,直到发生刷新
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
//同步删除
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
/*
* 异步删除操作
*/
//进行监听
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
System.out.println("响应:"+deleteResponse);
}
@Override
public void onFailure(Exception e) {
System.out.println("删除监听异常:"+e.getMessage());
}
// 设置超时时间
deleteRequest.timeout(TimeValue.timeValueMinutes(2));
// 设置刷新策略"wait_for"
// 保持此请求打开,直到刷新使此请求的内容可以搜索为止。此刷新策略与高索引和搜索吞吐量兼容,但它会导致请求等待响应,直到发生刷新
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
// 同步删除
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
/*
* 异步删除操作
*/
// 进行监听
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
System.out.println("响应:" + deleteResponse);
}
@Override
public void onFailure(Exception e) {
System.out.println("删除监听异常:" + e.getMessage());
}
};
//异步删除
// 异步删除
// client.deleteAsync(deleteRequest, RequestOptions.DEFAULT, listener);
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
//如果处理成功碎片的数量少于总碎片的情况,说明还在处理或者处理发生异常
// 如果处理成功碎片的数量少于总碎片的情况,说明还在处理或者处理发生异常
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
System.out.println("需要处理的碎片总量:"+shardInfo.getTotal());
System.out.println("处理成功的碎片总量:"+shardInfo.getSuccessful());
System.out.println("需要处理的碎片总量:" + shardInfo.getTotal());
System.out.println("处理成功的碎片总量:" + shardInfo.getSuccessful());
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
}
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
}
}
System.out.println("删除成功!");
}
/**
* 批量操作示例
* @throws InterruptedException
* 批量操作示例
*
* @throws InterruptedException
*/
private static void bulk() throws IOException, InterruptedException {
private static void bulk() throws IOException, InterruptedException {
String index = "estest";
String type = "estest";
BulkRequest request = new BulkRequest();
//批量新增
request.add(new IndexRequest(index, type, "1")
.source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest(index, type, "2")
.source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest(index, type, "3")
.source(XContentType.JSON,"field", "baz"));
//可以进行修改/删除/新增 操作
request.add(new UpdateRequest(index, type, "2")
.doc(XContentType.JSON,"field", "test"));
request.add(new DeleteRequest(index, type, "3"));
request.add(new IndexRequest(index, type, "4")
.source(XContentType.JSON,"field", "baz"));
BulkRequest request = new BulkRequest();
// 批量新增
request.add(new IndexRequest(index, type, "1").source(XContentType.JSON, "field", "foo"));
request.add(new IndexRequest(index, type, "2").source(XContentType.JSON, "field", "bar"));
request.add(new IndexRequest(index, type, "3").source(XContentType.JSON, "field", "baz"));
// 可以进行修改/删除/新增 操作
request.add(new UpdateRequest(index, type, "2").doc(XContentType.JSON, "field", "test"));
request.add(new DeleteRequest(index, type, "3"));
request.add(new IndexRequest(index, type, "4").source(XContentType.JSON, "field", "baz"));
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
//可以快速检查一个或多个操作是否失败 true是有至少一个失败
if (bulkResponse.hasFailures()) {
// 可以快速检查一个或多个操作是否失败 true是有至少一个失败
if (bulkResponse.hasFailures()) {
System.out.println("有一个操作失败!");
}
//对处理结果进行遍历操作并根据不同的操作进行处理
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
//操作失败的进行处理
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
}
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
// 对处理结果进行遍历操作并根据不同的操作进行处理
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
// 操作失败的进行处理
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
}
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
System.out.println("批量执行成功!");
/*
* 批量执行处理器相关示例代码
* 批量执行处理器相关示例代码
*/
//批量处理器的监听器设置
// 批量处理器的监听器设置
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
//在执行BulkRequest的每次执行之前调用这个方法允许知道将要在BulkRequest中执行的操作的数量
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions);
}
//在每次执行BulkRequest之后调用这个方法允许知道BulkResponse是否包含错误
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}
//如果BulkRequest失败则调用该方法该方法允许知道失败
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Failed to execute bulk", failure);
}
// 在执行BulkRequest的每次执行之前调用这个方法允许知道将要在BulkRequest中执行的操作的数量
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
// 在每次执行BulkRequest之后调用这个方法允许知道BulkResponse是否包含错误
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
}
}
// 如果BulkRequest失败则调用该方法该方法允许知道失败
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Failed to execute bulk", failure);
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request2, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
//创建一个批量执行的处理器
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build();
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request2, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
// 创建一个批量执行的处理器
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build();
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
//根据当前添加的操作数量设置刷新新批量请求的时间(默认为1000使用-1禁用它)
builder.setBulkActions(500);
//根据当前添加的操作大小设置刷新新批量请求的时间(默认为5Mb使用-1禁用)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
//设置允许执行的并发请求数量(默认为1使用0只允许执行单个请求)
builder.setConcurrentRequests(0);
//设置刷新间隔如果间隔通过则刷新任何挂起的BulkRequest(默认为未设置)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
//设置一个常量后退策略该策略最初等待1秒并重试最多3次。
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
IndexRequest one = new IndexRequest(index, type, "1").
source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest(index, type, "2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest(index, type, "3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
// 根据当前添加的操作数量设置刷新新批量请求的时间(默认为1000使用-1禁用它)
builder.setBulkActions(500);
// 根据当前添加的操作大小设置刷新新批量请求的时间(默认为5Mb使用-1禁用)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
// 设置允许执行的并发请求数量(默认为1使用0只允许执行单个请求)
builder.setConcurrentRequests(0);
// 设置刷新间隔如果间隔通过则刷新任何挂起的BulkRequest(默认为未设置)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
// 设置一个常量后退策略该策略最初等待1秒并重试最多3次。
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
IndexRequest one = new IndexRequest(index, type, "1").source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest(index, type, "2").source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest(index, type, "3").source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);
//如果所有大容量请求都已完成则该方法返回true;如果在所有大容量请求完成之前的等待时间已经过去则返回false
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
System.out.println("请求的响应结果:"+terminated);
// 如果所有大容量请求都已完成则该方法返回true;如果在所有大容量请求完成之前的等待时间已经过去则返回false
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
System.out.println("请求的响应结果:" + terminated);
}
}

View File

@@ -1,26 +1,20 @@
package com.pancm.sql.easticsearch;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
package com.pancm.test.esTest;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.Delete;
import io.searchbox.core.DocumentResult;
import io.searchbox.core.Index;
import io.searchbox.core.Search;
import io.searchbox.core.*;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import io.searchbox.indices.mapping.GetMapping;
import io.searchbox.indices.mapping.PutMapping;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
*
@@ -34,7 +28,7 @@ public class JestTest {
private static JestClient jestClient;
private static String indexName = "userindex";
private static String typeName = "user";
private static String elasticIps="http://127.0.0.1:9200";
private static String elasticIps="http://192.169.0.23:9200,http://192.169.0.24:9200";
public static void main(String[] args) throws Exception {
@@ -48,8 +42,13 @@ public class JestTest {
}
private static JestClient getJestClient() {
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(new HttpClientConfig.Builder(elasticIps).connTimeout(60000).readTimeout(60000).multiThreaded(true).build());
JestClientFactory factory = new JestClientFactory();
//指定集群地址,设置超时时间,开启多线程,并发连接最多两个,最大连接数
factory.setHttpClientConfig(new HttpClientConfig.Builder(elasticIps)
.connTimeout(60000).readTimeout(60000).multiThreaded(true)
.defaultMaxTotalConnectionPerRoute(2)
.maxTotalConnection(10)
.build());
return factory.getObject();
}
@@ -174,7 +173,11 @@ public class JestTest {
Index index = new Index.Builder(obj).build();
bulk.addAction(index);
}
BulkResult br = jestClient.execute(bulk.build());
System.out.println("DSL语句:"+br.toString());
System.out.println("DSL语句:"+br.getJsonString());
System.out.println("DSL语句:"+br.getJsonObject());
return br.isSucceeded();
}
@@ -275,9 +278,9 @@ public class JestTest {
}
/**
* 设置编号
* @param Long id
/**
* 设置编号
* @param Long id
*/
public void setId(Long id) {
this.id = id;
@@ -295,7 +298,7 @@ public class JestTest {
/**
* 设置姓名
* @param String name
* @param String name
*/
public void setName(String name) {
this.name = name;
@@ -313,7 +316,7 @@ public class JestTest {
/**
* 设置年龄
* @param Integer age
* @param age
*/
public void setAge(Integer age) {
this.age = age;