From f33f4379dcba1858a629c9431250742cd1ea3a9f Mon Sep 17 00:00:00 2001 From: "lizhongkang@qniao.cn" Date: Tue, 18 Oct 2022 17:29:38 +0800 Subject: [PATCH] bug --- .../RootCloudIotDataFormatterJob.java | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java index 40431f8..3eea447 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java @@ -73,10 +73,7 @@ import java.util.Properties; */ public class RootCloudIotDataFormatterJob { - static SnowFlake snowflake = new SnowFlake( - Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)), - Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID)) - ); + static SnowFlake snowflake = new SnowFlake(1L, 1L); public static void main(String[] args) throws Exception { @@ -85,9 +82,9 @@ public class RootCloudIotDataFormatterJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); KafkaSource source = KafkaSource.builder() - .setBootstrapServers("kq.qniao.cn:19092") - .setTopics("ai_root_cloud_warning_report_data_event") - .setGroupId("ai_root_cloud_warning_data_etl") + .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) + .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) + .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) .setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") .setProperty("sasl.jaas.config", @@ -112,7 +109,8 @@ public class RootCloudIotDataFormatterJob { .build(); // 写入Rabbit - transformDs.addSink(new RMQSink<>(connectionConfig, new AIWarningDataReceivedEventSerializationSchema(), + transformDs.addSink(new RMQSink<>(connectionConfig, + new AIWarningDataReceivedEventSerializationSchema(), new RMQSinkPublishOptions() { @Override @@ -130,25 +128,21 @@ public class RootCloudIotDataFormatterJob { // 交换机名称 return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE); } - }) + })).name("AIWarningDataStream to rabbitmq Sink"); - ).name("AIWarningDataStream to rabbitmq Sink"); - - // // 发送到OSS存储 - // String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); - // StreamingFileSink sink = StreamingFileSink.forRowFormat( - // new Path(outputPath), - // new SimpleStringEncoder("UTF-8") - // ).build(); - // transformDs.addSink(sink); + // 发送到OSS存储 + String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); + StreamingFileSink sink = StreamingFileSink.forRowFormat( + new Path(outputPath), + new SimpleStringEncoder("UTF-8") + ).build(); + transformDs.addSink(sink); env.execute("ai root cloud waring data formatter job"); } private static AIWarningDataReceivedEvent transform(AIRootCloudWarningDataReceivedEvent event) { - System.out.println("********" + event); - AIWarningDataReceivedEvent aiWaringDataReceivedEvent = new AIWarningDataReceivedEvent(); if (Objects.nonNull(event) && Objects.nonNull(event.getPayload())) { aiWaringDataReceivedEvent.setId(snowflake.nextId());