From 250b7a0c0277417ea0c04280c7c33ec6b53ad5cb Mon Sep 17 00:00:00 2001 From: "lizhongkang@qniao.cn" Date: Tue, 18 Oct 2022 15:53:51 +0800 Subject: [PATCH] =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RootCloudIotDataFormatterJob.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java index 5d38a47..aeef7cc 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java @@ -43,11 +43,14 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.config.SaslConfigs; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Properties; /** * Skeleton for a Flink DataStream Job. @@ -80,6 +83,10 @@ public class RootCloudIotDataFormatterJob { .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) + .setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") + .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") + .setProperty("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";") .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setValueOnlyDeserializer(new AIRootCloudWarningDataReceivedEventDeserializationSchema()) .build(); @@ -90,10 +97,17 @@ public class RootCloudIotDataFormatterJob { .map((MapFunction) RootCloudIotDataFormatterJob::transform) .name("Transform AIWarningDataReceivedEvent"); + Properties kafkaProducerConfig = new Properties(); + kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); + kafkaProducerConfig.setProperty("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); + // 写入kafka transformDs.sinkTo( KafkaSink.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) + .setKafkaProducerConfig(kafkaProducerConfig) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS))