Browse Source

更新

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
parent
commit
64d7d9caa0
1 changed files with 305 additions and 22 deletions
  1. 327
      src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java

327
src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java

@ -1,7 +1,7 @@
package com.qniao.iot.gizwits;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.gizwits.config.ApolloConfig;
@ -9,14 +9,12 @@ import com.qniao.iot.gizwits.constant.ConfigConstant;
import com.qniao.iot.gizwits.utils.EsRestClientUtil;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
@ -41,7 +39,6 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
@ -52,9 +49,7 @@ import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.sql.Date;
import java.time.*;
import java.time.temporal.TemporalUnit;
import java.util.*;
import java.util.function.Consumer;
import static java.time.temporal.ChronoUnit.SECONDS;
@ -98,22 +93,285 @@ public class GizWitsIotMonitoringDataJob {
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>() {
// 最新的设备数据
private ValueState<DeviceTotalData> deviceTotalData;
// 开机消息
private ValueState<MachineIotDataReceivedEvent> onEventState;
// 下一个可能的电源状态
private ListState<Integer> nextPwrStatListState;
// 下一个可能的工作状态
private ListState<Integer> nextWorkingStatListState;
// 用于工作状态消息的容器
private MapState<String, Long> workingJobMapState;
// 临时容器
private MapState<String, Long> mapState;
// 上一次的待机消息
private ValueState<MachineIotDataReceivedEvent> waitJobEventState;
// 上一个消息是否是待机状态
private ValueState<Boolean> isLastWaitJobState;
// 上次开机时间
private ValueState<Long> lastBootTimeState;
// 机器上次状态
private ValueState<MachineIotDataReceivedEvent> lastOffEventState;
@Override
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
deviceTotalData = getRuntimeContext()
.getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class)));
onEventState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("startEvent", TypeInformation.of(MachineIotDataReceivedEvent.class)));
nextPwrStatListState = getRuntimeContext()
.getListState(new ListStateDescriptor<>("nextPwrStatList", TypeInformation.of(Integer.class)));
nextWorkingStatListState = getRuntimeContext()
.getListState(new ListStateDescriptor<>("nextWorkingStatList", TypeInformation.of(Integer.class)));
workingJobMapState = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("workingJobMap", Types.STRING, Types.LONG));
mapState = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("map", Types.STRING, Types.LONG));
waitJobEventState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("waitJobEvent", TypeInformation.of(MachineIotDataReceivedEvent.class)));
isLastWaitJobState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("isLastWaitJob", TypeInformation.of(Boolean.class)));
lastBootTimeState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastBootTime", TypeInformation.of(Long.class)));
lastOffEventState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastOffEvent", TypeInformation.of(MachineIotDataReceivedEvent.class)));
}
// 开机数据
private DeviceTotalData onData;
// 上次的关机数据
private MachineIotDataReceivedEvent lastOffData;
// 上次的开机数据
private MachineIotDataReceivedEvent lastOnData;
// 上次待机的数据
private MachineIotDataReceivedEvent lastWaitJobData;
@Override
public void processElement(MachineIotDataReceivedEvent event,
public void processElement(MachineIotDataReceivedEvent receivedEvent,
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>.Context ctx,
Collector<DeviceMonitoringData> out) throws Exception {
DeviceTotalData lastedDeviceState = getDeviceTotalData(event);
/* 勿删
MachineIotDataReceivedEvent onEvent = onEventState.value();
List<Integer> nextPwrStatList = CollUtil.list(false, nextPwrStatListState.get());
List<Integer> nextWorkingStatList = CollUtil.list(false, nextWorkingStatListState.get());
MachineIotDataReceivedEvent waitJobEvent = waitJobEventState.value();
Boolean isLastWaitJob = isLastWaitJobState.value();
Long lastBootTime = lastBootTimeState.value();*/
DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent);
if(lastedDeviceState != null) {
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
Long reportTime = receivedEvent.getReportTime();
// 勿删
// mapState.clear();
if(onData == null) {
onData = lastedDeviceState;
}
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
Long a;
if(machinePwrStat.equals(0)) {
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setCurrLocalDate(localDate);
if(lastOnData != null) {
lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault()));
}else {
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
}
deviceTotalData.update(lastedDeviceState);
// 如果关机
onData = lastedDeviceState;
lastOffData = receivedEvent;
// 关机后将待机数据清除
lastWaitJobData = null;
}else {
if(lastOffData != null) {
lastOnData = receivedEvent;
}
if(machineWorkingStat.equals(1)) {
// 工作中
lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
lastedDeviceState.setCurrLocalDate(localDate);
lastedDeviceState.setLastBootTime(onData.getLastBootTime());
if(lastWaitJobData != null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
LocalDateTime lastWaitJobTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a);
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a);
}else {
lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
}
deviceTotalData.update(lastedDeviceState);
}
if(machineWorkingStat.equals(2)) {
// 待机
lastWaitJobData = receivedEvent;
}
}
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(lastedDeviceState.getJobTotal());
data.setCurrJobCount(lastedDeviceState.getTheDayJobCount());
data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setLastBootTime(lastOnData.getReportTime() * 1000);
out.collect(data);
/* 备份勿删
if(nextPwrStatList.contains(machinePwrStat) && nextWorkingStatList.contains(machineWorkingStat)) {
LocalDateTime startTime;
if(onEventState == null) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault());
startTime = LocalDateTime.of(localDateTime.toLocalDate(), LocalTime.MIN);
}else {
startTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(onEvent.getReportTime() * 1000),
ZoneId.systemDefault());
}
if(machinePwrStat.equals(0)) {
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
long l = Duration.between(startTime, localDateTime).get(SECONDS);
if(isLastWaitJob) {
mapState.put("currJobDuration", workingJobMapState.get("currJobDuration"));
}else {
mapState.put("currJobDuration", l);
}
mapState.put("currJobCount", receivedEvent.getCurrJobCount());
mapState.put("accJobCount", mapState.get("currJobCount") + lastedDeviceState.getJobTotal());
mapState.put("accJobCountDuration", mapState.get("currJobDuration") + lastedDeviceState.getJobDurationTotal());
nextPwrStatListState.update(ListUtil.toList(1));
workingJobMapState.clear();
// 记录这次的关机消息用于判断下次开机
lastOffEventState.update(receivedEvent);
Long value = lastBootTimeState.value();
if(value == null) {
// 如果在本次开机的时候设置了开机时间那么上次开机时间就是在当前关机的时候进行设置
lastBootTimeState.update(value);
}
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(mapState.get("accJobCount"));
data.setCurrJobCount(mapState.get("currJobCount"));
data.setCurrJobDuration(mapState.get("currJobDuration"));
data.setAccJobCountDuration(mapState.get("currJobDuration"));
data.setReportTime(reportTime);
data.setLastBootTime(lastBootTime);
out.collect(data);
}else {
if(lastOffEventState.value() != null) {
lastBootTimeState.update(reportTime * 1000);
lastOffEventState.clear();
}
if(machineWorkingStat.equals(1)) {
// 工作中只计算工作时长
if(isLastWaitJob) {
// 如果前面的消息是待机消息
startTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(waitJobEvent.getReportTime() * 1000),
ZoneId.systemDefault());
// 状态重置
waitJobEventState.update(null);
isLastWaitJobState.update(false);
}
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime * 1000),
ZoneId.systemDefault());
long l = Duration.between(startTime, localDateTime).get(SECONDS);
workingJobMapState.put("currJobDuration", l + workingJobMapState.get("currJobDuration"));
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(mapState.get("accJobCount"));
data.setCurrJobCount(mapState.get("currJobCount"));
data.setCurrJobDuration(mapState.get("currJobDuration"));
data.setAccJobCountDuration(mapState.get("currJobDuration"));
data.setReportTime(reportTime);
data.setLastBootTime(lastBootTime);
out.collect(data);
}
if(machineWorkingStat.equals(2)) {
// 待机中
isLastWaitJobState.update(true);
waitJobEventState.update(receivedEvent);
if(!isLastWaitJob) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(mapState.get("accJobCount"));
data.setCurrJobCount(mapState.get("currJobCount"));
data.setCurrJobDuration(mapState.get("currJobDuration"));
data.setAccJobCountDuration(mapState.get("currJobDuration"));
data.setReportTime(reportTime);
data.setLastBootTime(lastBootTime);
out.collect(data);
}
}
}
}else {
onEventState.update(receivedEvent);
}*/
}
}
private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws IOException {
@ -123,7 +381,7 @@ public class GizWitsIotMonitoringDataJob {
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
if (value == null) {
// 从es中获取
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac());
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), null);
DeviceTotalData data = new DeviceTotalData();
if (deviceMonitoringData != null) {
data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
@ -137,34 +395,57 @@ public class GizWitsIotMonitoringDataJob {
data.setLastBootTime(ldt);
} else {
// es中也没有machine_iot_data_received_event索引中拿
queryDeviceMonitoringData(event.getMachineIotMac(), localDate, value);
queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000, data);
}
deviceTotalData.update(data);
}
// 是否日期是当天的否则需要更新当天工作时长和当天工作量
if (!value.getCurrLocalDate().isEqual(localDate)) {
queryDeviceMonitoringData(event.getMachineIotMac(), localDate, value);
// 先从es中拿昨天最新的
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(),
LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond());
if(deviceMonitoringData != null) {
DeviceTotalData data = new DeviceTotalData();
data.setJobTotal(deviceMonitoringData.getAccJobCount());
data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
data.setCurrLocalDate(localDate);
data.setLastBootTime(LocalDateTime.ofInstant(Instant
.ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault()));
deviceTotalData.update(data);
}else {
// value有值但是日期不对说明到了第二天那么只需要对当天数据清零即可
value.setTheDayJobDuration(0L);
value.setTheDayJobCount(0L);
}
}
return null;
}
private Tuple2<Long, Long> queryDeviceMonitoringData(Long machineIotMac,
LocalDate localDate,
DeviceTotalData value) throws IOException {
Long reportTime,
DeviceTotalData data) throws IOException {
LocalDateTime startTime = LocalDateTime.of(localDate, LocalTime.MIN);
LocalDateTime endTime = LocalDateTime.of(localDate, LocalTime.MAX);
LocalDateTime startTime = LocalDateTime.of(new Date(reportTime * 1000).toLocalDate(), LocalTime.MIN);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
/*searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime")
searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime")
.gte(startTime.atZone(ZoneOffset.of("+8")).toEpochSecond())
.lte(endTime.atZone(ZoneOffset.of("+8")).toEpochSecond()));*/
.lte(reportTime));
searchSourceBuilder.sort("reportTime");
searchSourceBuilder.size(500);
List<MachineIotDataReceivedEvent> receivedEventList = new ArrayList<>();
EsRestClientUtil.queryDeviceListPageResult(searchSourceBuilder,
receivedEventList::add, MachineIotDataReceivedEvent.class, getIndicesList());
//List<Tuple2<Long, Long>> tuple3List = statistics(receivedEventList);
// 一天的作业统计
List<Map<String, Long>> currJobStatistics = statistics(receivedEventList);
if(CollUtil.isNotEmpty(currJobStatistics)) {
//
}
return null;
}
@ -214,7 +495,6 @@ public class GizWitsIotMonitoringDataJob {
map.put("currJobDuration", l);
}
map.put("currJobCount", receivedEvent.getCurrJobCount());
mapList.add(map);
nextPwrStatList = ListUtil.toList(1);
workingJobMap.clear();
mapList.add(map);
@ -264,12 +544,15 @@ public class GizWitsIotMonitoringDataJob {
return ArrayUtil.toArray(indicesList, String.class);
}
private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac) {
private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac, Long reportTime) {
try {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
if(reportTime != null) {
searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime").lt(reportTime));
}
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中

Loading…
Cancel
Save