From 1a1ef48eb99821e750222c728f8e2f42eee99f25 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Mon, 18 Jul 2022 09:45:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../generator/job/IotMachineEventGeneratorJob.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index a6c9fd3..f2e96a0 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -43,6 +43,7 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.io.IOException; +import java.sql.SQLException; import java.util.*; import java.util.stream.Collectors; @@ -75,8 +76,6 @@ public class IotMachineEventGeneratorJob { .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); - List deviceStateList = Db.use().query(SQL, DeviceState.class); - DataStream machineIotDataReceivedEventDataStream = dataStreamSource .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { @@ -95,7 +94,7 @@ public class IotMachineEventGeneratorJob { KeyedProcessFunction.Context ctx, Collector out) throws Exception { - JSON deviceStateListJson = getDeviceStateListJson(deviceState, deviceStateList); + JSON deviceStateListJson = getDeviceStateListJson(deviceState); assert deviceStateListJson != null; DeviceState lastedDeviceState = deviceStateListJson .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); @@ -133,7 +132,7 @@ public class IotMachineEventGeneratorJob { Collector out) throws Exception { // 获取最新设备状态 - JSON deviceStateListJson = getDeviceStateListJson(deviceState, deviceStateList); + JSON deviceStateListJson = getDeviceStateListJson(deviceState); assert deviceStateListJson != null; DeviceState lastedDeviceState = deviceStateListJson .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); @@ -161,19 +160,19 @@ public class IotMachineEventGeneratorJob { env.execute("Kafka Job"); } - private static JSON getDeviceStateListJson(ValueState deviceState, - List deviceStateList) throws IOException { + private static JSON getDeviceStateListJson(ValueState deviceState) throws IOException, SQLException { // 获取最新设备状态 JSON deviceStateListJson = deviceState.value(); if(deviceStateListJson == null) { Map allMachineMap = new HashMap<>(); + List deviceStateList = Db.use().query(SQL, DeviceState.class); if (CollUtil.isNotEmpty(deviceStateList)) { allMachineMap = deviceStateList.stream() .collect(Collectors.toMap(DeviceState::getMachineIotMac, state -> state, (state1, state2) -> state1)); } - deviceState.update(JSONUtil.parse(allMachineMap)); + deviceState.update(deviceStateListJson = JSONUtil.parse(allMachineMap)); } return deviceStateListJson; }