From b60342cd9f0085ff69201e827da23d2b38e9559f Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Fri, 2 Sep 2022 14:46:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=AF=94=E5=BD=93=E5=89=8D?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E6=99=9A30=E5=88=86=E9=92=9F=E7=9A=84?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E4=B8=A2=E5=BC=83=E7=9A=84=E6=A0=A1=E9=AA=8C?= =?UTF-8?q?=E6=9D=A1=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/rc/RootCloudIotDataFormatterJob.java | 58 +++++++++++-------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index ebf649e..b13a157 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -129,9 +129,14 @@ public class RootCloudIotDataFormatterJob { value.set__timestamp__(reportTime * 1000); } } - return value.getWorking_sta() != null + if (value.getWorking_sta() != null && value.get__assetId__() != null - && value.getPWR_sta() != null && reportTime != null && value.getACC_count() != null; + && value.getPWR_sta() != null && reportTime != null && value.getACC_count() != null) { + long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + // 晚30分钟的数据就不要了 + return nowTime - value.get__timestamp__() <= (30 * 60 * 1000); + } + return false; } }).name("machine iot data received event filter operator"); @@ -181,31 +186,34 @@ public class RootCloudIotDataFormatterJob { Long accCount = value.getACC_count(); Integer lastPwrStat = lastReceivedEvent.getMachinePwrStat(); Integer lastWorkingStat = lastReceivedEvent.getMachineWorkingStat(); - MachineIotDataReceivedEvent receivedEvent = new MachineIotDataReceivedEvent(); - receivedEvent.setId(snowflake.nextId()); - receivedEvent.setDataSource(DataSource.ROOT_CLOUD); - receivedEvent.setMachineIotMac(machineIotMac); - receivedEvent.setMachinePwrStat(pwrSta); - receivedEvent.setMachineWorkingStat(workingSta); - receivedEvent.setAccJobCount(accCount); - if ((pwrSta == 1 && workingSta == 1) - || (lastPwrStat == 1 && lastWorkingStat == 1)) { - // 只有当前是工作中或上次是工作中才进行计算 - Long lastReportTime = lastReceivedEvent.getReportTime(); - Long reportTime = value.get__timestamp__(); - // 如果这次的消息和上次的消息相差半个小时,那么不进行计算 - if (reportTime - lastReportTime <= 30 * 60 * 1000) { - receivedEvent.setCurrJobCount(value.getACC_count() - lastReceivedEvent.getAccJobCount()); - receivedEvent.setCurrJobDuration(reportTime - lastReportTime); + Long lastReportTime = lastReceivedEvent.getReportTime(); + Long reportTime = value.get__timestamp__(); + // 只有当前消息的时间大于等于上一次消息的时间才要,否则丢弃 + if (reportTime >= lastReportTime) { + MachineIotDataReceivedEvent receivedEvent = new MachineIotDataReceivedEvent(); + receivedEvent.setId(snowflake.nextId()); + receivedEvent.setDataSource(DataSource.ROOT_CLOUD); + receivedEvent.setMachineIotMac(machineIotMac); + receivedEvent.setMachinePwrStat(pwrSta); + receivedEvent.setMachineWorkingStat(workingSta); + receivedEvent.setAccJobCount(accCount); + if ((pwrSta == 1 && workingSta == 1) + || (lastPwrStat == 1 && lastWorkingStat == 1)) { + // 只有当前是工作中或上次是工作中才进行计算 + // 如果这次的消息和上次的消息相差半个小时,那么不进行计算 + if (reportTime - lastReportTime <= 30 * 60 * 1000) { + receivedEvent.setCurrJobCount(value.getACC_count() - lastReceivedEvent.getAccJobCount()); + receivedEvent.setCurrJobDuration(reportTime - lastReportTime); + } } + receivedEvent.setCurrWaitingDuration(0L); + receivedEvent.setCurrStoppingDuration(0L); + receivedEvent.setIgStat(value.getIG_sta()); + receivedEvent.setReportTime(value.get__timestamp__()); + receivedEvent.setReceivedTime(System.currentTimeMillis()); + eventValueState.update(receivedEvent); + out.collect(receivedEvent); } - receivedEvent.setCurrWaitingDuration(0L); - receivedEvent.setCurrStoppingDuration(0L); - receivedEvent.setIgStat(value.getIG_sta()); - receivedEvent.setReportTime(value.get__timestamp__()); - receivedEvent.setReceivedTime(System.currentTimeMillis()); - eventValueState.update(receivedEvent); - out.collect(receivedEvent); } private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac,