Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
035656604d Bump storm-kafka from 1.2.2 to 1.2.3
Bumps storm-kafka from 1.2.2 to 1.2.3.

Signed-off-by: dependabot[bot] <support@github.com>
2020-02-20 10:06:42 +00:00
5 changed files with 47 additions and 443 deletions

87
.gitignore vendored
View File

@@ -1,44 +1,43 @@
/target/
/classes/
/log/
/logs/
.classpath
.project
.settings
.myeclipse
##filter databfile<6C><65>sln file##
*.mdb
*.ldb
*.sln
##class file##
*.com
*.class
*.dll
*.exe
*.o
*.so
# compression file
*.7z
*.dmg
*.gz
*.iso
*.jar
*.rar
*.tar
*.zip
*.via
*.iml
*.tmp
*.err
*.log
# OS generated files #
/.idea
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
Icon?
ehthumbs.db
Thumbs.db
/.github/
/target/
/classes/
/log/
/logs/
.classpath
.project
.settings
.myeclipse
##filter databfile<6C><65>sln file##
*.mdb
*.ldb
*.sln
##class file##
*.com
*.class
*.dll
*.exe
*.o
*.so
# compression file
*.7z
*.dmg
*.gz
*.iso
*.jar
*.rar
*.tar
*.zip
*.via
*.iml
*.tmp
*.err
*.log
# OS generated files #
/.idea
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
Icon?
ehthumbs.db
Thumbs.db

View File

