From 97acc8d5668efbeadcf3dcf9875ef5c46581998e Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Wed, 13 Jul 2022 17:31:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- root-cloud-mocker/pom.xml | 6 + .../rc/RootCloudIotDataEventSourceMocker.java | 19 ++- root-cloud-statistics/pom.xml | 28 ++-- .../iot/rc/RootCloudIotDataFormatterJob.java | 143 +++--------------- .../src/main/resources/db.setting | 18 +++ 5 files changed, 77 insertions(+), 137 deletions(-) create mode 100644 root-cloud-statistics/src/main/resources/db.setting diff --git a/root-cloud-mocker/pom.xml b/root-cloud-mocker/pom.xml index 94893a1..526eea1 100644 --- a/root-cloud-mocker/pom.xml +++ b/root-cloud-mocker/pom.xml @@ -35,6 +35,12 @@ flink-connector-kafka ${flink.version} + + + cn.hutool + hutool-all + 5.8.4 + diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java index efa66e2..68e8ce0 100644 --- a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java @@ -1,12 +1,17 @@ package com.qniao.iot.rc; +import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.util.RandomUtil; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; +import javax.swing.plaf.ListUI; import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; import java.util.Properties; public class RootCloudIotDataEventSourceMocker { @@ -22,19 +27,23 @@ public class RootCloudIotDataEventSourceMocker { String topic = "root_cloud_iot_report_data_event"; - + // 设备标识 + List assetIdList = Arrays.asList("10000","20000","30000","40000","5000","6000"); + // 电源状态 + List pwrStaList = Arrays.asList(0, 1); + // 设备工作状态(0停机 1工作 2待机) + List accStaList = Arrays.asList(0, 1, 2); // 循环发送事件 while (true) { RootCloudIotDataReceiptedEvent event = new RootCloudIotDataReceiptedEvent(); - event.set__assetId__("10000"); + event.set__assetId__(RandomUtil.randomEles(assetIdList, 1).get(0)); event.setACC_count(50L); event.setACC_count_total(500L); - event.setPWR_sta(1); - event.setWorking_sta(1); + event.setPWR_sta(RandomUtil.randomEles(pwrStaList, 1).get(0)); + event.setWorking_sta(RandomUtil.randomEles(accStaList, 1).get(0)); event.setStoping_duration("100"); event.setRunning_duration(new BigDecimal(1250)); - event.setIG_sta(1); event.setWaiting_duration(new BigDecimal(500)); ProducerRecord record = new RootCloudIotDataEventSerialization(topic).serialize( event, diff --git a/root-cloud-statistics/pom.xml b/root-cloud-statistics/pom.xml index d4c9d35..723ac16 100644 --- a/root-cloud-statistics/pom.xml +++ b/root-cloud-statistics/pom.xml @@ -94,16 +94,6 @@ under the License. commons-logging 1.2 - com.qniao @@ -123,6 +113,12 @@ under the License. 0.0.1-SNAPSHOT + + com.qniao + iot-machine-state-event-generator-job + 0.0.1-SNAPSHOT + + org.apache.flink flink-connector-rabbitmq_2.12 @@ -134,6 +130,18 @@ under the License. flink-connector-elasticsearch7_2.11 1.14.5 + + + cn.hutool + hutool-all + + + + + mysql + mysql-connector-java + 8.0.29 + diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index b2b47f5..1798ee8 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -18,6 +18,8 @@ package com.qniao.iot.rc; +import cn.hutool.db.Db; +import cn.hutool.db.Entity; import com.fasterxml.jackson.databind.util.JSONPObject; import com.qniao.domain.BaseCommand; import com.qniao.iot.machine.command.PowerOffMachineCommand; @@ -25,6 +27,7 @@ import com.qniao.iot.machine.command.PowerOnMachineCommand; import com.qniao.iot.machine.command.StartMachineWorkingCommand; import com.qniao.iot.machine.command.StopMachineWorkingCommand; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +import com.qniao.iot.machine.event.generator.job.IotMachineEventGeneratorJob; import com.qniao.iot.rc.command.BaseCommandSerializationSchema; import com.qniao.iot.rc.constant.DataSource; import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema; @@ -32,7 +35,9 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.tools.json.JSONUtil; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; @@ -45,28 +50,10 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; -import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; -import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; -import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; -import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; -import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.util.Collector; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; import java.math.BigDecimal; -import java.net.InetSocketAddress; +import java.sql.SQLException; import java.util.*; /** @@ -112,28 +99,28 @@ public class RootCloudIotDataFormatterJob { private ValueState deviceState; @Override - public void open(Configuration parameters) { + public void open(Configuration parameters) throws SQLException { // 必须在 open 生命周期初始化 + // TODO 获取所有设备的最新状态 + //Db.use().findAll(new Entity(), Object.class); deviceState = getRuntimeContext() .getState(new ValueStateDescriptor<>("deviceState", DeviceState.class)); } + + @Override public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction.Context ctx, Collector out) throws Exception { - System.out.println("收到事件数据-------------------------:" + event); // 获取最新设备状态 DeviceState lastedDeviceState = deviceState.value(); Integer deviceStatus = getDeviceStatus(event); - if (deviceStatus == null) { out.collect(null); } else { if (lastedDeviceState == null) { - // TODO 后续优化,先从数据库中获取设备最新的状态 deviceState.update(new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); - System.out.println("初始化设备状态" + deviceState.value().toString()); } else { DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); DeviceState oldState = deviceState.value(); @@ -142,95 +129,21 @@ public class RootCloudIotDataFormatterJob { } } } - }).name("keyBy stream"); + }).filter((FilterFunction) Objects::nonNull).name("keyBy stream"); // 写入rabbitmq - sinkRabbitMq(commandDataStream); + IotMachineEventGeneratorJob.sinkRabbitMq(commandDataStream); // 写入es - sinkEs(commandDataStream); + IotMachineEventGeneratorJob.sinkEs(commandDataStream); env.execute("Kafka Job"); } - private static void sinkEs(DataStream commandDataStream) { - - List httpHosts = new ArrayList<>(); - httpHosts.add(new HttpHost("119.23.41.137", 9200, "http")); - ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, - (ElasticsearchSinkFunction) (command, runtimeContext, requestIndexer) -> { - - HashMap map = new HashMap<>(); - if(command instanceof PowerOnMachineCommand) { - PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand)command; - map.put("id", powerOnMachineCommand.getId().toString()); - map.put("currTotalOutput", powerOnMachineCommand.getCurrTotalOutput().toString()); - } - - if(command instanceof PowerOffMachineCommand) { - PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand)command; - map.put("id", powerOffMachineCommand.getId().toString()); - map.put("currTotalOutput", powerOffMachineCommand.getCurrTotalOutput().toString()); - } - //创建es 请求 - IndexRequest indexRequest = Requests.indexRequest().index("flink").source(map); - requestIndexer.add(indexRequest); - } - ); - //刷新前缓冲的最大动作量 - esSinkBuilder.setBulkFlushMaxActions(10); - //刷新前缓冲区的最大数据大小(以MB为单位) - esSinkBuilder.setBulkFlushMaxSizeMb(5); - //论缓冲操作的数量或大小如何都要刷新的时间间隔 - esSinkBuilder.setBulkFlushInterval(5000L); - // 客户端创建配置回调,配置账号密码 - esSinkBuilder.setRestClientFactory( - restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","qn56521")); - return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - }) - ); - //数据流添加sink - commandDataStream.addSink(esSinkBuilder.build()).name("BaseCommand sink"); - } - - private static void sinkRabbitMq(DataStream commandDataStream) { - - // rabbitmq配置(测试环境) - RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() - .setHost("8.135.8.221") - .setVirtualHost("/") - .setUserName("qniao") - .setPassword("Qianniao2020") - .setPort(5672).build(); - // 发送相应的指令到rabbitmq的交换机 - commandDataStream.addSink(new RMQSink<>(connectionConfig, new BaseCommandSerializationSchema(), new RMQSinkPublishOptions() { - @Override - public String computeRoutingKey(BaseCommand command) { - return "flink"; - } - - @Override - public AMQP.BasicProperties computeProperties(BaseCommand command) { - return null; - } - @Override - public String computeExchange(BaseCommand command) { - - // 交换机名称 - System.out.println("发送消息:------------------" + command.toString()); - return "flink_test_exchange"; - } - })).name("PowerOffMachineCommand Sink"); - - // 直接发队列 - // commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12"); - } private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) { @@ -258,38 +171,24 @@ public class RootCloudIotDataFormatterJob { private static Integer getDeviceStatus(MachineIotDataReceivedEvent event) { - if (event.getMachinePwrStat() == 0) { - return 0; - } else if (event.getMachinePwrStat() == 1 && event.getMachineWorkingStat() == 1) { - return 1; - } else if (event.getMachinePwrStat() == 1 && event.getMachineWorkingStat() == 0) { - return 2; - } - - return null; + // 设备状态 + return event.getMachineWorkingStat(); } private static void collDeviceStatusChange(Collector out, DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) { - /*if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { - System.out.println("设备开机。相关事件:" + event.toString()); + if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { + // 设备开机 out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); } else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { - System.out.println("设备关机。相关事件:" + event.toString()); + // 设备关机 out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); } else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { - System.out.println("设备开始待机。相关事件:" + event.toString()); + // 设备开始待机 out.collect(new StopMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); } else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { - System.out.println("设备开始工作。相关事件:" + event.toString()); + // 设备开始工作 out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); - }*/ - if (newState.getStatus() == 1 || newState.getStatus() == 2) { - System.out.println("设备开机。相关事件:" + event.toString()); - out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); - } else if (newState.getStatus() == 0) { - System.out.println("设备关机。相关事件:" + event.toString()); - out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); } } } diff --git a/root-cloud-statistics/src/main/resources/db.setting b/root-cloud-statistics/src/main/resources/db.setting new file mode 100644 index 0000000..467c2bf --- /dev/null +++ b/root-cloud-statistics/src/main/resources/db.setting @@ -0,0 +1,18 @@ +## db.setting文件 + +url = jdbc:mysql://localhost:3306/test +user = root +pass = 123456 + + +# 是否在日志中显示执行的SQL +showSql = true + +# 是否格式化显示的SQL +formatSql = false + +# 是否显示SQL参数 +showParams = true + +# 打印SQL的日志等级,默认debug,可以是info、warn、error +sqlLevel = debug \ No newline at end of file