From d1e9038e0a46b83058ddb123374d2d06b35e127b Mon Sep 17 00:00:00 2001 From: "hupenghui@qniao.cn" <1049970895> Date: Wed, 24 Aug 2022 23:03:04 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/qniao/iot/IotMonitoringDataJob.java | 43 ++++++++----------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java index 7e68eab..b92ab9e 100644 --- a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java @@ -122,17 +122,17 @@ public class IotMonitoringDataJob { .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { - // 最新的设备数据 + // 上次设备数据 private ValueState deviceTotalDataStat; // 开机数据 - private ValueState onDataState; + //private ValueState onDataState; // 上次的工作状态 - private ValueState lastWorkingStatState; + //private ValueState lastWorkingStatState; // 上次的开机状态 - private ValueState lastPwStatState; + //private ValueState lastPwStatState; // 是否存在es中(假设都存在) private boolean isExistEs = true; @@ -144,14 +144,14 @@ public class IotMonitoringDataJob { deviceTotalDataStat = getRuntimeContext() .getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class))); - onDataState = getRuntimeContext() + /*onDataState = getRuntimeContext() .getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class))); lastWorkingStatState = getRuntimeContext() .getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class))); lastPwStatState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("lastPwStat", TypeInformation.of(Integer.class))); + .getState(new ValueStateDescriptor<>("lastPwStat", TypeInformation.of(Integer.class)));*/ } @Override @@ -160,15 +160,14 @@ public class IotMonitoringDataJob { Collector out) { try { - DeviceTotalData onData = onDataState.value(); - Integer lastWorkingStat = lastWorkingStatState.value(); - Integer lastPwStat = lastPwStatState.value(); DeviceTotalData lastedDeviceState = getLastDeviceTotalData(receivedEvent); + Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat(); + Integer lastPwStat = lastedDeviceState.getMachinePwrStat(); + // 上次启动时间 + Long lastBootTime = lastedDeviceState.getLastBootTime(); // 如果当前消息的时间大于等于上次消息的时间才进行处理 Integer machinePwrStat = receivedEvent.getMachinePwrStat(); Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); - lastWorkingStatState.update(machineWorkingStat); - lastPwStatState.update(machinePwrStat); Long reportTime = receivedEvent.getReportTime(); String reportTimeStr = StrUtil.toString(reportTime); if(reportTimeStr.length() == 10) { @@ -186,10 +185,6 @@ public class IotMonitoringDataJob { } nowDeviceState.setMachinePwrStat(machinePwrStat); nowDeviceState.setMachineWorkingStat(machineWorkingStat); - if (onData == null) { - onData = lastedDeviceState; - onDataState.update(onData); - } LocalDate localDate = new Date(reportTime).toLocalDate(); Long lastReportTime = lastedDeviceState.getReportTime(); if (lastReportTime == null) { @@ -213,8 +208,9 @@ public class IotMonitoringDataJob { nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration); nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); - nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + // 确认下当天数量CurrJobCount是否会清零 + nowDeviceState.setTheDayJobCount(receivedEvent.getCurrJobCount()); + nowDeviceState.setJobTotal(receivedEvent.getAccJobCount()); } else { // 机智云 nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration()); @@ -231,7 +227,7 @@ public class IotMonitoringDataJob { nowDeviceState = lastedDeviceState; } nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); - nowDeviceState.setLastBootTime(onData.getReportTime()); + nowDeviceState.setLastBootTime(lastBootTime); nowDeviceState.setReportTime(reportTime); nowDeviceState.setMachinePwrStat(machinePwrStat); nowDeviceState.setMachineWorkingStat(machineWorkingStat); @@ -240,8 +236,8 @@ public class IotMonitoringDataJob { // 工作 if (dataSource == 1) { // 树根(今日当前数 + 这次信息点距离上次信息点生产的数量) - nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + nowDeviceState.setTheDayJobCount(receivedEvent.getCurrJobCount()); + nowDeviceState.setJobTotal(receivedEvent.getAccJobCount()); nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); } else { @@ -262,15 +258,14 @@ public class IotMonitoringDataJob { nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration()); } nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); - nowDeviceState.setLastBootTime(onData.getReportTime()); + nowDeviceState.setLastBootTime(lastBootTime); nowDeviceState.setReportTime(reportTime); nowDeviceState.setMachinePwrStat(machinePwrStat); nowDeviceState.setMachineWorkingStat(machineWorkingStat); if (lastPwStat == 0) { // 如果上次是关机消息,那么这次就是开机消息 // 记录一个周期的开机时间 - onDataState.update(nowDeviceState); - onData = nowDeviceState; + nowDeviceState.setLastBootTime(reportTime); } } deviceTotalDataStat.update(nowDeviceState); @@ -288,7 +283,7 @@ public class IotMonitoringDataJob { data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); data.setReportTime(reportTime); data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); - data.setLastBootTime(onData.getReportTime()); + data.setLastBootTime(nowDeviceState.getLastBootTime()); data.setCurrDuration(nowDeviceState.getTheDayDuration()); if(!isExistEs) { isExistEs = true;