diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java b/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java index a8f0283..3d23f96 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java +++ b/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java @@ -58,7 +58,7 @@ public class DeviceMonitoringData { private Long machineId; /** - * 累计工作时长 + * 累计工作时长,单位秒 */ private Long accJobCountDuration; diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java index 94da50d..49ec54e 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java +++ b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java @@ -26,12 +26,12 @@ public class DeviceTotalData { /** * 累计作业计数 */ - private Long JobTotal; + private Long jobTotal; /** * 累计作业时长 */ - private Long JobDurationTotal; + private Long jobDurationTotal; /** * 当前日期 diff --git a/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java index 8b1610b..692510a 100644 --- a/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java @@ -1,16 +1,12 @@ package com.qniao.iot.gizwits; import cn.hutool.core.bean.BeanUtil; -import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ArrayUtil; -import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpUtil; -import cn.hutool.json.JSON; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONUtil; import com.qniao.iot.gizwits.config.ApolloConfig; import com.qniao.iot.gizwits.constant.ConfigConstant; -import com.qniao.iot.gizwits.utils.EsRestClientUtil; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; import lombok.extern.slf4j.Slf4j; @@ -39,18 +35,12 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.*; import org.elasticsearch.cluster.metadata.AliasMetaData; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.aggregations.AggregationBuilder; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.search.aggregations.metrics.ParsedStats; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; @@ -193,9 +183,9 @@ public class GizWitsIotMonitoringDataJob { if (lastOffData != null) { lastOnData = receivedEvent; } + assert lastedDeviceState != null; if (machineWorkingStat.equals(1)) { // 工作中 - assert lastedDeviceState != null; lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); lastedDeviceState.setCurrLocalDate(localDate); @@ -211,7 +201,13 @@ public class GizWitsIotMonitoringDataJob { lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a); lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a); } else { - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); + if(receivedEvent.getDataSource() == 0) { + // 机智云 + lastedDeviceState.setTheDayJobDuration(receivedEvent.getCurrJobDuration()); + }else { + // 树根 + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); + } lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); } deviceTotalDataStat.update(lastedDeviceState); @@ -227,7 +223,6 @@ public class GizWitsIotMonitoringDataJob { data.setMachineIotMac(receivedEvent.getMachineIotMac()); data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); - assert lastedDeviceState != null; data.setAccJobCount(lastedDeviceState.getJobTotal()); data.setCurrJobCount(lastedDeviceState.getTheDayJobCount()); data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration()); @@ -268,7 +263,7 @@ public class GizWitsIotMonitoringDataJob { deviceTotalDataStat.update(data); } // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 - if (!value.getCurrLocalDate().isEqual(localDate)) { + if (value.getCurrLocalDate().isBefore(localDate)) { // 先从es中拿昨天最新的 DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond()); @@ -342,100 +337,6 @@ public class GizWitsIotMonitoringDataJob { } - /*private DeviceTotalData statistics(List receivedEventList, Long time) throws Exception { - - if (CollUtil.isNotEmpty(receivedEventList)) { - - // 一个周期中的开机数据 - DeviceTotalData onData = new DeviceTotalData(); - onData.setTheDayJobCount(0L); - onData.setJobTotal(0L); - onData.setCurrLocalDate(new Date(time * 1000).toLocalDate()); - onData.setTheDayJobDuration(0L); - onData.setJobDurationTotal(0L); - onData.setLastBootTime(LocalDateTime.ofEpochSecond(time, 0, ZoneOffset.ofHours(8))); - - // 上次的开机数据 - MachineIotDataReceivedEvent lastOnData = null; - - // 当前周期的待机数据 - MachineIotDataReceivedEvent lastWaitJobData = null; - - // 上次的状态 - int lastWorkingStat = 0; - - // 最新的数据 - DeviceTotalData lastedDeviceState = onData; - - for (MachineIotDataReceivedEvent receivedEvent : receivedEventList) { - - Integer machinePwrStat = receivedEvent.getMachinePwrStat(); - Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); - lastWorkingStatState.update(machineWorkingStat); - Long reportTime = receivedEvent.getReportTime(); - if (lastOnData == null) { - lastOnData = receivedEvent; - } - LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); - Long a; - if (machinePwrStat.equals(0)) { - lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); - lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); - lastedDeviceState.setCurrLocalDate(localDate); - lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault())); - deviceTotalDataStat.update(lastedDeviceState); - // 如果关机 - onData = lastedDeviceState; - // 关机后将待机数据清除 - lastWaitJobData = null; - } else { - - if (machineWorkingStat.equals(1)) { - // 工作中 - lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setCurrLocalDate(localDate); - lastedDeviceState.setLastBootTime(onData.getLastBootTime()); - if (lastWaitJobData != null) { - LocalDateTime localDateTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), - ZoneId.systemDefault()); - LocalDateTime lastWaitJobTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), - ZoneId.systemDefault()); - a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a); - lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a); - } else { - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); - lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); - } - } - if (machineWorkingStat.equals(2)) { - // 待机 - lastWaitJobData = receivedEvent; - lastWorkingStat = 1; - } - } - if (lastWorkingStat != 1) { - DeviceMonitoringData data = new DeviceMonitoringData(); - data.setDataSource(receivedEvent.getDataSource()); - data.setMachineIotMac(receivedEvent.getMachineIotMac()); - data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); - data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); - data.setAccJobCount(lastedDeviceState.getJobTotal()); - data.setCurrJobCount(lastedDeviceState.getTheDayJobCount()); - data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration()); - data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal()); - data.setReportTime(reportTime); - data.setLastBootTime(lastOnData.getReportTime() * 1000); - } - } - } - return null; - }*/ private String[] getIndicesList() throws IOException { diff --git a/src/test/java/Demo1.java b/src/test/java/Demo1.java index 5a0faab..8215244 100644 --- a/src/test/java/Demo1.java +++ b/src/test/java/Demo1.java @@ -19,8 +19,13 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ParsedStats; +import org.elasticsearch.search.aggregations.metrics.ParsedTopHits; +import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.sql.Date; @@ -51,34 +56,31 @@ public class Demo1 { - - AggregationBuilder aggr1 = AggregationBuilders.stats("JobDurationTotal").field("space_of_time"); - AggregationBuilder aggr2 = AggregationBuilders.stats("JobTotal").field("quantity"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery("mac", 861193040814411L)) - .must(QueryBuilders.termQuery("data_type", 2)) - .must(QueryBuilders.rangeQuery ("space_of_time") - .lte(60)) - .must(QueryBuilders.rangeQuery("create_time") - .gte("2022-01-01 00:00:00") - .lte("2022-12-31 23:59:59")); - searchSourceBuilder.query(queryBuilder); - searchSourceBuilder.aggregation(aggr1); - searchSourceBuilder.aggregation(aggr2); + /*BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("machineIotMac", 102104060100L)); + searchSourceBuilder.query(queryBuilder);*/ + + TermsAggregationBuilder jobDurationTotal1 = AggregationBuilders.terms("stationAgg").size(100).minDocCount(1).field("machineIotMac"); + TopHitsAggregationBuilder sort = AggregationBuilders.topHits("top1").size(1).sort("reportTime", SortOrder.DESC); + + jobDurationTotal1.subAggregation(sort); + searchSourceBuilder.aggregation(jobDurationTotal1); + + searchSourceBuilder.size(0); - List receivedEventList = new ArrayList<>(); - SearchRequest request = new SearchRequest("qn_cloud_box_data_history_202208"); + SearchRequest request = new SearchRequest("machine_iot_data_received_event_*"); request.source(searchSourceBuilder); // 执行请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 获取响应中的聚合信息 Aggregations aggregations = response.getAggregations(); if (RestStatus.OK.equals(response.status()) || aggregations != null) { - ParsedStats jobDurationTotal = aggregations.get("JobDurationTotal"); - ParsedStats jobTotal = aggregations.get("JobTotal"); - double max = jobDurationTotal.getSum(); - double max1 = jobTotal.getMax(); + Terms jobDurationTotal = aggregations.get("stationAgg"); + List buckets = jobDurationTotal.getBuckets(); + ParsedTopHits topHits = buckets.get(0).getAggregations().get("top1"); + System.out.println(topHits.getHits().getHits()[0].getSourceAsString()); + } } }