diff --git a/pom.xml b/pom.xml
index 393078c..f345aa6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,6 +122,18 @@
druid
1.2.6
+
+
+ org.apache.flink
+ flink-connector-rabbitmq_2.12
+ 1.14.5
+
+
+
+ com.qniao
+ iot-machine-data-command
+ 0.0.1-SNAPSHOT
+
diff --git a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java
index 30ec501..679a742 100644
--- a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java
+++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java
@@ -7,11 +7,14 @@ import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.config.ApolloConfig;
import com.qniao.iot.constant.ConfigConstant;
+import com.qniao.iot.machine.command.MachineOutputCommand;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
+import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
@@ -25,6 +28,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
@@ -85,37 +90,22 @@ public class IotMonitoringDataJob {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
- KafkaSource source = KafkaSource.builder()
- .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
- .setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS))
- .setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
- .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
- .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
- .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_HOST))
+ .setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_POST))
+ .setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME))
+ .setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD))
+ .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUAL_HOST))
.build();
- // 设备数据源转换
- DataStreamSource dataStreamSource = env
- .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
-
- // 数据过滤
- SingleOutputStreamOperator streamOperator = dataStreamSource
- .filter((FilterFunction) value -> {
-
- Long reportTime = value.getReportTime();
- if (reportTime != null
- && value.getDataSource() != null && value.getMachinePwrStat() != null) {
- long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
- // 晚30分钟的数据就不要了
- return nowTime - reportTime <= (30 * 60 * 1000);
- }
- return false;
- });
+ final DataStream stream = env
+ .addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE),
+ true, new MachineOutputCommandDeserializationSchema())).setParallelism(1);
// mac分组并进行工作时长的集合操作
- DataStream machineIotDataReceivedEventDataStream = streamOperator
- .keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
- .process(new KeyedProcessFunction() {
+ DataStream machineIotDataReceivedEventDataStream = stream
+ .keyBy(MachineOutputCommand::getMac)
+ .process(new KeyedProcessFunction() {
// 上次设备数据
private ValueState deviceTotalDataStat;
@@ -133,23 +123,30 @@ public class IotMonitoringDataJob {
}
@Override
- public void processElement(MachineIotDataReceivedEvent receivedEvent,
- KeyedProcessFunction.Context ctx,
+ public void processElement(MachineOutputCommand command,
+ KeyedProcessFunction.Context ctx,
Collector out) {
try {
- DeviceTotalData lastedDeviceState = getLastDeviceTotalData(receivedEvent);
+ DeviceTotalData lastedDeviceState = getLastDeviceTotalData(command);
Long lastReportTime = lastedDeviceState.getReportTime();
- Long reportTime = receivedEvent.getReportTime();
+ Long reportTime = command.getTimestamp();
// 如果这次的消息事件小于上次消息的时间,那么就进行丢弃
if (lastReportTime <= reportTime) {
Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat();
Integer lastPwStat = lastedDeviceState.getMachinePwrStat();
// 上次启动时间
Long lastBootTime = lastedDeviceState.getLastBootTime();
+ Long lastTheDayDuration = lastedDeviceState.getTheDayDuration();
+ Long lastTheDayJobDuration = lastedDeviceState.getTheDayJobDuration();
+ Long lastJobDurationTotal = lastedDeviceState.getJobDurationTotal();
+ Long lastTheDayJobCount = lastedDeviceState.getTheDayJobCount();
+ Long lastJobTotal = lastedDeviceState.getJobTotal();
// 如果当前消息的时间大于等于上次消息的时间才进行处理
- Integer machinePwrStat = receivedEvent.getMachinePwrStat();
- Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
+ Integer machinePwrStat = command.getMachinePwrStat();
+ Integer machineWorkingStat = command.getMachineWorkingStat();
+ Long currDuration = command.getCurrDuration();
+ Long currCount = command.getCurrCount();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
nowDeviceState.setMachinePwrStat(machinePwrStat);
@@ -160,11 +157,11 @@ public class IotMonitoringDataJob {
if (lastPwStat != 0) {
if (lastWorkingStat == 1) {
// 如果上次是工作状态,那么需要记录产量和生产时间
- nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrDuration());
- nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrDuration());
- nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrDuration());
- nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrCount());
- nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrCount());
+ nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration);
+ nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration);
+ nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration);
+ nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount);
+ nowDeviceState.setJobTotal(lastJobTotal + currCount);
} else {
nowDeviceState = lastedDeviceState;
}
@@ -180,16 +177,16 @@ public class IotMonitoringDataJob {
// 开机
if (machineWorkingStat.equals(1)) {
// 工作
- nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrCount());
- nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrCount());
- nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrDuration());
- nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrDuration());
+ nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount);
+ nowDeviceState.setJobTotal(lastJobTotal + currCount);
+ nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration);
+ nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration);
} else {
// 待机
nowDeviceState = lastedDeviceState;
}
// 设置开机时长,待机也要进行累加,所以放这里
- nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrDuration());
+ nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration);
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
@@ -206,10 +203,10 @@ public class IotMonitoringDataJob {
if (((!(lastWorkingStat == 2 && machineWorkingStat == 2))
&& (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) {
DeviceMonitoringData data = new DeviceMonitoringData();
- data.setDataSource(receivedEvent.getDataSource());
- data.setMachineIotMac(receivedEvent.getMachineIotMac());
- data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
- data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
+ data.setDataSource(command.getDataSource());
+ data.setMachineIotMac(command.getMac());
+ data.setMachinePwrStat(command.getMachinePwrStat());
+ data.setMachineWorkingStat(command.getMachineWorkingStat());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
@@ -225,21 +222,22 @@ public class IotMonitoringDataJob {
}
}
} catch (Exception e) {
- log.info("导致异常的信息:" + JSONUtil.toJsonStr(receivedEvent));
+ log.info("导致异常的信息:" + JSONUtil.toJsonStr(command));
log.error("处理异常", e);
}
}
- private DeviceTotalData getLastDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception {
+ private DeviceTotalData getLastDeviceTotalData(MachineOutputCommand command) throws Exception {
// 上一次的数据
DeviceTotalData value = deviceTotalDataStat.value();
- Long reportTime = event.getReportTime();
+ Long reportTime = command.getTimestamp();
LocalDate localDate = new Date(reportTime).toLocalDate();
+ Long mac = command.getMac();
if (value == null) {
value = new DeviceTotalData();
// 从es中获取
- DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac());
+ DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(mac);
if (deviceMonitoringData != null) {
value.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
value.setJobTotal(deviceMonitoringData.getAccJobCount());
@@ -254,7 +252,7 @@ public class IotMonitoringDataJob {
} else {
// es中也没有,直接从老接口拿
isExistEs = false;
- value = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime);
+ value = queryDeviceMonitoringData(mac, reportTime);
}
// 因为ReportTime参与后面的计算,所以如果是第一次取这个数据需要设置为当前消息的时间,要不然会有很大的差值
value.setReportTime(reportTime);
diff --git a/src/main/java/com/qniao/iot/constant/ConfigConstant.java b/src/main/java/com/qniao/iot/constant/ConfigConstant.java
index ea5cf40..c660f2c 100644
--- a/src/main/java/com/qniao/iot/constant/ConfigConstant.java
+++ b/src/main/java/com/qniao/iot/constant/ConfigConstant.java
@@ -33,4 +33,16 @@ public interface ConfigConstant {
String ES_PASSWORD = "es.password";
String ES_CONNECT_TIMEOUT = "es.connect.timeout";
+
+ String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host";
+
+ String SOURCE_RABBITMQ_POST = "source.rabbitmq.post";
+
+ String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.username";
+
+ String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password";
+
+ String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtual.host";
+
+ String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue";
}