diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index a6c9fd3..f2e96a0 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -43,6 +43,7 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.io.IOException; +import java.sql.SQLException; import java.util.*; import java.util.stream.Collectors; @@ -75,8 +76,6 @@ public class IotMachineEventGeneratorJob { .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); - List deviceStateList = Db.use().query(SQL, DeviceState.class); - DataStream machineIotDataReceivedEventDataStream = dataStreamSource .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { @@ -95,7 +94,7 @@ public class IotMachineEventGeneratorJob { KeyedProcessFunction.Context ctx, Collector out) throws Exception { - JSON deviceStateListJson = getDeviceStateListJson(deviceState, deviceStateList); + JSON deviceStateListJson = getDeviceStateListJson(deviceState); assert deviceStateListJson != null; DeviceState lastedDeviceState = deviceStateListJson .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); @@ -133,7 +132,7 @@ public class IotMachineEventGeneratorJob { Collector out) throws Exception { // 获取最新设备状态 - JSON deviceStateListJson = getDeviceStateListJson(deviceState, deviceStateList); + JSON deviceStateListJson = getDeviceStateListJson(deviceState); assert deviceStateListJson != null; DeviceState lastedDeviceState = deviceStateListJson .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); @@ -161,19 +160,19 @@ public class IotMachineEventGeneratorJob { env.execute("Kafka Job"); } - private static JSON getDeviceStateListJson(ValueState deviceState, - List deviceStateList) throws IOException { + private static JSON getDeviceStateListJson(ValueState deviceState) throws IOException, SQLException { // 获取最新设备状态 JSON deviceStateListJson = deviceState.value(); if(deviceStateListJson == null) { Map allMachineMap = new HashMap<>(); + List deviceStateList = Db.use().query(SQL, DeviceState.class); if (CollUtil.isNotEmpty(deviceStateList)) { allMachineMap = deviceStateList.stream() .collect(Collectors.toMap(DeviceState::getMachineIotMac, state -> state, (state1, state2) -> state1)); } - deviceState.update(JSONUtil.parse(allMachineMap)); + deviceState.update(deviceStateListJson = JSONUtil.parse(allMachineMap)); } return deviceStateListJson; }