|
|
|
@ -2,9 +2,11 @@ package com.qniao.iot.machine.event.generator.job; |
|
|
|
|
|
|
|
import cn.hutool.core.bean.BeanUtil; |
|
|
|
import cn.hutool.core.collection.CollUtil; |
|
|
|
import cn.hutool.core.util.ObjectUtil; |
|
|
|
import cn.hutool.core.util.StrUtil; |
|
|
|
import cn.hutool.db.Db; |
|
|
|
import cn.hutool.json.JSON; |
|
|
|
import cn.hutool.json.JSONObject; |
|
|
|
import cn.hutool.json.JSONUtil; |
|
|
|
import com.qniao.domain.BaseCommand; |
|
|
|
import com.qniao.iot.machine.command.PowerOffMachineCommand; |
|
|
|
@ -42,7 +44,9 @@ import org.apache.http.auth.AuthScope; |
|
|
|
import org.apache.http.auth.UsernamePasswordCredentials; |
|
|
|
import org.apache.http.client.CredentialsProvider; |
|
|
|
import org.apache.http.impl.client.BasicCredentialsProvider; |
|
|
|
import org.apache.kafka.clients.consumer.ConsumerConfig; |
|
|
|
import org.apache.kafka.clients.consumer.OffsetResetStrategy; |
|
|
|
import org.apache.kafka.common.TopicPartition; |
|
|
|
import org.elasticsearch.action.index.IndexRequest; |
|
|
|
import org.elasticsearch.client.Requests; |
|
|
|
import java.io.IOException; |
|
|
|
@ -62,11 +66,15 @@ public class IotMachineEventGeneratorJob { |
|
|
|
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|
|
|
// 设置并行度为1,并行度要小于等于kafka topic的分区数,否则其他并行度分配不到数据 |
|
|
|
// env.setParallelism(1); |
|
|
|
TopicPartition topicPartition = new TopicPartition(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS), 0); |
|
|
|
HashSet<TopicPartition> set = new HashSet<>(); |
|
|
|
set.add(topicPartition); |
|
|
|
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder() |
|
|
|
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
|
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) |
|
|
|
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) |
|
|
|
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) |
|
|
|
.setStartingOffsets(OffsetsInitializer.latest()) |
|
|
|
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "80000") |
|
|
|
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) |
|
|
|
.build(); |
|
|
|
|
|
|
|
@ -178,6 +186,11 @@ public class IotMachineEventGeneratorJob { |
|
|
|
state -> state, (state1, state2) -> state1)); |
|
|
|
} |
|
|
|
deviceState.update(deviceStateListJson = JSONUtil.parse(allMachineMap)); |
|
|
|
}else { |
|
|
|
Map<Long, DeviceState> finalAllMachineMap = allMachineMap; |
|
|
|
deviceStateListJson.toBean(Map.class).forEach((a, b) |
|
|
|
-> finalAllMachineMap.put(Long.parseLong(String.valueOf(a)), |
|
|
|
BeanUtil.toBean(b, DeviceState.class))); |
|
|
|
} |
|
|
|
return new Tuple2<>(deviceStateListJson, allMachineMap); |
|
|
|
} |
|
|
|
|