diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java b/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java index 17ed170..9b6a475 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java +++ b/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java @@ -57,11 +57,6 @@ public class DeviceMonitoringData { */ private Long receivedTime; - /** - * 机器标识 - */ - private Long machineId; - /** * 累计工作时长,单位秒 */ diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java index b0f2183..ba1e910 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java +++ b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java @@ -44,7 +44,7 @@ public class DeviceTotalData { private LocalDate currLocalDate; /** - * 上次消息时间 + * 消息时间 */ - private Long lastReportTime; + private Long reportTime; } diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java index 38d1b20..6bd5736 100644 --- a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java @@ -2,14 +2,10 @@ package com.qniao.iot.gizwits; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.ArrayUtil; -import cn.hutool.core.util.NumberUtil; import cn.hutool.db.Db; import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONUtil; -import com.qniao.iot.gizwits.config.ApolloConfig; -import com.qniao.iot.gizwits.constant.ConfigConstant; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; import lombok.extern.slf4j.Slf4j; @@ -36,13 +32,11 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.*; import org.elasticsearch.client.indices.GetIndexRequest; -import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -50,15 +44,12 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; -import java.io.IOException; import java.sql.Date; import java.sql.SQLException; import java.time.*; import java.time.format.DateTimeFormatter; import java.util.*; -import static java.time.temporal.ChronoUnit.SECONDS; - @Slf4j public class IotMonitoringDataJob { @@ -163,7 +154,7 @@ public class IotMonitoringDataJob { Integer dataSource = receivedEvent.getDataSource(); // 当前数据 DeviceTotalData nowDeviceState = new DeviceTotalData(); - if (lastedDeviceState.getLastReportTime() <= receivedEvent.getReportTime()) { + if (lastedDeviceState.getReportTime() <= receivedEvent.getReportTime()) { if (lastWorkingStat == null) { lastWorkingStat = getDeviceStateListJson(receivedEvent.getMachineIotMac()); lastPwStat = lastWorkingStat == 0 ? 0 : 1; @@ -173,7 +164,7 @@ public class IotMonitoringDataJob { onDataState.update(onData); } LocalDate localDate = new Date(reportTime).toLocalDate(); - Long lastReportTime = lastedDeviceState.getLastReportTime(); + Long lastReportTime = lastedDeviceState.getReportTime(); if (lastReportTime == null) { // 如果上次的消息时间为空,那么不进行计算 nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount()); @@ -182,7 +173,7 @@ public class IotMonitoringDataJob { nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal()); } // 直接通过两个消息的时间差进行计算(毫秒) - Long workingDuration = reportTime - lastedDeviceState.getLastReportTime(); + Long workingDuration = reportTime - lastedDeviceState.getReportTime(); // 转为秒 workingDuration = workingDuration / 1000; if (machinePwrStat.equals(0)) { @@ -211,8 +202,8 @@ public class IotMonitoringDataJob { } } nowDeviceState.setCurrLocalDate(localDate); - nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault())); - nowDeviceState.setLastReportTime(reportTime); + nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getReportTime()), ZoneId.systemDefault())); + nowDeviceState.setReportTime(reportTime); deviceTotalDataStat.update(nowDeviceState); } else { nowDeviceState = lastedDeviceState; @@ -241,10 +232,10 @@ public class IotMonitoringDataJob { nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); } nowDeviceState.setCurrLocalDate(localDate); - nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault())); + nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getReportTime()), ZoneId.systemDefault())); nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); - nowDeviceState.setLastReportTime(reportTime); + nowDeviceState.setReportTime(reportTime); } else { nowDeviceState = lastedDeviceState; } @@ -253,7 +244,7 @@ public class IotMonitoringDataJob { if (lastPwStat == 0) { // 如果上次是关机消息,那么这次就是开机消息 // 记录本次开机作为上次开机时间 - nowDeviceState.setLastReportTime(reportTime); + nowDeviceState.setReportTime(reportTime); // 记录一个周期的开机时间 onDataState.update(nowDeviceState); onData = nowDeviceState; @@ -271,7 +262,8 @@ public class IotMonitoringDataJob { data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration()); data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); data.setReportTime(reportTime); - data.setLastBootTime(onData.getLastReportTime()); + data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + data.setLastBootTime(onData.getReportTime()); data.setCurrDuration(nowDeviceState.getTheDayDuration()); out.collect(data); } @@ -308,13 +300,11 @@ public class IotMonitoringDataJob { LocalDateTime ldt = new Date(deviceMonitoringData.getLastBootTime()) .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime(); data.setLastBootTime(ldt); - data.setLastReportTime(deviceMonitoringData.getReportTime()); + data.setReportTime(deviceMonitoringData.getReportTime()); } else { // es中也没有,直接从老接口拿 data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime); } - // TODO 测试数据,记得删除 - data.setJobTotal(6218646L); value = data; } // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 @@ -330,7 +320,7 @@ public class IotMonitoringDataJob { value.setCurrLocalDate(localDate); value.setLastBootTime(LocalDateTime.ofInstant(Instant .ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault())); - value.setLastReportTime(deviceMonitoringData.getReportTime()); + value.setReportTime(deviceMonitoringData.getReportTime()); value.setTheDayDuration(0L); } else { // value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可 @@ -341,7 +331,7 @@ public class IotMonitoringDataJob { value.setCurrLocalDate(localDate); value.setLastBootTime(value.getLastBootTime()); value.setTheDayDuration(0L); - value.setLastReportTime(reportTime); + value.setReportTime(reportTime); } } deviceTotalDataStat.update(value); @@ -387,7 +377,7 @@ public class IotMonitoringDataJob { deviceTotalData.setTheDayJobDuration(0L); deviceTotalData.setTheDayJobCount(0L); deviceTotalData.setCurrLocalDate(LocalDate.now()); - deviceTotalData.setLastReportTime(reportTime); + deviceTotalData.setReportTime(reportTime); stop = true; break; } @@ -401,7 +391,7 @@ public class IotMonitoringDataJob { deviceTotalData.setTheDayJobDuration(0L); deviceTotalData.setTheDayJobCount(0L); deviceTotalData.setCurrLocalDate(LocalDate.now()); - deviceTotalData.setLastReportTime(reportTime); + deviceTotalData.setReportTime(reportTime); } deviceTotalData.setTheDayDuration(0L); return deviceTotalData;