From bac70e001e088745dbd071adf98813db806d824b Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Wed, 7 Sep 2022 17:44:39 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Ekafka=E9=89=B4=E6=9D=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/qniao/iot/rc/RootCloudIotDataFormatterJob.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 7fc1581..4ec1f5d 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -105,12 +105,11 @@ public class RootCloudIotDataFormatterJob { .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUPID)) - /*.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") + .setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") .setProperty("sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";")*/ + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";") .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) - //.setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); @@ -126,13 +125,13 @@ public class RootCloudIotDataFormatterJob { kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); kafkaProducerConfig.setProperty("sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); // 写入kafka transformDs.sinkTo( KafkaSink.builder() .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) - //.setKafkaProducerConfig(kafkaProducerConfig) + .setKafkaProducerConfig(kafkaProducerConfig) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_TOPICS))