From 260c9deec23088ea0c74a91372411a0a58e3fb25 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Wed, 13 Jul 2022 17:30:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iot-machine-data-command/pom.xml | 2 +- iot-machine-data-constant/pom.xml | 2 +- iot-machine-data-event/pom.xml | 2 +- .../event/MachineIotDataReceivedEvent.java | 4 +- ...eivedEventRabbitMqSerializationSchema.java | 21 +++ .../dependency-reduced-pom.xml | 69 ++++++++- iot-machine-state-event-generator-job/pom.xml | 67 ++++++--- .../job/IotMachineEventGeneratorJob.java | 141 +++++++++++++++--- pom.xml | 4 +- 9 files changed, 258 insertions(+), 54 deletions(-) create mode 100644 iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqSerializationSchema.java diff --git a/iot-machine-data-command/pom.xml b/iot-machine-data-command/pom.xml index 263af16..21bbb40 100644 --- a/iot-machine-data-command/pom.xml +++ b/iot-machine-data-command/pom.xml @@ -3,8 +3,8 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - iot-machine-state-event-generator com.qniao + java-dependency 0.0.1-SNAPSHOT 4.0.0 diff --git a/iot-machine-data-constant/pom.xml b/iot-machine-data-constant/pom.xml index 3d22f37..28280f4 100644 --- a/iot-machine-data-constant/pom.xml +++ b/iot-machine-data-constant/pom.xml @@ -3,8 +3,8 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - iot-machine-state-event-generator com.qniao + java-dependency 0.0.1-SNAPSHOT 4.0.0 diff --git a/iot-machine-data-event/pom.xml b/iot-machine-data-event/pom.xml index 78a4d0f..1f59073 100644 --- a/iot-machine-data-event/pom.xml +++ b/iot-machine-data-event/pom.xml @@ -3,8 +3,8 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - iot-machine-state-event-generator com.qniao + java-dependency 0.0.1-SNAPSHOT 4.0.0 diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java index e8fc9eb..b6e68a0 100644 --- a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java +++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java @@ -28,12 +28,12 @@ public class MachineIotDataReceivedEvent implements Serializable { private Long machineIotMac; /** - * 机器电源状态 + * 机器电源状态(0断电 1供电) */ private Integer machinePwrStat; /** - * 机器工作状态 + * 机器工作状态(0停机状态 1工作状态 2待机状态) */ private Integer machineWorkingStat; diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqSerializationSchema.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqSerializationSchema.java new file mode 100644 index 0000000..ad53212 --- /dev/null +++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqSerializationSchema.java @@ -0,0 +1,21 @@ +package com.qniao.iot.machine.event; + +import com.qniao.domain.BaseCommand; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + + +public class MachineIotDataReceivedEventRabbitMqSerializationSchema implements SerializationSchema { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public byte[] serialize(BaseCommand command) { + try { + return OBJECT_MAPPER.writeValueAsBytes(command); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Could not serialize record: " + command, e); + } + } +} diff --git a/iot-machine-state-event-generator-job/dependency-reduced-pom.xml b/iot-machine-state-event-generator-job/dependency-reduced-pom.xml index 6eb25e6..3440136 100644 --- a/iot-machine-state-event-generator-job/dependency-reduced-pom.xml +++ b/iot-machine-state-event-generator-job/dependency-reduced-pom.xml @@ -1,7 +1,7 @@ - iot-machine-state-event-generator + java-dependency com.qniao 0.0.1-SNAPSHOT @@ -45,12 +45,6 @@ - - - - com.qniao.iot.machine.event.generator.job.IotMachineEventGeneratorJob - - @@ -58,6 +52,18 @@ + + com.qniao + iot-machine-data-event + 0.0.1-SNAPSHOT + provided + + + org.apache.flink + flink-streaming-java + 1.15.0 + provided + org.apache.logging.log4j log4j-slf4j-impl @@ -76,7 +82,56 @@ 2.17.2 runtime + + com.fasterxml.jackson.core + jackson-databind + 2.13.3 + provided + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + 2.13.3 + provided + + + com.qniao + iot-machine-data-command + 0.0.1-SNAPSHOT + provided + + + com.qniao + iot-machine-data-constant + 0.0.1-SNAPSHOT + provided + + + org.apache.flink + flink-connector-rabbitmq_2.12 + 1.14.5 + provided + + + org.apache.flink + flink-connector-elasticsearch7_2.11 + 1.14.5 + provided + + + commons-logging + commons-logging + 1.2 + provided + + + + maven-releases + Nexus releases Repository + http://120.78.76.88:8081/repository/maven-snapshots/ + + 1.8 2.17.2 diff --git a/iot-machine-state-event-generator-job/pom.xml b/iot-machine-state-event-generator-job/pom.xml index e1fab28..b18756e 100644 --- a/iot-machine-state-event-generator-job/pom.xml +++ b/iot-machine-state-event-generator-job/pom.xml @@ -3,8 +3,8 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - iot-machine-state-event-generator com.qniao + java-dependency 0.0.1-SNAPSHOT 4.0.0 @@ -34,17 +34,17 @@ flink-streaming-java ${flink.version} - + - + @@ -76,6 +76,36 @@ jackson-datatype-jsr310 2.13.3 + + + com.qniao + iot-machine-data-command + 0.0.1-SNAPSHOT + + + + com.qniao + iot-machine-data-constant + 0.0.1-SNAPSHOT + + + + org.apache.flink + flink-connector-rabbitmq_2.12 + 1.14.5 + + + + org.apache.flink + flink-connector-elasticsearch7_2.11 + 1.14.5 + + + + commons-logging + commons-logging + 1.2 + @@ -94,12 +124,12 @@ - + + <!– Run shade goal on package phase –> package @@ -116,8 +146,8 @@ - + <!– Do not copy the signatures in the META-INF folder. + Otherwise, this might cause SecurityExceptions when using the JAR. –> *:* META-INF/*.SF @@ -126,19 +156,18 @@ - - - - com.qniao.iot.machine.event.generator.job.IotMachineEventGeneratorJob - - - - + --> + + + + maven-releases + Nexus releases Repository + http://120.78.76.88:8081/repository/maven-snapshots/ + + \ No newline at end of file diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 9671ba4..72ce839 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -1,27 +1,126 @@ package com.qniao.iot.machine.event.generator.job; -import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; -import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.connector.kafka.source.KafkaSource; -import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import com.qniao.domain.BaseCommand; +import com.qniao.iot.machine.command.PowerOffMachineCommand; +import com.qniao.iot.machine.command.PowerOnMachineCommand; +import com.qniao.iot.machine.command.StartMachineWorkingCommand; +import com.qniao.iot.machine.command.StopMachineWorkingCommand; +import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema; +import com.rabbitmq.client.AMQP; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; public class IotMachineEventGeneratorJob { - public static void main(String[] args) throws Exception { - final ParameterTool params = ParameterTool.fromArgs(args); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); - - // 定义Kafka数据源 - KafkaSource source = KafkaSource.builder() - .setBootstrapServers(params.get("source.bootstrap.servers")) - .setTopics("root_cloud_iot_report_data_event") - .setGroupId("iot.machine.generator") - .setStartingOffsets(OffsetsInitializer.earliest()) - .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) - .build(); + + public static void sinkRabbitMq(DataStream commandDataStream) { + + // rabbitmq配置(测试环境) + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost("8.135.8.221") + .setVirtualHost("/") + .setUserName("qniao") + .setPassword("Qianniao2020") + .setPort(5672).build(); + + // 发送相应的指令到rabbitmq的交换机 + commandDataStream.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(), new RMQSinkPublishOptions() { + + @Override + public String computeRoutingKey(BaseCommand command) { + return "machine-iot-data-received-event"; + } + + @Override + public AMQP.BasicProperties computeProperties(BaseCommand command) { + return null; + } + + @Override + public String computeExchange(BaseCommand command) { + + // 交换机名称 + return "flink_test_exchange"; + } + })).name("commandDataStream to rabbitmq Sink"); + + // 直接发队列 + // commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12"); + } + + public static void sinkEs(DataStream commandDataStream) { + + List httpHosts = new ArrayList<>(); + httpHosts.add(new HttpHost("119.23.41.137", 9200, "http")); + ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, + (ElasticsearchSinkFunction) (command, runtimeContext, requestIndexer) -> { + + HashMap map = new HashMap<>(); + if(command instanceof PowerOnMachineCommand) { + // 设备开机数据 + PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand)command; + map.put("id", powerOnMachineCommand.getId().toString()); + map.put("currTotalOutput", powerOnMachineCommand.getCurrTotalOutput().toString()); + } + if(command instanceof PowerOffMachineCommand) { + // 设备关机数据 + PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand)command; + map.put("id", powerOffMachineCommand.getId().toString()); + map.put("currTotalOutput", powerOffMachineCommand.getCurrTotalOutput().toString()); + } + if(command instanceof StopMachineWorkingCommand) { + // 设备待机数据 + StopMachineWorkingCommand stopMachineWorkingCommand = (StopMachineWorkingCommand)command; + map.put("id", stopMachineWorkingCommand.getId().toString()); + map.put("currTotalOutput", stopMachineWorkingCommand.getCurrTotalOutput().toString()); + } + if(command instanceof StartMachineWorkingCommand) { + // 设备工作数据 + StartMachineWorkingCommand startMachineWorkingCommand = (StartMachineWorkingCommand)command; + map.put("id", startMachineWorkingCommand.getId().toString()); + map.put("currTotalOutput", startMachineWorkingCommand.getCurrTotalOutput().toString()); + } + //创建es 请求 + IndexRequest indexRequest = Requests.indexRequest().index("machine-iot-data-received-event").source(map); + requestIndexer.add(indexRequest); + } + ); + //刷新前缓冲的最大动作量 + esSinkBuilder.setBulkFlushMaxActions(10); + //刷新前缓冲区的最大数据大小(以MB为单位) + esSinkBuilder.setBulkFlushMaxSizeMb(5); + //论缓冲操作的数量或大小如何都要刷新的时间间隔 + esSinkBuilder.setBulkFlushInterval(5000L); + // 客户端创建配置回调,配置账号密码 + esSinkBuilder.setRestClientFactory( + restClientBuilder -> { + restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","qn56521")); + return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + }); + restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { + // 设置es连接超时时间 + requestConfigBuilder.setConnectTimeout(3000); + return requestConfigBuilder; + }); + } + ); + //数据流添加sink + commandDataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink"); } } diff --git a/pom.xml b/pom.xml index fcd5af0..b92b0fd 100644 --- a/pom.xml +++ b/pom.xml @@ -46,14 +46,14 @@ under the License. - + nexus qniao