|
|
|
@ -21,6 +21,7 @@ package com.qniao.rootcloudstatistics; |
|
|
|
|
|
|
|
import cn.hutool.core.collection.CollUtil; |
|
|
|
import cn.hutool.core.convert.Convert; |
|
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema; |
|
|
|
import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedEvent; |
|
|
|
import com.qniao.rootcloudevent.AIWarningDataReceivedEvent; |
|
|
|
import com.qniao.rootcloudstatistics.config.ApolloConfig; |
|
|
|
@ -30,6 +31,7 @@ import com.qniao.rootcloudstatistics.event.AIRootCloudWarningDataReceivedEventDe |
|
|
|
import com.qniao.rootcloudstatistics.event.AIWarningDataReceivedEventSerializationSchema; |
|
|
|
import com.qniao.rootcloudstatistics.until.OSSUtils; |
|
|
|
import com.qniao.rootcloudstatistics.until.SnowFlake; |
|
|
|
import com.rabbitmq.client.AMQP; |
|
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
|
import org.apache.flink.api.common.functions.MapFunction; |
|
|
|
import org.apache.flink.api.common.serialization.SimpleStringEncoder; |
|
|
|
@ -43,6 +45,9 @@ import org.apache.flink.streaming.api.CheckpointingMode; |
|
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
|
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; |
|
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; |
|
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; |
|
|
|
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; |
|
|
|
import org.apache.kafka.clients.CommonClientConfigs; |
|
|
|
import org.apache.kafka.clients.consumer.OffsetResetStrategy; |
|
|
|
import org.apache.kafka.common.config.SaslConfigs; |
|
|
|
@ -97,25 +102,37 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
.map((MapFunction<AIRootCloudWarningDataReceivedEvent, AIWarningDataReceivedEvent>) RootCloudIotDataFormatterJob::transform) |
|
|
|
.name("Transform AIWarningDataReceivedEvent"); |
|
|
|
|
|
|
|
Properties kafkaProducerConfig = new Properties(); |
|
|
|
kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); |
|
|
|
kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); |
|
|
|
kafkaProducerConfig.setProperty("sasl.jaas.config", |
|
|
|
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); |
|
|
|
|
|
|
|
// 写入kafka |
|
|
|
transformDs.sinkTo( |
|
|
|
KafkaSink.<AIWarningDataReceivedEvent>builder() |
|
|
|
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
|
.setKafkaProducerConfig(kafkaProducerConfig) |
|
|
|
.setRecordSerializer( |
|
|
|
KafkaRecordSerializationSchema.builder() |
|
|
|
.setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS)) |
|
|
|
.setValueSerializationSchema(new AIWarningDataReceivedEventSerializationSchema()) |
|
|
|
.build() |
|
|
|
).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) |
|
|
|
.build() |
|
|
|
).name("AIWarningDataReceivedEvent Sink"); |
|
|
|
// rabbitmq配置 |
|
|
|
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() |
|
|
|
.setHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_HOST)) |
|
|
|
.setVirtualHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST)) |
|
|
|
.setUserName(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_USER_NAME)) |
|
|
|
.setPassword(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_PASSWORD)) |
|
|
|
.setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)) |
|
|
|
.build(); |
|
|
|
|
|
|
|
// 写入Rabbit |
|
|
|
transformDs.addSink(new RMQSink<>(connectionConfig, new AIWarningDataReceivedEventSerializationSchema(), |
|
|
|
new RMQSinkPublishOptions<AIWarningDataReceivedEvent>() { |
|
|
|
|
|
|
|
@Override |
|
|
|
public String computeRoutingKey(AIWarningDataReceivedEvent aiWarningDataReceivedEvent) { |
|
|
|
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_AI_WARNING_ROUTING_KEY); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public AMQP.BasicProperties computeProperties(AIWarningDataReceivedEvent aiWarningDataReceivedEvent) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public String computeExchange(AIWarningDataReceivedEvent aiWarningDataReceivedEvent) { |
|
|
|
// 交换机名称 |
|
|
|
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE); |
|
|
|
} |
|
|
|
}) |
|
|
|
|
|
|
|
).name("AIWarningDataStream to rabbitmq Sink"); |
|
|
|
|
|
|
|
// 发送到OSS存储 |
|
|
|
String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); |
|
|
|
|