Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
2554a967d6
3 changed files with 424 additions and 327 deletions
  1. 1
      src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java
  2. 536
      src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java
  3. 214
      src/main/java/com/qniao/iot/gizwits/TestJob.java

1
src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java

@ -37,5 +37,4 @@ public class DeviceTotalData {
* 当前日期
*/
private LocalDate currLocalDate;
}

536
src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java

@ -1,7 +1,6 @@
package com.qniao.iot.gizwits;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.gizwits.config.ApolloConfig;
@ -13,8 +12,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
@ -94,289 +91,148 @@ public class GizWitsIotMonitoringDataJob {
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>() {
// 最新的设备数据
private ValueState<DeviceTotalData> deviceTotalData;
private ValueState<DeviceTotalData> deviceTotalDataStat;
// 开机消息
private ValueState<MachineIotDataReceivedEvent> onEventState;
// 下一个可能的电源状态
private ListState<Integer> nextPwrStatListState;
// 下一个可能的工作状态
private ListState<Integer> nextWorkingStatListState;
// 用于工作状态消息的容器
private MapState<String, Long> workingJobMapState;
// 临时容器
private MapState<String, Long> mapState;
// 开机数据
private ValueState<DeviceTotalData> onDataState;
// 一次的待机消息
private ValueState<MachineIotDataReceivedEvent> waitJobEventState;
// 上次的关机数据
private ValueState<MachineIotDataReceivedEvent> lastOffDataState;
// 一个消息是否是待机状态
private ValueState<Boolean> isLastWaitJobState;
// 上次的开机数据
private ValueState<MachineIotDataReceivedEvent> lastOnDataState;
// 上次开机时间
private ValueState<Long> lastBootTimeState;
// 当前周期的待机数据
private ValueState<MachineIotDataReceivedEvent> lastWaitJobDataState;
// 机器上次状态
private ValueState<MachineIotDataReceivedEvent> lastOffEventState;
// 上次的状态
private ValueState<Integer> lastWorkingStatState;
@Override
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
deviceTotalData = getRuntimeContext()
deviceTotalDataStat = getRuntimeContext()
.getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class)));
onEventState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("startEvent", TypeInformation.of(MachineIotDataReceivedEvent.class)));
nextPwrStatListState = getRuntimeContext()
.getListState(new ListStateDescriptor<>("nextPwrStatList", TypeInformation.of(Integer.class)));
onDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class)));
nextWorkingStatListState = getRuntimeContext()
.getListState(new ListStateDescriptor<>("nextWorkingStatList", TypeInformation.of(Integer.class)));
lastOffDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastOffData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
workingJobMapState = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("workingJobMap", Types.STRING, Types.LONG));
lastOnDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastOnData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
mapState = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("map", Types.STRING, Types.LONG));
lastWaitJobDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastWaitJobData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
waitJobEventState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("waitJobEvent", TypeInformation.of(MachineIotDataReceivedEvent.class)));
isLastWaitJobState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("isLastWaitJob", TypeInformation.of(Boolean.class)));
lastBootTimeState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastBootTime", TypeInformation.of(Long.class)));
lastOffEventState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastOffEvent", TypeInformation.of(MachineIotDataReceivedEvent.class)));
lastWorkingStatState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class)));
}
// 开机数据
private DeviceTotalData onData;
// 上次的关机数据
private MachineIotDataReceivedEvent lastOffData;
// 上次的开机数据
private MachineIotDataReceivedEvent lastOnData;
// 上次待机的数据
private MachineIotDataReceivedEvent lastWaitJobData;
@Override
public void processElement(MachineIotDataReceivedEvent receivedEvent,
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>.Context ctx,
Collector<DeviceMonitoringData> out) throws Exception {
/* 勿删
MachineIotDataReceivedEvent onEvent = onEventState.value();
List<Integer> nextPwrStatList = CollUtil.list(false, nextPwrStatListState.get());
List<Integer> nextWorkingStatList = CollUtil.list(false, nextWorkingStatListState.get());
MachineIotDataReceivedEvent waitJobEvent = waitJobEventState.value();
Boolean isLastWaitJob = isLastWaitJobState.value();
Long lastBootTime = lastBootTimeState.value();*/
DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent);
if(lastedDeviceState != null) {
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
Long reportTime = receivedEvent.getReportTime();
// 勿删
// mapState.clear();
if(onData == null) {
onData = lastedDeviceState;
DeviceTotalData onData = onDataState.value();
MachineIotDataReceivedEvent lastOffData = lastOffDataState.value();
MachineIotDataReceivedEvent lastOnData = lastOnDataState.value();
MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value();
Integer lastWorkingStat = lastWorkingStatState.value();
DeviceTotalData lastedDeviceState = deviceTotalDataStat.value();
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
Long reportTime = receivedEvent.getReportTime();
if(lastedDeviceState == null) {
lastedDeviceState = getDeviceTotalData(receivedEvent);
lastOnData = receivedEvent;
}
if(lastWorkingStat == null) {
lastWorkingStat = 0;
}
if (onData == null) {
onData = lastedDeviceState;
onDataState.update(onData);
}
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
Long a;
if (machinePwrStat.equals(0)) {
assert lastedDeviceState != null;
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setCurrLocalDate(localDate);
if (lastOnData != null) {
lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault()));
} else {
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
}
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
Long a;
if(machinePwrStat.equals(0)) {
deviceTotalDataStat.update(lastedDeviceState);
// 如果关机
onDataState.update(lastedDeviceState);
lastOffDataState.update(receivedEvent);
// 关机后将待机数据清除
lastWaitJobDataState.update(null);
} else {
if (lastOffData != null) {
lastOnData = receivedEvent;
}
if (machineWorkingStat.equals(1)) {
// 工作中
assert lastedDeviceState != null;
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setCurrLocalDate(localDate);
if(lastOnData != null) {
lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault()));
}else {
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
}
deviceTotalData.update(lastedDeviceState);
// 如果关机
onData = lastedDeviceState;
lastOffData = receivedEvent;
// 关机后将待机数据清除
lastWaitJobData = null;
}else {
if(lastOffData != null) {
lastOnData = receivedEvent;
}
if(machineWorkingStat.equals(1)) {
// 工作中
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setCurrLocalDate(localDate);
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
if(lastWaitJobData != null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
LocalDateTime lastWaitJobTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a);
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a);
}else {
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
}
deviceTotalData.update(lastedDeviceState);
}
if(machineWorkingStat.equals(2)) {
// 待机
lastWaitJobData = receivedEvent;
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
if (lastWaitJobData != null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
LocalDateTime lastWaitJobTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a);
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a);
} else {
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
}
deviceTotalDataStat.update(lastedDeviceState);
}
if (machineWorkingStat.equals(2)) {
// 待机
lastWaitJobDataState.update(receivedEvent);
}
}
if(lastWorkingStat != 1) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
assert lastedDeviceState != null;
data.setAccJobCount(lastedDeviceState.getJobTotal());
data.setCurrJobCount(lastedDeviceState.getTheDayJobCount());
data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setLastBootTime(lastOnData.getReportTime() * 1000);
out.collect(data);
/* 备份勿删
if(nextPwrStatList.contains(machinePwrStat) && nextWorkingStatList.contains(machineWorkingStat)) {
LocalDateTime startTime;
if(onEventState == null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault());
startTime = LocalDateTime.of(localDateTime.toLocalDate(), LocalTime.MIN);
}else {
startTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(onEvent.getReportTime() * 1000),
ZoneId.systemDefault());
}
if(machinePwrStat.equals(0)) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
long l = Duration.between(startTime, localDateTime).get(SECONDS);
if(isLastWaitJob) {
mapState.put("currJobDuration", workingJobMapState.get("currJobDuration"));
}else {
mapState.put("currJobDuration", l);
}
mapState.put("currJobCount", receivedEvent.getCurrJobCount());
mapState.put("accJobCount", mapState.get("currJobCount") + lastedDeviceState.getJobTotal());
mapState.put("accJobCountDuration", mapState.get("currJobDuration") + lastedDeviceState.getJobDurationTotal());
nextPwrStatListState.update(ListUtil.toList(1));
workingJobMapState.clear();
// 记录这次的关机消息用于判断下次开机
lastOffEventState.update(receivedEvent);
Long value = lastBootTimeState.value();
if(value == null) {
// 如果在本次开机的时候设置了开机时间那么上次开机时间就是在当前关机的时候进行设置
lastBootTimeState.update(value);
}
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(mapState.get("accJobCount"));
data.setCurrJobCount(mapState.get("currJobCount"));
data.setCurrJobDuration(mapState.get("currJobDuration"));
data.setAccJobCountDuration(mapState.get("currJobDuration"));
data.setReportTime(reportTime);
data.setLastBootTime(lastBootTime);
out.collect(data);
}else {
if(lastOffEventState.value() != null) {
lastBootTimeState.update(reportTime * 1000);
lastOffEventState.clear();
}
if(machineWorkingStat.equals(1)) {
// 工作中只计算工作时长
if(isLastWaitJob) {
// 如果前面的消息是待机消息
startTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(waitJobEvent.getReportTime() * 1000),
ZoneId.systemDefault());
// 状态重置
waitJobEventState.update(null);
isLastWaitJobState.update(false);
}
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
long l = Duration.between(startTime, localDateTime).get(SECONDS);
workingJobMapState.put("currJobDuration", l + workingJobMapState.get("currJobDuration"));
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(mapState.get("accJobCount"));
data.setCurrJobCount(mapState.get("currJobCount"));
data.setCurrJobDuration(mapState.get("currJobDuration"));
data.setAccJobCountDuration(mapState.get("currJobDuration"));
data.setReportTime(reportTime);
data.setLastBootTime(lastBootTime);
out.collect(data);
}
if(machineWorkingStat.equals(2)) {
// 待机中
isLastWaitJobState.update(true);
waitJobEventState.update(receivedEvent);
if(!isLastWaitJob) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(mapState.get("accJobCount"));
data.setCurrJobCount(mapState.get("currJobCount"));
data.setCurrJobDuration(mapState.get("currJobDuration"));
data.setAccJobCountDuration(mapState.get("currJobDuration"));
data.setReportTime(reportTime);
data.setLastBootTime(lastBootTime);
out.collect(data);
}
}
}
if(lastOnData == null) {
data.setLastBootTime(reportTime * 1000);
}else {
onEventState.update(receivedEvent);
}*/
data.setLastBootTime(lastOnData.getReportTime() * 1000);
}
out.collect(data);
}
}
private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws IOException {
private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception {
DeviceTotalData value = deviceTotalData.value();
DeviceTotalData value = deviceTotalDataStat.value();
Long reportTime = event.getReportTime();
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
if (value == null) {
@ -395,16 +251,16 @@ public class GizWitsIotMonitoringDataJob {
data.setLastBootTime(ldt);
} else {
// es中也没有machine_iot_data_received_event索引中拿
queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000, data);
data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000);
}
deviceTotalData.update(data);
deviceTotalDataStat.update(data);
}
// 是否日期是当天的否则需要更新当天工作时长和当天工作量
if (!value.getCurrLocalDate().isEqual(localDate)) {
// 先从es中拿昨天最新的
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(),
LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond());
if(deviceMonitoringData != null) {
if (deviceMonitoringData != null) {
DeviceTotalData data = new DeviceTotalData();
data.setJobTotal(deviceMonitoringData.getAccJobCount());
data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
@ -412,9 +268,9 @@ public class GizWitsIotMonitoringDataJob {
data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
data.setCurrLocalDate(localDate);
data.setLastBootTime(LocalDateTime.ofInstant(Instant
.ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault()));
deviceTotalData.update(data);
}else {
.ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault()));
deviceTotalDataStat.update(data);
} else {
// value有值但是日期不对说明到了第二天那么只需要对当天数据清零即可
value.setTheDayJobDuration(0L);
value.setTheDayJobCount(0L);
@ -423,11 +279,11 @@ public class GizWitsIotMonitoringDataJob {
return null;
}
private Tuple2<Long, Long> queryDeviceMonitoringData(Long machineIotMac,
Long reportTime,
DeviceTotalData data) throws IOException {
private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac,
Long reportTime) throws Exception {
LocalDateTime startTime = LocalDateTime.of(new Date(reportTime * 1000).toLocalDate(), LocalTime.MIN);
DeviceTotalData deviceTotalData;
/*LocalDateTime startTime = LocalDateTime.of(new Date(reportTime * 1000).toLocalDate(), LocalTime.MIN);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime")
@ -439,14 +295,22 @@ public class GizWitsIotMonitoringDataJob {
EsRestClientUtil.queryDeviceListPageResult(searchSourceBuilder,
receivedEventList::add, MachineIotDataReceivedEvent.class, getIndicesList());
// 一天的作业统计
List<Map<String, Long>> currJobStatistics = statistics(receivedEventList);
if(CollUtil.isNotEmpty(currJobStatistics)) {
//
}
return null;
deviceTotalData = statistics(receivedEventList, reportTime);
if(deviceTotalData == null) {
deviceTotalData = new DeviceTotalData();
deviceTotalData.setJobTotal(0L);
deviceTotalData.setJobDurationTotal(0L);
deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()));
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
}*/
deviceTotalData = new DeviceTotalData();
deviceTotalData.setJobTotal(0L);
deviceTotalData.setJobDurationTotal(0L);
deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()));
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
return deviceTotalData;
}
/**
@ -454,79 +318,99 @@ public class GizWitsIotMonitoringDataJob {
* @param receivedEventList
* @return 时长数量
*/
private List<Map<String, Long>> statistics(List<MachineIotDataReceivedEvent> receivedEventList) {
List<Map<String, Long>> mapList = new ArrayList<>();
MachineIotDataReceivedEvent startEvent = null;
List<Integer> nextPwrStatList = ListUtil.toList(0, 1);
ArrayList<Integer> nextWorkingStatList = ListUtil.toList(0, 1, 2);
Map<String, Long> workingJobMap = new HashMap<>(2);
Map<String, Long> map = new HashMap<>(2);
MachineIotDataReceivedEvent waitJobEvent = null;
// 一个工作周期期间是否待机过
boolean isHadWaitJob = false;
for (int i = 0; i < receivedEventList.size(); i++) {
MachineIotDataReceivedEvent receivedEvent = receivedEventList.get(i);
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
Long reportTime = receivedEvent.getReportTime();
map.clear();
if(nextPwrStatList.contains(machinePwrStat) && nextWorkingStatList.contains(machineWorkingStat)) {
LocalDateTime startTime;
if(startEvent == null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault());
startTime = LocalDateTime.of(localDateTime.toLocalDate(), LocalTime.MIN);
}else {
startTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(startEvent.getReportTime() * 1000),
ZoneId.systemDefault());
private DeviceTotalData statistics(List<MachineIotDataReceivedEvent> receivedEventList, Long time) throws Exception{
if(CollUtil.isNotEmpty(receivedEventList)) {
// 一个周期中的开机数据
DeviceTotalData onData = new DeviceTotalData();
onData.setTheDayJobCount(0L);
onData.setJobTotal(0L);
onData.setCurrLocalDate(new Date(time * 1000).toLocalDate());
onData.setTheDayJobDuration(0L);
onData.setJobDurationTotal(0L);
onData.setLastBootTime(LocalDateTime.ofEpochSecond(time, 0, ZoneOffset.ofHours(8)));
// 上次的开机数据
MachineIotDataReceivedEvent lastOnData = null;
// 当前周期的待机数据
MachineIotDataReceivedEvent lastWaitJobData = null;
// 上次的状态
int lastWorkingStat = 0;
// 最新的数据
DeviceTotalData lastedDeviceState = onData;
for (MachineIotDataReceivedEvent receivedEvent : receivedEventList) {
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
Long reportTime = receivedEvent.getReportTime();
if(lastOnData == null) {
lastOnData = receivedEvent;
}
if(machinePwrStat.equals(0)) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
long l = Duration.between(startTime, localDateTime).get(SECONDS);
if(isHadWaitJob) {
map.put("currJobDuration", workingJobMap.get("currJobDuration"));
}else {
map.put("currJobDuration", l);
}
map.put("currJobCount", receivedEvent.getCurrJobCount());
nextPwrStatList = ListUtil.toList(1);
workingJobMap.clear();
mapList.add(map);
}else {
if(machineWorkingStat.equals(1)) {
// 工作中只计算工作时长
if(isHadWaitJob) {
// 如果前面的消息是待机消息
startTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(waitJobEvent.getReportTime() * 1000),
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
Long a;
if (machinePwrStat.equals(0)) {
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setCurrLocalDate(localDate);
lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault()));
deviceTotalDataStat.update(lastedDeviceState);
// 如果关机
onData = lastedDeviceState;
// 关机后将待机数据清除
lastWaitJobData = null;
} else {
if (machineWorkingStat.equals(1)) {
// 工作中
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setCurrLocalDate(localDate);
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
if (lastWaitJobData != null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
// 状态重置
waitJobEvent = null;
isHadWaitJob = false;
LocalDateTime lastWaitJobTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a);
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a);
} else {
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
}
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
long l = Duration.between(startTime, localDateTime).get(SECONDS);
workingJobMap.put("currJobDuration", l + workingJobMap.get("currJobDuration"));
}
if(machineWorkingStat.equals(2)) {
// 待机
isHadWaitJob = true;
waitJobEvent = receivedEvent;
if (machineWorkingStat.equals(2)) {
// 待机
lastWaitJobData = receivedEvent;
lastWorkingStat = 1;
}
}
}else {
startEvent = receivedEvent;
if(lastWorkingStat != 1) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(lastedDeviceState.getJobTotal());
data.setCurrJobCount(lastedDeviceState.getTheDayJobCount());
data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setLastBootTime(lastOnData.getReportTime() * 1000);
}
}
}
return mapList;
return null;
}
private String[] getIndicesList() throws IOException {
@ -550,7 +434,7 @@ public class GizWitsIotMonitoringDataJob {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
if(reportTime != null) {
if (reportTime != null) {
searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime").lt(reportTime));
}
searchSourceBuilder.sort("reportTime", SortOrder.DESC);

214
src/main/java/com/qniao/iot/gizwits/TestJob.java

@ -0,0 +1,214 @@
/*
package com.qniao.iot.gizwits;
import com.qniao.iot.gizwits.config.ApolloConfig;
import com.qniao.iot.gizwits.constant.ConfigConstant;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.sql.Date;
import java.time.*;
import static java.time.temporal.ChronoUnit.SECONDS;
@Slf4j
public class TestJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers("120.25.199.30:19092")
.setTopics("machine_iot_data_received_event")
.setGroupId("123")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
.build();
// 设备数据源转换
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source")
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> value.getMachinePwrStat() != null && value.getMachineIotMac() != null
&& value.getDataSource() != null && value.getMachineWorkingStat() != null);
streamOperator.print("输出源数据");
// mac分组并进行工作时长的集合操作
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = streamOperator
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>() {
// 最新的设备数据
private ValueState<DeviceTotalData> deviceTotalData;
// 开机数据
private ValueState<DeviceTotalData> onDataState;
// 上次的关机数据
private ValueState<MachineIotDataReceivedEvent> lastOffDataState;
// 上次的开机数据
private ValueState<MachineIotDataReceivedEvent> lastOnDataState;
// 当前周期的待机数据
private ValueState<MachineIotDataReceivedEvent> lastWaitJobDataState;
// 上次的状态
private ValueState<Integer> lastWorkingStatState;
@Override
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
deviceTotalData = getRuntimeContext()
.getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class)));
onDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class)));
lastOffDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastOffData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
lastOnDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastOnData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
lastWaitJobDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastWaitJobData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
lastWorkingStatState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class)));
}
@Override
public void processElement(MachineIotDataReceivedEvent receivedEvent,
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>.Context ctx,
Collector<DeviceMonitoringData> out) throws Exception {
DeviceTotalData onData = onDataState.value();
MachineIotDataReceivedEvent lastOffData = lastOffDataState.value();
MachineIotDataReceivedEvent lastOnData = lastOnDataState.value();
MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value();
Integer lastWorkingStat = lastWorkingStatState.value();
DeviceTotalData lastedDeviceState = deviceTotalData.value();
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
Long reportTime = receivedEvent.getReportTime();
if(lastedDeviceState == null) {
lastedDeviceState = new DeviceTotalData();
lastedDeviceState.setJobTotal(0L);
lastedDeviceState.setJobDurationTotal(0L);
lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()));
lastedDeviceState.setTheDayJobDuration(0L);
lastedDeviceState.setTheDayJobCount(0L);
lastOnData = receivedEvent;
}
if(lastWorkingStat == null) {
lastWorkingStat = 0;
}
if (onData == null) {
onData = lastedDeviceState;
onDataState.update(onData);
}
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
Long a;
if (machinePwrStat.equals(0)) {
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setCurrLocalDate(localDate);
if (lastOnData != null) {
lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault()));
} else {
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
}
deviceTotalData.update(lastedDeviceState);
// 如果关机
onDataState.update(lastedDeviceState);
lastOffDataState.update(receivedEvent);
// 关机后将待机数据清除
lastWaitJobDataState.update(null);
} else {
if (lastOffData != null) {
lastOnData = receivedEvent;
}
if (machineWorkingStat.equals(1)) {
// 工作中
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setCurrLocalDate(localDate);
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
if (lastWaitJobData != null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
LocalDateTime lastWaitJobTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a);
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a);
} else {
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
}
deviceTotalData.update(lastedDeviceState);
}
if (machineWorkingStat.equals(2)) {
// 待机
lastWaitJobDataState.update(receivedEvent);
}
}
if(lastWorkingStat != 1) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(lastedDeviceState.getJobTotal());
data.setCurrJobCount(lastedDeviceState.getTheDayJobCount());
data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
if(lastOnData == null) {
data.setLastBootTime(reportTime * 1000);
}else {
data.setLastBootTime(lastOnData.getReportTime() * 1000);
}
out.collect(data);
}
}
}).name("machineIotDataReceivedEventDataStream keyBy stream");
machineIotDataReceivedEventDataStream.print("输出清洗数据");
env.execute();
}
}
*/
Loading…
Cancel
Save