Browse Source

更新

master
1049970895@qniao.cn 3 years ago
parent
commit
71a1812907
1 changed files with 149 additions and 113 deletions
  1. 262
      iot-machine-state-event-generator-job/src/test/java/DemoTes.java

262
iot-machine-state-event-generator-job/src/test/java/DemoTes.java

@ -27,10 +27,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInter
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedSum;
import org.elasticsearch.search.aggregations.metrics.ParsedTopHits;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.*;
import org.elasticsearch.search.aggregations.pipeline.BucketSortPipelineAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
@ -41,6 +39,7 @@ import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List; import java.util.List;
public class DemoTes { public class DemoTes {
@ -64,121 +63,19 @@ public class DemoTes {
try { try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder bool = new BoolQueryBuilder();
//
long l = LocalDate.now().plus(-1, ChronoUnit.DAYS)
.atTime(LocalTime.MAX).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", 102104060037L))
.filter(QueryBuilders.rangeQuery("reportTime")
.from(null).timeZone("Z").to(l).includeLower(false).includeUpper(true));
searchSourceBuilder.size(1);
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.query(boolQueryBuilder);
SearchRequest request = new SearchRequest("iot_device_monitoring_data");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
if(RestStatus.OK.equals(response.status())) {
SearchHit[] hits = response.getHits().getHits();
if(hits.length > 0) {
SearchHit hit = hits[0];
System.out.println(hit.getSourceAsString());
}
}
// 条件查询
//boolQuery(restHighLevelClient);
// 按日期分组 // 按日期分组
/*SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
DateHistogramAggregationBuilder dateHistogramAgg = AggregationBuilders.dateHistogram("dateAgg")
.field("reportTime").fixedInterval(DateHistogramInterval.days(1)).format("yyyy-MM-dd");
TopHitsAggregationBuilder topSubAgg = AggregationBuilders.topHits("topSubAgg").size(1).sort("reportTime", SortOrder.DESC);
dateHistogramAgg.subAggregation(topSubAgg);
searchSourceBuilder.aggregation(dateHistogramAgg);
searchSourceBuilder.size(0);
SearchRequest request = new SearchRequest("iot_device_monitoring_data");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 总工作时长
long totalAccJobCountDuration = 0L;
if (RestStatus.OK.equals(response.status()) || aggregations != null) {
Histogram dateAgg = aggregations.get("dateAgg");
List<? extends Histogram.Bucket> buckets = dateAgg.getBuckets();
for (Histogram.Bucket bucket : buckets) {
ParsedTopHits topHits = bucket.getAggregations().get("topSubAgg");
String keyAsString = bucket.getKeyAsString();
System.out.println(keyAsString);
SearchHit[] hits = topHits.getHits().getHits();
if(hits.length > 0) {
SearchHit hit = hits[0];
String sourceAsString = hit.getSourceAsString();
System.out.println(sourceAsString);
}
}
}*/
//queryByGroupByDate(restHighLevelClient);
// 自定义索引 // 自定义索引
// 创建索引 // 创建索引
/*CreateIndexRequest request = new CreateIndexRequest("my_test_231");
// 索引设置3个master分片每个master2个从片
request.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 2));
// 字段映射
String mappersStr = "{\n" +
" \"properties\": {\n" +
" \"accJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"accJobCountDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"dataSource\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"lastBootTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"machineIotMac\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"machinePwrStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"machineWorkingStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"receivedTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"reportTime\": {\n" +
" \"type\": \"date\"\n" +
" }\n" +
" }\n" +
"}";
request.mapping(mappersStr, XContentType.JSON);
// 设置所有别名
request.alias(new Alias("my_test"));
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
if(acknowledged && shardsAcknowledged) {
System.out.println("索引创建成功");
}*/
//createIndex(restHighLevelClient);
// 分组后分页
// 根据设备mac分组并获取reportTime最大的那条数据
pageAfterGroupBy(restHighLevelClient);
} catch (Exception e) { } catch (Exception e) {
@ -186,4 +83,143 @@ public class DemoTes {
restHighLevelClient.close(); restHighLevelClient.close();
} }
private static void pageAfterGroupBy(RestHighLevelClient restHighLevelClient) throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder bool = new BoolQueryBuilder();
BoolQueryBuilder boolQueryBuilder = bool.filter(QueryBuilders.termQuery("machineIotMac", 102104060038L))
.filter(QueryBuilders.existsQuery("machinePowerOffTime"));
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.from(1);
searchSourceBuilder.size(100);
searchSourceBuilder.sort("machinePowerOnTime", SortOrder.DESC);
SearchRequest request = new SearchRequest("iot_device_power_on_and_off_data_event");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 总工作时长
if (RestStatus.OK.equals(response.status()) && response.getHits().getHits().length > 0) {
SearchHits hits = response.getHits();
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());
}
}
}
private static void createIndex(RestHighLevelClient restHighLevelClient) throws IOException {
CreateIndexRequest request = new CreateIndexRequest("my_test_231");
// 索引设置3个master分片每个master2个从片
request.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 2));
// 字段映射
String mappersStr = "{\n" +
" \"properties\": {\n" +
" \"accJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"accJobCountDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"dataSource\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"lastBootTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"machineIotMac\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"machinePwrStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"machineWorkingStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"receivedTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"reportTime\": {\n" +
" \"type\": \"date\"\n" +
" }\n" +
" }\n" +
"}";
request.mapping(mappersStr, XContentType.JSON);
// 设置所有别名
request.alias(new Alias("my_test"));
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
if(acknowledged && shardsAcknowledged) {
System.out.println("索引创建成功");
}
}
private static void queryByGroupByDate(RestHighLevelClient restHighLevelClient) throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
DateHistogramAggregationBuilder dateHistogramAgg = AggregationBuilders.dateHistogram("dateAgg")
.field("reportTime").fixedInterval(DateHistogramInterval.days(1)).format("yyyy-MM-dd");
TopHitsAggregationBuilder topSubAgg = AggregationBuilders.topHits("topSubAgg").size(1).sort("reportTime", SortOrder.DESC);
dateHistogramAgg.subAggregation(topSubAgg);
searchSourceBuilder.aggregation(dateHistogramAgg);
searchSourceBuilder.size(0);
SearchRequest request = new SearchRequest("iot_device_monitoring_data");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 总工作时长
long totalAccJobCountDuration = 0L;
if (RestStatus.OK.equals(response.status()) || aggregations != null) {
Histogram dateAgg = aggregations.get("dateAgg");
List<? extends Histogram.Bucket> buckets = dateAgg.getBuckets();
for (Histogram.Bucket bucket : buckets) {
ParsedTopHits topHits = bucket.getAggregations().get("topSubAgg");
String keyAsString = bucket.getKeyAsString();
System.out.println(keyAsString);
SearchHit[] hits = topHits.getHits().getHits();
if(hits.length > 0) {
SearchHit hit = hits[0];
String sourceAsString = hit.getSourceAsString();
System.out.println(sourceAsString);
}
}
}
}
private static void boolQuery(RestHighLevelClient restHighLevelClient) throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder bool = new BoolQueryBuilder();
//
long l = LocalDate.now().plus(-1, ChronoUnit.DAYS)
.atTime(LocalTime.MAX).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", 102104060037L))
.filter(QueryBuilders.rangeQuery("reportTime")
.from(null).timeZone("Z").to(l).includeLower(false).includeUpper(true));
searchSourceBuilder.size(1);
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.query(boolQueryBuilder);
SearchRequest request = new SearchRequest("iot_device_monitoring_data");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
if(RestStatus.OK.equals(response.status())) {
SearchHit[] hits = response.getHits().getHits();
if(hits.length > 0) {
SearchHit hit = hits[0];
System.out.println(hit.getSourceAsString());
}
}
}
} }
Loading…
Cancel
Save