Browse Source

更新kafka消费的策略

master
1049970895@qniao.cn 3 years ago
parent
commit
716ca2754d
2 changed files with 2 additions and 3 deletions
  1. 3
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java
  2. 2
      iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties

3
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -66,12 +66,11 @@ public class IotMachineEventGeneratorJob {
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 设置并行度为1并行度要小于等于kafka topic的分区数否则其他并行度分配不到数据 // 设置并行度为1并行度要小于等于kafka topic的分区数否则其他并行度分配不到数据
// env.setParallelism(1); // env.setParallelism(1);
TopicPartition topicPartition = new TopicPartition(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS), 0);
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder() KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
.setStartingOffsets(OffsetsInitializer.latest())
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
.build(); .build();

2
iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties

@ -3,4 +3,4 @@ app.id=machine-state-event-generator
# ???? 8.135.8.221 # ???? 8.135.8.221
# ???? 47.112.164.224 # ???? 47.112.164.224
apollo.meta=http://8.135.8.221:5000
apollo.meta=http://47.112.164.224:5000
Loading…
Cancel
Save