From 741b593fd5b49e2526ce621e5d53f2954bc8542f Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Tue, 19 Jul 2022 17:10:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java | 2 +- .../java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java | 3 ++- .../main/java/com/qniao/iot/rc/constant/ConfigConstant.java | 2 ++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java index f9bfd21..ea8c5fd 100644 --- a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java @@ -15,7 +15,7 @@ import java.util.Properties; public class RootCloudIotDataEventSourceMocker { // 延迟:毫秒 - public static final long DELAY = 5000; + public static final long DELAY = 1000; public static void main(String[] args) throws Exception { // 创建kafka配置属性 diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 0bfa210..99d4d36 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import java.math.BigDecimal; import java.util.Objects; @@ -90,7 +91,7 @@ public class RootCloudIotDataFormatterJob { ).name("MachineIotDataReceivedEvent Sink"); // 发送到OSS存储 - String outputPath = "oss://qn-flink-test/root-cloud-model-hw-reported-data"; + String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); StreamingFileSink sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder("UTF-8") diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java index d078033..b8878c8 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java @@ -11,4 +11,6 @@ public interface ConfigConstant { String SINK_KAFKA_BOOTSTRAP_SERVERS = "sink.kafka.bootstrap.servers"; String SINK_KAFKA_TOPICS = "sink.kafka.topics"; + + String SINK_OSS_PATH = "sink.oss.path"; }