Browse Source

更新

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
parent
commit
e97098bcb1
4 changed files with 34 additions and 131 deletions
  1. 2
      src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java
  2. 4
      src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java
  3. 117
      src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java
  4. 42
      src/test/java/Demo1.java

2
src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java

@ -58,7 +58,7 @@ public class DeviceMonitoringData {
private Long machineId;
/**
* 累计工作时长
* 累计工作时长单位秒
*/
private Long accJobCountDuration;

4
src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java

@ -26,12 +26,12 @@ public class DeviceTotalData {
/**
* 累计作业计数
*/
private Long JobTotal;
private Long jobTotal;
/**
* 累计作业时长
*/
private Long JobDurationTotal;
private Long jobDurationTotal;
/**
* 当前日期

117
src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java

@ -1,16 +1,12 @@
package com.qniao.iot.gizwits;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.gizwits.config.ApolloConfig;
import com.qniao.iot.gizwits.constant.ConfigConstant;
import com.qniao.iot.gizwits.utils.EsRestClientUtil;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
@ -39,18 +35,12 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.*;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.metrics.ParsedStats;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
@ -193,9 +183,9 @@ public class GizWitsIotMonitoringDataJob {
if (lastOffData != null) {
lastOnData = receivedEvent;
}
assert lastedDeviceState != null;
if (machineWorkingStat.equals(1)) {
// 工作中
assert lastedDeviceState != null;
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setCurrLocalDate(localDate);
@ -211,7 +201,13 @@ public class GizWitsIotMonitoringDataJob {
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a);
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a);
} else {
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
if(receivedEvent.getDataSource() == 0) {
// 机智云
lastedDeviceState.setTheDayJobDuration(receivedEvent.getCurrJobDuration());
}else {
// 树根
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
}
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
}
deviceTotalDataStat.update(lastedDeviceState);
@ -227,7 +223,6 @@ public class GizWitsIotMonitoringDataJob {
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
assert lastedDeviceState != null;
data.setAccJobCount(lastedDeviceState.getJobTotal());
data.setCurrJobCount(lastedDeviceState.getTheDayJobCount());
data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration());
@ -268,7 +263,7 @@ public class GizWitsIotMonitoringDataJob {
deviceTotalDataStat.update(data);
}
// 是否日期是当天的否则需要更新当天工作时长和当天工作量
if (!value.getCurrLocalDate().isEqual(localDate)) {
if (value.getCurrLocalDate().isBefore(localDate)) {
// 先从es中拿昨天最新的
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(),
LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond());
@ -342,100 +337,6 @@ public class GizWitsIotMonitoringDataJob {
}
/*private DeviceTotalData statistics(List<MachineIotDataReceivedEvent> receivedEventList, Long time) throws Exception {
if (CollUtil.isNotEmpty(receivedEventList)) {
// 一个周期中的开机数据
DeviceTotalData onData = new DeviceTotalData();
onData.setTheDayJobCount(0L);
onData.setJobTotal(0L);
onData.setCurrLocalDate(new Date(time * 1000).toLocalDate());
onData.setTheDayJobDuration(0L);
onData.setJobDurationTotal(0L);
onData.setLastBootTime(LocalDateTime.ofEpochSecond(time, 0, ZoneOffset.ofHours(8)));
// 上次的开机数据
MachineIotDataReceivedEvent lastOnData = null;
// 当前周期的待机数据
MachineIotDataReceivedEvent lastWaitJobData = null;
// 上次的状态
int lastWorkingStat = 0;
// 最新的数据
DeviceTotalData lastedDeviceState = onData;
for (MachineIotDataReceivedEvent receivedEvent : receivedEventList) {
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
Long reportTime = receivedEvent.getReportTime();
if (lastOnData == null) {
lastOnData = receivedEvent;
}
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
Long a;
if (machinePwrStat.equals(0)) {
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setCurrLocalDate(localDate);
lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault()));
deviceTotalDataStat.update(lastedDeviceState);
// 如果关机
onData = lastedDeviceState;
// 关机后将待机数据清除
lastWaitJobData = null;
} else {
if (machineWorkingStat.equals(1)) {
// 工作中
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setCurrLocalDate(localDate);
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
if (lastWaitJobData != null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
LocalDateTime lastWaitJobTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a);
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a);
} else {
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
}
}
if (machineWorkingStat.equals(2)) {
// 待机
lastWaitJobData = receivedEvent;
lastWorkingStat = 1;
}
}
if (lastWorkingStat != 1) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(lastedDeviceState.getJobTotal());
data.setCurrJobCount(lastedDeviceState.getTheDayJobCount());
data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setLastBootTime(lastOnData.getReportTime() * 1000);
}
}
}
return null;
}*/
private String[] getIndicesList() throws IOException {

42
src/test/java/Demo1.java

@ -19,8 +19,13 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedStats;
import org.elasticsearch.search.aggregations.metrics.ParsedTopHits;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.sql.Date;
@ -51,34 +56,31 @@ public class Demo1 {
AggregationBuilder aggr1 = AggregationBuilders.stats("JobDurationTotal").field("space_of_time");
AggregationBuilder aggr2 = AggregationBuilders.stats("JobTotal").field("quantity");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("mac", 861193040814411L))
.must(QueryBuilders.termQuery("data_type", 2))
.must(QueryBuilders.rangeQuery ("space_of_time")
.lte(60))
.must(QueryBuilders.rangeQuery("create_time")
.gte("2022-01-01 00:00:00")
.lte("2022-12-31 23:59:59"));
searchSourceBuilder.query(queryBuilder);
searchSourceBuilder.aggregation(aggr1);
searchSourceBuilder.aggregation(aggr2);
/*BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("machineIotMac", 102104060100L));
searchSourceBuilder.query(queryBuilder);*/
TermsAggregationBuilder jobDurationTotal1 = AggregationBuilders.terms("stationAgg").size(100).minDocCount(1).field("machineIotMac");
TopHitsAggregationBuilder sort = AggregationBuilders.topHits("top1").size(1).sort("reportTime", SortOrder.DESC);
jobDurationTotal1.subAggregation(sort);
searchSourceBuilder.aggregation(jobDurationTotal1);
searchSourceBuilder.size(0);
List<CloudBoxData> receivedEventList = new ArrayList<>();
SearchRequest request = new SearchRequest("qn_cloud_box_data_history_202208");
SearchRequest request = new SearchRequest("machine_iot_data_received_event_*");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
if (RestStatus.OK.equals(response.status()) || aggregations != null) {
ParsedStats jobDurationTotal = aggregations.get("JobDurationTotal");
ParsedStats jobTotal = aggregations.get("JobTotal");
double max = jobDurationTotal.getSum();
double max1 = jobTotal.getMax();
Terms jobDurationTotal = aggregations.get("stationAgg");
List<? extends Terms.Bucket> buckets = jobDurationTotal.getBuckets();
ParsedTopHits topHits = buckets.get(0).getAggregations().get("top1");
System.out.println(topHits.getHits().getHits()[0].getSourceAsString());
}
}
}
Loading…
Cancel
Save