Browse Source

新增kafka鉴权

hph-优化版本
1049970895@qniao.cn 3 years ago
parent
commit
70e5c5b737
1 changed files with 4 additions and 3 deletions
  1. 7
      iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java

7
iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java

@ -65,6 +65,8 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import java.time.*; import java.time.*;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
@ -105,18 +107,17 @@ public class GizWitsIotDataFormatterJob {
}).name("Transform MachineIotDataReceivedEvent"); }).name("Transform MachineIotDataReceivedEvent");
// kafka 认证配置暂时注释后续可能需要放开 // kafka 认证配置暂时注释后续可能需要放开
/*Properties kafkaProducerConfig = new Properties();
Properties kafkaProducerConfig = new Properties();
kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
kafkaProducerConfig.setProperty("sasl.jaas.config", kafkaProducerConfig.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";");
*/
// 写入kafka // 写入kafka
transformDs.sinkTo( transformDs.sinkTo(
KafkaSink.<MachineIotDataReceivedEvent>builder() KafkaSink.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) .setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS))
//.setKafkaProducerConfig(kafkaProducerConfig)
.setKafkaProducerConfig(kafkaProducerConfig)
.setRecordSerializer( .setRecordSerializer(
KafkaRecordSerializationSchema.builder() KafkaRecordSerializationSchema.builder()
.setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS)) .setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS))

Loading…
Cancel
Save