|
|
|
@ -43,11 +43,14 @@ import org.apache.flink.streaming.api.CheckpointingMode; |
|
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
|
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; |
|
|
|
import org.apache.kafka.clients.CommonClientConfigs; |
|
|
|
import org.apache.kafka.clients.consumer.OffsetResetStrategy; |
|
|
|
import org.apache.kafka.common.config.SaslConfigs; |
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Objects; |
|
|
|
import java.util.Properties; |
|
|
|
|
|
|
|
/** |
|
|
|
* Skeleton for a Flink DataStream Job. |
|
|
|
@ -80,6 +83,10 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
|
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) |
|
|
|
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) |
|
|
|
.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") |
|
|
|
.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") |
|
|
|
.setProperty("sasl.jaas.config", |
|
|
|
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";") |
|
|
|
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) |
|
|
|
.setValueOnlyDeserializer(new AIRootCloudWarningDataReceivedEventDeserializationSchema()) |
|
|
|
.build(); |
|
|
|
@ -90,10 +97,17 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
.map((MapFunction<AIRootCloudWarningDataReceivedEvent, AIWarningDataReceivedEvent>) RootCloudIotDataFormatterJob::transform) |
|
|
|
.name("Transform AIWarningDataReceivedEvent"); |
|
|
|
|
|
|
|
Properties kafkaProducerConfig = new Properties(); |
|
|
|
kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); |
|
|
|
kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); |
|
|
|
kafkaProducerConfig.setProperty("sasl.jaas.config", |
|
|
|
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); |
|
|
|
|
|
|
|
// 写入kafka |
|
|
|
transformDs.sinkTo( |
|
|
|
KafkaSink.<AIWarningDataReceivedEvent>builder() |
|
|
|
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
|
.setKafkaProducerConfig(kafkaProducerConfig) |
|
|
|
.setRecordSerializer( |
|
|
|
KafkaRecordSerializationSchema.builder() |
|
|
|
.setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS)) |
|
|
|
|