|
|
|
@ -71,7 +71,7 @@ public class IotMachineEventGeneratorJob { |
|
|
|
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
|
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) |
|
|
|
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) |
|
|
|
.setStartingOffsets(OffsetsInitializer.earliest()) |
|
|
|
.setStartingOffsets(OffsetsInitializer.latest()) |
|
|
|
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) |
|
|
|
.build(); |
|
|
|
|
|
|
|
@ -189,7 +189,8 @@ public class IotMachineEventGeneratorJob { |
|
|
|
.setVirtualHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST)) |
|
|
|
.setUserName(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_USER_NAME)) |
|
|
|
.setPassword(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_PASSWORD)) |
|
|
|
.setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)).build(); |
|
|
|
.setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)) |
|
|
|
.build(); |
|
|
|
|
|
|
|
// 发送相应的指令到rabbitmq的交换机 |
|
|
|
commandDataStream |
|
|
|
@ -198,7 +199,22 @@ public class IotMachineEventGeneratorJob { |
|
|
|
|
|
|
|
@Override |
|
|
|
public String computeRoutingKey(BaseCommand command) { |
|
|
|
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_ROUTING_KEY); |
|
|
|
|
|
|
|
if(command instanceof PowerOnMachineCommand) { |
|
|
|
// 机器通电 |
|
|
|
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY); |
|
|
|
} |
|
|
|
if(command instanceof PowerOffMachineCommand) { |
|
|
|
// 机器断电 |
|
|
|
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY); |
|
|
|
} |
|
|
|
if(command instanceof StopMachineWorkingCommand) { |
|
|
|
// 机器待机 |
|
|
|
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY); |
|
|
|
}else { |
|
|
|
// 机器工作 |
|
|
|
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -280,16 +296,20 @@ public class IotMachineEventGeneratorJob { |
|
|
|
|
|
|
|
if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { |
|
|
|
// 设备开机 |
|
|
|
out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
out.collect(new PowerOnMachineCommand(newState.getMachineId(), |
|
|
|
newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { |
|
|
|
// 设备关机 |
|
|
|
out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
out.collect(new PowerOffMachineCommand(newState.getMachineId(), |
|
|
|
newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { |
|
|
|
// 设备开始待机 |
|
|
|
out.collect(new StopMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
out.collect(new StopMachineWorkingCommand(newState.getMachineId(), |
|
|
|
newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { |
|
|
|
// 设备开始工作 |
|
|
|
out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
out.collect(new StartMachineWorkingCommand(newState.getMachineId(), |
|
|
|
newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|