diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index fbf901b..579f41c 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -21,6 +21,8 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; @@ -100,20 +102,23 @@ public class IotMachineEventGeneratorJob { KeyedProcessFunction.Context ctx, Collector out) throws Exception { - JSON deviceStateListJson = getDeviceStateListJson(deviceState); + Tuple2> deviceStateListTuple = getDeviceStateListJson(deviceState); + JSON deviceStateListJson = deviceStateListTuple.f0; assert deviceStateListJson != null; DeviceState lastedDeviceState = deviceStateListJson .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); Integer deviceStatus = getDeviceStatus(event); - if (deviceStatus != null) { + Map deviceStateMap = deviceStateListTuple.f1; + DeviceState ds = deviceStateMap.get(event.getMachineIotMac()); + if (deviceStatus != null && ds != null) { deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()), - new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); + new DeviceState(ds.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); if (lastedDeviceState != null) { DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); collDeviceStatusChange1(out, newState, lastedDeviceState, event); } - deviceState.update(deviceStateListJson); + this.deviceState.update(deviceStateListJson); } } }).name("machineIotDataReceivedEventDataStream keyBy stream"); @@ -138,14 +143,17 @@ public class IotMachineEventGeneratorJob { Collector out) throws Exception { // 获取最新设备状态 - JSON deviceStateListJson = getDeviceStateListJson(deviceState); + Tuple2> deviceStateListTuple = getDeviceStateListJson(deviceState); + JSON deviceStateListJson = deviceStateListTuple.f0; assert deviceStateListJson != null; DeviceState lastedDeviceState = deviceStateListJson .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); Integer deviceStatus = getDeviceStatus(event); - if (deviceStatus != null) { + Map deviceStateMap = deviceStateListTuple.f1; + DeviceState ds = deviceStateMap.get(event.getMachineIotMac()); + if (deviceStatus != null && ds != null) { deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()), - new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); + new DeviceState(ds.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); if (lastedDeviceState != null) { DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); @@ -166,12 +174,12 @@ public class IotMachineEventGeneratorJob { env.execute("iot machine event generator job"); } - private static JSON getDeviceStateListJson(ValueState deviceState) throws IOException, SQLException { + private static Tuple2> getDeviceStateListJson(ValueState deviceState) throws IOException, SQLException { // 获取最新设备状态 JSON deviceStateListJson = deviceState.value(); + Map allMachineMap = new HashMap<>(); if(deviceStateListJson == null) { - Map allMachineMap = new HashMap<>(); List deviceStateList = Db.use().query(SQL, DeviceState.class); if (CollUtil.isNotEmpty(deviceStateList)) { allMachineMap = deviceStateList.stream() @@ -180,7 +188,7 @@ public class IotMachineEventGeneratorJob { } deviceState.update(deviceStateListJson = JSONUtil.parse(allMachineMap)); } - return deviceStateListJson; + return new Tuple2<>(deviceStateListJson, allMachineMap); } private static void sinkRabbitMq(DataStream commandDataStream) { diff --git a/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties b/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties index ef428d4..d2538f2 100644 --- a/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties +++ b/iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties @@ -1,3 +1,4 @@ app.id=machine-state-event-generator -apollo.meta=http://8.135.8.221:5000 \ No newline at end of file + +apollo.meta=http://47.112.164.224:5000 \ No newline at end of file