From cca9463edb09acaff1029a307db0467834d42c6d Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Tue, 19 Jul 2022 20:58:03 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../job/IotMachineEventGeneratorJob.java | 133 ++++++++---------- 1 file changed, 59 insertions(+), 74 deletions(-) diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index b42e97c..273bf71 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -5,6 +5,7 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.db.Db; +import cn.hutool.db.handler.RsHandler; import cn.hutool.json.JSON; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; @@ -49,7 +50,9 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; + import java.io.IOException; +import java.sql.ResultSet; import java.sql.SQLException; import java.time.LocalDateTime; import java.time.ZoneOffset; @@ -58,7 +61,8 @@ import java.util.stream.Collectors; public class IotMachineEventGeneratorJob { - private final static String SQL = "select machine_id, iot_mac as machine_iot_mac, status from qn_machine_realtime_state where is_delete = 0"; + private final static String SQL = "select machine_id, iot_mac as machine_iot_mac, status " + + "from qn_machine_realtime_state where iot_mac = ? and is_delete = 0"; public static void main(String[] args) throws Exception { @@ -84,13 +88,13 @@ public class IotMachineEventGeneratorJob { .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { - private ValueState deviceState; + private ValueState deviceState; @Override public void open(Configuration parameters) { // 必须在 open 生命周期初始化 deviceState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("deviceState1", TypeInformation.of(JSON.class))); + .getState(new ValueStateDescriptor<>("deviceState1", TypeInformation.of(DeviceState.class))); } @Override @@ -98,23 +102,16 @@ public class IotMachineEventGeneratorJob { KeyedProcessFunction.Context ctx, Collector out) throws Exception { - Tuple2> deviceStateListTuple = getDeviceStateListJson(deviceState); - JSON deviceStateListJson = deviceStateListTuple.f0; - assert deviceStateListJson != null; - DeviceState lastedDeviceState = deviceStateListJson - .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); + DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac()); Integer deviceStatus = getDeviceStatus(event); - Map deviceStateMap = deviceStateListTuple.f1; - DeviceState ds = deviceStateMap.get(event.getMachineIotMac()); - if (deviceStatus != null && ds != null) { - deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()), - new DeviceState(ds.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); + if (deviceStatus != null) { + // 更新状态 if (lastedDeviceState != null) { DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); collDeviceStatusChange1(out, newState, lastedDeviceState, event); + this.deviceState.update(newState); } - this.deviceState.update(deviceStateListJson); } } }).name("machineIotDataReceivedEventDataStream keyBy stream"); @@ -124,13 +121,13 @@ public class IotMachineEventGeneratorJob { .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { - private ValueState deviceState; + private ValueState deviceState; @Override public void open(Configuration parameters) { // 必须在 open 生命周期初始化 deviceState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(JSON.class))); + .getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(DeviceState.class))); } @Override @@ -139,23 +136,16 @@ public class IotMachineEventGeneratorJob { Collector out) throws Exception { // 获取最新设备状态 - Tuple2> deviceStateListTuple = getDeviceStateListJson(deviceState); - JSON deviceStateListJson = deviceStateListTuple.f0; - assert deviceStateListJson != null; - DeviceState lastedDeviceState = deviceStateListJson - .getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); + DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac()); Integer deviceStatus = getDeviceStatus(event); - Map deviceStateMap = deviceStateListTuple.f1; - DeviceState ds = deviceStateMap.get(event.getMachineIotMac()); - if (deviceStatus != null && ds != null) { - deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()), - new DeviceState(ds.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); + if (deviceStatus != null) { + // 更新状态 if (lastedDeviceState != null) { DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); collDeviceStatusChange(out, newState, lastedDeviceState, event); + this.deviceState.update(newState); } - deviceState.update(deviceStateListJson); } } }).name("keyBy stream"); @@ -170,26 +160,21 @@ public class IotMachineEventGeneratorJob { env.execute("iot machine event generator job"); } - private static Tuple2> getDeviceStateListJson(ValueState deviceState) throws IOException, SQLException { + private static DeviceState getDeviceStateListJson(ValueState deviceState, + Long machineIotMac) throws IOException, SQLException { // 获取最新设备状态 - JSON deviceStateListJson = deviceState.value(); - Map allMachineMap = new HashMap<>(); - if(deviceStateListJson == null) { - List deviceStateList = Db.use().query(SQL, DeviceState.class); - if (CollUtil.isNotEmpty(deviceStateList)) { - allMachineMap = deviceStateList.stream() - .collect(Collectors.toMap(DeviceState::getMachineIotMac, - state -> state, (state1, state2) -> state1)); + DeviceState deviceStateListJson = deviceState.value(); + if (deviceStateListJson == null) { + deviceStateListJson = Db.use().query(SQL, (RsHandler) rs -> new DeviceState(rs.getLong(1), + rs.getLong(2), + rs.getInt(3), System.currentTimeMillis()), machineIotMac); + if (deviceStateListJson != null) { + // 如果是空的,并且在表中都没找到,说明是没有被记录的设备,不用管 + deviceState.update(deviceStateListJson); } - deviceState.update(deviceStateListJson = JSONUtil.parse(allMachineMap)); - }else { - Map finalAllMachineMap = allMachineMap; - deviceStateListJson.toBean(Map.class).forEach((a, b) - -> finalAllMachineMap.put(Long.parseLong(String.valueOf(a)), - BeanUtil.toBean(b, DeviceState.class))); } - return new Tuple2<>(deviceStateListJson, allMachineMap); + return deviceStateListJson; } private static void sinkRabbitMq(DataStream commandDataStream) { @@ -206,40 +191,40 @@ public class IotMachineEventGeneratorJob { // 发送相应的指令到rabbitmq的交换机 commandDataStream .addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(), - new RMQSinkPublishOptions() { - - @Override - public String computeRoutingKey(BaseCommand command) { - - if(command instanceof PowerOnMachineCommand) { - // 机器通电 - return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY); - } - if(command instanceof PowerOffMachineCommand) { - // 机器断电 - return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY); - } - if(command instanceof StopMachineWorkingCommand) { - // 机器待机 - return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY); - }else { - // 机器工作 - return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY); - } - } + new RMQSinkPublishOptions() { + + @Override + public String computeRoutingKey(BaseCommand command) { + + if (command instanceof PowerOnMachineCommand) { + // 机器通电 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY); + } + if (command instanceof PowerOffMachineCommand) { + // 机器断电 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY); + } + if (command instanceof StopMachineWorkingCommand) { + // 机器待机 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY); + } else { + // 机器工作 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY); + } + } - @Override - public AMQP.BasicProperties computeProperties(BaseCommand command) { - return null; - } + @Override + public AMQP.BasicProperties computeProperties(BaseCommand command) { + return null; + } - @Override - public String computeExchange(BaseCommand command) { + @Override + public String computeExchange(BaseCommand command) { - // 交换机名称 - return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE); - } - })).name("commandDataStream to rabbitmq Sink"); + // 交换机名称 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE); + } + })).name("commandDataStream to rabbitmq Sink"); } private static void sinkEs(DataStream dataStream) {