From 49d687dc565d665a0b93dbd3b46f7b57e41593d4 Mon Sep 17 00:00:00 2001 From: "hupenghui@qniao.cn" <1049970895> Date: Mon, 15 Aug 2022 23:48:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/test/java/DemoTes.java | 141 +++++++++++++----- 1 file changed, 104 insertions(+), 37 deletions(-) diff --git a/iot-machine-state-event-generator-job/src/test/java/DemoTes.java b/iot-machine-state-event-generator-job/src/test/java/DemoTes.java index 7e63687..4428f33 100644 --- a/iot-machine-state-event-generator-job/src/test/java/DemoTes.java +++ b/iot-machine-state-event-generator-job/src/test/java/DemoTes.java @@ -1,22 +1,39 @@ +import cn.hutool.json.JSONUtil; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ParsedSum; +import org.elasticsearch.search.aggregations.metrics.ParsedTopHits; import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.math.BigDecimal; +import java.util.List; public class DemoTes { @@ -38,45 +55,95 @@ public class DemoTes { })); try { - // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) - /*SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", "102104060102")); - searchSourceBuilder.sort("reportTime", SortOrder.DESC); - searchSourceBuilder.size(1); - // 创建查询请求对象,将查询对象配置到其中 - SearchRequest searchRequest = new SearchRequest("machine_iot_data_received_event_202208", - "machine_iot_data_received_event_202207", "machine_iot_data_received_event_197001"); - searchRequest.source(searchSourceBuilder); - // 执行查询,然后处理响应结果 - SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); - // 根据状态和数据条数验证是否返回了数据 - if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { - SearchHits hits = searchResponse.getHits(); - SearchHit reqHit = hits.getHits()[0]; - MachineIotDataReceivedEvent receivedEvent = JSONUtil - .toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class); - System.out.println(receivedEvent); - }*/ - - - SearchSourceBuilder searchSourceBuilder1 = new SearchSourceBuilder(); - searchSourceBuilder1.size(0); - SumAggregationBuilder sumCurrStoppingDuration = AggregationBuilders.sum("sum_currStoppingDuration").field("currStoppingDuration"); - SumAggregationBuilder sumCurrWaitingDuration = AggregationBuilders.sum("sum_currWaitingDuration").field("currWaitingDuration"); - searchSourceBuilder1.aggregation(sumCurrStoppingDuration); - searchSourceBuilder1.aggregation(sumCurrWaitingDuration); - SearchRequest request1 = new SearchRequest("machine_iot_data_received_event_*"); - request1.source(searchSourceBuilder1); + /* 按日期分组 + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + DateHistogramAggregationBuilder dateHistogramAgg = AggregationBuilders.dateHistogram("dateAgg") + .field("reportTime").fixedInterval(DateHistogramInterval.hours(1)).format("yyyy-MM-dd HH"); + TopHitsAggregationBuilder topSubAgg = AggregationBuilders.topHits("topSubAgg").size(1).sort("reportTime", SortOrder.DESC); + dateHistogramAgg.subAggregation(topSubAgg); + searchSourceBuilder.aggregation(dateHistogramAgg); + searchSourceBuilder.size(0); + SearchRequest request = new SearchRequest("iot_device_monitoring_data_*"); + request.source(searchSourceBuilder); // 执行请求 - SearchResponse response1 = restHighLevelClient.search(request1, RequestOptions.DEFAULT); - Aggregations aggregations1 = response1.getAggregations(); - if (RestStatus.OK.equals(response1.status()) || aggregations1 != null) { - ParsedSum agg = aggregations1.get("sum_currStoppingDuration"); - ParsedSum agg1 = aggregations1.get("sum_currWaitingDuration"); - double value = agg.getValue(); - BigDecimal bigDecimal = new BigDecimal(value); - System.out.println(bigDecimal.longValueExact()); + SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); + // 获取响应中的聚合信息 + Aggregations aggregations = response.getAggregations(); + // 总工作时长 + long totalAccJobCountDuration = 0L; + if (RestStatus.OK.equals(response.status()) || aggregations != null) { + Histogram dateAgg = aggregations.get("dateAgg"); + List buckets = dateAgg.getBuckets(); + for (Histogram.Bucket bucket : buckets) { + ParsedTopHits topHits = bucket.getAggregations().get("topSubAgg"); + String keyAsString = bucket.getKeyAsString(); + System.out.println(keyAsString); + SearchHit[] hits = topHits.getHits().getHits(); + if(hits.length > 0) { + SearchHit hit = hits[0]; + String sourceAsString = hit.getSourceAsString(); + System.out.println(sourceAsString); + } + } + }*/ + + // 自定义索引 + // 创建索引 + CreateIndexRequest request = new CreateIndexRequest("my_test_231"); + // 索引设置,3个master分片,每个master2个从片 + /*request.settings(Settings.builder() + .put("index.number_of_shards", 3) + .put("index.number_of_replicas", 2));*/ + // 字段映射 + String mappersStr = "{\n" + + " \"properties\": {\n" + + " \"accJobCount\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"accJobCountDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"currDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"currJobCount\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"currJobDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"dataSource\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"lastBootTime\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"machineIotMac\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"machinePwrStat\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"machineWorkingStat\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"receivedTime\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"reportTime\": {\n" + + " \"type\": \"date\"\n" + + " }\n" + + " }\n" + + "}"; + request.mapping(mappersStr, XContentType.JSON); + // 设置所有别名 + request.alias(new Alias("my_test")); + CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT); + boolean acknowledged = createIndexResponse.isAcknowledged(); + boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged(); + if(acknowledged && shardsAcknowledged) { + System.out.println("索引创建成功"); } } catch (IOException e) {