From d4b501f8034bd8eb4c24bf964125d658f1ae9d19 Mon Sep 17 00:00:00 2001 From: "lizhongkang@qniao.cn" Date: Tue, 18 Oct 2022 16:26:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=94=B9=E4=B8=BArabbit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../AIRootCloudWarningDataReceivedEvent.java | 12 ++++ .../AIWarningDataReceivedEvent.java | 6 ++ .../RootCloudIotDataFormatterJob.java | 55 ++++++++++++------- .../config/ApolloConfig.java | 5 ++ .../constant/ConfigConstant.java | 9 +++ 5 files changed, 68 insertions(+), 19 deletions(-) diff --git a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java index f96a868..fb9a8fa 100644 --- a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java +++ b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java @@ -1,7 +1,9 @@ package com.qniao.rootcloudevent; import com.fasterxml.jackson.annotation.JsonAutoDetect; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.List; @@ -11,6 +13,8 @@ import java.util.List; * @date 2022/10/17 **/ @Data +@AllArgsConstructor +@NoArgsConstructor @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY, getterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) public class AIRootCloudWarningDataReceivedEvent implements Serializable { @@ -25,6 +29,8 @@ public class AIRootCloudWarningDataReceivedEvent implements Serializable { private Long timestamp; @Data + @AllArgsConstructor + @NoArgsConstructor public static class Payload { private String code; @@ -46,6 +52,8 @@ public class AIRootCloudWarningDataReceivedEvent implements Serializable { } @Data + @AllArgsConstructor + @NoArgsConstructor public static class Info { /** * 摄像头id @@ -63,6 +71,8 @@ public class AIRootCloudWarningDataReceivedEvent implements Serializable { } @Data + @AllArgsConstructor + @NoArgsConstructor public static class Alarm { /** * 图片地址 @@ -94,6 +104,8 @@ public class AIRootCloudWarningDataReceivedEvent implements Serializable { } @Data + @AllArgsConstructor + @NoArgsConstructor public static class Extention { } diff --git a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java index 98c823f..c1e8f9b 100644 --- a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java +++ b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java @@ -1,6 +1,8 @@ package com.qniao.rootcloudevent; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.List; @@ -10,6 +12,8 @@ import java.util.List; * @date 2022/10/17 **/ @Data +@AllArgsConstructor +@NoArgsConstructor public class AIWarningDataReceivedEvent implements Serializable { private static final long serialVersionUID = 1L; /** @@ -53,6 +57,8 @@ public class AIWarningDataReceivedEvent implements Serializable { private List alarmList; @Data + @AllArgsConstructor + @NoArgsConstructor public static class Alarm { /** * 图片地址 diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java index aeef7cc..bdc814c 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java @@ -21,6 +21,7 @@ package com.qniao.rootcloudstatistics; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; +import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema; import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedEvent; import com.qniao.rootcloudevent.AIWarningDataReceivedEvent; import com.qniao.rootcloudstatistics.config.ApolloConfig; @@ -30,6 +31,7 @@ import com.qniao.rootcloudstatistics.event.AIRootCloudWarningDataReceivedEventDe import com.qniao.rootcloudstatistics.event.AIWarningDataReceivedEventSerializationSchema; import com.qniao.rootcloudstatistics.until.OSSUtils; import com.qniao.rootcloudstatistics.until.SnowFlake; +import com.rabbitmq.client.AMQP; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; @@ -43,6 +45,9 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.config.SaslConfigs; @@ -97,25 +102,37 @@ public class RootCloudIotDataFormatterJob { .map((MapFunction) RootCloudIotDataFormatterJob::transform) .name("Transform AIWarningDataReceivedEvent"); - Properties kafkaProducerConfig = new Properties(); - kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); - kafkaProducerConfig.setProperty("sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); - - // 写入kafka - transformDs.sinkTo( - KafkaSink.builder() - .setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) - .setKafkaProducerConfig(kafkaProducerConfig) - .setRecordSerializer( - KafkaRecordSerializationSchema.builder() - .setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS)) - .setValueSerializationSchema(new AIWarningDataReceivedEventSerializationSchema()) - .build() - ).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) - .build() - ).name("AIWarningDataReceivedEvent Sink"); + // rabbitmq配置 + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_HOST)) + .setVirtualHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST)) + .setUserName(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_USER_NAME)) + .setPassword(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_PASSWORD)) + .setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)) + .build(); + + // 写入Rabbit + transformDs.addSink(new RMQSink<>(connectionConfig, new AIWarningDataReceivedEventSerializationSchema(), + new RMQSinkPublishOptions() { + + @Override + public String computeRoutingKey(AIWarningDataReceivedEvent aiWarningDataReceivedEvent) { + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_AI_WARNING_ROUTING_KEY); + } + + @Override + public AMQP.BasicProperties computeProperties(AIWarningDataReceivedEvent aiWarningDataReceivedEvent) { + return null; + } + + @Override + public String computeExchange(AIWarningDataReceivedEvent aiWarningDataReceivedEvent) { + // 交换机名称 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE); + } + }) + + ).name("AIWarningDataStream to rabbitmq Sink"); // 发送到OSS存储 String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/config/ApolloConfig.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/config/ApolloConfig.java index 75d04a2..2cc7a24 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/config/ApolloConfig.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/config/ApolloConfig.java @@ -16,4 +16,9 @@ public class ApolloConfig { return config.getProperty(key, null); } + + public static Integer getInt(String key) { + + return config.getIntProperty(key, null); + } } diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java index 5add8cc..25deff1 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java @@ -17,4 +17,13 @@ public interface ConfigConstant { String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; String SINK_OSS_PATH = "sink.oss.path"; + + + String SINK_RABBITMQ_HOST = "sink.rabbitmq.host"; + String SINK_RABBITMQ_PORT = "sink.rabbitmq.port"; + String SINK_RABBITMQ_VIRTUAL_HOST = "sink.rabbitmq.virtualHost"; + String SINK_RABBITMQ_USER_NAME = "sink.rabbitmq.userName"; + String SINK_RABBITMQ_PASSWORD = "sink.rabbitmq.password"; + String SINK_RABBITMQ_AI_WARNING_ROUTING_KEY = "sink.rabbitmq.aiWarning.routingKey"; + String SINK_RABBITMQ_EXCHANGE = "sink.rabbitmq.exchange"; }