|
|
|
@ -85,9 +85,9 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|
|
|
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|
|
|
KafkaSource<AIRootCloudWarningDataReceivedEvent> source = KafkaSource.<AIRootCloudWarningDataReceivedEvent>builder() |
|
|
|
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
|
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) |
|
|
|
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) |
|
|
|
.setBootstrapServers("kq.qniao.cn:19092") |
|
|
|
.setTopics("ai_root_cloud_warning_report_data_event") |
|
|
|
.setGroupId("ai_root_cloud_warning_data_etl") |
|
|
|
.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") |
|
|
|
.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") |
|
|
|
.setProperty("sasl.jaas.config", |
|
|
|
@ -134,19 +134,20 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
|
|
|
|
).name("AIWarningDataStream to rabbitmq Sink"); |
|
|
|
|
|
|
|
// 发送到OSS存储 |
|
|
|
String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); |
|
|
|
StreamingFileSink<AIWarningDataReceivedEvent> sink = StreamingFileSink.forRowFormat( |
|
|
|
new Path(outputPath), |
|
|
|
new SimpleStringEncoder<AIWarningDataReceivedEvent>("UTF-8") |
|
|
|
).build(); |
|
|
|
transformDs.addSink(sink); |
|
|
|
// // 发送到OSS存储 |
|
|
|
// String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); |
|
|
|
// StreamingFileSink<AIWarningDataReceivedEvent> sink = StreamingFileSink.forRowFormat( |
|
|
|
// new Path(outputPath), |
|
|
|
// new SimpleStringEncoder<AIWarningDataReceivedEvent>("UTF-8") |
|
|
|
// ).build(); |
|
|
|
// transformDs.addSink(sink); |
|
|
|
|
|
|
|
env.execute("ai root cloud waring data formatter job"); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static AIWarningDataReceivedEvent transform(AIRootCloudWarningDataReceivedEvent event) { |
|
|
|
System.out.println("********" + event); |
|
|
|
|
|
|
|
AIWarningDataReceivedEvent aiWaringDataReceivedEvent = new AIWarningDataReceivedEvent(); |
|
|
|
if (Objects.nonNull(event) && Objects.nonNull(event.getPayload())) { |
|
|
|
|