diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java index 6b65a57..1a14645 100644 --- a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java @@ -102,7 +102,7 @@ public class IotMonitoringDataJob { .filter((FilterFunction) value -> value.getReportTime() != null && value.getDataSource() != null && value.getMachinePwrStat() != null); - streamOperator.print().name("kafka 数据源:"); + streamOperator.print().name("数据源:"); // mac分组并进行工作时长的集合操作 DataStream machineIotDataReceivedEventDataStream = streamOperator @@ -121,6 +121,9 @@ public class IotMonitoringDataJob { // 上次的开机状态 private ValueState lastPwStatState; + // 是否存在es中(假设都存在) + private boolean isExistEs = true; + @Override public void open(Configuration parameters) { @@ -147,7 +150,7 @@ public class IotMonitoringDataJob { DeviceTotalData onData = onDataState.value(); Integer lastWorkingStat = lastWorkingStatState.value(); Integer lastPwStat = lastPwStatState.value(); - DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent); + DeviceTotalData lastedDeviceState = getLastDeviceTotalData(receivedEvent); // 如果当前消息的时间大于等于上次消息的时间才进行处理 Integer machinePwrStat = receivedEvent.getMachinePwrStat(); Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); @@ -214,11 +217,10 @@ public class IotMonitoringDataJob { } else { nowDeviceState = lastedDeviceState; } - deviceTotalDataStat.update(nowDeviceState); } else { nowDeviceState = lastedDeviceState; - deviceTotalDataStat.update(nowDeviceState); } + deviceTotalDataStat.update(nowDeviceState); } else { if (machineWorkingStat.equals(1)) { if (dataSource == 1) { @@ -257,7 +259,7 @@ public class IotMonitoringDataJob { } } // 如果上次是待机,并且这次也是待机,那么就不需要发送了 - if (!(lastWorkingStat == 2 && machineWorkingStat == 2)) { + if (!(lastWorkingStat == 2 && machineWorkingStat == 2 && isExistEs)) { DeviceMonitoringData data = new DeviceMonitoringData(); data.setDataSource(receivedEvent.getDataSource()); data.setMachineIotMac(receivedEvent.getMachineIotMac()); @@ -271,6 +273,9 @@ public class IotMonitoringDataJob { data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); data.setLastBootTime(onData.getReportTime()); data.setCurrDuration(nowDeviceState.getTheDayDuration()); + if(!isExistEs) { + isExistEs = true; + } out.collect(data); } } @@ -291,9 +296,11 @@ public class IotMonitoringDataJob { return null; } - private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception { + private DeviceTotalData getLastDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception { + // 上一次的数据 DeviceTotalData value = deviceTotalDataStat.value(); + // 用来存放这次的数据,作为下一个的上一次数据 DeviceTotalData data = new DeviceTotalData(); Long reportTime = event.getReportTime(); LocalDate localDate = new Date(reportTime).toLocalDate(); @@ -308,9 +315,11 @@ public class IotMonitoringDataJob { data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); data.setLastBootTime(deviceMonitoringData.getLastBootTime()); + data.setTheDayDuration(deviceMonitoringData.getCurrDuration()); data.setReportTime(deviceMonitoringData.getReportTime()); } else { // es中也没有,直接从老接口拿 + isExistEs = false; data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime); } value = data; @@ -321,27 +330,28 @@ public class IotMonitoringDataJob { DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond()); if (deviceMonitoringData != null) { - value.setJobTotal(deviceMonitoringData.getAccJobCount()); - value.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); - value.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); - value.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); - value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); - value.setLastBootTime(deviceMonitoringData.getLastBootTime()); - value.setReportTime(deviceMonitoringData.getReportTime()); - value.setTheDayDuration(0L); + data.setJobTotal(deviceMonitoringData.getAccJobCount()); + data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); + data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); + data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); + data.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + data.setLastBootTime(deviceMonitoringData.getLastBootTime()); + data.setReportTime(deviceMonitoringData.getReportTime()); + data.setTheDayDuration(0L); } else { // value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可 - value.setJobTotal(value.getJobTotal()); - value.setJobDurationTotal(value.getJobDurationTotal()); - value.setTheDayJobDuration(0L); - value.setTheDayJobCount(0L); - value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); - value.setLastBootTime(value.getLastBootTime()); - value.setTheDayDuration(0L); - value.setReportTime(reportTime); + data.setJobTotal(value.getJobTotal()); + data.setJobDurationTotal(value.getJobDurationTotal()); + data.setTheDayJobDuration(0L); + data.setTheDayJobCount(0L); + data.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + data.setLastBootTime(value.getLastBootTime()); + data.setTheDayDuration(0L); + data.setReportTime(reportTime); } + value = data; } - deviceTotalDataStat.update(value); + deviceTotalDataStat.update(data); return value; }