Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
6054d3c623
1 changed files with 33 additions and 23 deletions
  1. 56
      src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java

56
src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java

@ -102,7 +102,7 @@ public class IotMonitoringDataJob {
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> value.getReportTime() != null
&& value.getDataSource() != null && value.getMachinePwrStat() != null);
streamOperator.print().name("kafka 数据源:");
streamOperator.print().name("数据源:");
// mac分组并进行工作时长的集合操作
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = streamOperator
@ -121,6 +121,9 @@ public class IotMonitoringDataJob {
// 上次的开机状态
private ValueState<Integer> lastPwStatState;
// 是否存在es中假设都存在
private boolean isExistEs = true;
@Override
public void open(Configuration parameters) {
@ -147,7 +150,7 @@ public class IotMonitoringDataJob {
DeviceTotalData onData = onDataState.value();
Integer lastWorkingStat = lastWorkingStatState.value();
Integer lastPwStat = lastPwStatState.value();
DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent);
DeviceTotalData lastedDeviceState = getLastDeviceTotalData(receivedEvent);
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
@ -214,11 +217,10 @@ public class IotMonitoringDataJob {
} else {
nowDeviceState = lastedDeviceState;
}
deviceTotalDataStat.update(nowDeviceState);
} else {
nowDeviceState = lastedDeviceState;
deviceTotalDataStat.update(nowDeviceState);
}
deviceTotalDataStat.update(nowDeviceState);
} else {
if (machineWorkingStat.equals(1)) {
if (dataSource == 1) {
@ -257,7 +259,7 @@ public class IotMonitoringDataJob {
}
}
// 如果上次是待机并且这次也是待机那么就不需要发送了
if (!(lastWorkingStat == 2 && machineWorkingStat == 2)) {
if (!(lastWorkingStat == 2 && machineWorkingStat == 2 && isExistEs)) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
@ -271,6 +273,9 @@ public class IotMonitoringDataJob {
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
data.setLastBootTime(onData.getReportTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
if(!isExistEs) {
isExistEs = true;
}
out.collect(data);
}
}
@ -291,9 +296,11 @@ public class IotMonitoringDataJob {
return null;
}
private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception {
private DeviceTotalData getLastDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception {
// 上一次的数据
DeviceTotalData value = deviceTotalDataStat.value();
// 用来存放这次的数据作为下一个的上一次数据
DeviceTotalData data = new DeviceTotalData();
Long reportTime = event.getReportTime();
LocalDate localDate = new Date(reportTime).toLocalDate();
@ -308,9 +315,11 @@ public class IotMonitoringDataJob {
data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
data.setLastBootTime(deviceMonitoringData.getLastBootTime());
data.setTheDayDuration(deviceMonitoringData.getCurrDuration());
data.setReportTime(deviceMonitoringData.getReportTime());
} else {
// es中也没有直接从老接口拿
isExistEs = false;
data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime);
}
value = data;
@ -321,27 +330,28 @@ public class IotMonitoringDataJob {
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(),
LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond());
if (deviceMonitoringData != null) {
value.setJobTotal(deviceMonitoringData.getAccJobCount());
value.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
value.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
value.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
value.setLastBootTime(deviceMonitoringData.getLastBootTime());
value.setReportTime(deviceMonitoringData.getReportTime());
value.setTheDayDuration(0L);
data.setJobTotal(deviceMonitoringData.getAccJobCount());
data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
data.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
data.setLastBootTime(deviceMonitoringData.getLastBootTime());
data.setReportTime(deviceMonitoringData.getReportTime());
data.setTheDayDuration(0L);
} else {
// value有值但是日期不对说明到了第二天那么只需要对当天数据清零即可
value.setJobTotal(value.getJobTotal());
value.setJobDurationTotal(value.getJobDurationTotal());
value.setTheDayJobDuration(0L);
value.setTheDayJobCount(0L);
value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
value.setLastBootTime(value.getLastBootTime());
value.setTheDayDuration(0L);
value.setReportTime(reportTime);
data.setJobTotal(value.getJobTotal());
data.setJobDurationTotal(value.getJobDurationTotal());
data.setTheDayJobDuration(0L);
data.setTheDayJobCount(0L);
data.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
data.setLastBootTime(value.getLastBootTime());
data.setTheDayDuration(0L);
data.setReportTime(reportTime);
}
value = data;
}
deviceTotalDataStat.update(value);
deviceTotalDataStat.update(data);
return value;
}

Loading…
Cancel
Save