diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java index f9bfd21..ea8c5fd 100644 --- a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java @@ -15,7 +15,7 @@ import java.util.Properties; public class RootCloudIotDataEventSourceMocker { // 延迟:毫秒 - public static final long DELAY = 5000; + public static final long DELAY = 1000; public static void main(String[] args) throws Exception { // 创建kafka配置属性 diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 0bfa210..99d4d36 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import java.math.BigDecimal; import java.util.Objects; @@ -90,7 +91,7 @@ public class RootCloudIotDataFormatterJob { ).name("MachineIotDataReceivedEvent Sink"); // 发送到OSS存储 - String outputPath = "oss://qn-flink-test/root-cloud-model-hw-reported-data"; + String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); StreamingFileSink sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder("UTF-8") diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java index d078033..b8878c8 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java @@ -11,4 +11,6 @@ public interface ConfigConstant { String SINK_KAFKA_BOOTSTRAP_SERVERS = "sink.kafka.bootstrap.servers"; String SINK_KAFKA_TOPICS = "sink.kafka.topics"; + + String SINK_OSS_PATH = "sink.oss.path"; }