diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 0cd4b26..971619b 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -2,9 +2,11 @@ package com.qniao.iot.machine.event.generator.job; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.db.Db; import cn.hutool.json.JSON; +import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.qniao.domain.BaseCommand; import com.qniao.iot.machine.command.PowerOffMachineCommand; @@ -42,7 +44,9 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.io.IOException; @@ -62,11 +66,15 @@ public class IotMachineEventGeneratorJob { env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); // 设置并行度为1,并行度要小于等于kafka topic的分区数,否则其他并行度分配不到数据 // env.setParallelism(1); + TopicPartition topicPartition = new TopicPartition(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS), 0); + HashSet set = new HashSet<>(); + set.add(topicPartition); KafkaSource source = KafkaSource.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) - .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) + .setStartingOffsets(OffsetsInitializer.latest()) + .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "80000") .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .build(); @@ -178,6 +186,11 @@ public class IotMachineEventGeneratorJob { state -> state, (state1, state2) -> state1)); } deviceState.update(deviceStateListJson = JSONUtil.parse(allMachineMap)); + }else { + Map finalAllMachineMap = allMachineMap; + deviceStateListJson.toBean(Map.class).forEach((a, b) + -> finalAllMachineMap.put(Long.parseLong(String.valueOf(a)), + BeanUtil.toBean(b, DeviceState.class))); } return new Tuple2<>(deviceStateListJson, allMachineMap); } diff --git a/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties b/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties index d2538f2..ed4c09b 100644 --- a/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties +++ b/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties @@ -1,4 +1,6 @@ app.id=machine-state-event-generator -apollo.meta=http://47.112.164.224:5000 \ No newline at end of file +# ???? 8.135.8.221 +# ???? 47.112.164.224 +apollo.meta=http://8.135.8.221:5000 \ No newline at end of file