diff --git a/iot-machine-state-event-generator-job/src/test/java/DemoTes.java b/iot-machine-state-event-generator-job/src/test/java/DemoTes.java index 34b5a54..29bf84d 100644 --- a/iot-machine-state-event-generator-job/src/test/java/DemoTes.java +++ b/iot-machine-state-event-generator-job/src/test/java/DemoTes.java @@ -27,10 +27,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInter import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedSum; -import org.elasticsearch.search.aggregations.metrics.ParsedTopHits; -import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.*; +import org.elasticsearch.search.aggregations.pipeline.BucketSortPipelineAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; @@ -41,6 +39,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneId; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.List; public class DemoTes { @@ -64,121 +63,19 @@ public class DemoTes { try { - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - BoolQueryBuilder bool = new BoolQueryBuilder(); - // - long l = LocalDate.now().plus(-1, ChronoUnit.DAYS) - .atTime(LocalTime.MAX).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", 102104060037L)) - .filter(QueryBuilders.rangeQuery("reportTime") - .from(null).timeZone("Z").to(l).includeLower(false).includeUpper(true)); - searchSourceBuilder.size(1); - searchSourceBuilder.sort("reportTime", SortOrder.DESC); - searchSourceBuilder.query(boolQueryBuilder); - SearchRequest request = new SearchRequest("iot_device_monitoring_data"); - request.source(searchSourceBuilder); - // 执行请求 - SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); - if(RestStatus.OK.equals(response.status())) { - SearchHit[] hits = response.getHits().getHits(); - if(hits.length > 0) { - SearchHit hit = hits[0]; - System.out.println(hit.getSourceAsString()); - } - } - + // 条件查询 + //boolQuery(restHighLevelClient); // 按日期分组 - /*SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - DateHistogramAggregationBuilder dateHistogramAgg = AggregationBuilders.dateHistogram("dateAgg") - .field("reportTime").fixedInterval(DateHistogramInterval.days(1)).format("yyyy-MM-dd"); - TopHitsAggregationBuilder topSubAgg = AggregationBuilders.topHits("topSubAgg").size(1).sort("reportTime", SortOrder.DESC); - dateHistogramAgg.subAggregation(topSubAgg); - searchSourceBuilder.aggregation(dateHistogramAgg); - searchSourceBuilder.size(0); - SearchRequest request = new SearchRequest("iot_device_monitoring_data"); - request.source(searchSourceBuilder); - // 执行请求 - SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); - // 获取响应中的聚合信息 - Aggregations aggregations = response.getAggregations(); - // 总工作时长 - long totalAccJobCountDuration = 0L; - if (RestStatus.OK.equals(response.status()) || aggregations != null) { - Histogram dateAgg = aggregations.get("dateAgg"); - List buckets = dateAgg.getBuckets(); - for (Histogram.Bucket bucket : buckets) { - ParsedTopHits topHits = bucket.getAggregations().get("topSubAgg"); - String keyAsString = bucket.getKeyAsString(); - System.out.println(keyAsString); - SearchHit[] hits = topHits.getHits().getHits(); - if(hits.length > 0) { - SearchHit hit = hits[0]; - String sourceAsString = hit.getSourceAsString(); - System.out.println(sourceAsString); - } - } - }*/ + //queryByGroupByDate(restHighLevelClient); // 自定义索引 // 创建索引 - /*CreateIndexRequest request = new CreateIndexRequest("my_test_231"); - // 索引设置,3个master分片,每个master2个从片 - request.settings(Settings.builder() - .put("index.number_of_shards", 3) - .put("index.number_of_replicas", 2)); - // 字段映射 - String mappersStr = "{\n" + - " \"properties\": {\n" + - " \"accJobCount\": {\n" + - " \"type\": \"long\"\n" + - " },\n" + - " \"accJobCountDuration\": {\n" + - " \"type\": \"long\"\n" + - " },\n" + - " \"currDuration\": {\n" + - " \"type\": \"long\"\n" + - " },\n" + - " \"currJobCount\": {\n" + - " \"type\": \"long\"\n" + - " },\n" + - " \"currJobDuration\": {\n" + - " \"type\": \"long\"\n" + - " },\n" + - " \"dataSource\": {\n" + - " \"type\": \"integer\"\n" + - " },\n" + - " \"lastBootTime\": {\n" + - " \"type\": \"date\"\n" + - " },\n" + - " \"machineIotMac\": {\n" + - " \"type\": \"keyword\"\n" + - " },\n" + - " \"machinePwrStat\": {\n" + - " \"type\": \"integer\"\n" + - " },\n" + - " \"machineWorkingStat\": {\n" + - " \"type\": \"integer\"\n" + - " },\n" + - " \"receivedTime\": {\n" + - " \"type\": \"date\"\n" + - " },\n" + - " \"reportTime\": {\n" + - " \"type\": \"date\"\n" + - " }\n" + - " }\n" + - "}"; - request.mapping(mappersStr, XContentType.JSON); - // 设置所有别名 - request.alias(new Alias("my_test")); - CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT); - boolean acknowledged = createIndexResponse.isAcknowledged(); - boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged(); - if(acknowledged && shardsAcknowledged) { - System.out.println("索引创建成功"); - }*/ - + //createIndex(restHighLevelClient); + // 分组后分页 + // 根据设备mac分组,并获取reportTime最大的那条数据 + pageAfterGroupBy(restHighLevelClient); } catch (Exception e) { @@ -186,4 +83,143 @@ public class DemoTes { restHighLevelClient.close(); } + + private static void pageAfterGroupBy(RestHighLevelClient restHighLevelClient) throws IOException { + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + BoolQueryBuilder bool = new BoolQueryBuilder(); + BoolQueryBuilder boolQueryBuilder = bool.filter(QueryBuilders.termQuery("machineIotMac", 102104060038L)) + .filter(QueryBuilders.existsQuery("machinePowerOffTime")); + searchSourceBuilder.query(boolQueryBuilder); + searchSourceBuilder.from(1); + searchSourceBuilder.size(100); + searchSourceBuilder.sort("machinePowerOnTime", SortOrder.DESC); + SearchRequest request = new SearchRequest("iot_device_power_on_and_off_data_event"); + request.source(searchSourceBuilder); + // 执行请求 + SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); + // 总工作时长 + if (RestStatus.OK.equals(response.status()) && response.getHits().getHits().length > 0) { + SearchHits hits = response.getHits(); + for (SearchHit hit : hits) { + System.out.println(hit.getSourceAsString()); + } + } + } + + private static void createIndex(RestHighLevelClient restHighLevelClient) throws IOException { + CreateIndexRequest request = new CreateIndexRequest("my_test_231"); + // 索引设置,3个master分片,每个master2个从片 + request.settings(Settings.builder() + .put("index.number_of_shards", 3) + .put("index.number_of_replicas", 2)); + // 字段映射 + String mappersStr = "{\n" + + " \"properties\": {\n" + + " \"accJobCount\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"accJobCountDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"currDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"currJobCount\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"currJobDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"dataSource\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"lastBootTime\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"machineIotMac\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"machinePwrStat\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"machineWorkingStat\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"receivedTime\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"reportTime\": {\n" + + " \"type\": \"date\"\n" + + " }\n" + + " }\n" + + "}"; + request.mapping(mappersStr, XContentType.JSON); + // 设置所有别名 + request.alias(new Alias("my_test")); + CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT); + boolean acknowledged = createIndexResponse.isAcknowledged(); + boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged(); + if(acknowledged && shardsAcknowledged) { + System.out.println("索引创建成功"); + } + } + + private static void queryByGroupByDate(RestHighLevelClient restHighLevelClient) throws IOException { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + DateHistogramAggregationBuilder dateHistogramAgg = AggregationBuilders.dateHistogram("dateAgg") + .field("reportTime").fixedInterval(DateHistogramInterval.days(1)).format("yyyy-MM-dd"); + TopHitsAggregationBuilder topSubAgg = AggregationBuilders.topHits("topSubAgg").size(1).sort("reportTime", SortOrder.DESC); + dateHistogramAgg.subAggregation(topSubAgg); + searchSourceBuilder.aggregation(dateHistogramAgg); + searchSourceBuilder.size(0); + SearchRequest request = new SearchRequest("iot_device_monitoring_data"); + request.source(searchSourceBuilder); + // 执行请求 + SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); + // 获取响应中的聚合信息 + Aggregations aggregations = response.getAggregations(); + // 总工作时长 + long totalAccJobCountDuration = 0L; + if (RestStatus.OK.equals(response.status()) || aggregations != null) { + Histogram dateAgg = aggregations.get("dateAgg"); + List buckets = dateAgg.getBuckets(); + for (Histogram.Bucket bucket : buckets) { + ParsedTopHits topHits = bucket.getAggregations().get("topSubAgg"); + String keyAsString = bucket.getKeyAsString(); + System.out.println(keyAsString); + SearchHit[] hits = topHits.getHits().getHits(); + if(hits.length > 0) { + SearchHit hit = hits[0]; + String sourceAsString = hit.getSourceAsString(); + System.out.println(sourceAsString); + } + } + } + } + + private static void boolQuery(RestHighLevelClient restHighLevelClient) throws IOException { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + BoolQueryBuilder bool = new BoolQueryBuilder(); + // + long l = LocalDate.now().plus(-1, ChronoUnit.DAYS) + .atTime(LocalTime.MAX).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", 102104060037L)) + .filter(QueryBuilders.rangeQuery("reportTime") + .from(null).timeZone("Z").to(l).includeLower(false).includeUpper(true)); + searchSourceBuilder.size(1); + searchSourceBuilder.sort("reportTime", SortOrder.DESC); + searchSourceBuilder.query(boolQueryBuilder); + SearchRequest request = new SearchRequest("iot_device_monitoring_data"); + request.source(searchSourceBuilder); + // 执行请求 + SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); + if(RestStatus.OK.equals(response.status())) { + SearchHit[] hits = response.getHits().getHits(); + if(hits.length > 0) { + SearchHit hit = hits[0]; + System.out.println(hit.getSourceAsString()); + } + } + } }