From be6d2765c5e7a0603c05de2fdc470031dda7efa6 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Thu, 14 Jul 2022 19:06:14 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../event/MachineIotDataReceivedEvent.java | 2 +- iot-machine-state-event-generator-job/pom.xml | 39 ++-- .../event/generator/job/DeviceState.java | 45 +++++ .../job/IotMachineEventGeneratorJob.java | 174 ++++++++++++++++-- .../src/main/resources/db.setting | 26 +++ .../src/main/resources/log4j2.properties | 25 +++ 6 files changed, 276 insertions(+), 35 deletions(-) create mode 100644 iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java create mode 100644 iot-machine-state-event-generator-job/src/main/resources/db.setting create mode 100644 iot-machine-state-event-generator-job/src/main/resources/log4j2.properties diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java index b6e68a0..2c2c5c8 100644 --- a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java +++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java @@ -33,7 +33,7 @@ public class MachineIotDataReceivedEvent implements Serializable { private Integer machinePwrStat; /** - * 机器工作状态(0停机状态 1工作状态 2待机状态) + * 机器工作状态(0未工作 1工作中) */ private Integer machineWorkingStat; diff --git a/iot-machine-state-event-generator-job/pom.xml b/iot-machine-state-event-generator-job/pom.xml index b18756e..44bf105 100644 --- a/iot-machine-state-event-generator-job/pom.xml +++ b/iot-machine-state-event-generator-job/pom.xml @@ -34,17 +34,17 @@ flink-streaming-java ${flink.version} - + - + @@ -66,16 +66,6 @@ ${log4j.version} runtime - - com.fasterxml.jackson.core - jackson-databind - 2.13.3 - - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - 2.13.3 - com.qniao @@ -106,6 +96,17 @@ commons-logging 1.2 + + + cn.hutool + hutool-all + + + + + mysql + mysql-connector-java + @@ -124,12 +125,12 @@ - package @@ -146,8 +147,8 @@ - <!– Do not copy the signatures in the META-INF folder. - Otherwise, this might cause SecurityExceptions when using the JAR. –> + *:* META-INF/*.SF @@ -159,7 +160,7 @@ - --> + diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java new file mode 100644 index 0000000..52cfb1c --- /dev/null +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java @@ -0,0 +1,45 @@ +package com.qniao.iot.machine.event.generator.job; + +import cn.hutool.json.JSONUtil; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.HashMap; +import java.util.Map; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class DeviceState { + + /** + * 机器标识 + */ + private Long machineId; + + /** + * 设备物联地址(云盒物理标识) + */ + private Long machineIotMac; + + /** + * 状态: 0:关机 1:生产中 2:待机 + */ + private Integer status; + + /** + * 发生时间 + */ + private Long updateTime; + + @Override + public String toString() { + return "设备状态:{" + + "machineId='" + machineId + '\'' + + ", status='" + status + + ", updateTime='" + updateTime + + '\'' + + '}'; + } +} diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 72ce839..d8e91d0 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -1,18 +1,38 @@ package com.qniao.iot.machine.event.generator.job; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.db.Db; +import cn.hutool.json.JSON; +import cn.hutool.json.JSONUtil; import com.qniao.domain.BaseCommand; import com.qniao.iot.machine.command.PowerOffMachineCommand; import com.qniao.iot.machine.command.PowerOnMachineCommand; import com.qniao.iot.machine.command.StartMachineWorkingCommand; import com.qniao.iot.machine.command.StopMachineWorkingCommand; +import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema; import com.rabbitmq.client.AMQP; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; +import org.apache.flink.util.Collector; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; @@ -21,13 +41,94 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; +import javax.sql.DataSource; +import java.io.IOException; +import java.sql.SQLException; +import java.util.*; +import java.util.stream.Collectors; public class IotMachineEventGeneratorJob { - public static void sinkRabbitMq(DataStream commandDataStream) { + private final static String sql = "select qm.id as machine_id, qei.mac as machine_iot_mac, qm.status\n" + + "from (select id, status from qn_machine where is_delete = 0) qm\n" + + " left join (select machine_id, equipment_information_id\n" + + " from qn_machine_binding_cloud_box\n" + + " where is_delete = 0) qmbcb ON qm.id = qmbcb.machine_id\n" + + " left join (select id, mac from qn_equipment_information where is_delete = 0) qei\n" + + " on qei.id = qmbcb.equipment_information_id"; + + public static void main(String[] args) throws Exception { + + + final ParameterTool params = ParameterTool.fromArgs(args); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); + KafkaSource source = KafkaSource.builder() + .setBootstrapServers(params.get("source.bootstrap.servers")) + .setTopics("machine_iot_data_received_event") + .setGroupId("root_cloud_iot_data_etl") + .setStartingOffsets(OffsetsInitializer.earliest()) + .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) + .build(); + + + // 设备数据分组 + DataStream commandDataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source") + .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) + .process(new KeyedProcessFunction() { + + private ValueState deviceState; + + @Override + public void open(Configuration parameters) throws SQLException, IOException { + // 必须在 open 生命周期初始化 + // 获取所有设备的最新状态 + List deviceStateList = Db.use().query(sql, DeviceState.class); + System.out.println("已获取到设备最新状态数据:" + deviceStateList.size()); + Map allMachineMap = new HashMap<>(); + if (CollUtil.isNotEmpty(deviceStateList)) { + allMachineMap = deviceStateList.stream() + .collect(Collectors.toMap(DeviceState::getMachineIotMac, + deviceState -> deviceState, (deviceState1, deviceState2) -> deviceState1)); + } + deviceState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(JSON.class), JSONUtil.parse(allMachineMap))); + } + + @Override + public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction.Context ctx, Collector out) throws Exception { + + // 获取最新设备状态 + JSON deviceStateListJson = deviceState.value(); + System.out.println("目前处理的事件:" + event); + DeviceState lastedDeviceState = deviceStateListJson.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); + Integer deviceStatus = getDeviceStatus(event); + if (deviceStatus != null) { + deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()), + new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); + if (lastedDeviceState != null) { + DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); + collDeviceStatusChange(out, newState, lastedDeviceState, event); + } + deviceState.update(deviceStateListJson); + } + } + }).name("keyBy stream"); + + + // 写入rabbitmq + sinkRabbitMq(commandDataStream); + + // 写入es + sinkEs(commandDataStream); + + env.execute("Kafka Job"); + } + + + private static void sinkRabbitMq(DataStream commandDataStream) { // rabbitmq配置(测试环境) RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() @@ -38,7 +139,8 @@ public class IotMachineEventGeneratorJob { .setPort(5672).build(); // 发送相应的指令到rabbitmq的交换机 - commandDataStream.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(), new RMQSinkPublishOptions() { + commandDataStream.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(), + new RMQSinkPublishOptions() { @Override public String computeRoutingKey(BaseCommand command) { @@ -62,7 +164,7 @@ public class IotMachineEventGeneratorJob { // commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12"); } - public static void sinkEs(DataStream commandDataStream) { + private static void sinkEs(DataStream commandDataStream) { List httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("119.23.41.137", 9200, "http")); @@ -70,27 +172,27 @@ public class IotMachineEventGeneratorJob { (ElasticsearchSinkFunction) (command, runtimeContext, requestIndexer) -> { HashMap map = new HashMap<>(); - if(command instanceof PowerOnMachineCommand) { + if (command instanceof PowerOnMachineCommand) { // 设备开机数据 - PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand)command; + PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand) command; map.put("id", powerOnMachineCommand.getId().toString()); map.put("currTotalOutput", powerOnMachineCommand.getCurrTotalOutput().toString()); } - if(command instanceof PowerOffMachineCommand) { + if (command instanceof PowerOffMachineCommand) { // 设备关机数据 - PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand)command; + PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand) command; map.put("id", powerOffMachineCommand.getId().toString()); map.put("currTotalOutput", powerOffMachineCommand.getCurrTotalOutput().toString()); } - if(command instanceof StopMachineWorkingCommand) { + if (command instanceof StopMachineWorkingCommand) { // 设备待机数据 - StopMachineWorkingCommand stopMachineWorkingCommand = (StopMachineWorkingCommand)command; + StopMachineWorkingCommand stopMachineWorkingCommand = (StopMachineWorkingCommand) command; map.put("id", stopMachineWorkingCommand.getId().toString()); map.put("currTotalOutput", stopMachineWorkingCommand.getCurrTotalOutput().toString()); } - if(command instanceof StartMachineWorkingCommand) { + if (command instanceof StartMachineWorkingCommand) { // 设备工作数据 - StartMachineWorkingCommand startMachineWorkingCommand = (StartMachineWorkingCommand)command; + StartMachineWorkingCommand startMachineWorkingCommand = (StartMachineWorkingCommand) command; map.put("id", startMachineWorkingCommand.getId().toString()); map.put("currTotalOutput", startMachineWorkingCommand.getCurrTotalOutput().toString()); } @@ -110,7 +212,7 @@ public class IotMachineEventGeneratorJob { restClientBuilder -> { restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","qn56521")); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "qn56521")); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); }); restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { @@ -123,4 +225,46 @@ public class IotMachineEventGeneratorJob { //数据流添加sink commandDataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink"); } + + + private static Integer getDeviceStatus(MachineIotDataReceivedEvent event) { + + // 设备状态 + if (Integer.valueOf("0").equals(event.getMachinePwrStat())) { + return 0; + } else if (Integer.valueOf("1").equals(event.getMachinePwrStat()) + && Integer.valueOf("1").equals(event.getMachineWorkingStat())) { + return 1; + } else if (Integer.valueOf("1").equals(event.getMachinePwrStat()) + && Integer.valueOf("0").equals(event.getMachineWorkingStat())) { + return 2; + } + return null; + } + + private static void collDeviceStatusChange(Collector out, DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) { + + if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { + // 设备开机 + out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + } else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { + // 设备关机 + out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + } else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { + // 设备开始待机 + out.collect(new StopMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + } else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { + // 设备开始工作 + out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + } + + + /*if (newState.getStatus() == 1 || newState.getStatus() == 2) { + System.out.println("设备开机。相关事件:" + event.toString()); + out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + } else if (newState.getStatus() == 0) { + System.out.println("设备关机。相关事件:" + event.toString()); + out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + }*/ + } } diff --git a/iot-machine-state-event-generator-job/src/main/resources/db.setting b/iot-machine-state-event-generator-job/src/main/resources/db.setting new file mode 100644 index 0000000..3e3aa65 --- /dev/null +++ b/iot-machine-state-event-generator-job/src/main/resources/db.setting @@ -0,0 +1,26 @@ +## db.setting文件 + +url = jdbc:mysql://rm-wz9it4fs5tk7n4tm1zo.mysql.rds.aliyuncs.com:3306/cloud_print_cloud_factory?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useTimezone=true&serverTimezone=GMT%2B8&useSSL=false +user = qn_cloudprint +pass = qncloudprint5682 + +# 是否在日志中显示执行的SQL +showSql = true + +# 是否格式化显示的SQL +formatSql = false + +# 是否显示SQL参数 +showParams = true + +# 打印SQL的日志等级,默认debug,可以是info、warn、error +sqlLevel = debug + +# 初始化时建立物理连接的个数 +initialSize = 0 + +# 最大连接池数量 +maxActive = 20 + +# 最小连接池数量 +minIdle = 0 \ No newline at end of file diff --git a/iot-machine-state-event-generator-job/src/main/resources/log4j2.properties b/iot-machine-state-event-generator-job/src/main/resources/log4j2.properties new file mode 100644 index 0000000..32c696e --- /dev/null +++ b/iot-machine-state-event-generator-job/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n