diff --git a/pom.xml b/pom.xml index f5178cd..1ec550f 100644 --- a/pom.xml +++ b/pom.xml @@ -165,7 +165,7 @@ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> - com.qniao.iot.gizwits.GizWitsIotMonitoringDataJob + com.qniao.iot.gizwits.IotMonitoringDataJob diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java index 49ec54e..6186837 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java +++ b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java @@ -37,4 +37,9 @@ public class DeviceTotalData { * 当前日期 */ private LocalDate currLocalDate; + + /** + * 上次消息事件 + */ + private Long lastReportTime; } diff --git a/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java similarity index 78% rename from src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java rename to src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java index 692510a..f938cc2 100644 --- a/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java @@ -53,7 +53,7 @@ import java.util.*; import static java.time.temporal.ChronoUnit.SECONDS; @Slf4j -public class GizWitsIotMonitoringDataJob { +public class IotMonitoringDataJob { private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient .builder(new HttpHost("120.79.137.137", 9200, "http")) @@ -69,15 +69,15 @@ public class GizWitsIotMonitoringDataJob { return requestConfigBuilder; })); - public static void main(String[] args) { + public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); // 获取设备数据源 KafkaSource source = KafkaSource.builder() - .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) - .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) - .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) + .setBootstrapServers("120.25.199.30:19092") + .setTopics("machine_iot_data_received_event") + .setGroupId("123") .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) @@ -148,8 +148,12 @@ public class GizWitsIotMonitoringDataJob { Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); lastWorkingStatState.update(machineWorkingStat); Long reportTime = receivedEvent.getReportTime(); + Long accJobCount = receivedEvent.getAccJobCount(); + // 1树根 0机智云 + Integer dataSource = receivedEvent.getDataSource(); + // 当前数据 + DeviceTotalData nowDeviceState = getDeviceTotalData(receivedEvent); if (lastedDeviceState == null) { - lastedDeviceState = getDeviceTotalData(receivedEvent); lastOnData = receivedEvent; } if (lastWorkingStat == null) { @@ -161,24 +165,47 @@ public class GizWitsIotMonitoringDataJob { } LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); Long a; + if (machinePwrStat.equals(0)) { - assert lastedDeviceState != null; - lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); - lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); - lastedDeviceState.setCurrLocalDate(localDate); - if (lastOnData != null) { - lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault())); - } else { - lastedDeviceState.setLastBootTime(onData.getLastBootTime()); + if (lastWorkingStat == 1) { + // 如果上次是工作中,那就进行累加 + assert lastedDeviceState != null; + Long lastReportTime = lastedDeviceState.getLastReportTime(); + if (lastReportTime == null) { + // 如果上次的消息时间为空,那么不进行计算 + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount()); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration()); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal()); + } else { + // 直接通过两个消息的时间差进行计算 + Long workingDuration = reportTime - lastedDeviceState.getLastReportTime(); + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobDuration() + workingDuration); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); + } + if(dataSource == 1) { + // 树根 + nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + }else { + // 机智云 + Long workingJon = accJobCount - lastedDeviceState.getJobTotal(); + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); + nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); + } + nowDeviceState.setCurrLocalDate(localDate); + if (lastOnData != null) { + nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault())); + } else { + nowDeviceState.setLastBootTime(onData.getLastBootTime()); + } + deviceTotalDataStat.update(nowDeviceState); + // 如果关机 + onDataState.update(nowDeviceState); + lastOffDataState.update(receivedEvent); + // 关机后将待机数据清除 + lastWaitJobDataState.update(null); } - deviceTotalDataStat.update(lastedDeviceState); - // 如果关机 - onDataState.update(lastedDeviceState); - lastOffDataState.update(receivedEvent); - // 关机后将待机数据清除 - lastWaitJobDataState.update(null); } else { if (lastOffData != null) { lastOnData = receivedEvent; @@ -186,10 +213,13 @@ public class GizWitsIotMonitoringDataJob { assert lastedDeviceState != null; if (machineWorkingStat.equals(1)) { // 工作中 - lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setCurrLocalDate(localDate); - lastedDeviceState.setLastBootTime(onData.getLastBootTime()); + Long workingDuration = reportTime - lastedDeviceState.getLastReportTime(); + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobDuration() + workingDuration); + + //nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + nowDeviceState.setCurrLocalDate(localDate); + nowDeviceState.setLastBootTime(onData.getLastBootTime()); if (lastWaitJobData != null) { LocalDateTime localDateTime = LocalDateTime .ofInstant(Instant.ofEpochMilli(reportTime * 1000), @@ -198,35 +228,30 @@ public class GizWitsIotMonitoringDataJob { .ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()); a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a); - lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + a); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + a); } else { - if(receivedEvent.getDataSource() == 0) { - // 机智云 - lastedDeviceState.setTheDayJobDuration(receivedEvent.getCurrJobDuration()); - }else { - // 树根 - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); - } - lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); } - deviceTotalDataStat.update(lastedDeviceState); + deviceTotalDataStat.update(nowDeviceState); } if (machineWorkingStat.equals(2)) { // 待机 lastWaitJobDataState.update(receivedEvent); } } - if (lastWorkingStat != 1) { + // 如果上次是待机,并且这次也是待机,那么就不需要发送了 + if (!(lastWorkingStat == 2 && machineWorkingStat == 2)) { DeviceMonitoringData data = new DeviceMonitoringData(); data.setDataSource(receivedEvent.getDataSource()); data.setMachineIotMac(receivedEvent.getMachineIotMac()); data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); - data.setAccJobCount(lastedDeviceState.getJobTotal()); - data.setCurrJobCount(lastedDeviceState.getTheDayJobCount()); - data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration()); - data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal()); + data.setAccJobCount(nowDeviceState.getJobTotal()); + data.setCurrJobCount(nowDeviceState.getTheDayJobCount()); + data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration()); + data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); data.setReportTime(reportTime); if (lastOnData == null) { data.setLastBootTime(reportTime * 1000); @@ -240,12 +265,12 @@ public class GizWitsIotMonitoringDataJob { private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception { DeviceTotalData value = deviceTotalDataStat.value(); + DeviceTotalData data = new DeviceTotalData(); Long reportTime = event.getReportTime(); LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); if (value == null) { // 从es中获取 DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), null); - DeviceTotalData data = new DeviceTotalData(); if (deviceMonitoringData != null) { data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); data.setJobTotal(deviceMonitoringData.getAccJobCount()); @@ -257,10 +282,9 @@ public class GizWitsIotMonitoringDataJob { .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime(); data.setLastBootTime(ldt); } else { - // es中也没有,从“qn_cloud_box_data”索引中拿 + // es中也没有,直接从老接口拿 data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000); } - deviceTotalDataStat.update(data); } // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 if (value.getCurrLocalDate().isBefore(localDate)) { @@ -268,7 +292,6 @@ public class GizWitsIotMonitoringDataJob { DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond()); if (deviceMonitoringData != null) { - DeviceTotalData data = new DeviceTotalData(); data.setJobTotal(deviceMonitoringData.getAccJobCount()); data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); @@ -281,9 +304,10 @@ public class GizWitsIotMonitoringDataJob { // value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可 value.setTheDayJobDuration(0L); value.setTheDayJobCount(0L); + data = value; } } - return null; + return data; } private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) { @@ -295,26 +319,26 @@ public class GizWitsIotMonitoringDataJob { String result = HttpUtil .get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D"); Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data"); - if(data == null) { + if (data == null) { break; } Object records = JSONUtil.getByPath(JSONUtil.parse(result), "records"); - if(records == null) { + if (records == null) { break; } JSONArray objects = JSONUtil.parseArray(records); - if(objects.isEmpty()) { + if (objects.isEmpty()) { break; } for (Object o : objects.toArray()) { Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac"); - Long iotMac = (Long)mac; - if(iotMac.equals(machineIotMac)) { + Long iotMac = (Long) mac; + if (iotMac.equals(machineIotMac)) { deviceTotalData = new DeviceTotalData(); Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal"); - deviceTotalData.setJobTotal((Long)productionTotal); + deviceTotalData.setJobTotal((Long) productionTotal); Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal"); - deviceTotalData.setJobDurationTotal(((Long)workTotalTotal) * 3600); + deviceTotalData.setJobDurationTotal(((Long) workTotalTotal) * 3600); Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime"); LocalDateTime lastBootTime = LocalDateTime .parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:hh:ss")); @@ -325,7 +349,7 @@ public class GizWitsIotMonitoringDataJob { } } } - if(deviceTotalData == null) { + if (deviceTotalData == null) { deviceTotalData = new DeviceTotalData(); deviceTotalData.setJobTotal(0L); deviceTotalData.setJobDurationTotal(0L); @@ -337,7 +361,6 @@ public class GizWitsIotMonitoringDataJob { } - private String[] getIndicesList() throws IOException { GetAliasesRequest request = new GetAliasesRequest(); @@ -384,6 +407,8 @@ public class GizWitsIotMonitoringDataJob { // 写入es sinkEs(machineIotDataReceivedEventDataStream); + + env.execute("iot_monitoring_data_job"); } private static void sinkEs(DataStream dataStream) { diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob_bak.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob_bak.java new file mode 100644 index 0000000..9c31ad4 --- /dev/null +++ b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob_bak.java @@ -0,0 +1,443 @@ +//package com.qniao.iot.gizwits; +// +//import cn.hutool.core.bean.BeanUtil; +//import cn.hutool.core.util.ArrayUtil; +//import cn.hutool.http.HttpUtil; +//import cn.hutool.json.JSONArray; +//import cn.hutool.json.JSONUtil; +//import com.qniao.iot.gizwits.config.ApolloConfig; +//import com.qniao.iot.gizwits.constant.ConfigConstant; +//import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +//import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; +//import lombok.extern.slf4j.Slf4j; +//import org.apache.flink.api.common.eventtime.WatermarkStrategy; +//import org.apache.flink.api.common.state.ValueState; +//import org.apache.flink.api.common.state.ValueStateDescriptor; +//import org.apache.flink.api.common.typeinfo.TypeInformation; +//import org.apache.flink.configuration.Configuration; +//import org.apache.flink.connector.kafka.source.KafkaSource; +//import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +//import org.apache.flink.streaming.api.CheckpointingMode; +//import org.apache.flink.streaming.api.datastream.DataStream; +//import org.apache.flink.streaming.api.datastream.DataStreamSource; +//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +//import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +//import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +//import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; +//import org.apache.flink.util.Collector; +//import org.apache.http.HttpHost; +//import org.apache.http.auth.AuthScope; +//import org.apache.http.auth.UsernamePasswordCredentials; +//import org.apache.http.client.CredentialsProvider; +//import org.apache.http.impl.client.BasicCredentialsProvider; +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.clients.consumer.OffsetResetStrategy; +//import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; +//import org.elasticsearch.action.index.IndexRequest; +//import org.elasticsearch.action.search.SearchRequest; +//import org.elasticsearch.action.search.SearchResponse; +//import org.elasticsearch.client.*; +//import org.elasticsearch.cluster.metadata.AliasMetaData; +//import org.elasticsearch.index.query.QueryBuilders; +//import org.elasticsearch.rest.RestStatus; +//import org.elasticsearch.search.SearchHit; +//import org.elasticsearch.search.SearchHits; +//import org.elasticsearch.search.builder.SearchSourceBuilder; +//import org.elasticsearch.search.sort.SortOrder; +// +//import java.io.IOException; +//import java.sql.Date; +//import java.time.*; +//import java.time.format.DateTimeFormatter; +//import java.util.ArrayList; +//import java.util.List; +//import java.util.Map; +//import java.util.Set; +// +//import static java.time.temporal.ChronoUnit.SECONDS; +// +///** +// * 备份代码 +// */ +//@Slf4j +//public class IotMonitoringDataJob_bak { +// +// private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient +// .builder(new HttpHost("120.79.137.137", 9200, "http")) +// .setHttpClientConfigCallback(httpAsyncClientBuilder -> { +// CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); +// credentialsProvider.setCredentials(AuthScope.ANY, +// new UsernamePasswordCredentials("elastic", "qnol26215")); +// return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); +// }) +// .setRequestConfigCallback(requestConfigBuilder -> { +// // 设置es连接超时时间 +// requestConfigBuilder.setConnectTimeout(3000); +// return requestConfigBuilder; +// })); +// +// public static void main(String[] args) throws Exception { +// +// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); +// // 获取设备数据源 +// KafkaSource source = KafkaSource.builder() +// .setBootstrapServers("120.25.199.30:19092") +// .setTopics("machine_iot_data_received_event") +// .setGroupId("123") +// .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) +// .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") +// .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) +// .build(); +// +// // 设备数据源转换 +// DataStreamSource dataStreamSource = env +// .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); +// +// // mac分组并进行工作时长的集合操作 +// DataStream machineIotDataReceivedEventDataStream = dataStreamSource +// .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) +// .process(new KeyedProcessFunction() { +// +// // 最新的设备数据 +// private ValueState deviceTotalDataStat; +// +// // 开机数据 +// private ValueState onDataState; +// +// // 上次的关机数据 +// private ValueState lastOffDataState; +// +// // 上次的开机数据 +// private ValueState lastOnDataState; +// +// // 当前周期的待机数据 +// private ValueState lastWaitJobDataState; +// +// // 上次的状态 +// private ValueState lastWorkingStatState; +// +// @Override +// public void open(Configuration parameters) { +// +// // 必须在 open 生命周期初始化 +// deviceTotalDataStat = getRuntimeContext() +// .getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class))); +// +// onDataState = getRuntimeContext() +// .getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class))); +// +// lastOffDataState = getRuntimeContext() +// .getState(new ValueStateDescriptor<>("lastOffData", TypeInformation.of(MachineIotDataReceivedEvent.class))); +// +// lastOnDataState = getRuntimeContext() +// .getState(new ValueStateDescriptor<>("lastOnData", TypeInformation.of(MachineIotDataReceivedEvent.class))); +// +// lastWaitJobDataState = getRuntimeContext() +// .getState(new ValueStateDescriptor<>("lastWaitJobData", TypeInformation.of(MachineIotDataReceivedEvent.class))); +// +// lastWorkingStatState = getRuntimeContext() +// .getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class))); +// } +// +// @Override +// public void processElement(MachineIotDataReceivedEvent receivedEvent, +// KeyedProcessFunction.Context ctx, +// Collector out) throws Exception { +// +// DeviceTotalData onData = onDataState.value(); +// MachineIotDataReceivedEvent lastOffData = lastOffDataState.value(); +// MachineIotDataReceivedEvent lastOnData = lastOnDataState.value(); +// MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value(); +// Integer lastWorkingStat = lastWorkingStatState.value(); +// DeviceTotalData lastedDeviceState = deviceTotalDataStat.value(); +// Integer machinePwrStat = receivedEvent.getMachinePwrStat(); +// Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); +// lastWorkingStatState.update(machineWorkingStat); +// Long reportTime = receivedEvent.getReportTime(); +// if (lastedDeviceState == null) { +// lastedDeviceState = getDeviceTotalData(receivedEvent); +// lastOnData = receivedEvent; +// } +// if (lastWorkingStat == null) { +// lastWorkingStat = 0; +// } +// if (onData == null) { +// onData = lastedDeviceState; +// onDataState.update(onData); +// } +// LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); +// Long a; +// if (machinePwrStat.equals(0)) { +// assert lastedDeviceState != null; +// lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); +// lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); +// lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); +// lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); +// lastedDeviceState.setCurrLocalDate(localDate); +// if (lastOnData != null) { +// lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault())); +// } else { +// lastedDeviceState.setLastBootTime(onData.getLastBootTime()); +// } +// deviceTotalDataStat.update(lastedDeviceState); +// // 如果关机 +// onDataState.update(lastedDeviceState); +// lastOffDataState.update(receivedEvent); +// // 关机后将待机数据清除 +// lastWaitJobDataState.update(null); +// } else { +// if (lastOffData != null) { +// lastOnData = receivedEvent; +// } +// assert lastedDeviceState != null; +// if (machineWorkingStat.equals(1)) { +// // 工作中 +// lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); +// lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); +// lastedDeviceState.setCurrLocalDate(localDate); +// lastedDeviceState.setLastBootTime(onData.getLastBootTime()); +// if (lastWaitJobData != null) { +// LocalDateTime localDateTime = LocalDateTime +// .ofInstant(Instant.ofEpochMilli(reportTime * 1000), +// ZoneId.systemDefault()); +// LocalDateTime lastWaitJobTime = LocalDateTime +// .ofInstant(Instant.ofEpochMilli(reportTime * 1000), +// ZoneId.systemDefault()); +// a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); +// lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a); +// lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a); +// } else { +// if(receivedEvent.getDataSource() == 0) { +// // 机智云 +// lastedDeviceState.setTheDayJobDuration(receivedEvent.getCurrJobDuration()); +// }else { +// // 树根 +// lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); +// } +// lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); +// } +// deviceTotalDataStat.update(lastedDeviceState); +// } +// if (machineWorkingStat.equals(2)) { +// // 待机 +// lastWaitJobDataState.update(receivedEvent); +// } +// } +// if (lastWorkingStat != 1) { +// DeviceMonitoringData data = new DeviceMonitoringData(); +// data.setDataSource(receivedEvent.getDataSource()); +// data.setMachineIotMac(receivedEvent.getMachineIotMac()); +// data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); +// data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); +// data.setAccJobCount(lastedDeviceState.getJobTotal()); +// data.setCurrJobCount(lastedDeviceState.getTheDayJobCount()); +// data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration()); +// data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal()); +// data.setReportTime(reportTime); +// if (lastOnData == null) { +// data.setLastBootTime(reportTime * 1000); +// } else { +// data.setLastBootTime(lastOnData.getReportTime() * 1000); +// } +// out.collect(data); +// } +// } +// +// private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception { +// +// DeviceTotalData value = deviceTotalDataStat.value(); +// Long reportTime = event.getReportTime(); +// LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); +// if (value == null) { +// // 从es中获取 +// DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), null); +// DeviceTotalData data = new DeviceTotalData(); +// if (deviceMonitoringData != null) { +// data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); +// data.setJobTotal(deviceMonitoringData.getAccJobCount()); +// // 单位秒 +// data.setCurrLocalDate(localDate); +// data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); +// data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); +// LocalDateTime ldt = new Date(deviceMonitoringData.getLastBootTime()) +// .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime(); +// data.setLastBootTime(ldt); +// } else { +// // es中也没有,从“qn_cloud_box_data”索引中拿 +// data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000); +// } +// deviceTotalDataStat.update(data); +// } +// // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 +// if (value.getCurrLocalDate().isBefore(localDate)) { +// // 先从es中拿昨天最新的 +// DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), +// LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond()); +// if (deviceMonitoringData != null) { +// DeviceTotalData data = new DeviceTotalData(); +// data.setJobTotal(deviceMonitoringData.getAccJobCount()); +// data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); +// data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); +// data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); +// data.setCurrLocalDate(localDate); +// data.setLastBootTime(LocalDateTime.ofInstant(Instant +// .ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault())); +// deviceTotalDataStat.update(data); +// } else { +// // value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可 +// value.setTheDayJobDuration(0L); +// value.setTheDayJobCount(0L); +// } +// } +// return null; +// } +// +// private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) { +// +// DeviceTotalData deviceTotalData = null; +// +// // 通过http去请求之前的接口拿数据 +// for (int i = 1; i <= 20; i++) { +// String result = HttpUtil +// .get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D"); +// Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data"); +// if(data == null) { +// break; +// } +// Object records = JSONUtil.getByPath(JSONUtil.parse(result), "records"); +// if(records == null) { +// break; +// } +// JSONArray objects = JSONUtil.parseArray(records); +// if(objects.isEmpty()) { +// break; +// } +// for (Object o : objects.toArray()) { +// Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac"); +// Long iotMac = (Long)mac; +// if(iotMac.equals(machineIotMac)) { +// deviceTotalData = new DeviceTotalData(); +// Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal"); +// deviceTotalData.setJobTotal((Long)productionTotal); +// Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal"); +// deviceTotalData.setJobDurationTotal(((Long)workTotalTotal) * 3600); +// Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime"); +// LocalDateTime lastBootTime = LocalDateTime +// .parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:hh:ss")); +// deviceTotalData.setLastBootTime(lastBootTime); +// deviceTotalData.setTheDayJobDuration(0L); +// deviceTotalData.setTheDayJobCount(0L); +// break; +// } +// } +// } +// if(deviceTotalData == null) { +// deviceTotalData = new DeviceTotalData(); +// deviceTotalData.setJobTotal(0L); +// deviceTotalData.setJobDurationTotal(0L); +// deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault())); +// deviceTotalData.setTheDayJobDuration(0L); +// deviceTotalData.setTheDayJobCount(0L); +// } +// return deviceTotalData; +// } +// +// +// +// private String[] getIndicesList() throws IOException { +// +// GetAliasesRequest request = new GetAliasesRequest(); +// GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request, RequestOptions.DEFAULT); +// Map> map = getAliasesResponse.getAliases(); +// Set indices = map.keySet(); +// List indicesList = new ArrayList<>(); +// for (String key : indices) { +// if (key.contains(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))) { +// indicesList.add(key); +// } +// } +// return ArrayUtil.toArray(indicesList, String.class); +// } +// +// private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac, Long reportTime) { +// +// try { +// // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) +// SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); +// searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); +// if (reportTime != null) { +// searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime").lt(reportTime)); +// } +// searchSourceBuilder.sort("reportTime", SortOrder.DESC); +// searchSourceBuilder.size(1); +// // 创建查询请求对象,将查询对象配置到其中 +// SearchRequest searchRequest = new SearchRequest("DeviceMonitoringData"); +// searchRequest.source(searchSourceBuilder); +// // 执行查询,然后处理响应结果 +// SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); +// // 根据状态和数据条数验证是否返回了数据 +// if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { +// SearchHits hits = searchResponse.getHits(); +// SearchHit reqHit = hits.getHits()[0]; +// return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class); +// } +// } catch (IOException e) { +// log.error("获取es数据异常", e); +// } +// return null; +// } +// }).name("machineIotDataReceivedEventDataStream keyBy stream"); +// +// // 写入es +// sinkEs(machineIotDataReceivedEventDataStream); +// +// env.execute("iot_monitoring_data_job"); +// } +// +// private static void sinkEs(DataStream dataStream) { +// +// List httpHosts = new ArrayList<>(); +// httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST), +// ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), +// ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); +// ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, +// (ElasticsearchSinkFunction) (deviceMonitoringData, runtimeContext, requestIndexer) -> { +// +// LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime()) +// .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); +// String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); +// //创建es 请求 +// IndexRequest indexRequest = Requests.indexRequest() +// .index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix) +// .source(BeanUtil.beanToMap(deviceMonitoringData)); +// requestIndexer.add(indexRequest); +// } +// ); +// //刷新前缓冲的最大动作量 +// esSinkBuilder.setBulkFlushMaxActions(10); +// //刷新前缓冲区的最大数据大小(以MB为单位) +// esSinkBuilder.setBulkFlushMaxSizeMb(5); +// //无论缓冲操作的数量或大小如何,都要刷新的时间间隔 +// esSinkBuilder.setBulkFlushInterval(5000L); +// // 客户端创建配置回调,配置账号密码 +// esSinkBuilder.setRestClientFactory( +// restClientBuilder -> { +// restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { +// CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); +// credentialsProvider.setCredentials(AuthScope.ANY, +// new UsernamePasswordCredentials(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), +// ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); +// return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); +// }); +// restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { +// // 设置es连接超时时间 +// requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); +// return requestConfigBuilder; +// }); +// } +// ); +// //数据流添加sink +// dataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink"); +// } +//} diff --git a/src/main/java/com/qniao/iot/gizwits/TestJob.java b/src/main/java/com/qniao/iot/gizwits/TestJob.java deleted file mode 100644 index da5d53a..0000000 --- a/src/main/java/com/qniao/iot/gizwits/TestJob.java +++ /dev/null @@ -1,214 +0,0 @@ -/* -package com.qniao.iot.gizwits; - -import com.qniao.iot.gizwits.config.ApolloConfig; -import com.qniao.iot.gizwits.constant.ConfigConstant; -import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; -import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.kafka.source.KafkaSource; -import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.util.Collector; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; - -import java.sql.Date; -import java.time.*; - -import static java.time.temporal.ChronoUnit.SECONDS; - -@Slf4j -public class TestJob { - - public static void main(String[] args) throws Exception { - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); - // 获取设备数据源 - KafkaSource source = KafkaSource.builder() - .setBootstrapServers("120.25.199.30:19092") - .setTopics("machine_iot_data_received_event") - .setGroupId("123") - .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) - .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") - .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) - .build(); - - // 设备数据源转换 - SingleOutputStreamOperator streamOperator = env - .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source") - .filter((FilterFunction) value -> value.getMachinePwrStat() != null && value.getMachineIotMac() != null - && value.getDataSource() != null && value.getMachineWorkingStat() != null); - - - streamOperator.print("输出源数据"); - - // mac分组并进行工作时长的集合操作 - DataStream machineIotDataReceivedEventDataStream = streamOperator - .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) - .process(new KeyedProcessFunction() { - - // 最新的设备数据 - private ValueState deviceTotalData; - - // 开机数据 - private ValueState onDataState; - - // 上次的关机数据 - private ValueState lastOffDataState; - - // 上次的开机数据 - private ValueState lastOnDataState; - - // 当前周期的待机数据 - private ValueState lastWaitJobDataState; - - // 上次的状态 - private ValueState lastWorkingStatState; - - @Override - public void open(Configuration parameters) { - - // 必须在 open 生命周期初始化 - deviceTotalData = getRuntimeContext() - .getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class))); - - onDataState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class))); - - lastOffDataState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("lastOffData", TypeInformation.of(MachineIotDataReceivedEvent.class))); - - lastOnDataState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("lastOnData", TypeInformation.of(MachineIotDataReceivedEvent.class))); - - lastWaitJobDataState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("lastWaitJobData", TypeInformation.of(MachineIotDataReceivedEvent.class))); - - lastWorkingStatState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class))); - } - - @Override - public void processElement(MachineIotDataReceivedEvent receivedEvent, - KeyedProcessFunction.Context ctx, - Collector out) throws Exception { - - - DeviceTotalData onData = onDataState.value(); - MachineIotDataReceivedEvent lastOffData = lastOffDataState.value(); - MachineIotDataReceivedEvent lastOnData = lastOnDataState.value(); - MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value(); - Integer lastWorkingStat = lastWorkingStatState.value(); - DeviceTotalData lastedDeviceState = deviceTotalData.value(); - Integer machinePwrStat = receivedEvent.getMachinePwrStat(); - Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); - lastWorkingStatState.update(machineWorkingStat); - Long reportTime = receivedEvent.getReportTime(); - if(lastedDeviceState == null) { - lastedDeviceState = new DeviceTotalData(); - lastedDeviceState.setJobTotal(0L); - lastedDeviceState.setJobDurationTotal(0L); - lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault())); - lastedDeviceState.setTheDayJobDuration(0L); - lastedDeviceState.setTheDayJobCount(0L); - lastOnData = receivedEvent; - } - if(lastWorkingStat == null) { - lastWorkingStat = 0; - } - - if (onData == null) { - onData = lastedDeviceState; - onDataState.update(onData); - } - LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); - Long a; - if (machinePwrStat.equals(0)) { - lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); - lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); - lastedDeviceState.setCurrLocalDate(localDate); - if (lastOnData != null) { - lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault())); - } else { - lastedDeviceState.setLastBootTime(onData.getLastBootTime()); - } - deviceTotalData.update(lastedDeviceState); - // 如果关机 - onDataState.update(lastedDeviceState); - lastOffDataState.update(receivedEvent); - // 关机后将待机数据清除 - lastWaitJobDataState.update(null); - } else { - if (lastOffData != null) { - lastOnData = receivedEvent; - } - if (machineWorkingStat.equals(1)) { - // 工作中 - lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); - lastedDeviceState.setCurrLocalDate(localDate); - lastedDeviceState.setLastBootTime(onData.getLastBootTime()); - if (lastWaitJobData != null) { - LocalDateTime localDateTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), - ZoneId.systemDefault()); - LocalDateTime lastWaitJobTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime * 1000), - ZoneId.systemDefault()); - a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a); - lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a); - } else { - lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); - lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); - } - deviceTotalData.update(lastedDeviceState); - } - if (machineWorkingStat.equals(2)) { - // 待机 - lastWaitJobDataState.update(receivedEvent); - } - } - if(lastWorkingStat != 1) { - DeviceMonitoringData data = new DeviceMonitoringData(); - data.setDataSource(receivedEvent.getDataSource()); - data.setMachineIotMac(receivedEvent.getMachineIotMac()); - data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); - data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); - data.setAccJobCount(lastedDeviceState.getJobTotal()); - data.setCurrJobCount(lastedDeviceState.getTheDayJobCount()); - data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration()); - data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal()); - data.setReportTime(reportTime); - if(lastOnData == null) { - data.setLastBootTime(reportTime * 1000); - }else { - data.setLastBootTime(lastOnData.getReportTime() * 1000); - } - out.collect(data); - } - } - }).name("machineIotDataReceivedEventDataStream keyBy stream"); - - - machineIotDataReceivedEventDataStream.print("输出清洗数据"); - - env.execute(); - } -} -*/