From 8cd36871a1c34d36aedfa64cea863e8763a40b56 Mon Sep 17 00:00:00 2001 From: "lizhongkang@qniao.cn" Date: Tue, 18 Oct 2022 16:39:59 +0800 Subject: [PATCH] bug --- .../AIRootCloudWarningDataReceivedEvent.java | 6 ++++++ .../AIWarningDataReceivedEvent.java | 2 ++ .../RootCloudIotDataFormatterJob.java | 21 ++++++++++--------- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java index 91036ed..36c15fe 100644 --- a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java +++ b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java @@ -29,6 +29,8 @@ public class AIRootCloudWarningDataReceivedEvent implements Serializable { private Long timestamp; @Data + @AllArgsConstructor + @NoArgsConstructor public static class Payload { private String code; @@ -50,6 +52,8 @@ public class AIRootCloudWarningDataReceivedEvent implements Serializable { } @Data + @AllArgsConstructor + @NoArgsConstructor public static class Info { /** * 摄像头id @@ -67,6 +71,8 @@ public class AIRootCloudWarningDataReceivedEvent implements Serializable { } @Data + @AllArgsConstructor + @NoArgsConstructor public static class Alarm { /** * 图片地址 diff --git a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java index 1428a42..c1e8f9b 100644 --- a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java +++ b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java @@ -57,6 +57,8 @@ public class AIWarningDataReceivedEvent implements Serializable { private List alarmList; @Data + @AllArgsConstructor + @NoArgsConstructor public static class Alarm { /** * 图片地址 diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java index bdc814c..40431f8 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java @@ -85,9 +85,9 @@ public class RootCloudIotDataFormatterJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); KafkaSource source = KafkaSource.builder() - .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) - .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) - .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) + .setBootstrapServers("kq.qniao.cn:19092") + .setTopics("ai_root_cloud_warning_report_data_event") + .setGroupId("ai_root_cloud_warning_data_etl") .setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") .setProperty("sasl.jaas.config", @@ -134,19 +134,20 @@ public class RootCloudIotDataFormatterJob { ).name("AIWarningDataStream to rabbitmq Sink"); - // 发送到OSS存储 - String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); - StreamingFileSink sink = StreamingFileSink.forRowFormat( - new Path(outputPath), - new SimpleStringEncoder("UTF-8") - ).build(); - transformDs.addSink(sink); + // // 发送到OSS存储 + // String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); + // StreamingFileSink sink = StreamingFileSink.forRowFormat( + // new Path(outputPath), + // new SimpleStringEncoder("UTF-8") + // ).build(); + // transformDs.addSink(sink); env.execute("ai root cloud waring data formatter job"); } private static AIWarningDataReceivedEvent transform(AIRootCloudWarningDataReceivedEvent event) { + System.out.println("********" + event); AIWarningDataReceivedEvent aiWaringDataReceivedEvent = new AIWarningDataReceivedEvent(); if (Objects.nonNull(event) && Objects.nonNull(event.getPayload())) {