@@ -81,15 +81,11 @@
**ElasticSearch相关:**
- [ElasticSearch实战系列一: ElasticSearch集群+Kibana安装教程](https://www.cnblogs.com/xuwujing/p/11385255.html)
- [ElasticSearch实战系列一: ElasticSearch集群+Kinaba安装教程](https://www.cnblogs.com/xuwujing/p/11385255.html)
- [ElasticSearch实战系列二: ElasticSearch的DSL语句使用教程---图文详解](https://www.cnblogs.com/xuwujing/p/11567053.html)
- [ElasticSearch实战系列三: ElasticSearch的JAVA API使用教程](https://www.cnblogs.com/xuwujing/p/11645630.html)
- [ElasticSearch实战系列四: ElasticSearch理论知识介绍](https://www.cnblogs.com/xuwujing/p/12093933.html)
- [ElasticSearch实战系列五: ElasticSearch的聚合查询基础使用教程之度量(Metric)聚合](https://www.cnblogs.com/xuwujing/p/12385903.html)
- [ElasticSearch实战系列六: Logstash快速入门](https://www.cnblogs.com/xuwujing/p/13412108.html)
- [ElasticSearch实战系列七: Logstash实战使用-图文讲解](https://www.cnblogs.com/xuwujing/p/13520666.html)
- [ElasticSearch实战系列八: Filebeat快速入门和使用---图文详解](https://www.cnblogs.com/xuwujing/p/13532125.html)
- [ElasticSearch实战系列九: ELK日志系统介绍和安装](https://www.cnblogs.com/xuwujing/p/13870806.html)
**其他博客:**
@@ -102,7 +98,6 @@
- [个人收集的资源分享](https://www.cnblogs.com/xuwujing/p/10393111.html)
- [一个毕业三年的程序猿对于提升自我的一些建议](https://www.cnblogs.com/xuwujing/p/11735726.html)
- [认清自我不在迷茫2019个人年终总结](https://www.cnblogs.com/xuwujing/p/12174112.html)
- [纵然前路坎坷也要毅然前行2020年终总结](https://www.cnblogs.com/xuwujing/p/14233270.html)
## 其他

View File

@@ -375,7 +375,7 @@
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.2.2</version>
<version>1.2.3</version>
<scope>provided</scope>
</dependency>

View File

@@ -8,17 +8,13 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
@@ -27,13 +23,10 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInter
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHits;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -277,7 +270,7 @@ public class EsAggregationSearchTest {
private static void maxSearch() throws IOException{
String buk="t_grade";
AggregationBuilder aggregation = AggregationBuilders.max(buk).field("grade");
logger.info("求班级的最分数:");
logger.info("求班级的最分数:");
agg(aggregation,buk);
}
@@ -400,28 +393,6 @@ public class EsAggregationSearchTest {
});
}
private static void agg(List<Map<String, Object>> list, Aggregations aggregations) {
aggregations.forEach(aggregation -> {
String name = aggregation.getName();
Terms genders = aggregations.get(name);
for (Terms.Bucket entry : genders.getBuckets()) {
String key = entry.getKey().toString();
long t = entry.getDocCount();
Map<String,Object> map =new HashMap<>();
map.put(name,key);
map.put(name+"_"+"count",t);
//判断里面是否还有嵌套的数据
List<Aggregation> list2 = entry.getAggregations().asList();
if (list2.isEmpty()) {
list.add(map);
}else{
agg(list, entry.getAggregations());
}
}
});
System.out.println(list);
}
private static SearchResponse search(AggregationBuilder aggregation) throws IOException {
@@ -502,99 +473,6 @@ public class EsAggregationSearchTest {
}
}
/**
* @Author pancm
* @Description having
* @Date 2020/8/21
* @Param []
* @return void
**/
private static void havingSearch() throws IOException{
String index="";
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.indices(index);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
String alias_name = "nas_ip_address_group";
String group_name = "nas_ip_address";
String query_name = "acct_start_time";
String query_type = "gte,lte";
String query_name_value="2020-08-05 13:25:55,2020-08-20 13:26:55";
String[] query_types= query_type.split(",");
String[] query_name_values= query_name_value.split(",");
for (int i = 0; i < query_types.length; i++) {
if("gte".equals(query_types[i])){
boolQueryBuilder.must(QueryBuilders.rangeQuery(query_name).gte(query_name_values[i]));
}
if("lte".equals(query_types[i])){
boolQueryBuilder.must(QueryBuilders.rangeQuery(query_name).lte(query_name_values[i]));
}
}
AggregationBuilder aggregationBuilder = AggregationBuilders.terms(alias_name).field(group_name).size(Integer.MAX_VALUE);
//声明BucketPath用于后面的bucket筛选
Map<String, String> bucketsPathsMap = new HashMap<>(8);
bucketsPathsMap.put("groupCount", "_count");
//设置脚本
Script script = new Script("params.groupCount >= 1000");
//构建bucket选择器
BucketSelectorPipelineAggregationBuilder bs =
PipelineAggregatorBuilders.bucketSelector("having", bucketsPathsMap, script);
aggregationBuilder.subAggregation(bs);
sourceBuilder.aggregation(aggregationBuilder);
//不需要解释
sourceBuilder.explain(false);
//不需要原始数据
sourceBuilder.fetchSource(false);
//不需要版本号
sourceBuilder.version(false);
sourceBuilder.query(boolQueryBuilder);
searchRequest.source(sourceBuilder);
System.out.println(sourceBuilder);
// 同步查询
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
// 查询条数
long count = searchResponse.getHits().getHits().length;
Aggregations aggregations = searchResponse.getAggregations();
// agg(aggregations);
Map<String,Object> map =new HashMap<>();
List<Map<String,Object>> list =new ArrayList<>();
agg(list,aggregations);
// System.out.println(map);
System.out.println(list);
}
/**
* @Author pancm
* @Description 去重
* @Date 2020/8/26
* @Param []
* @return void
**/
private static void distinctSearch() throws IOException{
String buk="group";
String distinctName="name";
AggregationBuilder aggregation = AggregationBuilders.terms("age").field("age");
CardinalityAggregationBuilder cardinalityBuilder = AggregationBuilders.cardinality(distinctName).field(distinctName);
//根据创建时间按天分组
// AggregationBuilder aggregation3 = AggregationBuilders.dateHistogram("createtm")
// .field("createtm")
// .format("yyyy-MM-dd")
// .dateHistogramInterval(DateHistogramInterval.DAY);
//
// aggregation2.subAggregation(aggregation3);
aggregation.subAggregation(cardinalityBuilder);
agg(aggregation,buk);
}
private static void topSearch() throws IOException{

View File

@@ -1,268 +0,0 @@
package com.pancm.elasticsearch;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
/**
* @author pancm
* @Title: pancm_project
* @Description: ES的集群相关配置
* @Version:1.0.0
* @Since:jdk1.8
* @date 2020/1/3
*/
public class EsHighLevelCluster {
private static String elasticIp = "192.169.0.1";
private static int elasticPort = 9200;
private static Logger logger = LoggerFactory.getLogger(EsHighLevelRestTest2.class);
private static RestHighLevelClient client = null;
/**
* @param args
*/
public static void main(String[] args) {
try {
init();
// clusterUpdateSetting();
// catHealth();
// clusterGetSetting();
clearCache();
close();
} catch (Exception e) {
e.printStackTrace();
}
}
/*
* 初始化服务
*/
private static void init() {
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticIp, elasticPort, "http")));
}
/*
* 关闭服务
*/
private static void close() {
if (client != null) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @return void
* @Author pancm
* @Description 设置该节点为冷节点
* @Date 2020/1/2
* @Param [index]
**/
public static void setCool(String index) throws IOException {
RestClient restClient = null;
try {
Objects.requireNonNull(index, "index is not null");
restClient = client.getLowLevelClient();
String source = "{\"index.routing.allocation.require.box_type\": \"%s\"}";
source = String.format(source, "cool");
HttpEntity entity = new NStringEntity(source, ContentType.APPLICATION_JSON);
restClient.performRequest("PUT", "/" + index + "/_settings", Collections.<String, String>emptyMap(), entity);
} catch (IOException e) {
throw e;
} finally {
if (restClient != null) {
restClient.close();
}
}
}
public void setAlias(String index, String aliasIndex) throws IOException {
RestClient restClient = null;
try {
Objects.requireNonNull(index, "index is not null");
restClient = client.getLowLevelClient();
String msg = "/" + index + "/_alias" + "/" + aliasIndex;
restClient.performRequest("PUT", msg);
} finally {
if (restClient != null) {
restClient.close();
}
}
}
/**
* @return void
* @Author pancm
* @Description
* @Date 2020/1/2
* @Param [index]
**/
public static void clearCache() throws IOException {
ClearIndicesCacheRequest request = new ClearIndicesCacheRequest();
ClearIndicesCacheResponse response = client.indices().clearCache(request,RequestOptions.DEFAULT);
System.out.println(""+response.getTotalShards());
System.out.println(""+response.getStatus());
}
/**
* @return void
* @Author pancm
* @Description 设置集群的配置
* @Date 2020/1/2
* @Param [index]
**/
public static void clusterUpdateSetting() throws IOException {
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();
String transientSettingKey =
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey();
int transientSettingValue = 10;
Settings transientSettings =
Settings.builder()
.put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES)
.build();
String persistentSettingKey =
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey();
String persistentSettingValue =
EnableAllocationDecider.Allocation.NONE.name();
Settings persistentSettings =
Settings.builder()
.put(persistentSettingKey, persistentSettingValue)
.build();
/** 方式一 */
Settings.Builder transientSettingsBuilder =
Settings.builder()
.put(transientSettingKey, transientSettingValue, ByteSizeUnit.BYTES);
request.transientSettings(transientSettingsBuilder);
/** 方式二 */
request.transientSettings(
"{\"indices.recovery.max_bytes_per_sec\": \"10b\"}"
, XContentType.JSON);
/** 方式三 */
Map<String, Object> map = new HashMap<>();
map.put(transientSettingKey
, transientSettingValue + ByteSizeUnit.BYTES.getSuffix());
request.transientSettings(map);
ClusterUpdateSettingsResponse response = client.cluster().putSettings(request, RequestOptions.DEFAULT);
Settings setting = response.getPersistentSettings();
Settings setting2 = response.getTransientSettings();
logger.info("setting:{}", setting);
logger.info("setting2:{}", setting2);
}
/**
* @return void
* @Author pancm
* @Description 设获取集群的健康情况
* @Date 2020/1/2
* @Param [index]
**/
public static void catHealth() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest();
ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
String clusterName = response.getClusterName();
ClusterHealthStatus status = response.getStatus();
boolean timedOut = response.isTimedOut();
RestStatus restStatus = response.status();
int numberOfNodes = response.getNumberOfNodes();
int numberOfDataNodes = response.getNumberOfDataNodes();
int activeShards = response.getActiveShards();
int activePrimaryShards = response.getActivePrimaryShards();
int relocatingShards = response.getRelocatingShards();
int initializingShards = response.getInitializingShards();
int unassignedShards = response.getUnassignedShards();
int delayedUnassignedShards = response.getDelayedUnassignedShards();
double activeShardsPercent = response.getActiveShardsPercent();
logger.info("clusterName:{},status:{},timedOut:{},restStatus:{}", clusterName, status, timedOut, restStatus.getStatus());
List<Map<String, Object>> mapList = new ArrayList<>();
response.getIndices().forEach((k, v) -> {
Map<String, Object> map = new HashMap<>();
String index = v.getIndex();
int replicas = v.getNumberOfReplicas();
int allShards = v.getActiveShards();
int shards = v.getActivePrimaryShards();
int status2 = v.getStatus().value();
map.put("index", index);
map.put("replicas", replicas);
map.put("shards", shards);
map.put("status", status2);
System.out.println(map);
});
}
/**
* @return void
* @Author pancm
* @Description 设获取集群的设置情况
* @Date 2020/1/2
* @Param [index]
**/
public static void clusterGetSetting() throws IOException {
ClusterGetSettingsRequest request = new ClusterGetSettingsRequest();
ClusterGetSettingsResponse response = client.cluster().getSettings(request, RequestOptions.DEFAULT);
Settings setting = response.getPersistentSettings();
Settings setting2 = response.getTransientSettings();
logger.info("setting:{}", setting);
logger.info("setting2:{}", setting2);
}
}