Browse Source

新增比当前时间晚30分钟的数据丢弃的校验条件

hph_优化版本
1049970895@qniao.cn 3 years ago
parent
commit
b60342cd9f
1 changed files with 33 additions and 25 deletions
  1. 58
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

58
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -129,9 +129,14 @@ public class RootCloudIotDataFormatterJob {
value.set__timestamp__(reportTime * 1000);
}
}
return value.getWorking_sta() != null
if (value.getWorking_sta() != null
&& value.get__assetId__() != null
&& value.getPWR_sta() != null && reportTime != null && value.getACC_count() != null;
&& value.getPWR_sta() != null && reportTime != null && value.getACC_count() != null) {
long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
// 晚30分钟的数据就不要了
return nowTime - value.get__timestamp__() <= (30 * 60 * 1000);
}
return false;
}
}).name("machine iot data received event filter operator");
@ -181,31 +186,34 @@ public class RootCloudIotDataFormatterJob {
Long accCount = value.getACC_count();
Integer lastPwrStat = lastReceivedEvent.getMachinePwrStat();
Integer lastWorkingStat = lastReceivedEvent.getMachineWorkingStat();
MachineIotDataReceivedEvent receivedEvent = new MachineIotDataReceivedEvent();
receivedEvent.setId(snowflake.nextId());
receivedEvent.setDataSource(DataSource.ROOT_CLOUD);
receivedEvent.setMachineIotMac(machineIotMac);
receivedEvent.setMachinePwrStat(pwrSta);
receivedEvent.setMachineWorkingStat(workingSta);
receivedEvent.setAccJobCount(accCount);
if ((pwrSta == 1 && workingSta == 1)
|| (lastPwrStat == 1 && lastWorkingStat == 1)) {
// 只有当前是工作中或上次是工作中才进行计算
Long lastReportTime = lastReceivedEvent.getReportTime();
Long reportTime = value.get__timestamp__();
// 如果这次的消息和上次的消息相差半个小时那么不进行计算
if (reportTime - lastReportTime <= 30 * 60 * 1000) {
receivedEvent.setCurrJobCount(value.getACC_count() - lastReceivedEvent.getAccJobCount());
receivedEvent.setCurrJobDuration(reportTime - lastReportTime);
Long lastReportTime = lastReceivedEvent.getReportTime();
Long reportTime = value.get__timestamp__();
// 只有当前消息的时间大于等于上一次消息的时间才要否则丢弃
if (reportTime >= lastReportTime) {
MachineIotDataReceivedEvent receivedEvent = new MachineIotDataReceivedEvent();
receivedEvent.setId(snowflake.nextId());
receivedEvent.setDataSource(DataSource.ROOT_CLOUD);
receivedEvent.setMachineIotMac(machineIotMac);
receivedEvent.setMachinePwrStat(pwrSta);
receivedEvent.setMachineWorkingStat(workingSta);
receivedEvent.setAccJobCount(accCount);
if ((pwrSta == 1 && workingSta == 1)
|| (lastPwrStat == 1 && lastWorkingStat == 1)) {
// 只有当前是工作中或上次是工作中才进行计算
// 如果这次的消息和上次的消息相差半个小时那么不进行计算
if (reportTime - lastReportTime <= 30 * 60 * 1000) {
receivedEvent.setCurrJobCount(value.getACC_count() - lastReceivedEvent.getAccJobCount());
receivedEvent.setCurrJobDuration(reportTime - lastReportTime);
}
}
receivedEvent.setCurrWaitingDuration(0L);
receivedEvent.setCurrStoppingDuration(0L);
receivedEvent.setIgStat(value.getIG_sta());
receivedEvent.setReportTime(value.get__timestamp__());
receivedEvent.setReceivedTime(System.currentTimeMillis());
eventValueState.update(receivedEvent);
out.collect(receivedEvent);
}
receivedEvent.setCurrWaitingDuration(0L);
receivedEvent.setCurrStoppingDuration(0L);
receivedEvent.setIgStat(value.getIG_sta());
receivedEvent.setReportTime(value.get__timestamp__());
receivedEvent.setReceivedTime(System.currentTimeMillis());
eventValueState.update(receivedEvent);
out.collect(receivedEvent);
}
private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac,

Loading…
Cancel
Save