|
|
|
@ -21,6 +21,8 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
|
import org.apache.flink.api.common.state.ValueState; |
|
|
|
import org.apache.flink.api.common.state.ValueStateDescriptor; |
|
|
|
import org.apache.flink.api.common.typeinfo.TypeInformation; |
|
|
|
import org.apache.flink.api.java.tuple.Tuple0; |
|
|
|
import org.apache.flink.api.java.tuple.Tuple2; |
|
|
|
import org.apache.flink.configuration.Configuration; |
|
|
|
import org.apache.flink.connector.kafka.source.KafkaSource; |
|
|
|
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
|
|
|
@ -100,20 +102,23 @@ public class IotMachineEventGeneratorJob { |
|
|
|
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>.Context ctx, |
|
|
|
Collector<MachineIotDataReceivedEvent> out) throws Exception { |
|
|
|
|
|
|
|
JSON deviceStateListJson = getDeviceStateListJson(deviceState); |
|
|
|
Tuple2<JSON,Map<Long, DeviceState>> deviceStateListTuple = getDeviceStateListJson(deviceState); |
|
|
|
JSON deviceStateListJson = deviceStateListTuple.f0; |
|
|
|
assert deviceStateListJson != null; |
|
|
|
DeviceState lastedDeviceState = deviceStateListJson |
|
|
|
.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); |
|
|
|
Integer deviceStatus = getDeviceStatus(event); |
|
|
|
if (deviceStatus != null) { |
|
|
|
Map<Long, DeviceState> deviceStateMap = deviceStateListTuple.f1; |
|
|
|
DeviceState ds = deviceStateMap.get(event.getMachineIotMac()); |
|
|
|
if (deviceStatus != null && ds != null) { |
|
|
|
deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()), |
|
|
|
new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); |
|
|
|
new DeviceState(ds.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); |
|
|
|
if (lastedDeviceState != null) { |
|
|
|
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), |
|
|
|
deviceStatus, event.getReportTime()); |
|
|
|
collDeviceStatusChange1(out, newState, lastedDeviceState, event); |
|
|
|
} |
|
|
|
deviceState.update(deviceStateListJson); |
|
|
|
this.deviceState.update(deviceStateListJson); |
|
|
|
} |
|
|
|
} |
|
|
|
}).name("machineIotDataReceivedEventDataStream keyBy stream"); |
|
|
|
@ -138,14 +143,17 @@ public class IotMachineEventGeneratorJob { |
|
|
|
Collector<BaseCommand> out) throws Exception { |
|
|
|
|
|
|
|
// 获取最新设备状态 |
|
|
|
JSON deviceStateListJson = getDeviceStateListJson(deviceState); |
|
|
|
Tuple2<JSON,Map<Long, DeviceState>> deviceStateListTuple = getDeviceStateListJson(deviceState); |
|
|
|
JSON deviceStateListJson = deviceStateListTuple.f0; |
|
|
|
assert deviceStateListJson != null; |
|
|
|
DeviceState lastedDeviceState = deviceStateListJson |
|
|
|
.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); |
|
|
|
Integer deviceStatus = getDeviceStatus(event); |
|
|
|
if (deviceStatus != null) { |
|
|
|
Map<Long, DeviceState> deviceStateMap = deviceStateListTuple.f1; |
|
|
|
DeviceState ds = deviceStateMap.get(event.getMachineIotMac()); |
|
|
|
if (deviceStatus != null && ds != null) { |
|
|
|
deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()), |
|
|
|
new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); |
|
|
|
new DeviceState(ds.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); |
|
|
|
if (lastedDeviceState != null) { |
|
|
|
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), |
|
|
|
deviceStatus, event.getReportTime()); |
|
|
|
@ -166,12 +174,12 @@ public class IotMachineEventGeneratorJob { |
|
|
|
env.execute("iot machine event generator job"); |
|
|
|
} |
|
|
|
|
|
|
|
private static JSON getDeviceStateListJson(ValueState<JSON> deviceState) throws IOException, SQLException { |
|
|
|
private static Tuple2<JSON,Map<Long, DeviceState>> getDeviceStateListJson(ValueState<JSON> deviceState) throws IOException, SQLException { |
|
|
|
|
|
|
|
// 获取最新设备状态 |
|
|
|
JSON deviceStateListJson = deviceState.value(); |
|
|
|
Map<Long, DeviceState> allMachineMap = new HashMap<>(); |
|
|
|
if(deviceStateListJson == null) { |
|
|
|
Map<Long, DeviceState> allMachineMap = new HashMap<>(); |
|
|
|
List<DeviceState> deviceStateList = Db.use().query(SQL, DeviceState.class); |
|
|
|
if (CollUtil.isNotEmpty(deviceStateList)) { |
|
|
|
allMachineMap = deviceStateList.stream() |
|
|
|
@ -180,7 +188,7 @@ public class IotMachineEventGeneratorJob { |
|
|
|
} |
|
|
|
deviceState.update(deviceStateListJson = JSONUtil.parse(allMachineMap)); |
|
|
|
} |
|
|
|
return deviceStateListJson; |
|
|
|
return new Tuple2<>(deviceStateListJson, allMachineMap); |
|
|
|
} |
|
|
|
|
|
|
|
private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) { |
|
|
|
|