Browse Source

更新

master
hupenghui@qniao.cn 3 years ago
parent
commit
49d687dc56
1 changed files with 104 additions and 37 deletions
  1. 141
      iot-machine-state-event-generator-job/src/test/java/DemoTes.java

141
iot-machine-state-event-generator-job/src/test/java/DemoTes.java

@ -1,22 +1,39 @@
import cn.hutool.json.JSONUtil;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedSum;
import org.elasticsearch.search.aggregations.metrics.ParsedTopHits;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
public class DemoTes {
@ -38,45 +55,95 @@ public class DemoTes {
}));
try {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
/*SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", "102104060102"));
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest("machine_iot_data_received_event_202208",
"machine_iot_data_received_event_202207", "machine_iot_data_received_event_197001");
searchRequest.source(searchSourceBuilder);
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
MachineIotDataReceivedEvent receivedEvent = JSONUtil
.toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class);
System.out.println(receivedEvent);
}*/
SearchSourceBuilder searchSourceBuilder1 = new SearchSourceBuilder();
searchSourceBuilder1.size(0);
SumAggregationBuilder sumCurrStoppingDuration = AggregationBuilders.sum("sum_currStoppingDuration").field("currStoppingDuration");
SumAggregationBuilder sumCurrWaitingDuration = AggregationBuilders.sum("sum_currWaitingDuration").field("currWaitingDuration");
searchSourceBuilder1.aggregation(sumCurrStoppingDuration);
searchSourceBuilder1.aggregation(sumCurrWaitingDuration);
SearchRequest request1 = new SearchRequest("machine_iot_data_received_event_*");
request1.source(searchSourceBuilder1);
/* 按日期分组
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
DateHistogramAggregationBuilder dateHistogramAgg = AggregationBuilders.dateHistogram("dateAgg")
.field("reportTime").fixedInterval(DateHistogramInterval.hours(1)).format("yyyy-MM-dd HH");
TopHitsAggregationBuilder topSubAgg = AggregationBuilders.topHits("topSubAgg").size(1).sort("reportTime", SortOrder.DESC);
dateHistogramAgg.subAggregation(topSubAgg);
searchSourceBuilder.aggregation(dateHistogramAgg);
searchSourceBuilder.size(0);
SearchRequest request = new SearchRequest("iot_device_monitoring_data_*");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response1 = restHighLevelClient.search(request1, RequestOptions.DEFAULT);
Aggregations aggregations1 = response1.getAggregations();
if (RestStatus.OK.equals(response1.status()) || aggregations1 != null) {
ParsedSum agg = aggregations1.get("sum_currStoppingDuration");
ParsedSum agg1 = aggregations1.get("sum_currWaitingDuration");
double value = agg.getValue();
BigDecimal bigDecimal = new BigDecimal(value);
System.out.println(bigDecimal.longValueExact());
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
// 总工作时长
long totalAccJobCountDuration = 0L;
if (RestStatus.OK.equals(response.status()) || aggregations != null) {
Histogram dateAgg = aggregations.get("dateAgg");
List<? extends Histogram.Bucket> buckets = dateAgg.getBuckets();
for (Histogram.Bucket bucket : buckets) {
ParsedTopHits topHits = bucket.getAggregations().get("topSubAgg");
String keyAsString = bucket.getKeyAsString();
System.out.println(keyAsString);
SearchHit[] hits = topHits.getHits().getHits();
if(hits.length > 0) {
SearchHit hit = hits[0];
String sourceAsString = hit.getSourceAsString();
System.out.println(sourceAsString);
}
}
}*/
// 自定义索引
// 创建索引
CreateIndexRequest request = new CreateIndexRequest("my_test_231");
// 索引设置3个master分片每个master2个从片
/*request.settings(Settings.builder()
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 2));*/
// 字段映射
String mappersStr = "{\n" +
" \"properties\": {\n" +
" \"accJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"accJobCountDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"dataSource\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"lastBootTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"machineIotMac\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"machinePwrStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"machineWorkingStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"receivedTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"reportTime\": {\n" +
" \"type\": \"date\"\n" +
" }\n" +
" }\n" +
"}";
request.mapping(mappersStr, XContentType.JSON);
// 设置所有别名
request.alias(new Alias("my_test"));
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
if(acknowledged && shardsAcknowledged) {
System.out.println("索引创建成功");
}
} catch (IOException e) {

Loading…
Cancel
Save