diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOffMachineCommand.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOffMachineCommand.java index 8220c2d..90f350e 100644 --- a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOffMachineCommand.java +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOffMachineCommand.java @@ -14,13 +14,19 @@ import lombok.NoArgsConstructor; @AllArgsConstructor @NoArgsConstructor public class PowerOffMachineCommand extends BaseCommand { + private static final long serialVersionUID = 1L; /** - * 机器标识 + * 机器标识(MachineId) */ private Long id; + /** + * 云盒mac + */ + private Long mac; + /** * 当前总产量 */ diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOnMachineCommand.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOnMachineCommand.java index fc089ad..057dcfd 100644 --- a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOnMachineCommand.java +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/PowerOnMachineCommand.java @@ -14,13 +14,19 @@ import lombok.NoArgsConstructor; @AllArgsConstructor @NoArgsConstructor public class PowerOnMachineCommand extends BaseCommand { + private static final long serialVersionUID = 1L; /** - * 机器标识 + * 机器标识(MachineId) */ private Long id; + /** + * 云盒mac + */ + private Long mac; + /** * 当前总产量 */ diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StartMachineWorkingCommand.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StartMachineWorkingCommand.java index 5f735d0..5b80775 100644 --- a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StartMachineWorkingCommand.java +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StartMachineWorkingCommand.java @@ -14,13 +14,19 @@ import lombok.NoArgsConstructor; @AllArgsConstructor @NoArgsConstructor public class StartMachineWorkingCommand extends BaseCommand { + private static final long serialVersionUID = 1L; /** - * 机器标识 + * 机器标识(MachineId) */ private Long id; + /** + * 云盒mac + */ + private Long mac; + /** * 当前总产量 */ diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StopMachineWorkingCommand.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StopMachineWorkingCommand.java index 59f1771..e0ab0d7 100644 --- a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StopMachineWorkingCommand.java +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/command/StopMachineWorkingCommand.java @@ -14,13 +14,19 @@ import lombok.NoArgsConstructor; @AllArgsConstructor @NoArgsConstructor public class StopMachineWorkingCommand extends BaseCommand { + private static final long serialVersionUID = 1L; /** - * 机器标识 + * 机器标识(MachineId) */ private Long id; + /** + * 云盒mac + */ + private Long mac; + /** * 当前总产量 */ diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java index 85f2366..2838478 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java @@ -18,7 +18,13 @@ public interface ConfigConstant { String SINK_RABBITMQ_PASSWORD = "sink.rabbitmq.password"; - String SINK_RABBITMQ_ROUTING_KEY = "sink.rabbitmq.routingKey"; + String SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.powerOnMachineCommand.routingKey"; + + String SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.powerOffMachineCommand.routingKey"; + + String SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY = "sink.rabbitmq.stopMachineWorkingCommand.routingKey"; + + String SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY = "sink.rabbitmq.startMachineWorkingCommand.routingKey"; String SINK_RABBITMQ_EXCHANGE = "sink.rabbitmq.exchange"; diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 1d81666..24bcc64 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -71,7 +71,7 @@ public class IotMachineEventGeneratorJob { .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) - .setStartingOffsets(OffsetsInitializer.earliest()) + .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .build(); @@ -189,7 +189,8 @@ public class IotMachineEventGeneratorJob { .setVirtualHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST)) .setUserName(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_USER_NAME)) .setPassword(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_PASSWORD)) - .setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)).build(); + .setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)) + .build(); // 发送相应的指令到rabbitmq的交换机 commandDataStream @@ -198,7 +199,22 @@ public class IotMachineEventGeneratorJob { @Override public String computeRoutingKey(BaseCommand command) { - return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_ROUTING_KEY); + + if(command instanceof PowerOnMachineCommand) { + // 机器通电 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY); + } + if(command instanceof PowerOffMachineCommand) { + // 机器断电 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY); + } + if(command instanceof StopMachineWorkingCommand) { + // 机器待机 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY); + }else { + // 机器工作 + return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY); + } } @Override @@ -280,16 +296,20 @@ public class IotMachineEventGeneratorJob { if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { // 设备开机 - out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + out.collect(new PowerOnMachineCommand(newState.getMachineId(), + newState.getMachineIotMac(), event.getCurrJobCount())); } else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { // 设备关机 - out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + out.collect(new PowerOffMachineCommand(newState.getMachineId(), + newState.getMachineIotMac(), event.getCurrJobCount())); } else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { // 设备开始待机 - out.collect(new StopMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + out.collect(new StopMachineWorkingCommand(newState.getMachineId(), + newState.getMachineIotMac(), event.getCurrJobCount())); } else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { // 设备开始工作 - out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); + out.collect(new StartMachineWorkingCommand(newState.getMachineId(), + newState.getMachineIotMac(), event.getCurrJobCount())); } }