|
|
@ -105,12 +105,11 @@ public class RootCloudIotDataFormatterJob { |
|
|
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
.setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS)) |
|
|
.setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS)) |
|
|
.setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUPID)) |
|
|
.setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUPID)) |
|
|
/*.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") |
|
|
|
|
|
|
|
|
.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") |
|
|
.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") |
|
|
.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") |
|
|
.setProperty("sasl.jaas.config", |
|
|
.setProperty("sasl.jaas.config", |
|
|
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";")*/ |
|
|
|
|
|
|
|
|
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";") |
|
|
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) |
|
|
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) |
|
|
//.setStartingOffsets(OffsetsInitializer.earliest()) |
|
|
|
|
|
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) |
|
|
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) |
|
|
.build(); |
|
|
.build(); |
|
|
|
|
|
|
|
|
@ -126,13 +125,13 @@ public class RootCloudIotDataFormatterJob { |
|
|
kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); |
|
|
kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); |
|
|
kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); |
|
|
kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); |
|
|
kafkaProducerConfig.setProperty("sasl.jaas.config", |
|
|
kafkaProducerConfig.setProperty("sasl.jaas.config", |
|
|
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); |
|
|
|
|
|
|
|
|
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); |
|
|
|
|
|
|
|
|
// 写入kafka |
|
|
// 写入kafka |
|
|
transformDs.sinkTo( |
|
|
transformDs.sinkTo( |
|
|
KafkaSink.<MachineIotDataReceivedEvent>builder() |
|
|
KafkaSink.<MachineIotDataReceivedEvent>builder() |
|
|
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
//.setKafkaProducerConfig(kafkaProducerConfig) |
|
|
|
|
|
|
|
|
.setKafkaProducerConfig(kafkaProducerConfig) |
|
|
.setRecordSerializer( |
|
|
.setRecordSerializer( |
|
|
KafkaRecordSerializationSchema.builder() |
|
|
KafkaRecordSerializationSchema.builder() |
|
|
.setTopic(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_TOPICS)) |
|
|
.setTopic(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_TOPICS)) |
|
|
|