From b762db6d66bca3ad2a34c5d968d56dc3bf4680d4 Mon Sep 17 00:00:00 2001
From: "1049970895@qniao.cn" <1049970895>
Date: Mon, 5 Sep 2022 17:46:50 +0800
Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
iot-device-power-on-and-off-data-job/pom.xml | 6 +
.../power/IotDevicePowerOnAndOffDataJob.java | 174 ++++++------------
.../device/power/constant/ConfigConstant.java | 21 +--
3 files changed, 67 insertions(+), 134 deletions(-)
diff --git a/iot-device-power-on-and-off-data-job/pom.xml b/iot-device-power-on-and-off-data-job/pom.xml
index 44b28dd..adbd2c2 100644
--- a/iot-device-power-on-and-off-data-job/pom.xml
+++ b/iot-device-power-on-and-off-data-job/pom.xml
@@ -143,6 +143,12 @@
0.0.1-SNAPSHOT
compile
+
+
+ com.qniao
+ iot-machine-data-command
+ 0.0.1-SNAPSHOT
+
diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java
index 49253a4..d8c8f9f 100644
--- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java
+++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java
@@ -7,8 +7,10 @@ import com.qniao.iot.device.power.config.ApolloConfig;
import com.qniao.iot.device.power.constant.ConfigConstant;
import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent;
import com.qniao.iot.device.power.utils.SnowFlake;
+import com.qniao.iot.machine.command.MachineOutputCommand;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
+import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema;
import com.qniao.iot.rc.constant.MachinePwrStatusEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -22,12 +24,15 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
@@ -99,41 +104,22 @@ public class IotDevicePowerOnAndOffDataJob {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
- KafkaSource source = KafkaSource.builder()
- .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
- .setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS))
- .setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
- .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
- .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
- .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_HOST))
+ .setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_PORT))
+ .setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME))
+ .setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD))
+ .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUAL_HOST))
.build();
// 设备数据源转换
- DataStreamSource dataStreamSource = env
- .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
+ final DataStream dataStreamSource = env
+ .addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE),
+ false, new MachineOutputCommandDeserializationSchema())).setParallelism(1);
- // 数据过滤
- SingleOutputStreamOperator streamOperator = dataStreamSource
- .filter((FilterFunction) value -> {
-
- Long reportTime = value.getReportTime();
- if (reportTime != null
- && value.getDataSource() != null && value.getMachinePwrStat() != null) {
- String reportTimeStr = StrUtil.toString(reportTime);
- if (reportTimeStr.length() == 10) {
- // 机智云那边的设备可能是秒或毫秒
- reportTime = reportTime * 1000;
- }
- long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
- // 晚30分钟的数据就不要了
- return nowTime - reportTime <= (30 * 60 * 1000);
- }
- return false;
- });
-
- SingleOutputStreamOperator outputStreamOperator = streamOperator
- .keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
- .process(new KeyedProcessFunction() {
+ SingleOutputStreamOperator outputStreamOperator = dataStreamSource
+ .keyBy(MachineOutputCommand::getMac)
+ .process(new KeyedProcessFunction() {
private ValueState powerOnAndOffDataEventValueState;
@@ -145,7 +131,8 @@ public class IotDevicePowerOnAndOffDataJob {
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
- ValueStateDescriptor powerOnAndOffDataEventValue = new ValueStateDescriptor<>("powerOnAndOffDataEventValue",
+ ValueStateDescriptor powerOnAndOffDataEventValue
+ = new ValueStateDescriptor<>("powerOnAndOffDataEventValue",
TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class));
// 设置状态值的过期时间,为了解决机器关机没有消息的情况
powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig);
@@ -156,34 +143,33 @@ public class IotDevicePowerOnAndOffDataJob {
}
@Override
- public void processElement(MachineIotDataReceivedEvent event,
- KeyedProcessFunction.Context ctx,
+ public void processElement(MachineOutputCommand command,
+ KeyedProcessFunction.Context ctx,
Collector out) throws Exception {
- IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event);
+ IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(command);
Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime();
- Long reportTime = event.getReportTime();
+ Long reportTime = command.getTimestamp();
if (reportTime > lastReportTime) {
Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat();
- Integer machinePwrStat = event.getMachinePwrStat();
+ Integer machinePwrStat = command.getMachinePwrStat();
Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat();
- Integer machineWorkingStat = event.getMachineWorkingStat();
+ Integer machineWorkingStat = command.getMachineWorkingStat();
if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0)
|| (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) {
Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount();
- Long accJobCount = event.getAccJobCount();
- Long currJobCount = event.getCurrJobCount();
- Integer dataSource = event.getDataSource();
+ Long accJobCount = command.getCurrTotalOutput();
+ Long currJobCount = command.getCurrCount();
+ Integer dataSource = command.getDataSource();
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId());
- powerOnAndOffDataEvent.setDataSource(event.getDataSource());
- powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac());
- powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat());
- powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat());
- powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount());
- powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration());
- powerOnAndOffDataEvent.setAccJobCount(event.getAccJobCount());
- powerOnAndOffDataEvent.setReportTime(event.getReportTime());
+ powerOnAndOffDataEvent.setDataSource(command.getDataSource());
+ powerOnAndOffDataEvent.setMachineIotMac(command.getMac());
+ powerOnAndOffDataEvent.setMachinePwrStat(command.getMachinePwrStat());
+ powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat());
+ powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration());
+ powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput());
+ powerOnAndOffDataEvent.setReportTime(command.getTimestamp());
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) {
@@ -213,7 +199,7 @@ public class IotDevicePowerOnAndOffDataJob {
if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) {
// 只有开机时间不为空
if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(machinePwrStat)) {
- powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime());
+ powerOnAndOffDataEvent.setMachinePowerOffTime(command.getTimestamp());
}
powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime());
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
@@ -222,7 +208,7 @@ public class IotDevicePowerOnAndOffDataJob {
// // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
// 开机
- powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime());
+ powerOnAndOffDataEvent.setMachinePowerOnTime(command.getTimestamp());
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
out.collect(powerOnAndOffDataEvent);
}
@@ -232,20 +218,20 @@ public class IotDevicePowerOnAndOffDataJob {
}
}
- private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineIotDataReceivedEvent event) throws IOException {
+ private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineOutputCommand command) throws IOException {
IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value();
if (iotDevicePowerOnAndOffDataEvent == null) {
- iotDevicePowerOnAndOffDataEvent = getByEs(event);
+ iotDevicePowerOnAndOffDataEvent = getByEs(command);
}
return iotDevicePowerOnAndOffDataEvent;
}
- private IotDevicePowerOnAndOffDataEvent getByEs(MachineIotDataReceivedEvent event) throws IOException {
+ private IotDevicePowerOnAndOffDataEvent getByEs(MachineOutputCommand command) throws IOException {
// 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询)
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", event.getMachineIotMac()));
+ searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", command.getMac()));
searchSourceBuilder.sort("receivedTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象,将查询对象配置到其中
@@ -265,76 +251,28 @@ public class IotDevicePowerOnAndOffDataJob {
return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class);
}
}
- // 如果没有就找清洗后的数据
- MachineIotDataReceivedEvent deviceMonitoringData = getMachineIotDataReceivedEvent(event.getMachineIotMac());
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId());
- if (deviceMonitoringData != null) {
- powerOnAndOffDataEvent.setDataSource(deviceMonitoringData.getDataSource());
- powerOnAndOffDataEvent.setMachineIotMac(deviceMonitoringData.getMachineIotMac());
- powerOnAndOffDataEvent.setAccJobCount(deviceMonitoringData.getAccJobCount());
- powerOnAndOffDataEvent.setCurrJobCount(0L);
- powerOnAndOffDataEvent.setCurrJobDuration(0L);
- Integer machinePwrStat = deviceMonitoringData.getMachinePwrStat();
- powerOnAndOffDataEvent.setMachinePwrStat(deviceMonitoringData.getMachinePwrStat());
- powerOnAndOffDataEvent.setMachineWorkingStat(deviceMonitoringData.getMachineWorkingStat());
- Long reportTime = deviceMonitoringData.getReportTime();
- // 开机
- powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
- if (machinePwrStat == 0) {
- // 关机
- powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
- }
- powerOnAndOffDataEvent.setReportTime(reportTime);
- } else {
- powerOnAndOffDataEvent.setDataSource(event.getDataSource());
- powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac());
- powerOnAndOffDataEvent.setAccJobCount(event.getAccJobCount());
- powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount());
- powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration());
- Integer machinePwrStat = event.getMachinePwrStat();
- powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat());
- powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat());
- Long reportTime = event.getReportTime();
- // 开机
- powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
- if (machinePwrStat == 0) {
- // 关机
- powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
- }
- powerOnAndOffDataEvent.setReportTime(reportTime);
+ powerOnAndOffDataEvent.setDataSource(command.getDataSource());
+ powerOnAndOffDataEvent.setMachineIotMac(command.getMac());
+ powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput());
+ powerOnAndOffDataEvent.setCurrJobCount(command.getCurrCount());
+ powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration());
+ Integer machinePwrStat = command.getMachinePwrStat();
+ powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat);
+ powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat());
+ Long reportTime = command.getTimestamp();
+ // 开机
+ powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
+ if (machinePwrStat == 0) {
+ // 关机
+ powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
}
+ powerOnAndOffDataEvent.setReportTime(reportTime);
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
return powerOnAndOffDataEvent;
}
-
- private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac) {
-
- try {
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- BoolQueryBuilder bool = new BoolQueryBuilder();
- BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", machineIotMac));
- searchSourceBuilder.size(1);
- searchSourceBuilder.sort("reportTime", SortOrder.DESC);
- searchSourceBuilder.query(boolQueryBuilder);
- SearchRequest request = new SearchRequest(ApolloConfig.getStr(ConfigConstant.DATA_ELASTICSEARCH_INDEX));
- request.source(searchSourceBuilder);
- // 执行请求
- SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
- if (RestStatus.OK.equals(response.status())) {
- SearchHit[] hits = response.getHits().getHits();
- if (hits.length > 0) {
- SearchHit hit = hits[0];
- String sourceAsString = hit.getSourceAsString();
- return JSONUtil.toBean(sourceAsString, MachineIotDataReceivedEvent.class);
- }
- }
- } catch (Exception e) {
- log.error("获取 machine_iot_data_received_event 索引数据异常");
- }
- return null;
- }
}).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac");
sinkEs(outputStreamOperator);
diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java
index fa1b96e..2019ecd 100644
--- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java
+++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java
@@ -1,14 +1,7 @@
package com.qniao.iot.device.power.constant;
public interface ConfigConstant {
-
-
- String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers";
-
- String SOURCE_KAFKA_TOPICS = "source.kafka.topics";
-
- String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId";
-
+
String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host";
String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post";
@@ -23,23 +16,19 @@ public interface ConfigConstant {
String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index";
- String ES_CONNECT_TIMEOUT = "es.connect.timeout";
-
String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id";
String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id";
String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host";
- String SOURCE_RABBITMQ_PORT = "source.rabbitmq.port";
-
- String SOURCE_RABBITMQ_USER_NAME = "source.rabbitmq.userName";
+ String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.username";
String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password";
- String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtualHost";
-
- String DATA_ELASTICSEARCH_INDEX = "data.elasticsearch.index";
+ String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtual.host";
String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue";
+
+ String SOURCE_RABBITMQ_PORT = "source.rabbitmq.port";
}