diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 7c08103..a6c9fd3 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -2,7 +2,6 @@ package com.qniao.iot.machine.event.generator.job; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.db.Db; import cn.hutool.json.JSON; @@ -43,14 +42,13 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; - -import java.sql.SQLException; +import java.io.IOException; import java.util.*; import java.util.stream.Collectors; public class IotMachineEventGeneratorJob { - private final static String sql = "select qm.id as machine_id, qei.mac as machine_iot_mac, qm.status\n" + + private final static String SQL = "select qm.id as machine_id, qei.mac as machine_iot_mac, qm.status\n" + "from (select id, status from qn_machine where is_delete = 0) qm\n" + " left join (select machine_id, equipment_information_id\n" + " from qn_machine_binding_cloud_box\n" + @@ -77,33 +75,28 @@ public class IotMachineEventGeneratorJob { .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); - DataStream commandDataStream1 = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) + List deviceStateList = Db.use().query(SQL, DeviceState.class); + + DataStream machineIotDataReceivedEventDataStream = dataStreamSource + .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { private ValueState deviceState; @Override - public void open(Configuration parameters) throws SQLException { + public void open(Configuration parameters) { // 必须在 open 生命周期初始化 - // 获取所有设备的最新状态 - List deviceStateList = Db.use().query(sql, DeviceState.class); - Map allMachineMap = new HashMap<>(); - if (CollUtil.isNotEmpty(deviceStateList)) { - allMachineMap = deviceStateList.stream() - .collect(Collectors.toMap(DeviceState::getMachineIotMac, - deviceState -> deviceState, (deviceState1, deviceState2) -> deviceState1)); - } deviceState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("deviceState1", - TypeInformation.of(JSON.class), JSONUtil.parse(allMachineMap))); + .getState(new ValueStateDescriptor<>("deviceState1", TypeInformation.of(JSON.class))); } @Override - public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction.Context ctx, Collector out) throws Exception { + public void processElement(MachineIotDataReceivedEvent event, + KeyedProcessFunction.Context ctx, + Collector out) throws Exception { - // 获取最新设备状态 - JSON deviceStateListJson = deviceState.value(); + JSON deviceStateListJson = getDeviceStateListJson(deviceState, deviceStateList); + assert deviceStateListJson != null; DeviceState lastedDeviceState = deviceStateListJson .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); Integer deviceStatus = getDeviceStatus(event); @@ -118,36 +111,30 @@ public class IotMachineEventGeneratorJob { deviceState.update(deviceStateListJson); } } - }).name("commandDataStream1 keyBy stream"); + }).name("machineIotDataReceivedEventDataStream keyBy stream"); - DataStream commandDataStream = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) + DataStream commandDataStream = dataStreamSource + .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { private ValueState deviceState; @Override - public void open(Configuration parameters) throws SQLException { + public void open(Configuration parameters) { // 必须在 open 生命周期初始化 - // 获取所有设备的最新状态 - List deviceStateList = Db.use().query(sql, DeviceState.class); - Map allMachineMap = new HashMap<>(); - if (CollUtil.isNotEmpty(deviceStateList)) { - allMachineMap = deviceStateList.stream() - .collect(Collectors.toMap(DeviceState::getMachineIotMac, - deviceState -> deviceState, (deviceState1, deviceState2) -> deviceState1)); - } deviceState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("deviceState", - TypeInformation.of(JSON.class), JSONUtil.parse(allMachineMap))); + .getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(JSON.class))); } @Override - public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction.Context ctx, Collector out) throws Exception { + public void processElement(MachineIotDataReceivedEvent event, + KeyedProcessFunction.Context ctx, + Collector out) throws Exception { // 获取最新设备状态 - JSON deviceStateListJson = deviceState.value(); + JSON deviceStateListJson = getDeviceStateListJson(deviceState, deviceStateList); + assert deviceStateListJson != null; DeviceState lastedDeviceState = deviceStateListJson .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); Integer deviceStatus = getDeviceStatus(event); @@ -169,11 +156,27 @@ public class IotMachineEventGeneratorJob { sinkRabbitMq(commandDataStream); // 写入es - sinkEs(commandDataStream1); + sinkEs(machineIotDataReceivedEventDataStream); env.execute("Kafka Job"); } + private static JSON getDeviceStateListJson(ValueState deviceState, + List deviceStateList) throws IOException { + + // 获取最新设备状态 + JSON deviceStateListJson = deviceState.value(); + if(deviceStateListJson == null) { + Map allMachineMap = new HashMap<>(); + if (CollUtil.isNotEmpty(deviceStateList)) { + allMachineMap = deviceStateList.stream() + .collect(Collectors.toMap(DeviceState::getMachineIotMac, + state -> state, (state1, state2) -> state1)); + } + deviceState.update(JSONUtil.parse(allMachineMap)); + } + return deviceStateListJson; + } private static void sinkRabbitMq(DataStream commandDataStream) { @@ -186,7 +189,8 @@ public class IotMachineEventGeneratorJob { .setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)).build(); // 发送相应的指令到rabbitmq的交换机 - commandDataStream.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(), + commandDataStream + .addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(), new RMQSinkPublishOptions() { @Override @@ -267,7 +271,9 @@ public class IotMachineEventGeneratorJob { return null; } - private static void collDeviceStatusChange(Collector out, DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) { + private static void collDeviceStatusChange(Collector out, + DeviceState newState, + DeviceState oldState, MachineIotDataReceivedEvent event) { if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { // 设备开机 @@ -284,8 +290,9 @@ public class IotMachineEventGeneratorJob { } } - private static void collDeviceStatusChange1(Collector out, DeviceState newState, DeviceState oldState, - MachineIotDataReceivedEvent event) { + private static void collDeviceStatusChange1(Collector out, + DeviceState newState, + DeviceState oldState, MachineIotDataReceivedEvent event) { if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { // 设备开机