diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java index 91d8105..94da50d 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java +++ b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java @@ -37,5 +37,4 @@ public class DeviceTotalData { * 当前日期 */ private LocalDate currLocalDate; - } diff --git a/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java index 387376f..741bea8 100644 --- a/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java @@ -1,7 +1,6 @@ package com.qniao.iot.gizwits; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.ArrayUtil; import cn.hutool.json.JSONUtil; import com.qniao.iot.gizwits.config.ApolloConfig; @@ -13,8 +12,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; @@ -94,289 +91,148 @@ public class GizWitsIotMonitoringDataJob { .process(new KeyedProcessFunction() { // 最新的设备数据 - private ValueState deviceTotalData; + private ValueState deviceTotalDataStat; - // 开机消息 - private ValueState onEventState; - - // 下一个可能的电源状态 - private ListState nextPwrStatListState; - - // 下一个可能的工作状态 - private ListState nextWorkingStatListState; - - // 用于工作状态消息的容器 - private MapState workingJobMapState; - - // 临时容器 - private MapState mapState; + // 开机数据 + private ValueState onDataState; - // 上一次的待机消息 - private ValueState waitJobEventState; + // 上次的关机数据 + private ValueState lastOffDataState; - // 上一个消息是否是待机状态 - private ValueState isLastWaitJobState; + // 上次的开机数据 + private ValueState lastOnDataState; - // 上次开机时间 - private ValueState lastBootTimeState; + // 当前周期的待机数据 + private ValueState lastWaitJobDataState; - // 机器上次状态 - private ValueState lastOffEventState; + // 上次的状态 + private ValueState lastWorkingStatState; @Override public void open(Configuration parameters) { // 必须在 open 生命周期初始化 - deviceTotalData = getRuntimeContext() + deviceTotalDataStat = getRuntimeContext() .getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class))); - onEventState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("startEvent", TypeInformation.of(MachineIotDataReceivedEvent.class))); - - nextPwrStatListState = getRuntimeContext() - .getListState(new ListStateDescriptor<>("nextPwrStatList", TypeInformation.of(Integer.class))); + onDataState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class))); - nextWorkingStatListState = getRuntimeContext() - .getListState(new ListStateDescriptor<>("nextWorkingStatList", TypeInformation.of(Integer.class))); + lastOffDataState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("lastOffData", TypeInformation.of(MachineIotDataReceivedEvent.class))); - workingJobMapState = getRuntimeContext() - .getMapState(new MapStateDescriptor<>("workingJobMap", Types.STRING, Types.LONG)); + lastOnDataState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("lastOnData", TypeInformation.of(MachineIotDataReceivedEvent.class))); - mapState = getRuntimeContext() - .getMapState(new MapStateDescriptor<>("map", Types.STRING, Types.LONG)); + lastWaitJobDataState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("lastWaitJobData", TypeInformation.of(MachineIotDataReceivedEvent.class))); - waitJobEventState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("waitJobEvent", TypeInformation.of(MachineIotDataReceivedEvent.class))); - - isLastWaitJobState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("isLastWaitJob", TypeInformation.of(Boolean.class))); - - lastBootTimeState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("lastBootTime", TypeInformation.of(Long.class))); - - lastOffEventState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("lastOffEvent", TypeInformation.of(MachineIotDataReceivedEvent.class))); + lastWorkingStatState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class))); } - - // 开机数据 - private DeviceTotalData onData; - - // 上次的关机数据 - private MachineIotDataReceivedEvent lastOffData; - - // 上次的开机数据 - private MachineIotDataReceivedEvent lastOnData; - - // 上次待机的数据 - private MachineIotDataReceivedEvent lastWaitJobData; - @Override public void processElement(MachineIotDataReceivedEvent receivedEvent, KeyedProcessFunction.Context ctx, Collector out) throws Exception { - /* 勿删 - MachineIotDataReceivedEvent onEvent = onEventState.value(); - List nextPwrStatList = CollUtil.list(false, nextPwrStatListState.get()); - List nextWorkingStatList = CollUtil.list(false, nextWorkingStatListState.get()); - MachineIotDataReceivedEvent waitJobEvent = waitJobEventState.value(); - Boolean isLastWaitJob = isLastWaitJobState.value(); - Long lastBootTime = lastBootTimeState.value();*/ - - DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent); - - if(lastedDeviceState != null) { - Integer machinePwrStat = receivedEvent.getMachinePwrStat(); - Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); - Long reportTime = receivedEvent.getReportTime(); - // 勿删 - // mapState.clear(); - - - if(onData == null) { - onData = lastedDeviceState; + DeviceTotalData onData = onDataState.value(); + MachineIotDataReceivedEvent lastOffData = lastOffDataState.value(); + MachineIotDataReceivedEvent lastOnData = lastOnDataState.value(); + MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value(); + Integer lastWorkingStat = lastWorkingStatState.value(); + DeviceTotalData lastedDeviceState = deviceTotalDataStat.value(); + Integer machinePwrStat = receivedEvent.getMachinePwrStat(); + Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); + lastWorkingStatState.update(machineWorkingStat); + Long reportTime = receivedEvent.getReportTime(); + if(lastedDeviceState == null) { + lastedDeviceState = getDeviceTotalData(receivedEvent); + lastOnData = receivedEvent; + } + if(lastWorkingStat == null) { + lastWorkingStat = 0; + } + if (onData == null) { + onData = lastedDeviceState; + onDataState.update(onData); + } + LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); + Long a; + if (machinePwrStat.equals(0)) { + assert lastedDeviceState != null; + lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); + lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); + lastedDeviceState.setCurrLocalDate(localDate); + if (lastOnData != null) { + lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault())); + } else { + lastedDeviceState.setLastBootTime(onData.getLastBootTime()); } - LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); - - Long a; - if(machinePwrStat.equals(0)) { + deviceTotalDataStat.update(lastedDeviceState); + // 如果关机 + onDataState.update(lastedDeviceState); + lastOffDataState.update(receivedEvent); + // 关机后将待机数据清除 + lastWaitJobDataState.update(null); + } else { + if (lastOffData != null) { + lastOnData = receivedEvent; + } + if (machineWorkingStat.equals(1)) { + // 工作中 + assert lastedDeviceState != null; lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); lastedDeviceState.setCurrLocalDate(localDate); - if(lastOnData != null) { - lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault())); - }else { - lastedDeviceState.setLastBootTime(onData.getLastBootTime()); - } - deviceTotalData.update(lastedDeviceState); - // 如果关机 - onData = lastedDeviceState; - lastOffData = receivedEvent; - // 关机后将待机数据清除 - lastWaitJobData = null; - }else { - if(lastOffData != null) { - lastOnData = receivedEvent; - } - if(machineWorkingStat.equals(1)) { - // 工作中 - lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setCurrLocalDate(localDate); - lastedDeviceState.setLastBootTime(onData.getLastBootTime()); - if(lastWaitJobData != null) { - LocalDateTime localDateTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), - ZoneId.systemDefault()); - LocalDateTime lastWaitJobTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), - ZoneId.systemDefault()); - a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a); - lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a); - }else { - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); - lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); - } - deviceTotalData.update(lastedDeviceState); - } - if(machineWorkingStat.equals(2)) { - // 待机 - lastWaitJobData = receivedEvent; + lastedDeviceState.setLastBootTime(onData.getLastBootTime()); + if (lastWaitJobData != null) { + LocalDateTime localDateTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(reportTime * 1000), + ZoneId.systemDefault()); + LocalDateTime lastWaitJobTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(reportTime * 1000), + ZoneId.systemDefault()); + a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a); + lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a); + } else { + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); + lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); } + deviceTotalDataStat.update(lastedDeviceState); } + if (machineWorkingStat.equals(2)) { + // 待机 + lastWaitJobDataState.update(receivedEvent); + } + } + if(lastWorkingStat != 1) { DeviceMonitoringData data = new DeviceMonitoringData(); data.setDataSource(receivedEvent.getDataSource()); data.setMachineIotMac(receivedEvent.getMachineIotMac()); data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); + assert lastedDeviceState != null; data.setAccJobCount(lastedDeviceState.getJobTotal()); data.setCurrJobCount(lastedDeviceState.getTheDayJobCount()); data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration()); data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal()); data.setReportTime(reportTime); - data.setLastBootTime(lastOnData.getReportTime() * 1000); - out.collect(data); - - - /* 备份,勿删 - if(nextPwrStatList.contains(machinePwrStat) && nextWorkingStatList.contains(machineWorkingStat)) { - - LocalDateTime startTime; - if(onEventState == null) { - LocalDateTime localDateTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()); - startTime = LocalDateTime.of(localDateTime.toLocalDate(), LocalTime.MIN); - }else { - startTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(onEvent.getReportTime() * 1000), - ZoneId.systemDefault()); - } - if(machinePwrStat.equals(0)) { - LocalDateTime localDateTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), - ZoneId.systemDefault()); - long l = Duration.between(startTime, localDateTime).get(SECONDS); - if(isLastWaitJob) { - mapState.put("currJobDuration", workingJobMapState.get("currJobDuration")); - }else { - mapState.put("currJobDuration", l); - } - mapState.put("currJobCount", receivedEvent.getCurrJobCount()); - mapState.put("accJobCount", mapState.get("currJobCount") + lastedDeviceState.getJobTotal()); - mapState.put("accJobCountDuration", mapState.get("currJobDuration") + lastedDeviceState.getJobDurationTotal()); - nextPwrStatListState.update(ListUtil.toList(1)); - workingJobMapState.clear(); - // 记录这次的关机消息,用于判断下次开机 - lastOffEventState.update(receivedEvent); - - Long value = lastBootTimeState.value(); - if(value == null) { - // 如果在本次开机的时候设置了开机时间,那么上次开机时间就是在当前关机的时候进行设置 - lastBootTimeState.update(value); - } - - DeviceMonitoringData data = new DeviceMonitoringData(); - data.setDataSource(receivedEvent.getDataSource()); - data.setMachineIotMac(receivedEvent.getMachineIotMac()); - data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); - data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); - data.setAccJobCount(mapState.get("accJobCount")); - data.setCurrJobCount(mapState.get("currJobCount")); - data.setCurrJobDuration(mapState.get("currJobDuration")); - data.setAccJobCountDuration(mapState.get("currJobDuration")); - data.setReportTime(reportTime); - data.setLastBootTime(lastBootTime); - out.collect(data); - }else { - - if(lastOffEventState.value() != null) { - lastBootTimeState.update(reportTime * 1000); - lastOffEventState.clear(); - } - - if(machineWorkingStat.equals(1)) { - // 工作中,只计算工作时长 - if(isLastWaitJob) { - // 如果前面的消息是待机消息 - startTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(waitJobEvent.getReportTime() * 1000), - ZoneId.systemDefault()); - // 状态重置 - waitJobEventState.update(null); - isLastWaitJobState.update(false); - } - LocalDateTime localDateTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), - ZoneId.systemDefault()); - long l = Duration.between(startTime, localDateTime).get(SECONDS); - workingJobMapState.put("currJobDuration", l + workingJobMapState.get("currJobDuration")); - DeviceMonitoringData data = new DeviceMonitoringData(); - data.setDataSource(receivedEvent.getDataSource()); - data.setMachineIotMac(receivedEvent.getMachineIotMac()); - data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); - data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); - data.setAccJobCount(mapState.get("accJobCount")); - data.setCurrJobCount(mapState.get("currJobCount")); - data.setCurrJobDuration(mapState.get("currJobDuration")); - data.setAccJobCountDuration(mapState.get("currJobDuration")); - data.setReportTime(reportTime); - data.setLastBootTime(lastBootTime); - out.collect(data); - } - if(machineWorkingStat.equals(2)) { - // 待机中 - isLastWaitJobState.update(true); - waitJobEventState.update(receivedEvent); - - if(!isLastWaitJob) { - DeviceMonitoringData data = new DeviceMonitoringData(); - data.setDataSource(receivedEvent.getDataSource()); - data.setMachineIotMac(receivedEvent.getMachineIotMac()); - data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); - data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); - data.setAccJobCount(mapState.get("accJobCount")); - data.setCurrJobCount(mapState.get("currJobCount")); - data.setCurrJobDuration(mapState.get("currJobDuration")); - data.setAccJobCountDuration(mapState.get("currJobDuration")); - data.setReportTime(reportTime); - data.setLastBootTime(lastBootTime); - out.collect(data); - } - } - } + if(lastOnData == null) { + data.setLastBootTime(reportTime * 1000); }else { - onEventState.update(receivedEvent); - }*/ + data.setLastBootTime(lastOnData.getReportTime() * 1000); + } + out.collect(data); } } - private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws IOException { + private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception { - DeviceTotalData value = deviceTotalData.value(); + DeviceTotalData value = deviceTotalDataStat.value(); Long reportTime = event.getReportTime(); LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); if (value == null) { @@ -395,16 +251,16 @@ public class GizWitsIotMonitoringDataJob { data.setLastBootTime(ldt); } else { // es中也没有,从“machine_iot_data_received_event”索引中拿 - queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000, data); + data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000); } - deviceTotalData.update(data); + deviceTotalDataStat.update(data); } // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 if (!value.getCurrLocalDate().isEqual(localDate)) { // 先从es中拿昨天最新的 DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond()); - if(deviceMonitoringData != null) { + if (deviceMonitoringData != null) { DeviceTotalData data = new DeviceTotalData(); data.setJobTotal(deviceMonitoringData.getAccJobCount()); data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); @@ -412,9 +268,9 @@ public class GizWitsIotMonitoringDataJob { data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); data.setCurrLocalDate(localDate); data.setLastBootTime(LocalDateTime.ofInstant(Instant - .ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault())); - deviceTotalData.update(data); - }else { + .ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault())); + deviceTotalDataStat.update(data); + } else { // value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可 value.setTheDayJobDuration(0L); value.setTheDayJobCount(0L); @@ -423,11 +279,11 @@ public class GizWitsIotMonitoringDataJob { return null; } - private Tuple2 queryDeviceMonitoringData(Long machineIotMac, - Long reportTime, - DeviceTotalData data) throws IOException { + private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, + Long reportTime) throws Exception { - LocalDateTime startTime = LocalDateTime.of(new Date(reportTime * 1000).toLocalDate(), LocalTime.MIN); + DeviceTotalData deviceTotalData; + /*LocalDateTime startTime = LocalDateTime.of(new Date(reportTime * 1000).toLocalDate(), LocalTime.MIN); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime") @@ -439,14 +295,22 @@ public class GizWitsIotMonitoringDataJob { EsRestClientUtil.queryDeviceListPageResult(searchSourceBuilder, receivedEventList::add, MachineIotDataReceivedEvent.class, getIndicesList()); // 一天的作业统计 - List> currJobStatistics = statistics(receivedEventList); - if(CollUtil.isNotEmpty(currJobStatistics)) { - // - } - - - - return null; + deviceTotalData = statistics(receivedEventList, reportTime); + if(deviceTotalData == null) { + deviceTotalData = new DeviceTotalData(); + deviceTotalData.setJobTotal(0L); + deviceTotalData.setJobDurationTotal(0L); + deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault())); + deviceTotalData.setTheDayJobDuration(0L); + deviceTotalData.setTheDayJobCount(0L); + }*/ + deviceTotalData = new DeviceTotalData(); + deviceTotalData.setJobTotal(0L); + deviceTotalData.setJobDurationTotal(0L); + deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault())); + deviceTotalData.setTheDayJobDuration(0L); + deviceTotalData.setTheDayJobCount(0L); + return deviceTotalData; } /** @@ -454,79 +318,99 @@ public class GizWitsIotMonitoringDataJob { * @param receivedEventList * @return 时长,数量 */ - private List> statistics(List receivedEventList) { - - List> mapList = new ArrayList<>(); - MachineIotDataReceivedEvent startEvent = null; - List nextPwrStatList = ListUtil.toList(0, 1); - ArrayList nextWorkingStatList = ListUtil.toList(0, 1, 2); - Map workingJobMap = new HashMap<>(2); - Map map = new HashMap<>(2); - MachineIotDataReceivedEvent waitJobEvent = null; - // 一个工作周期期间是否待机过 - boolean isHadWaitJob = false; - for (int i = 0; i < receivedEventList.size(); i++) { - - MachineIotDataReceivedEvent receivedEvent = receivedEventList.get(i); - Integer machinePwrStat = receivedEvent.getMachinePwrStat(); - Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); - Long reportTime = receivedEvent.getReportTime(); - map.clear(); - if(nextPwrStatList.contains(machinePwrStat) && nextWorkingStatList.contains(machineWorkingStat)) { - - LocalDateTime startTime; - if(startEvent == null) { - LocalDateTime localDateTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()); - startTime = LocalDateTime.of(localDateTime.toLocalDate(), LocalTime.MIN); - }else { - startTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(startEvent.getReportTime() * 1000), - ZoneId.systemDefault()); + private DeviceTotalData statistics(List receivedEventList, Long time) throws Exception{ + + if(CollUtil.isNotEmpty(receivedEventList)) { + + // 一个周期中的开机数据 + DeviceTotalData onData = new DeviceTotalData(); + onData.setTheDayJobCount(0L); + onData.setJobTotal(0L); + onData.setCurrLocalDate(new Date(time * 1000).toLocalDate()); + onData.setTheDayJobDuration(0L); + onData.setJobDurationTotal(0L); + onData.setLastBootTime(LocalDateTime.ofEpochSecond(time, 0, ZoneOffset.ofHours(8))); + + // 上次的开机数据 + MachineIotDataReceivedEvent lastOnData = null; + + // 当前周期的待机数据 + MachineIotDataReceivedEvent lastWaitJobData = null; + + // 上次的状态 + int lastWorkingStat = 0; + + // 最新的数据 + DeviceTotalData lastedDeviceState = onData; + + for (MachineIotDataReceivedEvent receivedEvent : receivedEventList) { + + Integer machinePwrStat = receivedEvent.getMachinePwrStat(); + Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); + lastWorkingStatState.update(machineWorkingStat); + Long reportTime = receivedEvent.getReportTime(); + if(lastOnData == null) { + lastOnData = receivedEvent; } - if(machinePwrStat.equals(0)) { - LocalDateTime localDateTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), - ZoneId.systemDefault()); - long l = Duration.between(startTime, localDateTime).get(SECONDS); - if(isHadWaitJob) { - map.put("currJobDuration", workingJobMap.get("currJobDuration")); - }else { - map.put("currJobDuration", l); - } - map.put("currJobCount", receivedEvent.getCurrJobCount()); - nextPwrStatList = ListUtil.toList(1); - workingJobMap.clear(); - mapList.add(map); - }else { - if(machineWorkingStat.equals(1)) { - // 工作中,只计算工作时长 - if(isHadWaitJob) { - // 如果前面的消息是待机消息 - startTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(waitJobEvent.getReportTime() * 1000), + LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); + Long a; + if (machinePwrStat.equals(0)) { + lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); + lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); + lastedDeviceState.setCurrLocalDate(localDate); + lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault())); + deviceTotalDataStat.update(lastedDeviceState); + // 如果关机 + onData = lastedDeviceState; + // 关机后将待机数据清除 + lastWaitJobData = null; + } else { + + if (machineWorkingStat.equals(1)) { + // 工作中 + lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setCurrLocalDate(localDate); + lastedDeviceState.setLastBootTime(onData.getLastBootTime()); + if (lastWaitJobData != null) { + LocalDateTime localDateTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()); - // 状态重置 - waitJobEvent = null; - isHadWaitJob = false; + LocalDateTime lastWaitJobTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(reportTime * 1000), + ZoneId.systemDefault()); + a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a); + lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a); + } else { + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); + lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); } - LocalDateTime localDateTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), - ZoneId.systemDefault()); - long l = Duration.between(startTime, localDateTime).get(SECONDS); - workingJobMap.put("currJobDuration", l + workingJobMap.get("currJobDuration")); } - if(machineWorkingStat.equals(2)) { - // 待机中 - isHadWaitJob = true; - waitJobEvent = receivedEvent; + if (machineWorkingStat.equals(2)) { + // 待机 + lastWaitJobData = receivedEvent; + lastWorkingStat = 1; } } - }else { - startEvent = receivedEvent; + if(lastWorkingStat != 1) { + DeviceMonitoringData data = new DeviceMonitoringData(); + data.setDataSource(receivedEvent.getDataSource()); + data.setMachineIotMac(receivedEvent.getMachineIotMac()); + data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); + data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); + data.setAccJobCount(lastedDeviceState.getJobTotal()); + data.setCurrJobCount(lastedDeviceState.getTheDayJobCount()); + data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration()); + data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal()); + data.setReportTime(reportTime); + data.setLastBootTime(lastOnData.getReportTime() * 1000); + } } } - return mapList; + return null; } private String[] getIndicesList() throws IOException { @@ -550,7 +434,7 @@ public class GizWitsIotMonitoringDataJob { // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); - if(reportTime != null) { + if (reportTime != null) { searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime").lt(reportTime)); } searchSourceBuilder.sort("reportTime", SortOrder.DESC); diff --git a/src/main/java/com/qniao/iot/gizwits/TestJob.java b/src/main/java/com/qniao/iot/gizwits/TestJob.java new file mode 100644 index 0000000..da5d53a --- /dev/null +++ b/src/main/java/com/qniao/iot/gizwits/TestJob.java @@ -0,0 +1,214 @@ +/* +package com.qniao.iot.gizwits; + +import com.qniao.iot.gizwits.config.ApolloConfig; +import com.qniao.iot.gizwits.constant.ConfigConstant; +import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; + +import java.sql.Date; +import java.time.*; + +import static java.time.temporal.ChronoUnit.SECONDS; + +@Slf4j +public class TestJob { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); + // 获取设备数据源 + KafkaSource source = KafkaSource.builder() + .setBootstrapServers("120.25.199.30:19092") + .setTopics("machine_iot_data_received_event") + .setGroupId("123") + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) + .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") + .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) + .build(); + + // 设备数据源转换 + SingleOutputStreamOperator streamOperator = env + .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source") + .filter((FilterFunction) value -> value.getMachinePwrStat() != null && value.getMachineIotMac() != null + && value.getDataSource() != null && value.getMachineWorkingStat() != null); + + + streamOperator.print("输出源数据"); + + // mac分组并进行工作时长的集合操作 + DataStream machineIotDataReceivedEventDataStream = streamOperator + .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) + .process(new KeyedProcessFunction() { + + // 最新的设备数据 + private ValueState deviceTotalData; + + // 开机数据 + private ValueState onDataState; + + // 上次的关机数据 + private ValueState lastOffDataState; + + // 上次的开机数据 + private ValueState lastOnDataState; + + // 当前周期的待机数据 + private ValueState lastWaitJobDataState; + + // 上次的状态 + private ValueState lastWorkingStatState; + + @Override + public void open(Configuration parameters) { + + // 必须在 open 生命周期初始化 + deviceTotalData = getRuntimeContext() + .getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class))); + + onDataState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class))); + + lastOffDataState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("lastOffData", TypeInformation.of(MachineIotDataReceivedEvent.class))); + + lastOnDataState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("lastOnData", TypeInformation.of(MachineIotDataReceivedEvent.class))); + + lastWaitJobDataState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("lastWaitJobData", TypeInformation.of(MachineIotDataReceivedEvent.class))); + + lastWorkingStatState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class))); + } + + @Override + public void processElement(MachineIotDataReceivedEvent receivedEvent, + KeyedProcessFunction.Context ctx, + Collector out) throws Exception { + + + DeviceTotalData onData = onDataState.value(); + MachineIotDataReceivedEvent lastOffData = lastOffDataState.value(); + MachineIotDataReceivedEvent lastOnData = lastOnDataState.value(); + MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value(); + Integer lastWorkingStat = lastWorkingStatState.value(); + DeviceTotalData lastedDeviceState = deviceTotalData.value(); + Integer machinePwrStat = receivedEvent.getMachinePwrStat(); + Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); + lastWorkingStatState.update(machineWorkingStat); + Long reportTime = receivedEvent.getReportTime(); + if(lastedDeviceState == null) { + lastedDeviceState = new DeviceTotalData(); + lastedDeviceState.setJobTotal(0L); + lastedDeviceState.setJobDurationTotal(0L); + lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault())); + lastedDeviceState.setTheDayJobDuration(0L); + lastedDeviceState.setTheDayJobCount(0L); + lastOnData = receivedEvent; + } + if(lastWorkingStat == null) { + lastWorkingStat = 0; + } + + if (onData == null) { + onData = lastedDeviceState; + onDataState.update(onData); + } + LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); + Long a; + if (machinePwrStat.equals(0)) { + lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); + lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); + lastedDeviceState.setCurrLocalDate(localDate); + if (lastOnData != null) { + lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault())); + } else { + lastedDeviceState.setLastBootTime(onData.getLastBootTime()); + } + deviceTotalData.update(lastedDeviceState); + // 如果关机 + onDataState.update(lastedDeviceState); + lastOffDataState.update(receivedEvent); + // 关机后将待机数据清除 + lastWaitJobDataState.update(null); + } else { + if (lastOffData != null) { + lastOnData = receivedEvent; + } + if (machineWorkingStat.equals(1)) { + // 工作中 + lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setCurrLocalDate(localDate); + lastedDeviceState.setLastBootTime(onData.getLastBootTime()); + if (lastWaitJobData != null) { + LocalDateTime localDateTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(reportTime * 1000), + ZoneId.systemDefault()); + LocalDateTime lastWaitJobTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(reportTime * 1000), + ZoneId.systemDefault()); + a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a); + lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a); + } else { + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); + lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); + } + deviceTotalData.update(lastedDeviceState); + } + if (machineWorkingStat.equals(2)) { + // 待机 + lastWaitJobDataState.update(receivedEvent); + } + } + if(lastWorkingStat != 1) { + DeviceMonitoringData data = new DeviceMonitoringData(); + data.setDataSource(receivedEvent.getDataSource()); + data.setMachineIotMac(receivedEvent.getMachineIotMac()); + data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); + data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); + data.setAccJobCount(lastedDeviceState.getJobTotal()); + data.setCurrJobCount(lastedDeviceState.getTheDayJobCount()); + data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration()); + data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal()); + data.setReportTime(reportTime); + if(lastOnData == null) { + data.setLastBootTime(reportTime * 1000); + }else { + data.setLastBootTime(lastOnData.getReportTime() * 1000); + } + out.collect(data); + } + } + }).name("machineIotDataReceivedEventDataStream keyBy stream"); + + + machineIotDataReceivedEventDataStream.print("输出清洗数据"); + + env.execute(); + } +} +*/