Browse Source

更新kafka消费的策略

hph_优化版本
1049970895@qniao.cn 3 years ago
parent
commit
3e061053af
4 changed files with 8 additions and 3 deletions
  1. 5
      root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java
  2. 2
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
  3. 2
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java
  4. 2
      root-cloud-statistics/src/main/resources/META-INF/app.properties

5
root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java

@ -15,7 +15,7 @@ import java.util.Properties;
public class RootCloudIotDataEventSourceMocker { public class RootCloudIotDataEventSourceMocker {
// 延迟毫秒 // 延迟毫秒
public static final long DELAY = 1000;
public static final long DELAY = 3000;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// 创建kafka配置属性 // 创建kafka配置属性
@ -75,6 +75,9 @@ public class RootCloudIotDataEventSourceMocker {
private static Properties createKafkaProperties() { private static Properties createKafkaProperties() {
Properties kafkaProps = new Properties(); Properties kafkaProps = new Properties();
// 测试环境
//kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092");
// 正式环境
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092"); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());

2
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -74,7 +74,7 @@ public class RootCloudIotDataFormatterJob {
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID))
.setStartingOffsets(OffsetsInitializer.latest())
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema())
.build(); .build();

2
root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java

@ -12,7 +12,7 @@ public interface ConfigConstant {
String SINK_KAFKA_TOPICS = "sink.kafka.topics"; String SINK_KAFKA_TOPICS = "sink.kafka.topics";
String SNOW_FLAKE_DATACENTER_ID = "snow-flake.datacenter.id";
String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id";
String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id";

2
root-cloud-statistics/src/main/resources/META-INF/app.properties

@ -1,3 +1,5 @@
app.id=root-cloud-model-hw-formatter app.id=root-cloud-model-hw-formatter
# ???? 8.135.8.221
# ???? 47.112.164.224
apollo.meta=http://47.112.164.224:5000 apollo.meta=http://47.112.164.224:5000
Loading…
Cancel
Save