From a7e8cbd0e0e7e1220cc7f4b92bd30176bffad476 Mon Sep 17 00:00:00 2001 From: "hupenghui@qniao.cn" <1049970895> Date: Wed, 17 Aug 2022 23:29:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/qniao/iot/IotMonitoringDataJob.java | 54 ++++++++++--------- src/main/resources/META-INF/app.properties | 2 +- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java index 2d78adb..3228362 100644 --- a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java @@ -113,8 +113,22 @@ public class IotMonitoringDataJob { // 数据过滤 SingleOutputStreamOperator streamOperator = dataStreamSource - .filter((FilterFunction) value -> value.getReportTime() != null - && value.getDataSource() != null && value.getMachinePwrStat() != null); + .filter((FilterFunction) value -> { + + Long reportTime = value.getReportTime(); + if(reportTime != null + && value.getDataSource() != null && value.getMachinePwrStat() != null) { + String reportTimeStr = StrUtil.toString(reportTime); + if(reportTimeStr.length() == 10) { + // 机智云那边的设备可能是秒或毫秒 + reportTime = reportTime * 1000; + } + long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + // 晚30分钟的数据就不要了 + return nowTime - reportTime <= (30*60*1000); + } + return false; + }); // mac分组并进行工作时长的集合操作 DataStream machineIotDataReceivedEventDataStream = streamOperator @@ -223,16 +237,17 @@ public class IotMonitoringDataJob { nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); } } - nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); - nowDeviceState.setLastBootTime(onData.getReportTime()); - nowDeviceState.setReportTime(reportTime); } else { nowDeviceState = lastedDeviceState; } } else { nowDeviceState = lastedDeviceState; } - deviceTotalDataStat.update(nowDeviceState); + nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + nowDeviceState.setLastBootTime(onData.getReportTime()); + nowDeviceState.setReportTime(reportTime); + nowDeviceState.setMachinePwrStat(machinePwrStat); + nowDeviceState.setMachineWorkingStat(machineWorkingStat); } else { if (machineWorkingStat.equals(1)) { // 工作 @@ -249,9 +264,6 @@ public class IotMonitoringDataJob { nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); } - nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); - nowDeviceState.setLastBootTime(onData.getReportTime()); - nowDeviceState.setReportTime(reportTime); } else { // 待机或开机 nowDeviceState = lastedDeviceState; @@ -262,16 +274,19 @@ public class IotMonitoringDataJob { }else { nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration()); } + nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + nowDeviceState.setLastBootTime(onData.getReportTime()); + nowDeviceState.setReportTime(reportTime); + nowDeviceState.setMachinePwrStat(machinePwrStat); + nowDeviceState.setMachineWorkingStat(machineWorkingStat); if (lastPwStat == 0) { // 如果上次是关机消息,那么这次就是开机消息 - // 记录本次开机作为上次开机时间 - nowDeviceState.setReportTime(reportTime); // 记录一个周期的开机时间 onDataState.update(nowDeviceState); onData = nowDeviceState; } - deviceTotalDataStat.update(nowDeviceState); } + deviceTotalDataStat.update(nowDeviceState); // 如果上次是待机,并且这次也是待机,那么就不需要发送了 if (((!(lastWorkingStat == 2 && machineWorkingStat == 2)) && (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) { @@ -300,17 +315,6 @@ public class IotMonitoringDataJob { } } - - private DeviceState getDeviceStateListJson(Long machineIotMac) throws SQLException { - - // 查询数据库最新的设备状态 - List list = Db.use().query(SQL, DeviceState.class, machineIotMac); - if (CollUtil.isNotEmpty(list)) { - return list.get(0); - } - return null; - } - private DeviceTotalData getLastDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception { // 上一次的数据 @@ -332,12 +336,13 @@ public class IotMonitoringDataJob { value.setMachinePwrStat(deviceMonitoringData.getMachinePwrStat()); value.setMachineWorkingStat(deviceMonitoringData.getMachineWorkingStat()); value.setTheDayDuration(deviceMonitoringData.getCurrDuration()); - value.setReportTime(deviceMonitoringData.getReportTime()); } else { // es中也没有,直接从老接口拿 isExistEs = false; value = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime); } + // 因为ReportTime参与后面的计算,所以如果是第一次取这个数据需要设置为当前消息的时间,要不然会有很大的差值 + value.setReportTime(reportTime); } // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 if (LocalDate.parse(value.getCurrLocalDate(), DateTimeFormatter.ofPattern("yyyy-MM-dd")).isBefore(localDate)) { @@ -408,7 +413,6 @@ public class IotMonitoringDataJob { deviceTotalData.setMachinePwrStat(0); deviceTotalData.setMachineWorkingStat(0); } - deviceTotalData.setReportTime(reportTime); stop = true; break; } diff --git a/src/main/resources/META-INF/app.properties b/src/main/resources/META-INF/app.properties index d10864b..f979514 100644 --- a/src/main/resources/META-INF/app.properties +++ b/src/main/resources/META-INF/app.properties @@ -2,4 +2,4 @@ app.id=iot-device-monitoring-data # test 8.135.8.221 # prod 47.112.164.224 -apollo.meta=http://47.112.164.224:5000 \ No newline at end of file +apollo.meta=http://8.135.8.221:5000 \ No newline at end of file