From 70e5c5b73768dd145403317719e0b4ef6236c83a Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Wed, 7 Sep 2022 17:45:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Ekafka=E9=89=B4=E6=9D=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java index ed2d54a..676dc12 100644 --- a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java @@ -65,6 +65,8 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.util.Collector; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SaslConfigs; import java.time.*; import java.time.format.DateTimeFormatter; @@ -105,18 +107,17 @@ public class GizWitsIotDataFormatterJob { }).name("Transform MachineIotDataReceivedEvent"); // kafka 认证配置,暂时注释,后续可能需要放开 - /*Properties kafkaProducerConfig = new Properties(); + Properties kafkaProducerConfig = new Properties(); kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); kafkaProducerConfig.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); - */ // 写入kafka transformDs.sinkTo( KafkaSink.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) - //.setKafkaProducerConfig(kafkaProducerConfig) + .setKafkaProducerConfig(kafkaProducerConfig) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS))