diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java index ed2d54a..676dc12 100644 --- a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java @@ -65,6 +65,8 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.util.Collector; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SaslConfigs; import java.time.*; import java.time.format.DateTimeFormatter; @@ -105,18 +107,17 @@ public class GizWitsIotDataFormatterJob { }).name("Transform MachineIotDataReceivedEvent"); // kafka 认证配置,暂时注释,后续可能需要放开 - /*Properties kafkaProducerConfig = new Properties(); + Properties kafkaProducerConfig = new Properties(); kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); kafkaProducerConfig.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); - */ // 写入kafka transformDs.sinkTo( KafkaSink.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) - //.setKafkaProducerConfig(kafkaProducerConfig) + .setKafkaProducerConfig(kafkaProducerConfig) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS))