From 2a4d7a2b060e226cb9800891fae5e81881b11278 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Mon, 5 Sep 2022 16:00:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 12 +++ .../com/qniao/iot/IotMonitoringDataJob.java | 102 +++++++++--------- .../qniao/iot/constant/ConfigConstant.java | 12 +++ 3 files changed, 74 insertions(+), 52 deletions(-) diff --git a/pom.xml b/pom.xml index 393078c..f345aa6 100644 --- a/pom.xml +++ b/pom.xml @@ -122,6 +122,18 @@ druid 1.2.6 + + + org.apache.flink + flink-connector-rabbitmq_2.12 + 1.14.5 + + + + com.qniao + iot-machine-data-command + 0.0.1-SNAPSHOT + diff --git a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java index 30ec501..679a742 100644 --- a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java @@ -7,11 +7,14 @@ import cn.hutool.json.JSONArray; import cn.hutool.json.JSONUtil; import com.qniao.iot.config.ApolloConfig; import com.qniao.iot.constant.ConfigConstant; +import com.qniao.iot.machine.command.MachineOutputCommand; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; +import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; @@ -25,6 +28,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.util.Collector; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -85,37 +90,22 @@ public class IotMonitoringDataJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); // 获取设备数据源 - KafkaSource source = KafkaSource.builder() - .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) - .setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS)) - .setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) - .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) - .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") - .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_HOST)) + .setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_POST)) + .setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME)) + .setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD)) + .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUAL_HOST)) .build(); - // 设备数据源转换 - DataStreamSource dataStreamSource = env - .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); - - // 数据过滤 - SingleOutputStreamOperator streamOperator = dataStreamSource - .filter((FilterFunction) value -> { - - Long reportTime = value.getReportTime(); - if (reportTime != null - && value.getDataSource() != null && value.getMachinePwrStat() != null) { - long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - // 晚30分钟的数据就不要了 - return nowTime - reportTime <= (30 * 60 * 1000); - } - return false; - }); + final DataStream stream = env + .addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE), + true, new MachineOutputCommandDeserializationSchema())).setParallelism(1); // mac分组并进行工作时长的集合操作 - DataStream machineIotDataReceivedEventDataStream = streamOperator - .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) - .process(new KeyedProcessFunction() { + DataStream machineIotDataReceivedEventDataStream = stream + .keyBy(MachineOutputCommand::getMac) + .process(new KeyedProcessFunction() { // 上次设备数据 private ValueState deviceTotalDataStat; @@ -133,23 +123,30 @@ public class IotMonitoringDataJob { } @Override - public void processElement(MachineIotDataReceivedEvent receivedEvent, - KeyedProcessFunction.Context ctx, + public void processElement(MachineOutputCommand command, + KeyedProcessFunction.Context ctx, Collector out) { try { - DeviceTotalData lastedDeviceState = getLastDeviceTotalData(receivedEvent); + DeviceTotalData lastedDeviceState = getLastDeviceTotalData(command); Long lastReportTime = lastedDeviceState.getReportTime(); - Long reportTime = receivedEvent.getReportTime(); + Long reportTime = command.getTimestamp(); // 如果这次的消息事件小于上次消息的时间,那么就进行丢弃 if (lastReportTime <= reportTime) { Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat(); Integer lastPwStat = lastedDeviceState.getMachinePwrStat(); // 上次启动时间 Long lastBootTime = lastedDeviceState.getLastBootTime(); + Long lastTheDayDuration = lastedDeviceState.getTheDayDuration(); + Long lastTheDayJobDuration = lastedDeviceState.getTheDayJobDuration(); + Long lastJobDurationTotal = lastedDeviceState.getJobDurationTotal(); + Long lastTheDayJobCount = lastedDeviceState.getTheDayJobCount(); + Long lastJobTotal = lastedDeviceState.getJobTotal(); // 如果当前消息的时间大于等于上次消息的时间才进行处理 - Integer machinePwrStat = receivedEvent.getMachinePwrStat(); - Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); + Integer machinePwrStat = command.getMachinePwrStat(); + Integer machineWorkingStat = command.getMachineWorkingStat(); + Long currDuration = command.getCurrDuration(); + Long currCount = command.getCurrCount(); // 当前数据 DeviceTotalData nowDeviceState = new DeviceTotalData(); nowDeviceState.setMachinePwrStat(machinePwrStat); @@ -160,11 +157,11 @@ public class IotMonitoringDataJob { if (lastPwStat != 0) { if (lastWorkingStat == 1) { // 如果上次是工作状态,那么需要记录产量和生产时间 - nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrDuration()); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrDuration()); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrDuration()); - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrCount()); - nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrCount()); + nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration); + nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration); + nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration); + nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount); + nowDeviceState.setJobTotal(lastJobTotal + currCount); } else { nowDeviceState = lastedDeviceState; } @@ -180,16 +177,16 @@ public class IotMonitoringDataJob { // 开机 if (machineWorkingStat.equals(1)) { // 工作 - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrCount()); - nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrCount()); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrDuration()); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrDuration()); + nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount); + nowDeviceState.setJobTotal(lastJobTotal + currCount); + nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration); + nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration); } else { // 待机 nowDeviceState = lastedDeviceState; } // 设置开机时长,待机也要进行累加,所以放这里 - nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrDuration()); + nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration); nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); nowDeviceState.setLastBootTime(lastBootTime); nowDeviceState.setReportTime(reportTime); @@ -206,10 +203,10 @@ public class IotMonitoringDataJob { if (((!(lastWorkingStat == 2 && machineWorkingStat == 2)) && (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) { DeviceMonitoringData data = new DeviceMonitoringData(); - data.setDataSource(receivedEvent.getDataSource()); - data.setMachineIotMac(receivedEvent.getMachineIotMac()); - data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); - data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); + data.setDataSource(command.getDataSource()); + data.setMachineIotMac(command.getMac()); + data.setMachinePwrStat(command.getMachinePwrStat()); + data.setMachineWorkingStat(command.getMachineWorkingStat()); data.setAccJobCount(nowDeviceState.getJobTotal()); data.setCurrJobCount(nowDeviceState.getTheDayJobCount()); data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration()); @@ -225,21 +222,22 @@ public class IotMonitoringDataJob { } } } catch (Exception e) { - log.info("导致异常的信息:" + JSONUtil.toJsonStr(receivedEvent)); + log.info("导致异常的信息:" + JSONUtil.toJsonStr(command)); log.error("处理异常", e); } } - private DeviceTotalData getLastDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception { + private DeviceTotalData getLastDeviceTotalData(MachineOutputCommand command) throws Exception { // 上一次的数据 DeviceTotalData value = deviceTotalDataStat.value(); - Long reportTime = event.getReportTime(); + Long reportTime = command.getTimestamp(); LocalDate localDate = new Date(reportTime).toLocalDate(); + Long mac = command.getMac(); if (value == null) { value = new DeviceTotalData(); // 从es中获取 - DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac()); + DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(mac); if (deviceMonitoringData != null) { value.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); value.setJobTotal(deviceMonitoringData.getAccJobCount()); @@ -254,7 +252,7 @@ public class IotMonitoringDataJob { } else { // es中也没有,直接从老接口拿 isExistEs = false; - value = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime); + value = queryDeviceMonitoringData(mac, reportTime); } // 因为ReportTime参与后面的计算,所以如果是第一次取这个数据需要设置为当前消息的时间,要不然会有很大的差值 value.setReportTime(reportTime); diff --git a/src/main/java/com/qniao/iot/constant/ConfigConstant.java b/src/main/java/com/qniao/iot/constant/ConfigConstant.java index ea5cf40..c660f2c 100644 --- a/src/main/java/com/qniao/iot/constant/ConfigConstant.java +++ b/src/main/java/com/qniao/iot/constant/ConfigConstant.java @@ -33,4 +33,16 @@ public interface ConfigConstant { String ES_PASSWORD = "es.password"; String ES_CONNECT_TIMEOUT = "es.connect.timeout"; + + String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host"; + + String SOURCE_RABBITMQ_POST = "source.rabbitmq.post"; + + String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.username"; + + String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password"; + + String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtual.host"; + + String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue"; }