From 4c62600d1f09492ce1a317dc96bc81e44c7a47cf Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Mon, 5 Sep 2022 14:31:24 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../command/PowerOffMachineCommand.java | 25 ++ .../command/PowerOnMachineCommand.java | 25 ++ .../command/StartMachineWorkingCommand.java | 25 ++ .../command/StopMachineWorkingCommand.java | 25 ++ .../event/MachineIotDataReceivedEvent.java | 4 +- .../generator/constant/ConfigConstant.java | 2 + .../event/generator/job/DeviceState.java | 11 - .../job/IotMachineEventGeneratorJob.java | 271 +++++++++--------- 8 files changed, 239 insertions(+), 149 deletions(-) diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOffMachineCommand.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOffMachineCommand.java index 90f350e..c55746d 100644 --- a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOffMachineCommand.java +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOffMachineCommand.java @@ -27,6 +27,31 @@ public class PowerOffMachineCommand extends BaseCommand { */ private Long mac; + /** + * 机器电源状态(0断电 1供电) + */ + private Integer machinePwrStat; + + /** + * 机器工作状态(0未工作 1工作中 2待机中) + */ + private Integer machineWorkingStat; + + /** + * 数据来源(0机智云 1树根) + */ + private Integer dataSource; + + /** + * 当前产能(距离上次的工作生产数量) + */ + private Long currCount; + + /** + * 当前时长(距离上次的作业时长,单位秒) + */ + private Long currDuration; + /** * 当前总产量 */ diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOnMachineCommand.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOnMachineCommand.java index 057dcfd..5c6305c 100644 --- a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOnMachineCommand.java +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOnMachineCommand.java @@ -27,6 +27,31 @@ public class PowerOnMachineCommand extends BaseCommand { */ private Long mac; + /** + * 机器电源状态(0断电 1供电) + */ + private Integer machinePwrStat; + + /** + * 机器工作状态(0未工作 1工作中 2待机中) + */ + private Integer machineWorkingStat; + + /** + * 数据来源(0机智云 1树根) + */ + private Integer dataSource; + + /** + * 当前产能(距离上次的工作生产数量) + */ + private Long currCount; + + /** + * 当前时长(距离上次的作业时长,单位秒) + */ + private Long currDuration; + /** * 当前总产量 */ diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StartMachineWorkingCommand.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StartMachineWorkingCommand.java index 5b80775..2d9fd45 100644 --- a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StartMachineWorkingCommand.java +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StartMachineWorkingCommand.java @@ -27,6 +27,31 @@ public class StartMachineWorkingCommand extends BaseCommand { */ private Long mac; + /** + * 机器电源状态(0断电 1供电) + */ + private Integer machinePwrStat; + + /** + * 机器工作状态(0未工作 1工作中 2待机中) + */ + private Integer machineWorkingStat; + + /** + * 数据来源(0机智云 1树根) + */ + private Integer dataSource; + + /** + * 当前产能(距离上次的工作生产数量) + */ + private Long currCount; + + /** + * 当前时长(距离上次的作业时长,单位秒) + */ + private Long currDuration; + /** * 当前总产量 */ diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StopMachineWorkingCommand.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StopMachineWorkingCommand.java index e0ab0d7..c52c863 100644 --- a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StopMachineWorkingCommand.java +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StopMachineWorkingCommand.java @@ -27,6 +27,31 @@ public class StopMachineWorkingCommand extends BaseCommand { */ private Long mac; + /** + * 机器电源状态(0断电 1供电) + */ + private Integer machinePwrStat; + + /** + * 机器工作状态(0未工作 1工作中 2待机中) + */ + private Integer machineWorkingStat; + + /** + * 数据来源(0机智云 1树根) + */ + private Integer dataSource; + + /** + * 当前产能(距离上次的工作生产数量) + */ + private Long currCount; + + /** + * 当前时长(距离上次的作业时长,单位秒) + */ + private Long currDuration; + /** * 当前总产量 */ diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java index 8cef901..deb2535 100644 --- a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java +++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java @@ -43,12 +43,12 @@ public class MachineIotDataReceivedEvent implements Serializable { private Long accJobCount; /** - * 当前产能(距离上次的工作生产数量) + * 当前产能(距离上次的生产数量) */ private Long currCount; /** - * 当前时长(距离上次的作业时长,单位秒) + * 当前时长(距离上次的间隔时长,单位秒) */ private Long currDuration; diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java index a8a12ff..9ceda8d 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java @@ -18,6 +18,8 @@ public interface ConfigConstant { String SINK_RABBITMQ_PASSWORD = "sink.rabbitmq.password"; + String SINK_RABBITMQ_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.machineCommand.routingKey"; + String SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.powerOnMachineCommand.routingKey"; String SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.powerOffMachineCommand.routingKey"; diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java index c358f1c..4b00b42 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/DeviceState.java @@ -14,16 +14,6 @@ public class DeviceState { */ private Long machineId; - /** - * 设备物联地址(云盒物理标识) - */ - private Long machineIotMac; - - /** - * 状态: 0:关机 1:生产中 2:待机 - */ - private Integer status; - /** * 计算单位 */ @@ -38,7 +28,6 @@ public class DeviceState { public String toString() { return "设备状态:{" + "machineId='" + machineId + '\'' + - ", status='" + status + ", updateTime='" + updateTime + '\'' + '}'; diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 3854bf2..a4d0e8d 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -38,6 +38,7 @@ import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; @@ -149,49 +150,32 @@ public class IotMachineEventGeneratorJob { } }).name("machine iot data received event filter"); - DataStream machineIotDataReceivedEventDataStream = streamOperator - .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) - .process(new KeyedProcessFunction() { - - private ValueState deviceState; - - @Override - public void open(Configuration parameters) { - - // 必须在 open 生命周期初始化 - deviceState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(DeviceState.class))); - } - - @Override - public void processElement(MachineIotDataReceivedEvent event, - KeyedProcessFunction.Context ctx, - Collector out) throws Exception { - - DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac()); - Integer deviceStatus = event.getMachineWorkingStat(); - // 更新状态 - if (lastedDeviceState != null) { - DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), - deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime()); - collDeviceStatusChange1(out, newState, lastedDeviceState, event); - this.deviceState.update(newState); - } - } - }).name("machineIotDataReceivedEventDataStream keyBy stream"); + // 新增一个测输出,用于接收MachineIotDataReceivedEvent类型数据 + OutputTag machineIotDataReceivedEventOutput + = new OutputTag("machine-iot-data-received-event") { + }; - DataStream commandDataStream = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) + SingleOutputStreamOperator commandDataStream = streamOperator + .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { private ValueState deviceState; + private ValueState lastDataReceivedEventState; + @Override public void open(Configuration parameters) { // 必须在 open 生命周期初始化 deviceState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(DeviceState.class))); + .getState(new ValueStateDescriptor<>("deviceState", + TypeInformation.of(DeviceState.class))); + + // 必须在 open 生命周期初始化 + lastDataReceivedEventState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("lastDataReceivedEvent1", + TypeInformation.of(MachineIotDataReceivedEvent.class))); } @Override @@ -199,51 +183,81 @@ public class IotMachineEventGeneratorJob { KeyedProcessFunction.Context ctx, Collector out) throws Exception { - // 获取最新设备状态 - DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac()); - Integer deviceStatus = event.getMachineWorkingStat(); - // 更新状态 - if (lastedDeviceState != null) { - DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), - deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime()); - collDeviceStatusChange(out, newState, lastedDeviceState, event); - this.deviceState.update(newState); + Long machineIotMac = event.getMachineIotMac(); + DeviceState deviceState = getDeviceState(this.deviceState, machineIotMac); + if (deviceState != null) { + MachineIotDataReceivedEvent lastDataReceivedEvent = getLastDataReceivedEvent(lastDataReceivedEventState, machineIotMac); + if (lastDataReceivedEvent == null) { + lastDataReceivedEvent = event; + } + Integer pwrSta = event.getMachinePwrStat(); + Integer workingSta = event.getMachineWorkingStat(); + // 总产量 + Long accCount = event.getAccJobCount(); + Long reportTime = event.getReportTime(); + Integer lastPwrStat = lastDataReceivedEvent.getMachinePwrStat(); + Integer lastWorkingStat = lastDataReceivedEvent.getMachineWorkingStat(); + Long lastReportTime = lastDataReceivedEvent.getReportTime(); + Long lastAccJobCount = lastDataReceivedEvent.getAccJobCount(); + // 只有当前消息的时间大于等于上一次消息的时间才要,否则丢弃 + if (reportTime >= lastReportTime) { + if ((pwrSta == 1 && workingSta == 1) + || (lastPwrStat == 1 && lastWorkingStat == 1)) { + // 只有当前是工作中或上次是工作中才进行计算 + // 如果这次的消息和上次的消息相差半个小时,那么不进行计算 + if (reportTime - lastReportTime <= 30 * 60 * 1000) { + event.setCurrCount(accCount - lastAccJobCount); + // 单位是秒 + event.setCurrDuration((reportTime - lastReportTime) / 3600); + } + } + ctx.output(machineIotDataReceivedEventOutput, event); + collDeviceStatusChange(out, deviceState, lastDataReceivedEvent, event); + lastDataReceivedEventState.update(event); + } } } }).name("keyBy stream"); + // 获取测输出数据 + DataStream sideOutput = commandDataStream.getSideOutput(machineIotDataReceivedEventOutput); + // 写入rabbitmq sinkRabbitMq(commandDataStream); // 写入es - sinkEs(machineIotDataReceivedEventDataStream); + sinkEs(sideOutput); env.execute("iot machine event generator job"); } - private static DeviceState getDeviceStateListJson(ValueState deviceState, - Long machineIotMac) throws IOException, SQLException { + private static MachineIotDataReceivedEvent getLastDataReceivedEvent(ValueState lastDataReceivedEventState, + Long machineIotMac) throws IOException { + + MachineIotDataReceivedEvent value = lastDataReceivedEventState.value(); + if (value == null) { + value = queryLastDataReceivedEvent(machineIotMac); + } + return value; + } + + + private static DeviceState getDeviceState(ValueState deviceState, + Long machineIotMac) throws SQLException, IOException { - // 获取最新设备状态 - DeviceState deviceStateListJson = deviceState.value(); - if (deviceStateListJson == null) { - int countUnit = 1; + DeviceState value = deviceState.value(); + if (value == null) { // 查询数据库最新的设备状态 List list = Db.use().query(SQL, DeviceState.class, machineIotMac); if (CollUtil.isNotEmpty(list)) { - deviceStateListJson = list.get(0); + value = list.get(0); } - DeviceState latestDeviceState = queryLatestDeviceState(machineIotMac); - if (latestDeviceState != null && deviceStateListJson != null) { - latestDeviceState.setCountUnit(countUnit); - latestDeviceState.setMachineId(deviceStateListJson.getMachineId()); - } - deviceStateListJson = latestDeviceState; + // 数据库找不到的话说明设备没记录 } - return deviceStateListJson; + return value; } - private static DeviceState queryLatestDeviceState(Long machineIotMac) { + private static MachineIotDataReceivedEvent queryLastDataReceivedEvent(Long machineIotMac) { try { // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) @@ -265,13 +279,8 @@ public class IotMachineEventGeneratorJob { if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { SearchHits hits = searchResponse.getHits(); SearchHit reqHit = hits.getHits()[0]; - MachineIotDataReceivedEvent receivedEvent = JSONUtil + return JSONUtil .toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class); - DeviceState deviceState = new DeviceState(); - deviceState.setMachineIotMac(machineIotMac); - deviceState.setStatus(receivedEvent.getMachineWorkingStat()); - deviceState.setUpdateTime(receivedEvent.getReportTime()); - return deviceState; } } } catch (Exception e) { @@ -299,21 +308,7 @@ public class IotMachineEventGeneratorJob { @Override public String computeRoutingKey(BaseCommand command) { - if (command instanceof PowerOnMachineCommand) { - // 机器通电 - return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY); - } - if (command instanceof PowerOffMachineCommand) { - // 机器断电 - return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY); - } - if (command instanceof StopMachineWorkingCommand) { - // 机器待机 - return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY); - } else { - // 机器工作 - return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY); - } + return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_MACHINE_COMMAND_ROUTING_KEY); } @Override @@ -327,8 +322,6 @@ public class IotMachineEventGeneratorJob { // 交换机名称 return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_EXCHANGE); } - - })).name("commandDataStream to rabbitmq Sink"); } @@ -470,66 +463,72 @@ public class IotMachineEventGeneratorJob { } private static void collDeviceStatusChange(Collector out, - DeviceState newState, - DeviceState oldState, MachineIotDataReceivedEvent event) { - - Integer countUnit = newState.getCountUnit(); - countUnit = countUnit == null ? 1 : countUnit; - Long currJobCount = event.getCurrJobCount(); - currJobCount = currJobCount == null ? 0 : currJobCount * countUnit; - if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { + DeviceState deviceState, + MachineIotDataReceivedEvent lastDataReceivedEvent, + MachineIotDataReceivedEvent event) { + + Long machineIotMac = event.getMachineIotMac(); + Integer dataSource = event.getDataSource(); + Long machineId = deviceState.getMachineId(); + Long currCount = event.getCurrCount(); + Long currDuration = event.getCurrDuration(); + Long accJobCount = event.getAccJobCount(); + Long reportTime = event.getReportTime(); + Integer lastWorkingStat = lastDataReceivedEvent.getMachineWorkingStat(); + Integer workingStat = event.getMachineWorkingStat(); + Integer pwrStat = event.getMachinePwrStat(); + if (lastWorkingStat == 0 && (workingStat == 1 || workingStat == 2)) { // 设备开机 - PowerOnMachineCommand powerOnMachineCommand = new PowerOnMachineCommand(newState.getMachineId(), - newState.getMachineIotMac(), currJobCount); - powerOnMachineCommand.setTimestamp(event.getReportTime()); - out.collect(powerOnMachineCommand); - } else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { + PowerOnMachineCommand command = new PowerOnMachineCommand(); + command.setId(machineId); + command.setMac(machineIotMac); + command.setMachinePwrStat(pwrStat); + command.setMachineWorkingStat(workingStat); + command.setDataSource(dataSource); + command.setCurrCount(currCount); + command.setCurrDuration(currDuration); + command.setCurrTotalOutput(accJobCount); + command.setTimestamp(reportTime); + out.collect(command); + } else if ((lastWorkingStat == 1 || lastWorkingStat == 2) && workingStat == 0) { // 设备关机 - PowerOffMachineCommand powerOffMachineCommand = new PowerOffMachineCommand(newState.getMachineId(), - newState.getMachineIotMac(), currJobCount); - powerOffMachineCommand.setTimestamp(event.getReportTime()); - out.collect(powerOffMachineCommand); - } else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { + PowerOffMachineCommand command = new PowerOffMachineCommand(); + command.setId(machineId); + command.setMac(machineIotMac); + command.setMachinePwrStat(pwrStat); + command.setMachineWorkingStat(workingStat); + command.setDataSource(dataSource); + command.setCurrCount(currCount); + command.setCurrDuration(currDuration); + command.setCurrTotalOutput(accJobCount); + command.setTimestamp(reportTime); + out.collect(command); + } else if (lastWorkingStat == 1 && workingStat == 2) { // 设备开始待机 - StopMachineWorkingCommand stopMachineWorkingCommand = new StopMachineWorkingCommand(newState.getMachineId(), - newState.getMachineIotMac(), currJobCount); - stopMachineWorkingCommand.setTimestamp(event.getReportTime()); - out.collect(stopMachineWorkingCommand); - } else if ((oldState.getStatus() == 2 || oldState.getStatus() == 1) && newState.getStatus() == 1) { + StopMachineWorkingCommand command = new StopMachineWorkingCommand(); + command.setId(machineId); + command.setMac(machineIotMac); + command.setMachinePwrStat(pwrStat); + command.setMachineWorkingStat(workingStat); + command.setDataSource(dataSource); + command.setCurrCount(currCount); + command.setCurrDuration(currDuration); + command.setCurrTotalOutput(accJobCount); + command.setTimestamp(reportTime); + out.collect(command); + } else if ((lastWorkingStat == 2 || lastWorkingStat == 1) && workingStat == 1) { // 设备开始工作 - StartMachineWorkingCommand startMachineWorkingCommand = new StartMachineWorkingCommand(newState.getMachineId(), - newState.getMachineIotMac(), currJobCount); - startMachineWorkingCommand.setTimestamp(event.getReportTime()); - out.collect(startMachineWorkingCommand); - } - } - - private static void collDeviceStatusChange1(Collector out, - DeviceState newState, - DeviceState oldState, MachineIotDataReceivedEvent event) { - - Integer countUnit = newState.getCountUnit(); - countUnit = countUnit == null ? 1 : countUnit; - Long currJobCount = event.getCurrJobCount(); - if (currJobCount != null) { - event.setCurrJobCount(currJobCount * countUnit); - } - Long accJobCount = event.getAccJobCount(); - if (accJobCount != null) { - event.setAccJobCount(accJobCount * countUnit); - } - if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { - // 设备开机 - out.collect(event); - } else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { - // 设备关机 - out.collect(event); - } else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { - // 设备待机中 - out.collect(event); - } else if ((oldState.getStatus() == 2 || oldState.getStatus() == 1) && newState.getStatus() == 1) { - // 设备工作中 - out.collect(event); + StartMachineWorkingCommand command = new StartMachineWorkingCommand(); + command.setId(machineId); + command.setMac(machineIotMac); + command.setMachinePwrStat(pwrStat); + command.setMachineWorkingStat(workingStat); + command.setDataSource(dataSource); + command.setCurrCount(currCount); + command.setCurrDuration(currDuration); + command.setCurrTotalOutput(accJobCount); + command.setTimestamp(reportTime); + out.collect(command); } } }