Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
d1e9038e0a
1 changed files with 19 additions and 24 deletions
  1. 43
      src/main/java/com/qniao/iot/IotMonitoringDataJob.java

43
src/main/java/com/qniao/iot/IotMonitoringDataJob.java

@ -122,17 +122,17 @@ public class IotMonitoringDataJob {
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>() {
// 最新的设备数据
// 上次设备数据
private ValueState<DeviceTotalData> deviceTotalDataStat;
// 开机数据
private ValueState<DeviceTotalData> onDataState;
//private ValueState<DeviceTotalData> onDataState;
// 上次的工作状态
private ValueState<Integer> lastWorkingStatState;
//private ValueState<Integer> lastWorkingStatState;
// 上次的开机状态
private ValueState<Integer> lastPwStatState;
//private ValueState<Integer> lastPwStatState;
// 是否存在es中假设都存在
private boolean isExistEs = true;
@ -144,14 +144,14 @@ public class IotMonitoringDataJob {
deviceTotalDataStat = getRuntimeContext()
.getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class)));
onDataState = getRuntimeContext()
/*onDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class)));
lastWorkingStatState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class)));
lastPwStatState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastPwStat", TypeInformation.of(Integer.class)));
.getState(new ValueStateDescriptor<>("lastPwStat", TypeInformation.of(Integer.class)));*/
}
@Override
@ -160,15 +160,14 @@ public class IotMonitoringDataJob {
Collector<DeviceMonitoringData> out) {
try {
DeviceTotalData onData = onDataState.value();
Integer lastWorkingStat = lastWorkingStatState.value();
Integer lastPwStat = lastPwStatState.value();
DeviceTotalData lastedDeviceState = getLastDeviceTotalData(receivedEvent);
Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat();
Integer lastPwStat = lastedDeviceState.getMachinePwrStat();
// 上次启动时间
Long lastBootTime = lastedDeviceState.getLastBootTime();
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
lastPwStatState.update(machinePwrStat);
Long reportTime = receivedEvent.getReportTime();
String reportTimeStr = StrUtil.toString(reportTime);
if(reportTimeStr.length() == 10) {
@ -186,10 +185,6 @@ public class IotMonitoringDataJob {
}
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
if (onData == null) {
onData = lastedDeviceState;
onDataState.update(onData);
}
LocalDate localDate = new Date(reportTime).toLocalDate();
Long lastReportTime = lastedDeviceState.getReportTime();
if (lastReportTime == null) {
@ -213,8 +208,9 @@ public class IotMonitoringDataJob {
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
// 确认下当天数量CurrJobCount是否会清零
nowDeviceState.setTheDayJobCount(receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(receivedEvent.getAccJobCount());
} else {
// 机智云
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration());
@ -231,7 +227,7 @@ public class IotMonitoringDataJob {
nowDeviceState = lastedDeviceState;
}
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(onData.getReportTime());
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
@ -240,8 +236,8 @@ public class IotMonitoringDataJob {
// 工作
if (dataSource == 1) {
// 树根今日当前数 + 这次信息点距离上次信息点生产的数量
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
nowDeviceState.setTheDayJobCount(receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(receivedEvent.getAccJobCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
} else {
@ -262,15 +258,14 @@ public class IotMonitoringDataJob {
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration());
}
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(onData.getReportTime());
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
if (lastPwStat == 0) {
// 如果上次是关机消息那么这次就是开机消息
// 记录一个周期的开机时间
onDataState.update(nowDeviceState);
onData = nowDeviceState;
nowDeviceState.setLastBootTime(reportTime);
}
}
deviceTotalDataStat.update(nowDeviceState);
@ -288,7 +283,7 @@ public class IotMonitoringDataJob {
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
data.setLastBootTime(onData.getReportTime());
data.setLastBootTime(nowDeviceState.getLastBootTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
if(!isExistEs) {
isExistEs = true;

Loading…
Cancel
Save