From 3e061053af849e58803d3bb71ad1e2b3a31b21c7 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Tue, 19 Jul 2022 19:18:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0kafka=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E7=9A=84=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java | 5 ++++- .../java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java | 2 +- .../main/java/com/qniao/iot/rc/constant/ConfigConstant.java | 2 +- .../src/main/resources/META-INF/app.properties | 2 ++ 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java index ea8c5fd..18c58fc 100644 --- a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java @@ -15,7 +15,7 @@ import java.util.Properties; public class RootCloudIotDataEventSourceMocker { // 延迟:毫秒 - public static final long DELAY = 1000; + public static final long DELAY = 3000; public static void main(String[] args) throws Exception { // 创建kafka配置属性 @@ -75,6 +75,9 @@ public class RootCloudIotDataEventSourceMocker { private static Properties createKafkaProperties() { Properties kafkaProps = new Properties(); + // 测试环境 + //kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092"); + // 正式环境 kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092"); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 89a2ea1..0bc211f 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -74,7 +74,7 @@ public class RootCloudIotDataFormatterJob { .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) - .setStartingOffsets(OffsetsInitializer.latest()) + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java index 253f9b4..38e795e 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java @@ -12,7 +12,7 @@ public interface ConfigConstant { String SINK_KAFKA_TOPICS = "sink.kafka.topics"; - String SNOW_FLAKE_DATACENTER_ID = "snow-flake.datacenter.id"; + String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id"; String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; diff --git a/root-cloud-statistics/src/main/resources/META-INF/app.properties b/root-cloud-statistics/src/main/resources/META-INF/app.properties index 1452591..313a4c2 100644 --- a/root-cloud-statistics/src/main/resources/META-INF/app.properties +++ b/root-cloud-statistics/src/main/resources/META-INF/app.properties @@ -1,3 +1,5 @@ app.id=root-cloud-model-hw-formatter +# ???? 8.135.8.221 +# ???? 47.112.164.224 apollo.meta=http://47.112.164.224:5000 \ No newline at end of file