From 359ad98653fa27428cfb9c8dba8961b4c629ca00 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Tue, 19 Jul 2022 11:04:45 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../command/PowerOffMachineCommand.java | 8 ++++- .../command/PowerOnMachineCommand.java | 8 ++++- .../command/StartMachineWorkingCommand.java | 8 ++++- .../command/StopMachineWorkingCommand.java | 8 ++++- .../generator/constant/ConfigConstant.java | 8 ++++- .../job/IotMachineEventGeneratorJob.java | 34 +++++++++++++++---- 6 files changed, 62 insertions(+), 12 deletions(-) diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOffMachineCommand.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOffMachineCommand.java index 8220c2d..90f350e 100644 --- a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOffMachineCommand.java +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOffMachineCommand.java @@ -14,13 +14,19 @@ import lombok.NoArgsConstructor; @AllArgsConstructor @NoArgsConstructor public class PowerOffMachineCommand extends BaseCommand { + private static final long serialVersionUID = 1L; /** - * 机器标识 + * 机器标识(MachineId) */ private Long id; + /** + * 云盒mac + */ + private Long mac; + /** * 当前总产量 */ diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOnMachineCommand.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOnMachineCommand.java index fc089ad..057dcfd 100644 --- a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOnMachineCommand.java +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOnMachineCommand.java @@ -14,13 +14,19 @@ import lombok.NoArgsConstructor; @AllArgsConstructor @NoArgsConstructor public class PowerOnMachineCommand extends BaseCommand { + private static final long serialVersionUID = 1L; /** - * 机器标识 + * 机器标识(MachineId) */ private Long id; + /** + * 云盒mac + */ + private Long mac; + /** * 当前总产量 */ diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StartMachineWorkingCommand.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StartMachineWorkingCommand.java index 5f735d0..5b80775 100644 --- a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StartMachineWorkingCommand.java +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StartMachineWorkingCommand.java @@ -14,13 +14,19 @@ import lombok.NoArgsConstructor; @AllArgsConstructor @NoArgsConstructor public class StartMachineWorkingCommand extends BaseCommand { + private static final long serialVersionUID = 1L; /** - * 机器标识 + * 机器标识(MachineId) */ private Long id; + /** + * 云盒mac + */ + private Long mac; + /** * 当前总产量 */ diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StopMachineWorkingCommand.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StopMachineWorkingCommand.java index 59f1771..e0ab0d7 100644 --- a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StopMachineWorkingCommand.java +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StopMachineWorkingCommand.java @@ -14,13 +14,19 @@ import lombok.NoArgsConstructor; @AllArgsConstructor @NoArgsConstructor public class StopMachineWorkingCommand extends BaseCommand { + private static final long serialVersionUID = 1L; /** - * 机器标识 + * 机器标识(MachineId) */ private Long id; + /** + * 云盒mac + */ + private Long mac; + /** * 当前总产量 */ diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java index 85f2366..2838478 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java @@ -18,7 +18,13 @@ public interface ConfigConstant { String SINK_RABBITMQ_PASSWORD = "sink.rabbitmq.password"; - String SINK_RABBITMQ_ROUTING_KEY = "sink.rabbitmq.routingKey"; + String SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.powerOnMachineCommand.routingKey"; + + String SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.powerOffMachineCommand.routingKey"; + + String SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY = "sink.rabbitmq.stopMachineWorkingCommand.routingKey"; + + String SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY = "sink.rabbitmq.startMachineWorkingCommand.routingKey"; String SINK_RABBITMQ_EXCHANGE = "sink.rabbitmq.exchange"; diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 1d81666..24bcc64 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -71,7 +71,7 @@ public class IotMachineEventGeneratorJob { .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) - .setStartingOffsets(OffsetsInitializer.earliest()) + .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .build(); @@ -189,7 +189,8 @@ public class IotMachineEventGeneratorJob { .setVirtualHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST)) .setUserName(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_USER_NAME)) .setPassword(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_PASSWORD)) - .setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)).build(); + .setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)) + .build(); // 发送相应的指令到rabbitmq的交换机 commandDataStream @@ -198,7 +199,22 @@ public class IotMachineEventGeneratorJob { @Override public String computeRoutingKey(BaseCommand command) { - return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_ROUTING_KEY); + + if(command instanceof PowerOnMachineCommand) { + // 机器通电 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY); + } + if(command instanceof PowerOffMachineCommand) { + // 机器断电 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY); + } + if(command instanceof StopMachineWorkingCommand) { + // 机器待机 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY); + }else { + // 机器工作 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY); + } } @Override @@ -280,16 +296,20 @@ public class IotMachineEventGeneratorJob { if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { // 设备开机 - out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + out.collect(new PowerOnMachineCommand(newState.getMachineId(), + newState.getMachineIotMac(), event.getCurrJobCount())); } else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { // 设备关机 - out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + out.collect(new PowerOffMachineCommand(newState.getMachineId(), + newState.getMachineIotMac(), event.getCurrJobCount())); } else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { // 设备开始待机 - out.collect(new StopMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + out.collect(new StopMachineWorkingCommand(newState.getMachineId(), + newState.getMachineIotMac(), event.getCurrJobCount())); } else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { // 设备开始工作 - out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + out.collect(new StartMachineWorkingCommand(newState.getMachineId(), + newState.getMachineIotMac(), event.getCurrJobCount())); } }