diff --git a/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java index 8801708..387376f 100644 --- a/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java @@ -1,7 +1,7 @@ package com.qniao.iot.gizwits; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.util.ArrayUtil; import cn.hutool.json.JSONUtil; import com.qniao.iot.gizwits.config.ApolloConfig; @@ -9,14 +9,12 @@ import com.qniao.iot.gizwits.constant.ConfigConstant; import com.qniao.iot.gizwits.utils.EsRestClientUtil; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; @@ -41,7 +39,6 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.cluster.metadata.AliasMetaData; -import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -52,9 +49,7 @@ import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.sql.Date; import java.time.*; -import java.time.temporal.TemporalUnit; import java.util.*; -import java.util.function.Consumer; import static java.time.temporal.ChronoUnit.SECONDS; @@ -98,22 +93,285 @@ public class GizWitsIotMonitoringDataJob { .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { + // 最新的设备数据 private ValueState deviceTotalData; + // 开机消息 + private ValueState onEventState; + + // 下一个可能的电源状态 + private ListState nextPwrStatListState; + + // 下一个可能的工作状态 + private ListState nextWorkingStatListState; + + // 用于工作状态消息的容器 + private MapState workingJobMapState; + + // 临时容器 + private MapState mapState; + + // 上一次的待机消息 + private ValueState waitJobEventState; + + // 上一个消息是否是待机状态 + private ValueState isLastWaitJobState; + + // 上次开机时间 + private ValueState lastBootTimeState; + + // 机器上次状态 + private ValueState lastOffEventState; + @Override public void open(Configuration parameters) { // 必须在 open 生命周期初始化 deviceTotalData = getRuntimeContext() .getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class))); + + onEventState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("startEvent", TypeInformation.of(MachineIotDataReceivedEvent.class))); + + nextPwrStatListState = getRuntimeContext() + .getListState(new ListStateDescriptor<>("nextPwrStatList", TypeInformation.of(Integer.class))); + + nextWorkingStatListState = getRuntimeContext() + .getListState(new ListStateDescriptor<>("nextWorkingStatList", TypeInformation.of(Integer.class))); + + workingJobMapState = getRuntimeContext() + .getMapState(new MapStateDescriptor<>("workingJobMap", Types.STRING, Types.LONG)); + + mapState = getRuntimeContext() + .getMapState(new MapStateDescriptor<>("map", Types.STRING, Types.LONG)); + + waitJobEventState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("waitJobEvent", TypeInformation.of(MachineIotDataReceivedEvent.class))); + + isLastWaitJobState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("isLastWaitJob", TypeInformation.of(Boolean.class))); + + lastBootTimeState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("lastBootTime", TypeInformation.of(Long.class))); + + lastOffEventState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("lastOffEvent", TypeInformation.of(MachineIotDataReceivedEvent.class))); } + + // 开机数据 + private DeviceTotalData onData; + + // 上次的关机数据 + private MachineIotDataReceivedEvent lastOffData; + + // 上次的开机数据 + private MachineIotDataReceivedEvent lastOnData; + + // 上次待机的数据 + private MachineIotDataReceivedEvent lastWaitJobData; + @Override - public void processElement(MachineIotDataReceivedEvent event, + public void processElement(MachineIotDataReceivedEvent receivedEvent, KeyedProcessFunction.Context ctx, Collector out) throws Exception { - DeviceTotalData lastedDeviceState = getDeviceTotalData(event); + /* 勿删 + MachineIotDataReceivedEvent onEvent = onEventState.value(); + List nextPwrStatList = CollUtil.list(false, nextPwrStatListState.get()); + List nextWorkingStatList = CollUtil.list(false, nextWorkingStatListState.get()); + MachineIotDataReceivedEvent waitJobEvent = waitJobEventState.value(); + Boolean isLastWaitJob = isLastWaitJobState.value(); + Long lastBootTime = lastBootTimeState.value();*/ + + DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent); + + if(lastedDeviceState != null) { + Integer machinePwrStat = receivedEvent.getMachinePwrStat(); + Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); + Long reportTime = receivedEvent.getReportTime(); + // 勿删 + // mapState.clear(); + + + if(onData == null) { + onData = lastedDeviceState; + } + LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); + + Long a; + if(machinePwrStat.equals(0)) { + lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); + lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); + lastedDeviceState.setCurrLocalDate(localDate); + if(lastOnData != null) { + lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault())); + }else { + lastedDeviceState.setLastBootTime(onData.getLastBootTime()); + } + deviceTotalData.update(lastedDeviceState); + // 如果关机 + onData = lastedDeviceState; + lastOffData = receivedEvent; + // 关机后将待机数据清除 + lastWaitJobData = null; + }else { + if(lastOffData != null) { + lastOnData = receivedEvent; + } + if(machineWorkingStat.equals(1)) { + // 工作中 + lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + lastedDeviceState.setCurrLocalDate(localDate); + lastedDeviceState.setLastBootTime(onData.getLastBootTime()); + if(lastWaitJobData != null) { + LocalDateTime localDateTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(reportTime * 1000), + ZoneId.systemDefault()); + LocalDateTime lastWaitJobTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(reportTime * 1000), + ZoneId.systemDefault()); + a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a); + lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a); + }else { + lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); + lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); + } + deviceTotalData.update(lastedDeviceState); + } + if(machineWorkingStat.equals(2)) { + // 待机 + lastWaitJobData = receivedEvent; + } + } + DeviceMonitoringData data = new DeviceMonitoringData(); + data.setDataSource(receivedEvent.getDataSource()); + data.setMachineIotMac(receivedEvent.getMachineIotMac()); + data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); + data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); + data.setAccJobCount(lastedDeviceState.getJobTotal()); + data.setCurrJobCount(lastedDeviceState.getTheDayJobCount()); + data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration()); + data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal()); + data.setReportTime(reportTime); + data.setLastBootTime(lastOnData.getReportTime() * 1000); + out.collect(data); + + + /* 备份,勿删 + if(nextPwrStatList.contains(machinePwrStat) && nextWorkingStatList.contains(machineWorkingStat)) { + + LocalDateTime startTime; + if(onEventState == null) { + LocalDateTime localDateTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()); + startTime = LocalDateTime.of(localDateTime.toLocalDate(), LocalTime.MIN); + }else { + startTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(onEvent.getReportTime() * 1000), + ZoneId.systemDefault()); + } + if(machinePwrStat.equals(0)) { + LocalDateTime localDateTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(reportTime * 1000), + ZoneId.systemDefault()); + long l = Duration.between(startTime, localDateTime).get(SECONDS); + if(isLastWaitJob) { + mapState.put("currJobDuration", workingJobMapState.get("currJobDuration")); + }else { + mapState.put("currJobDuration", l); + } + mapState.put("currJobCount", receivedEvent.getCurrJobCount()); + mapState.put("accJobCount", mapState.get("currJobCount") + lastedDeviceState.getJobTotal()); + mapState.put("accJobCountDuration", mapState.get("currJobDuration") + lastedDeviceState.getJobDurationTotal()); + nextPwrStatListState.update(ListUtil.toList(1)); + workingJobMapState.clear(); + // 记录这次的关机消息,用于判断下次开机 + lastOffEventState.update(receivedEvent); + + Long value = lastBootTimeState.value(); + if(value == null) { + // 如果在本次开机的时候设置了开机时间,那么上次开机时间就是在当前关机的时候进行设置 + lastBootTimeState.update(value); + } + + DeviceMonitoringData data = new DeviceMonitoringData(); + data.setDataSource(receivedEvent.getDataSource()); + data.setMachineIotMac(receivedEvent.getMachineIotMac()); + data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); + data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); + data.setAccJobCount(mapState.get("accJobCount")); + data.setCurrJobCount(mapState.get("currJobCount")); + data.setCurrJobDuration(mapState.get("currJobDuration")); + data.setAccJobCountDuration(mapState.get("currJobDuration")); + data.setReportTime(reportTime); + data.setLastBootTime(lastBootTime); + out.collect(data); + }else { + + if(lastOffEventState.value() != null) { + lastBootTimeState.update(reportTime * 1000); + lastOffEventState.clear(); + } + + if(machineWorkingStat.equals(1)) { + // 工作中,只计算工作时长 + if(isLastWaitJob) { + // 如果前面的消息是待机消息 + startTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(waitJobEvent.getReportTime() * 1000), + ZoneId.systemDefault()); + // 状态重置 + waitJobEventState.update(null); + isLastWaitJobState.update(false); + } + LocalDateTime localDateTime = LocalDateTime + .ofInstant(Instant.ofEpochMilli(reportTime * 1000), + ZoneId.systemDefault()); + long l = Duration.between(startTime, localDateTime).get(SECONDS); + workingJobMapState.put("currJobDuration", l + workingJobMapState.get("currJobDuration")); + DeviceMonitoringData data = new DeviceMonitoringData(); + data.setDataSource(receivedEvent.getDataSource()); + data.setMachineIotMac(receivedEvent.getMachineIotMac()); + data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); + data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); + data.setAccJobCount(mapState.get("accJobCount")); + data.setCurrJobCount(mapState.get("currJobCount")); + data.setCurrJobDuration(mapState.get("currJobDuration")); + data.setAccJobCountDuration(mapState.get("currJobDuration")); + data.setReportTime(reportTime); + data.setLastBootTime(lastBootTime); + out.collect(data); + } + if(machineWorkingStat.equals(2)) { + // 待机中 + isLastWaitJobState.update(true); + waitJobEventState.update(receivedEvent); + + if(!isLastWaitJob) { + DeviceMonitoringData data = new DeviceMonitoringData(); + data.setDataSource(receivedEvent.getDataSource()); + data.setMachineIotMac(receivedEvent.getMachineIotMac()); + data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); + data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); + data.setAccJobCount(mapState.get("accJobCount")); + data.setCurrJobCount(mapState.get("currJobCount")); + data.setCurrJobDuration(mapState.get("currJobDuration")); + data.setAccJobCountDuration(mapState.get("currJobDuration")); + data.setReportTime(reportTime); + data.setLastBootTime(lastBootTime); + out.collect(data); + } + } + } + }else { + onEventState.update(receivedEvent); + }*/ + } } private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws IOException { @@ -123,7 +381,7 @@ public class GizWitsIotMonitoringDataJob { LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); if (value == null) { // 从es中获取 - DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac()); + DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), null); DeviceTotalData data = new DeviceTotalData(); if (deviceMonitoringData != null) { data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); @@ -137,34 +395,57 @@ public class GizWitsIotMonitoringDataJob { data.setLastBootTime(ldt); } else { // es中也没有,从“machine_iot_data_received_event”索引中拿 - queryDeviceMonitoringData(event.getMachineIotMac(), localDate, value); + queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000, data); } deviceTotalData.update(data); } // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 if (!value.getCurrLocalDate().isEqual(localDate)) { - queryDeviceMonitoringData(event.getMachineIotMac(), localDate, value); + // 先从es中拿昨天最新的 + DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), + LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond()); + if(deviceMonitoringData != null) { + DeviceTotalData data = new DeviceTotalData(); + data.setJobTotal(deviceMonitoringData.getAccJobCount()); + data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); + data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); + data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); + data.setCurrLocalDate(localDate); + data.setLastBootTime(LocalDateTime.ofInstant(Instant + .ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault())); + deviceTotalData.update(data); + }else { + // value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可 + value.setTheDayJobDuration(0L); + value.setTheDayJobCount(0L); + } } return null; } private Tuple2 queryDeviceMonitoringData(Long machineIotMac, - LocalDate localDate, - DeviceTotalData value) throws IOException { + Long reportTime, + DeviceTotalData data) throws IOException { - LocalDateTime startTime = LocalDateTime.of(localDate, LocalTime.MIN); - LocalDateTime endTime = LocalDateTime.of(localDate, LocalTime.MAX); + LocalDateTime startTime = LocalDateTime.of(new Date(reportTime * 1000).toLocalDate(), LocalTime.MIN); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); - /*searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime") + searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime") .gte(startTime.atZone(ZoneOffset.of("+8")).toEpochSecond()) - .lte(endTime.atZone(ZoneOffset.of("+8")).toEpochSecond()));*/ + .lte(reportTime)); searchSourceBuilder.sort("reportTime"); searchSourceBuilder.size(500); List receivedEventList = new ArrayList<>(); EsRestClientUtil.queryDeviceListPageResult(searchSourceBuilder, receivedEventList::add, MachineIotDataReceivedEvent.class, getIndicesList()); - //List> tuple3List = statistics(receivedEventList); + // 一天的作业统计 + List> currJobStatistics = statistics(receivedEventList); + if(CollUtil.isNotEmpty(currJobStatistics)) { + // + } + + + return null; } @@ -214,7 +495,6 @@ public class GizWitsIotMonitoringDataJob { map.put("currJobDuration", l); } map.put("currJobCount", receivedEvent.getCurrJobCount()); - mapList.add(map); nextPwrStatList = ListUtil.toList(1); workingJobMap.clear(); mapList.add(map); @@ -264,12 +544,15 @@ public class GizWitsIotMonitoringDataJob { return ArrayUtil.toArray(indicesList, String.class); } - private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac) { + private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac, Long reportTime) { try { // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); + if(reportTime != null) { + searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime").lt(reportTime)); + } searchSourceBuilder.sort("reportTime", SortOrder.DESC); searchSourceBuilder.size(1); // 创建查询请求对象,将查询对象配置到其中