1.增加es使用示例
This commit is contained in:
parent
297ed8b53a
commit
d415a280e1
@ -49,7 +49,7 @@ import java.util.concurrent.TimeUnit;
|
||||
*/
|
||||
public class EsHighLevelRestSearchTest {
|
||||
|
||||
private static String elasticIp = "192.169.2.98";
|
||||
private static String elasticIp = "192.169.0.23";
|
||||
private static int elasticPort = 9200;
|
||||
private static Logger logger = LoggerFactory.getLogger(EsHighLevelRestSearchTest.class);
|
||||
|
||||
@ -67,9 +67,11 @@ public class EsHighLevelRestSearchTest {
|
||||
genSearch();
|
||||
orSearch();
|
||||
likeSearch();
|
||||
inSearch();
|
||||
// inSearch();
|
||||
existSearch();
|
||||
rangeSearch();
|
||||
regexpSearch();
|
||||
boolSearch();
|
||||
// search();
|
||||
// search2();
|
||||
|
||||
@ -80,7 +82,7 @@ public class EsHighLevelRestSearchTest {
|
||||
}
|
||||
|
||||
/**
|
||||
* p_test
|
||||
* test1
|
||||
*{
|
||||
* "settings" : {
|
||||
* "number_of_shards" : 10,
|
||||
@ -92,7 +94,7 @@ public class EsHighLevelRestSearchTest {
|
||||
* "uid" : { "type" : "long" },
|
||||
* "phone" : { "type" : "long" },
|
||||
* "userid" : { "type" : "keyword" },
|
||||
* "sendday" : { "type" : "long" },
|
||||
*
|
||||
* "message" : { "type" : "keyword" },
|
||||
* "msgcode" : { "type" : "long" },
|
||||
* "price" : { "type" : "double","index": "false" },
|
||||
@ -114,6 +116,69 @@ public class EsHighLevelRestSearchTest {
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @Author pancm
|
||||
* @Description 组合查询
|
||||
* @Date 2019/9/30
|
||||
* @Param []
|
||||
* @return void
|
||||
**/
|
||||
private static void boolSearch() throws IOException{
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
// 查询指定的索引库
|
||||
SearchRequest searchRequest = new SearchRequest(index);
|
||||
searchRequest.types(type);
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
||||
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
|
||||
boolQueryBuilder.must(QueryBuilders.termQuery("uid",12345));
|
||||
boolQueryBuilder.must(QueryBuilders.termQuery("msgcode",1));
|
||||
// 设置查询条件
|
||||
sourceBuilder.query(boolQueryBuilder);
|
||||
searchRequest.source(sourceBuilder);
|
||||
System.out.println("组合查询的DSL语句:"+sourceBuilder.toString());
|
||||
// 同步查询
|
||||
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
// 结果
|
||||
searchResponse.getHits().forEach(hit -> {
|
||||
String string = hit.getSourceAsString();
|
||||
System.out.println("组合查询的String结果:" + string);
|
||||
});
|
||||
System.out.println("\n=================\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* @Author pancm
|
||||
* @Description 正则查询
|
||||
* @Date 2019/9/30
|
||||
* @Param []
|
||||
* @return void
|
||||
**/
|
||||
private static void regexpSearch() throws IOException{
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
// 查询指定的索引库
|
||||
SearchRequest searchRequest = new SearchRequest(index);
|
||||
searchRequest.types(type);
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
||||
// 设置查询条件
|
||||
sourceBuilder.query(QueryBuilders.regexpQuery("message","xu[0-9]"));
|
||||
searchRequest.source(sourceBuilder);
|
||||
System.out.println("正则查询的DSL语句:"+sourceBuilder.toString());
|
||||
// 同步查询
|
||||
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
// 结果
|
||||
searchResponse.getHits().forEach(hit -> {
|
||||
Map<String, Object> map = hit.getSourceAsMap();
|
||||
String string = hit.getSourceAsString();
|
||||
System.out.println("正则查询的Map结果:" + map);
|
||||
System.out.println("正则查询的String结果:" + string);
|
||||
});
|
||||
|
||||
System.out.println("\n=================\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* @Author pancm
|
||||
* @Description 范围查询
|
||||
@ -122,24 +187,23 @@ public class EsHighLevelRestSearchTest {
|
||||
* @return void
|
||||
**/
|
||||
private static void rangeSearch() throws IOException{
|
||||
// 查询指定的索引库
|
||||
SearchRequest searchRequest = new SearchRequest("user");
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
SearchRequest searchRequest = new SearchRequest(index);
|
||||
searchRequest.types(type);
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
||||
|
||||
// 设置查询条件
|
||||
sourceBuilder.query(QueryBuilders.rangeQuery("sendtime").gte("2019-01-01 00:00:00.000").lte("2019-12-31 23:59:59.999"));
|
||||
sourceBuilder.query(QueryBuilders.rangeQuery("sendtime").gte("2019-01-01 00:00:00").lte("2019-12-31 23:59:59"));
|
||||
searchRequest.source(sourceBuilder);
|
||||
System.out.println("范围查询的DSL语句:"+sourceBuilder.toString());
|
||||
// 同步查询
|
||||
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
// 结果
|
||||
searchResponse.getHits().forEach(hit -> {
|
||||
Map<String, Object> map = hit.getSourceAsMap();
|
||||
String string = hit.getSourceAsString();
|
||||
System.out.println("范围查询的Map结果:" + map);
|
||||
System.out.println("范围查询的String结果:" + string);
|
||||
});
|
||||
|
||||
|
||||
System.out.println("\n=================\n");
|
||||
}
|
||||
|
||||
@ -151,13 +215,17 @@ public class EsHighLevelRestSearchTest {
|
||||
* @Param []
|
||||
**/
|
||||
private static void existSearch() throws IOException {
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
// 查询指定的索引库
|
||||
SearchRequest searchRequest = new SearchRequest("user");
|
||||
SearchRequest searchRequest = new SearchRequest(index);
|
||||
searchRequest.types(type);
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
||||
|
||||
// 设置查询条件
|
||||
sourceBuilder.query(QueryBuilders.existsQuery("uid"));
|
||||
sourceBuilder.query(QueryBuilders.existsQuery("msgcode"));
|
||||
searchRequest.source(sourceBuilder);
|
||||
System.out.println("存在查询的DSL语句:"+sourceBuilder.toString());
|
||||
// 同步查询
|
||||
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
// 结果
|
||||
@ -167,10 +235,7 @@ public class EsHighLevelRestSearchTest {
|
||||
System.out.println("存在查询的Map结果:" + map);
|
||||
System.out.println("存在查询的String结果:" + string);
|
||||
});
|
||||
|
||||
|
||||
System.out.println("\n=================\n");
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@ -181,15 +246,18 @@ public class EsHighLevelRestSearchTest {
|
||||
* @Param []
|
||||
**/
|
||||
private static void inSearch() throws IOException {
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
// 查询指定的索引库
|
||||
SearchRequest searchRequest = new SearchRequest("user");
|
||||
SearchRequest searchRequest = new SearchRequest(index,type);
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
||||
/**
|
||||
* SELECT * FROM p_test where uid in (1,2)
|
||||
* */
|
||||
// 设置查询条件
|
||||
sourceBuilder.query(QueryBuilders.termsQuery("uid", 1, 2));
|
||||
// sourceBuilder.query(QueryBuilders.termsQuery("uid", 1, 2));
|
||||
searchRequest.source(sourceBuilder);
|
||||
System.out.println("in查询的DSL语句:"+sourceBuilder.toString());
|
||||
// 同步查询
|
||||
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
// 结果
|
||||
@ -200,7 +268,6 @@ public class EsHighLevelRestSearchTest {
|
||||
System.out.println("in查询的String结果:" + string);
|
||||
});
|
||||
|
||||
|
||||
System.out.println("\n=================\n");
|
||||
}
|
||||
|
||||
@ -212,25 +279,26 @@ public class EsHighLevelRestSearchTest {
|
||||
* @Param []
|
||||
**/
|
||||
private static void likeSearch() throws IOException {
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.indices("p_test");
|
||||
searchRequest.types("_doc");
|
||||
searchRequest.indices(index);
|
||||
searchRequest.types(type);
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
|
||||
/**
|
||||
* SELECT * FROM p_test where message like '%pan%';
|
||||
* SELECT * FROM p_test where message like '%xu%';
|
||||
* */
|
||||
boolQueryBuilder.must(QueryBuilders.wildcardQuery("message", "*pan*"));
|
||||
boolQueryBuilder.must(QueryBuilders.wildcardQuery("message", "*xu*"));
|
||||
searchSourceBuilder.query(boolQueryBuilder);
|
||||
|
||||
System.out.println("模糊查询语句:" + searchSourceBuilder.toString());
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
// 同步查询
|
||||
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
searchResponse.getHits().forEach(documentFields -> {
|
||||
System.out.println("查询结果:" + documentFields.getSourceAsMap());
|
||||
System.out.println("模糊查询结果:" + documentFields.getSourceAsMap());
|
||||
});
|
||||
|
||||
System.out.println("\n=================\n");
|
||||
}
|
||||
|
||||
|
||||
@ -242,11 +310,14 @@ public class EsHighLevelRestSearchTest {
|
||||
* @Param []
|
||||
**/
|
||||
private static void genSearch() throws IOException {
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
// 查询指定的索引库
|
||||
SearchRequest searchRequest = new SearchRequest("user");
|
||||
SearchRequest searchRequest = new SearchRequest(index);
|
||||
searchRequest.types(type);
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
||||
// 设置查询条件
|
||||
sourceBuilder.query(QueryBuilders.termQuery("user", "pancm"));
|
||||
sourceBuilder.query(QueryBuilders.termQuery("uid", "1234"));
|
||||
// 设置起止和结束
|
||||
sourceBuilder.from(0);
|
||||
sourceBuilder.size(5);
|
||||
@ -273,7 +344,7 @@ public class EsHighLevelRestSearchTest {
|
||||
// sourceBuilder.fetchSource(includeFields, excludeFields);
|
||||
|
||||
searchRequest.source(sourceBuilder);
|
||||
|
||||
System.out.println("普通查询的DSL语句:"+sourceBuilder.toString());
|
||||
// 同步查询
|
||||
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
|
||||
@ -291,18 +362,12 @@ public class EsHighLevelRestSearchTest {
|
||||
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
|
||||
// failures should be handled here
|
||||
}
|
||||
|
||||
// 结果
|
||||
searchResponse.getHits().forEach(hit -> {
|
||||
Map<String, Object> map = hit.getSourceAsMap();
|
||||
String string = hit.getSourceAsString();
|
||||
System.out.println("普通查询的Map结果:" + map);
|
||||
System.out.println("普通查询的String结果:" + string);
|
||||
System.out.println("普通查询的结果:" + map);
|
||||
});
|
||||
|
||||
|
||||
System.out.println("\n=================\n");
|
||||
|
||||
}
|
||||
|
||||
private static void allSearch() throws IOException {
|
||||
@ -316,7 +381,6 @@ public class EsHighLevelRestSearchTest {
|
||||
searchRequestAll.source(searchSourceBuilder);
|
||||
// 同步查询
|
||||
SearchResponse searchResponseAll = client.search(searchRequestAll, RequestOptions.DEFAULT);
|
||||
|
||||
System.out.println("所有查询总数:" + searchResponseAll.getHits().getTotalHits());
|
||||
}
|
||||
|
||||
@ -329,30 +393,27 @@ public class EsHighLevelRestSearchTest {
|
||||
**/
|
||||
private static void orSearch() throws IOException {
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.indices("p_test");
|
||||
searchRequest.indices("test1");
|
||||
searchRequest.types("_doc");
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
|
||||
BoolQueryBuilder boolQueryBuilder2 = new BoolQueryBuilder();
|
||||
/**
|
||||
* SELECT * FROM p_test where (uid = 1 or uid =2) and phone = 12345678919
|
||||
* SELECT * FROM test1 where (uid = 1234 or uid =12345) and phone = 12345678909
|
||||
* */
|
||||
boolQueryBuilder2.should(QueryBuilders.termQuery("uid", 1));
|
||||
boolQueryBuilder2.should(QueryBuilders.termQuery("uid", 2));
|
||||
boolQueryBuilder2.should(QueryBuilders.termQuery("uid", 1234));
|
||||
boolQueryBuilder2.should(QueryBuilders.termQuery("uid", 12345));
|
||||
boolQueryBuilder.must(boolQueryBuilder2);
|
||||
boolQueryBuilder.must(QueryBuilders.termQuery("phone", "12345678919"));
|
||||
boolQueryBuilder.must(QueryBuilders.termQuery("phone", "12345678909"));
|
||||
searchSourceBuilder.query(boolQueryBuilder);
|
||||
|
||||
System.out.println("或查询语句:" + searchSourceBuilder.toString());
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
// 同步查询
|
||||
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
|
||||
|
||||
searchResponse.getHits().forEach(documentFields -> {
|
||||
|
||||
System.out.println("查询结果:" + documentFields.getSourceAsMap());
|
||||
System.out.println("或查询结果:" + documentFields.getSourceAsMap());
|
||||
});
|
||||
|
||||
System.out.println("\n=================\n");
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@ -1,27 +1,23 @@
|
||||
package com.pancm.sql.easticsearch;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
package com.pancm.easticsearch;
|
||||
|
||||
import org.apache.http.HttpHost;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteRequest;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
||||
import org.elasticsearch.action.bulk.*;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.get.GetRequest;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
@ -35,14 +31,28 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.query.TermQueryBuilder;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
/**
|
||||
* @Title: EsHighLevelRestTest1
|
||||
* @Description: Java High Level REST Client Es高级客户端使用教程一 (基本CRUD使用)
|
||||
官方文档地址: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html
|
||||
* @Description: Java High Level REST Client Es高级客户端使用教程一 (基本CRUD使用) 官方文档地址:
|
||||
* https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high.html
|
||||
* @since jdk 1.8
|
||||
* @Version:1.0.0
|
||||
* @author pancm
|
||||
@ -54,18 +64,21 @@ public class EsHighLevelRestTest1 {
|
||||
private static int elasticPort = 9200;
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(EsHighLevelRestTest1.class);
|
||||
|
||||
|
||||
private static RestHighLevelClient client = null;
|
||||
|
||||
public static void main(String[] args) {
|
||||
try {
|
||||
init();
|
||||
careatindex();
|
||||
get();
|
||||
createIndex();
|
||||
insert();
|
||||
queryById();
|
||||
exists();
|
||||
update();
|
||||
delete();
|
||||
bulk();
|
||||
// deleteByQuery();
|
||||
// deleteIndex();
|
||||
// delete();
|
||||
// bulk();
|
||||
close();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
@ -73,11 +86,82 @@ public class EsHighLevelRestTest1 {
|
||||
|
||||
}
|
||||
|
||||
private static void insert() throws IOException {
|
||||
String index = "test1";
|
||||
String type = "_doc";
|
||||
// 唯一编号
|
||||
String id = "1";
|
||||
IndexRequest request = new IndexRequest(index, type, id);
|
||||
/*
|
||||
* 第一种方式,通过jsonString进行创建
|
||||
*/
|
||||
// json
|
||||
String jsonString = "{" + "\"uid\":\"1234\","+ "\"phone\":\"12345678909\","+ "\"msgcode\":\"1\"," + "\"sendtime\":\"2019-03-14 01:57:04\","
|
||||
+ "\"message\":\"xuwujing study Elasticsearch\"" + "}";
|
||||
request.source(jsonString, XContentType.JSON);
|
||||
|
||||
/*
|
||||
* 第二种方式,通过map创建,,会自动转换成json的数据
|
||||
*/
|
||||
Map<String, Object> jsonMap = new HashMap<>();
|
||||
jsonMap.put("uid", 1234);
|
||||
jsonMap.put("phone", 12345678909L);
|
||||
jsonMap.put("msgcode", 1);
|
||||
jsonMap.put("sendtime", "2019-03-14 01:57:04");
|
||||
jsonMap.put("message", "xuwujing study Elasticsearch");
|
||||
request.source(jsonMap);
|
||||
|
||||
/*
|
||||
* 第三种方式 : 通过XContentBuilder对象进行创建
|
||||
*/
|
||||
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject();
|
||||
{
|
||||
builder.field("uid", 1234);
|
||||
builder.field("phone", 12345678909L);
|
||||
builder.field("msgcode", 1);
|
||||
builder.timeField("sendtime", "2019-03-14 01:57:04");
|
||||
builder.field("message", "xuwujing study Elasticsearch");
|
||||
}
|
||||
builder.endObject();
|
||||
request.source(builder);
|
||||
|
||||
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
|
||||
|
||||
//如果是200则表示成功,否则就是失败
|
||||
if(200 == indexResponse.status().getStatus()){
|
||||
|
||||
}
|
||||
|
||||
// 对响应结果进行处理
|
||||
String index1 = indexResponse.getIndex();
|
||||
String type1 = indexResponse.getType();
|
||||
String id1 = indexResponse.getId();
|
||||
long version = indexResponse.getVersion();
|
||||
// 如果是新增/修改的话的话
|
||||
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
|
||||
|
||||
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
|
||||
|
||||
}
|
||||
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
|
||||
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
|
||||
|
||||
}
|
||||
if (shardInfo.getFailed() > 0) {
|
||||
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
|
||||
String reason = failure.reason();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* 初始化服务
|
||||
*/
|
||||
private static void init() {
|
||||
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticIp, elasticPort, "http")));
|
||||
|
||||
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticIp, elasticPort)));
|
||||
|
||||
}
|
||||
|
||||
@ -99,77 +183,84 @@ public class EsHighLevelRestTest1 {
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void careatindex() throws IOException {
|
||||
String index = "user";
|
||||
String type = "userindex";
|
||||
// 唯一编号
|
||||
String id = "1";
|
||||
private static void createIndex() throws IOException {
|
||||
|
||||
IndexRequest request = new IndexRequest(index, type, id);
|
||||
// 类型
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
// setting 的值
|
||||
Map<String, Object> setmapping = new HashMap<>();
|
||||
// 分区数、副本数、缓存刷新时间
|
||||
setmapping.put("number_of_shards", 10);
|
||||
setmapping.put("number_of_replicas", 1);
|
||||
setmapping.put("refresh_interval", "5s");
|
||||
Map<String, Object> keyword = new HashMap<>();
|
||||
//设置类型
|
||||
keyword.put("type", "keyword");
|
||||
Map<String, Object> lon = new HashMap<>();
|
||||
//设置类型
|
||||
lon.put("type", "long");
|
||||
Map<String, Object> date = new HashMap<>();
|
||||
//设置类型
|
||||
date.put("type", "date");
|
||||
date.put("format", "yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
/*
|
||||
* 第一种方式,通过jsonString进行创建
|
||||
*/
|
||||
// json
|
||||
String jsonString = "{" + "\"user\":\"pancm\"," + "\"postDate\":\"2019-03-08\","+ "\"age\":\"18\","
|
||||
+ "\"message\":\"study Elasticsearch\"" + "}";
|
||||
Map<String, Object> jsonMap2 = new HashMap<>();
|
||||
Map<String, Object> properties = new HashMap<>();
|
||||
//设置字段message信息
|
||||
properties.put("uid", lon);
|
||||
properties.put("phone", lon);
|
||||
properties.put("msgcode", lon);
|
||||
properties.put("message", keyword);
|
||||
properties.put("sendtime", date);
|
||||
Map<String, Object> mapping = new HashMap<>();
|
||||
mapping.put("properties", properties);
|
||||
jsonMap2.put(type, mapping);
|
||||
|
||||
request.source(jsonString, XContentType.JSON);
|
||||
|
||||
/*
|
||||
* 第二种方式,通过map创建,,会自动转换成json的数据
|
||||
*/
|
||||
|
||||
Map<String, Object> jsonMap = new HashMap<>();
|
||||
jsonMap.put("user", "pancm");
|
||||
jsonMap.put("postDate", "2019-03-08");
|
||||
jsonMap.put("age", "18");
|
||||
jsonMap.put("message", "study Elasticsearch");
|
||||
|
||||
request.source(jsonMap);
|
||||
|
||||
/*
|
||||
* 第三种方式 : 通过XContentBuilder对象进行创建
|
||||
*/
|
||||
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject();
|
||||
{
|
||||
builder.field("user", "pancm");
|
||||
builder.timeField("postDate", "2019-03-08");
|
||||
builder.field("age", "18");
|
||||
builder.field("message", "study Elasticsearch");
|
||||
GetIndexRequest getRequest = new GetIndexRequest();
|
||||
getRequest.indices(index);
|
||||
getRequest.types(type);
|
||||
getRequest.local(false);
|
||||
getRequest.humanReadable(true);
|
||||
boolean exists2 = client.indices().exists(getRequest, RequestOptions.DEFAULT);
|
||||
//如果存在就不创建了
|
||||
if(exists2) {
|
||||
System.out.println(index+"索引库已经存在!");
|
||||
return;
|
||||
}
|
||||
builder.endObject();
|
||||
request.source(builder);
|
||||
|
||||
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
|
||||
|
||||
|
||||
|
||||
//对响应结果进行处理
|
||||
|
||||
String index1 = indexResponse.getIndex();
|
||||
String type1 = indexResponse.getType();
|
||||
String id1 = indexResponse.getId();
|
||||
long version = indexResponse.getVersion();
|
||||
// 如果是新增/修改的话的话
|
||||
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
|
||||
|
||||
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
|
||||
|
||||
}
|
||||
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
|
||||
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
|
||||
|
||||
}
|
||||
if (shardInfo.getFailed() > 0) {
|
||||
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
|
||||
String reason = failure.reason();
|
||||
// 开始创建库
|
||||
CreateIndexRequest request = new CreateIndexRequest(index);
|
||||
try {
|
||||
// 加载数据类型
|
||||
request.settings(setmapping);
|
||||
//设置mapping参数
|
||||
request.mapping(type, jsonMap2);
|
||||
//设置别名
|
||||
request.alias(new Alias("pancm_alias"));
|
||||
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
|
||||
boolean falg = createIndexResponse.isAcknowledged();
|
||||
if(falg){
|
||||
System.out.println("创建索引库:"+index+"成功!" );
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
System.out.println("创建成功!");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 删除索引
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void deleteIndex() throws IOException {
|
||||
String index = "userindex";
|
||||
DeleteIndexRequest request = new DeleteIndexRequest(index);
|
||||
// 同步删除
|
||||
client.indices().delete(request,RequestOptions.DEFAULT);
|
||||
System.out.println("删除索引库成功!"+index);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@ -177,9 +268,9 @@ public class EsHighLevelRestTest1 {
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void get() {
|
||||
String index = "user";
|
||||
String type = "userindex";
|
||||
private static void queryById() {
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
// 唯一编号
|
||||
String id = "1";
|
||||
// 创建查询请求
|
||||
@ -194,7 +285,7 @@ public class EsHighLevelRestTest1 {
|
||||
} catch (ElasticsearchException e) {
|
||||
// 如果是索引不存在
|
||||
if (e.status() == RestStatus.NOT_FOUND) {
|
||||
System.out.println("该索引库不存在!"+index);
|
||||
System.out.println("该索引库不存在!" + index);
|
||||
}
|
||||
|
||||
}
|
||||
@ -204,272 +295,356 @@ public class EsHighLevelRestTest1 {
|
||||
String sourceAsString = getResponse.getSourceAsString();
|
||||
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
|
||||
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
|
||||
System.out.println("查询返回结果String:"+sourceAsString);
|
||||
System.out.println("查询返回结果Map:"+sourceAsMap);
|
||||
System.out.println("查询返回结果String:" + sourceAsString);
|
||||
System.out.println("查询返回结果Map:" + sourceAsMap);
|
||||
} else {
|
||||
System.out.println("没有找到该数据!");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 是否存在
|
||||
* @throws IOException
|
||||
* 是否存在
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void exists() throws IOException {
|
||||
String index = "user";
|
||||
String type = "userindex";
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
// 唯一编号
|
||||
String id = "1";
|
||||
// 创建查询请求
|
||||
GetRequest getRequest = new GetRequest(index, type, id);
|
||||
|
||||
|
||||
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
|
||||
|
||||
|
||||
ActionListener<Boolean> listener = new ActionListener<Boolean>() {
|
||||
@Override
|
||||
public void onResponse(Boolean exists) {
|
||||
System.out.println("=="+exists);
|
||||
}
|
||||
@Override
|
||||
public void onResponse(Boolean exists) {
|
||||
System.out.println("==" + exists);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
System.out.println("失败的原因:"+e.getMessage());
|
||||
}
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
System.out.println("失败的原因:" + e.getMessage());
|
||||
}
|
||||
};
|
||||
//进行异步监听
|
||||
// 进行异步监听
|
||||
// client.existsAsync(getRequest, RequestOptions.DEFAULT, listener);
|
||||
|
||||
System.out.println("是否存在:"+exists);
|
||||
|
||||
System.out.println("数据是否存在:" + exists);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 更新操作
|
||||
* 更新操作
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void update() throws IOException {
|
||||
String index = "user";
|
||||
String type = "userindex";
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
// 唯一编号
|
||||
String id = "1";
|
||||
UpdateRequest upateRequest=new UpdateRequest();
|
||||
UpdateRequest upateRequest = new UpdateRequest();
|
||||
upateRequest.id(id);
|
||||
upateRequest.index(index);
|
||||
upateRequest.type(type);
|
||||
|
||||
//依旧可以使用Map这种集合作为更新条件
|
||||
|
||||
// 依旧可以使用Map这种集合作为更新条件
|
||||
Map<String, Object> jsonMap = new HashMap<>();
|
||||
jsonMap.put("user", "xuwujing");
|
||||
jsonMap.put("postDate", "2019-03-11");
|
||||
|
||||
jsonMap.put("uid", 12345);
|
||||
jsonMap.put("phone", 123456789019L);
|
||||
jsonMap.put("msgcode", 2);
|
||||
jsonMap.put("sendtime", "2019-03-14 01:57:04");
|
||||
jsonMap.put("message", "xuwujing study Elasticsearch");
|
||||
upateRequest.doc(jsonMap);
|
||||
|
||||
//
|
||||
upateRequest.docAsUpsert(true);
|
||||
// upsert 方法表示如果数据不存在,那么就新增一条
|
||||
upateRequest.upsert(jsonMap);
|
||||
|
||||
upateRequest.docAsUpsert(true);
|
||||
client.update(upateRequest, RequestOptions.DEFAULT);
|
||||
System.out.println("更新成功!");
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 根据查询条件更新
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void updateByQuery() throws IOException {
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
//
|
||||
UpdateByQueryRequest request = new UpdateByQueryRequest(index,type);
|
||||
// 设置查询条件
|
||||
request.setQuery(new TermQueryBuilder("user", "pancm"));
|
||||
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
|
||||
|
||||
// 设置复制文档的数量
|
||||
request.setSize(10);
|
||||
// 设置一次批量处理的条数,默认是1000
|
||||
request.setBatchSize(100);
|
||||
//设置超时时间
|
||||
request.setTimeout(TimeValue.timeValueMinutes(2));
|
||||
//索引选项
|
||||
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
||||
|
||||
// 同步执行
|
||||
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
|
||||
|
||||
// 异步执行
|
||||
// client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener);
|
||||
|
||||
// 返回结果
|
||||
TimeValue timeTaken = bulkResponse.getTook();
|
||||
boolean timedOut = bulkResponse.isTimedOut();
|
||||
long totalDocs = bulkResponse.getTotal();
|
||||
long updatedDocs = bulkResponse.getUpdated();
|
||||
long deletedDocs = bulkResponse.getDeleted();
|
||||
long batches = bulkResponse.getBatches();
|
||||
long noops = bulkResponse.getNoops();
|
||||
long versionConflicts = bulkResponse.getVersionConflicts();
|
||||
long bulkRetries = bulkResponse.getBulkRetries();
|
||||
long searchRetries = bulkResponse.getSearchRetries();
|
||||
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
|
||||
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil();
|
||||
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures();
|
||||
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures();
|
||||
System.out.println("查询更新总共花费了:" + timeTaken.getMillis() + " 毫秒,总条数:" + totalDocs + ",更新数:" + updatedDocs);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除
|
||||
* @throws IOException
|
||||
*
|
||||
* @throws IOException
|
||||
*
|
||||
*/
|
||||
private static void delete() throws IOException {
|
||||
|
||||
String index = "user";
|
||||
String type = "userindex";
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
// 唯一编号
|
||||
String id = "1";
|
||||
DeleteRequest deleteRequest=new DeleteRequest();
|
||||
DeleteRequest deleteRequest = new DeleteRequest();
|
||||
deleteRequest.id(id);
|
||||
deleteRequest.index(index);
|
||||
deleteRequest.type(type);
|
||||
|
||||
//设置超时时间
|
||||
deleteRequest.timeout(TimeValue.timeValueMinutes(2));
|
||||
//设置刷新策略"wait_for"
|
||||
//保持此请求打开,直到刷新使此请求的内容可以搜索为止。此刷新策略与高索引和搜索吞吐量兼容,但它会导致请求等待响应,直到发生刷新
|
||||
// 设置超时时间
|
||||
deleteRequest.timeout(TimeValue.timeValueMinutes(2));
|
||||
// 设置刷新策略"wait_for"
|
||||
// 保持此请求打开,直到刷新使此请求的内容可以搜索为止。此刷新策略与高索引和搜索吞吐量兼容,但它会导致请求等待响应,直到发生刷新
|
||||
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
|
||||
|
||||
//同步删除
|
||||
// 同步删除
|
||||
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
|
||||
|
||||
/*
|
||||
* 异步删除操作
|
||||
*/
|
||||
|
||||
//进行监听
|
||||
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
|
||||
@Override
|
||||
public void onResponse(DeleteResponse deleteResponse) {
|
||||
System.out.println("响应:"+deleteResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
System.out.println("删除监听异常:"+e.getMessage());
|
||||
}
|
||||
/*
|
||||
* 异步删除操作
|
||||
*/
|
||||
|
||||
// 进行监听
|
||||
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
|
||||
@Override
|
||||
public void onResponse(DeleteResponse deleteResponse) {
|
||||
System.out.println("响应:" + deleteResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
System.out.println("删除监听异常:" + e.getMessage());
|
||||
}
|
||||
};
|
||||
|
||||
//异步删除
|
||||
|
||||
// 异步删除
|
||||
// client.deleteAsync(deleteRequest, RequestOptions.DEFAULT, listener);
|
||||
|
||||
|
||||
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
|
||||
//如果处理成功碎片的数量少于总碎片的情况,说明还在处理或者处理发生异常
|
||||
// 如果处理成功碎片的数量少于总碎片的情况,说明还在处理或者处理发生异常
|
||||
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
|
||||
System.out.println("需要处理的碎片总量:"+shardInfo.getTotal());
|
||||
System.out.println("处理成功的碎片总量:"+shardInfo.getSuccessful());
|
||||
System.out.println("需要处理的碎片总量:" + shardInfo.getTotal());
|
||||
System.out.println("处理成功的碎片总量:" + shardInfo.getSuccessful());
|
||||
}
|
||||
|
||||
|
||||
if (shardInfo.getFailed() > 0) {
|
||||
for (ReplicationResponse.ShardInfo.Failure failure :
|
||||
shardInfo.getFailures()) {
|
||||
String reason = failure.reason();
|
||||
}
|
||||
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
|
||||
String reason = failure.reason();
|
||||
}
|
||||
}
|
||||
System.out.println("删除成功!");
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 批量操作示例
|
||||
* @throws InterruptedException
|
||||
* 根据查询条件删除
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void bulk() throws IOException, InterruptedException {
|
||||
private static void deleteByQuery() throws IOException {
|
||||
String type = "_doc";
|
||||
String index = "test1";
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest(index,type);
|
||||
// 设置查询条件
|
||||
request.setQuery(QueryBuilders.termQuery("uid",1234));
|
||||
// 同步执行
|
||||
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
|
||||
|
||||
// 异步执行
|
||||
// client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener);
|
||||
|
||||
// 返回结果
|
||||
TimeValue timeTaken = bulkResponse.getTook();
|
||||
boolean timedOut = bulkResponse.isTimedOut();
|
||||
long totalDocs = bulkResponse.getTotal();
|
||||
long updatedDocs = bulkResponse.getUpdated();
|
||||
long deletedDocs = bulkResponse.getDeleted();
|
||||
long batches = bulkResponse.getBatches();
|
||||
long noops = bulkResponse.getNoops();
|
||||
long versionConflicts = bulkResponse.getVersionConflicts();
|
||||
long bulkRetries = bulkResponse.getBulkRetries();
|
||||
long searchRetries = bulkResponse.getSearchRetries();
|
||||
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
|
||||
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil();
|
||||
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures();
|
||||
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures();
|
||||
System.out.println("查询更新总共花费了:" + timeTaken.getMillis() + " 毫秒,总条数:" + totalDocs + ",更新数:" + updatedDocs);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量操作示例
|
||||
*
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private static void bulk() throws IOException, InterruptedException {
|
||||
String index = "estest";
|
||||
String type = "estest";
|
||||
|
||||
BulkRequest request = new BulkRequest();
|
||||
//批量新增
|
||||
request.add(new IndexRequest(index, type, "1")
|
||||
.source(XContentType.JSON,"field", "foo"));
|
||||
request.add(new IndexRequest(index, type, "2")
|
||||
.source(XContentType.JSON,"field", "bar"));
|
||||
request.add(new IndexRequest(index, type, "3")
|
||||
.source(XContentType.JSON,"field", "baz"));
|
||||
|
||||
//可以进行修改/删除/新增 操作
|
||||
request.add(new UpdateRequest(index, type, "2")
|
||||
.doc(XContentType.JSON,"field", "test"));
|
||||
request.add(new DeleteRequest(index, type, "3"));
|
||||
request.add(new IndexRequest(index, type, "4")
|
||||
.source(XContentType.JSON,"field", "baz"));
|
||||
|
||||
|
||||
|
||||
BulkRequest request = new BulkRequest();
|
||||
// 批量新增,存在会直接覆盖
|
||||
request.add(new IndexRequest(index, type, "1").source(XContentType.JSON, "field", "foo"));
|
||||
request.add(new IndexRequest(index, type, "2").source(XContentType.JSON, "field", "bar"));
|
||||
request.add(new IndexRequest(index, type, "3").source(XContentType.JSON, "field", "baz"));
|
||||
|
||||
// 可以进行修改/删除/新增 操作
|
||||
//docAsUpsert 为true表示存在更新,不存在插入,为false表示不存在就是不做更新
|
||||
request.add(new UpdateRequest(index, type, "2").doc(XContentType.JSON, "field", "test").docAsUpsert(true));
|
||||
request.add(new DeleteRequest(index, type, "3"));
|
||||
request.add(new IndexRequest(index, type, "4").source(XContentType.JSON, "field", "baz"));
|
||||
|
||||
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
|
||||
|
||||
//可以快速检查一个或多个操作是否失败 true是有至少一个失败!
|
||||
if (bulkResponse.hasFailures()) {
|
||||
|
||||
|
||||
ActionListener<BulkResponse> listener3 = new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse response) {
|
||||
System.out.println("===="+response.buildFailureMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
System.out.println("====---"+e.getMessage());
|
||||
}
|
||||
};
|
||||
|
||||
client.bulkAsync(request, RequestOptions.DEFAULT,listener3);
|
||||
|
||||
// 可以快速检查一个或多个操作是否失败 true是有至少一个失败!
|
||||
if (bulkResponse.hasFailures()) {
|
||||
System.out.println("有一个操作失败!");
|
||||
}
|
||||
|
||||
//对处理结果进行遍历操作并根据不同的操作进行处理
|
||||
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
||||
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
|
||||
|
||||
//操作失败的进行处理
|
||||
if (bulkItemResponse.isFailed()) {
|
||||
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
|
||||
|
||||
}
|
||||
|
||||
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|
||||
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
|
||||
IndexResponse indexResponse = (IndexResponse) itemResponse;
|
||||
// 对处理结果进行遍历操作并根据不同的操作进行处理
|
||||
for (BulkItemResponse bulkItemResponse : bulkResponse) {
|
||||
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
|
||||
|
||||
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
|
||||
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
|
||||
// 操作失败的进行处理
|
||||
if (bulkItemResponse.isFailed()) {
|
||||
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
|
||||
|
||||
}
|
||||
|
||||
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|
||||
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
|
||||
IndexResponse indexResponse = (IndexResponse) itemResponse;
|
||||
System.out.println("新增失败!"+indexResponse.toString());
|
||||
|
||||
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
|
||||
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
|
||||
System.out.println("更新失败!"+updateResponse.toString());
|
||||
|
||||
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
|
||||
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
|
||||
System.out.println("删除失败!"+deleteResponse.toString());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
|
||||
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("批量执行成功!");
|
||||
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* 批量执行处理器相关示例代码
|
||||
* 批量执行处理器相关示例代码
|
||||
*/
|
||||
|
||||
|
||||
//批量处理器的监听器设置
|
||||
|
||||
|
||||
// 批量处理器的监听器设置
|
||||
|
||||
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
|
||||
|
||||
//在执行BulkRequest的每次执行之前调用,这个方法允许知道将要在BulkRequest中执行的操作的数量
|
||||
@Override
|
||||
public void beforeBulk(long executionId, BulkRequest request) {
|
||||
int numberOfActions = request.numberOfActions();
|
||||
logger.debug("Executing bulk [{}] with {} requests",
|
||||
executionId, numberOfActions);
|
||||
}
|
||||
|
||||
//在每次执行BulkRequest之后调用,这个方法允许知道BulkResponse是否包含错误
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request,
|
||||
BulkResponse response) {
|
||||
if (response.hasFailures()) {
|
||||
logger.warn("Bulk [{}] executed with failures", executionId);
|
||||
} else {
|
||||
logger.debug("Bulk [{}] completed in {} milliseconds",
|
||||
executionId, response.getTook().getMillis());
|
||||
}
|
||||
}
|
||||
|
||||
//如果BulkRequest失败,则调用该方法,该方法允许知道失败
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
|
||||
logger.error("Failed to execute bulk", failure);
|
||||
}
|
||||
|
||||
// 在执行BulkRequest的每次执行之前调用,这个方法允许知道将要在BulkRequest中执行的操作的数量
|
||||
@Override
|
||||
public void beforeBulk(long executionId, BulkRequest request) {
|
||||
int numberOfActions = request.numberOfActions();
|
||||
logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
|
||||
}
|
||||
|
||||
// 在每次执行BulkRequest之后调用,这个方法允许知道BulkResponse是否包含错误
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
|
||||
if (response.hasFailures()) {
|
||||
logger.warn("Bulk [{}] executed with failures", executionId);
|
||||
} else {
|
||||
logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
|
||||
}
|
||||
}
|
||||
|
||||
// 如果BulkRequest失败,则调用该方法,该方法允许知道失败
|
||||
@Override
|
||||
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
|
||||
logger.error("Failed to execute bulk", failure);
|
||||
}
|
||||
};
|
||||
|
||||
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
|
||||
(request2, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
|
||||
//创建一个批量执行的处理器
|
||||
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build();
|
||||
|
||||
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request2, bulkListener) -> client
|
||||
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
|
||||
// 创建一个批量执行的处理器
|
||||
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build();
|
||||
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
|
||||
//根据当前添加的操作数量设置刷新新批量请求的时间(默认为1000,使用-1禁用它)
|
||||
builder.setBulkActions(500);
|
||||
//根据当前添加的操作大小设置刷新新批量请求的时间(默认为5Mb,使用-1禁用)
|
||||
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
|
||||
//设置允许执行的并发请求数量(默认为1,使用0只允许执行单个请求)
|
||||
builder.setConcurrentRequests(0);
|
||||
//设置刷新间隔如果间隔通过,则刷新任何挂起的BulkRequest(默认为未设置)
|
||||
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
|
||||
//设置一个常量后退策略,该策略最初等待1秒并重试最多3次。
|
||||
builder.setBackoffPolicy(BackoffPolicy
|
||||
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
|
||||
|
||||
|
||||
IndexRequest one = new IndexRequest(index, type, "1").
|
||||
source(XContentType.JSON, "title",
|
||||
"In which order are my Elasticsearch queries executed?");
|
||||
IndexRequest two = new IndexRequest(index, type, "2")
|
||||
.source(XContentType.JSON, "title",
|
||||
"Current status and upcoming changes in Elasticsearch");
|
||||
IndexRequest three = new IndexRequest(index, type, "3")
|
||||
.source(XContentType.JSON, "title",
|
||||
"The Future of Federated Search in Elasticsearch");
|
||||
// 根据当前添加的操作数量设置刷新新批量请求的时间(默认为1000,使用-1禁用它)
|
||||
builder.setBulkActions(500);
|
||||
// 根据当前添加的操作大小设置刷新新批量请求的时间(默认为5Mb,使用-1禁用)
|
||||
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
|
||||
// 设置允许执行的并发请求数量(默认为1,使用0只允许执行单个请求)
|
||||
builder.setConcurrentRequests(0);
|
||||
// 设置刷新间隔如果间隔通过,则刷新任何挂起的BulkRequest(默认为未设置)
|
||||
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
|
||||
// 设置一个常量后退策略,该策略最初等待1秒并重试最多3次。
|
||||
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
|
||||
|
||||
IndexRequest one = new IndexRequest(index, type, "1").source(XContentType.JSON, "title",
|
||||
"In which order are my Elasticsearch queries executed?");
|
||||
IndexRequest two = new IndexRequest(index, type, "2").source(XContentType.JSON, "title",
|
||||
"Current status and upcoming changes in Elasticsearch");
|
||||
IndexRequest three = new IndexRequest(index, type, "3").source(XContentType.JSON, "title",
|
||||
"The Future of Federated Search in Elasticsearch");
|
||||
bulkProcessor.add(one);
|
||||
bulkProcessor.add(two);
|
||||
bulkProcessor.add(three);
|
||||
|
||||
|
||||
//如果所有大容量请求都已完成,则该方法返回true;如果在所有大容量请求完成之前的等待时间已经过去,则返回false
|
||||
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
|
||||
|
||||
System.out.println("请求的响应结果:"+terminated);
|
||||
|
||||
|
||||
|
||||
// 如果所有大容量请求都已完成,则该方法返回true;如果在所有大容量请求完成之前的等待时间已经过去,则返回false
|
||||
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
|
||||
|
||||
System.out.println("请求的响应结果:" + terminated);
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -1,11 +1,5 @@
|
||||
package com.pancm.easticsearch;
|
||||
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.http.HttpHost;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
@ -25,11 +19,16 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
|
||||
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
|
||||
import org.elasticsearch.index.reindex.ReindexRequest;
|
||||
import org.elasticsearch.index.reindex.ScrollableHitSource;
|
||||
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
/**
|
||||
* @Title: EsHighLevelRestTest2
|
||||
* @Description: Java High Level REST Client Es高级客户端使用教程二 (关于组合使用)
|
||||
@ -55,7 +54,6 @@ public class EsHighLevelRestTest2 {
|
||||
init();
|
||||
multiGet();
|
||||
reindex();
|
||||
updataByQuery();
|
||||
deleteByQuery();
|
||||
rethrottleByQuery();
|
||||
close();
|
||||
@ -174,7 +172,7 @@ public class EsHighLevelRestTest2 {
|
||||
BulkByScrollResponse bulkResponse = client.reindex(request, RequestOptions.DEFAULT);
|
||||
|
||||
// 异步执行
|
||||
// client.reindexAsync(request, RequestOptions.DEFAULT, listener);
|
||||
// client.reindexAsync(request, RequestOptions.DEFAULT, listener);
|
||||
|
||||
// 响应结果处理
|
||||
|
||||
@ -199,56 +197,7 @@ public class EsHighLevelRestTest2 {
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 根据查询条件更新
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void updataByQuery() throws IOException {
|
||||
//
|
||||
UpdateByQueryRequest request = new UpdateByQueryRequest("user");
|
||||
|
||||
// 设置查询条件
|
||||
request.setQuery(new TermQueryBuilder("user", "pancm"));
|
||||
|
||||
// 设置复制文档的数量
|
||||
request.setSize(10);
|
||||
// 设置一次批量处理的条数,默认是1000
|
||||
request.setBatchSize(100);
|
||||
//设置路由
|
||||
request.setRouting("=cat");
|
||||
//设置超时时间
|
||||
request.setTimeout(TimeValue.timeValueMinutes(2));
|
||||
//允许刷新
|
||||
request.setRefresh(true);
|
||||
//索引选项
|
||||
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
|
||||
|
||||
// 同步执行
|
||||
BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);
|
||||
|
||||
// 异步执行
|
||||
// client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener);
|
||||
|
||||
// 返回结果
|
||||
TimeValue timeTaken = bulkResponse.getTook();
|
||||
boolean timedOut = bulkResponse.isTimedOut();
|
||||
long totalDocs = bulkResponse.getTotal();
|
||||
long updatedDocs = bulkResponse.getUpdated();
|
||||
long deletedDocs = bulkResponse.getDeleted();
|
||||
long batches = bulkResponse.getBatches();
|
||||
long noops = bulkResponse.getNoops();
|
||||
long versionConflicts = bulkResponse.getVersionConflicts();
|
||||
long bulkRetries = bulkResponse.getBulkRetries();
|
||||
long searchRetries = bulkResponse.getSearchRetries();
|
||||
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
|
||||
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil();
|
||||
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures();
|
||||
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures();
|
||||
System.out.println("查询更新总共花费了:" + timeTaken.getMillis() + " 毫秒,总条数:" + totalDocs + ",更新数:" + updatedDocs);
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
@ -332,7 +281,7 @@ public class EsHighLevelRestTest2 {
|
||||
client.reindexRethrottleAsync(request, RequestOptions.DEFAULT, listener);
|
||||
client.updateByQueryRethrottleAsync(request, RequestOptions.DEFAULT, listener);
|
||||
client.deleteByQueryRethrottleAsync(request, RequestOptions.DEFAULT, listener);
|
||||
|
||||
|
||||
System.out.println("已成功设置!");
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user