Browse Source

更新

master
1049970895@qniao.cn 3 years ago
parent
commit
ebfe2a8b48
1 changed files with 2 additions and 4 deletions
  1. 6
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

6
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -67,14 +67,12 @@ public class IotMachineEventGeneratorJob {
// 设置并行度为1并行度要小于等于kafka topic的分区数否则其他并行度分配不到数据 // 设置并行度为1并行度要小于等于kafka topic的分区数否则其他并行度分配不到数据
// env.setParallelism(1); // env.setParallelism(1);
TopicPartition topicPartition = new TopicPartition(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS), 0); TopicPartition topicPartition = new TopicPartition(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS), 0);
HashSet<TopicPartition> set = new HashSet<>();
set.add(topicPartition);
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder() KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
.setStartingOffsets(OffsetsInitializer.latest()) .setStartingOffsets(OffsetsInitializer.latest())
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "80000")
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
.build(); .build();
@ -197,7 +195,7 @@ public class IotMachineEventGeneratorJob {
private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) { private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) {
// rabbitmq配置测试环境
// rabbitmq配置
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_HOST)) .setHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_HOST))
.setVirtualHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST)) .setVirtualHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST))

Loading…
Cancel
Save