diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 971619b..71868b9 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -67,14 +67,12 @@ public class IotMachineEventGeneratorJob { // 设置并行度为1,并行度要小于等于kafka topic的分区数,否则其他并行度分配不到数据 // env.setParallelism(1); TopicPartition topicPartition = new TopicPartition(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS), 0); - HashSet set = new HashSet<>(); - set.add(topicPartition); KafkaSource source = KafkaSource.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) .setStartingOffsets(OffsetsInitializer.latest()) - .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "80000") + .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .build(); @@ -197,7 +195,7 @@ public class IotMachineEventGeneratorJob { private static void sinkRabbitMq(DataStream commandDataStream) { - // rabbitmq配置(测试环境) + // rabbitmq配置 RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_HOST)) .setVirtualHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST))