diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java index 52cfb1c..cca3f6d 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java @@ -1,13 +1,9 @@ package com.qniao.iot.machine.event.generator.job; -import cn.hutool.json.JSONUtil; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import java.util.HashMap; -import java.util.Map; - @Data @AllArgsConstructor @NoArgsConstructor diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 273bf71..3cebc2f 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -2,13 +2,8 @@ package com.qniao.iot.machine.event.generator.job; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.StrUtil; +import cn.hutool.core.lang.Tuple; import cn.hutool.db.Db; -import cn.hutool.db.handler.RsHandler; -import cn.hutool.json.JSON; -import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; import com.qniao.domain.BaseCommand; import com.qniao.iot.machine.command.PowerOffMachineCommand; import com.qniao.iot.machine.command.PowerOnMachineCommand; @@ -24,8 +19,6 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; @@ -47,17 +40,15 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.common.TopicPartition; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.io.IOException; -import java.sql.ResultSet; import java.sql.SQLException; import java.time.LocalDateTime; import java.time.ZoneOffset; -import java.util.*; -import java.util.stream.Collectors; +import java.util.ArrayList; +import java.util.List; public class IotMachineEventGeneratorJob { @@ -83,7 +74,6 @@ public class IotMachineEventGeneratorJob { DataStreamSource dataStreamSource = env .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); - DataStream machineIotDataReceivedEventDataStream = dataStreamSource .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { @@ -161,14 +151,15 @@ public class IotMachineEventGeneratorJob { } private static DeviceState getDeviceStateListJson(ValueState deviceState, - Long machineIotMac) throws IOException, SQLException { + Long machineIotMac) throws IOException, SQLException { // 获取最新设备状态 DeviceState deviceStateListJson = deviceState.value(); if (deviceStateListJson == null) { - deviceStateListJson = Db.use().query(SQL, (RsHandler) rs -> new DeviceState(rs.getLong(1), - rs.getLong(2), - rs.getInt(3), System.currentTimeMillis()), machineIotMac); + List list = Db.use().query(SQL, DeviceState.class, machineIotMac); + if(CollUtil.isNotEmpty(list)) { + deviceStateListJson = list.get(0); + } if (deviceStateListJson != null) { // 如果是空的,并且在表中都没找到,说明是没有被记录的设备,不用管 deviceState.update(deviceStateListJson);