Browse Source

更新

master
1049970895@qniao.cn 3 years ago
parent
commit
1a1ef48eb9
1 changed files with 6 additions and 7 deletions
  1. 13
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

13
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -43,6 +43,7 @@ import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -75,8 +76,6 @@ public class IotMachineEventGeneratorJob {
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
List<DeviceState> deviceStateList = Db.use().query(SQL, DeviceState.class);
DataStream<MachineIotDataReceivedEvent> machineIotDataReceivedEventDataStream = dataStreamSource DataStream<MachineIotDataReceivedEvent> machineIotDataReceivedEventDataStream = dataStreamSource
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>() { .process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>() {
@ -95,7 +94,7 @@ public class IotMachineEventGeneratorJob {
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>.Context ctx, KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>.Context ctx,
Collector<MachineIotDataReceivedEvent> out) throws Exception { Collector<MachineIotDataReceivedEvent> out) throws Exception {
JSON deviceStateListJson = getDeviceStateListJson(deviceState, deviceStateList);
JSON deviceStateListJson = getDeviceStateListJson(deviceState);
assert deviceStateListJson != null; assert deviceStateListJson != null;
DeviceState lastedDeviceState = deviceStateListJson DeviceState lastedDeviceState = deviceStateListJson
.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class);
@ -133,7 +132,7 @@ public class IotMachineEventGeneratorJob {
Collector<BaseCommand> out) throws Exception { Collector<BaseCommand> out) throws Exception {
// 获取最新设备状态 // 获取最新设备状态
JSON deviceStateListJson = getDeviceStateListJson(deviceState, deviceStateList);
JSON deviceStateListJson = getDeviceStateListJson(deviceState);
assert deviceStateListJson != null; assert deviceStateListJson != null;
DeviceState lastedDeviceState = deviceStateListJson DeviceState lastedDeviceState = deviceStateListJson
.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class);
@ -161,19 +160,19 @@ public class IotMachineEventGeneratorJob {
env.execute("Kafka Job"); env.execute("Kafka Job");
} }
private static JSON getDeviceStateListJson(ValueState<JSON> deviceState,
List<DeviceState> deviceStateList) throws IOException {
private static JSON getDeviceStateListJson(ValueState<JSON> deviceState) throws IOException, SQLException {
// 获取最新设备状态 // 获取最新设备状态
JSON deviceStateListJson = deviceState.value(); JSON deviceStateListJson = deviceState.value();
if(deviceStateListJson == null) { if(deviceStateListJson == null) {
Map<Long, DeviceState> allMachineMap = new HashMap<>(); Map<Long, DeviceState> allMachineMap = new HashMap<>();
List<DeviceState> deviceStateList = Db.use().query(SQL, DeviceState.class);
if (CollUtil.isNotEmpty(deviceStateList)) { if (CollUtil.isNotEmpty(deviceStateList)) {
allMachineMap = deviceStateList.stream() allMachineMap = deviceStateList.stream()
.collect(Collectors.toMap(DeviceState::getMachineIotMac, .collect(Collectors.toMap(DeviceState::getMachineIotMac,
state -> state, (state1, state2) -> state1)); state -> state, (state1, state2) -> state1));
} }
deviceState.update(JSONUtil.parse(allMachineMap));
deviceState.update(deviceStateListJson = JSONUtil.parse(allMachineMap));
} }
return deviceStateListJson; return deviceStateListJson;
} }

Loading…
Cancel
Save