diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 6608f77..ecaf516 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -218,8 +218,10 @@ public class IotMachineEventGeneratorJob { } if((event.getCurrJobCount() != 0 && event.getCurrJobDuration() != 0) || (event.getCurrJobCount() == 0 && event.getCurrJobDuration() == 0)) { - ctx.output(machineIotDataReceivedEventOutput, event); - collDeviceStatusChange(out, deviceState, lastDataReceivedEvent, event); + MachineIotDataReceivedEvent receivedEvent = collDeviceStatusChange(out, deviceState, lastDataReceivedEvent, event); + if(receivedEvent != null) { + ctx.output(machineIotDataReceivedEventOutput, event); + } lastDataReceivedEventState.update(event); } } @@ -503,7 +505,7 @@ public class IotMachineEventGeneratorJob { } } - private static void collDeviceStatusChange(Collector out, + private static MachineIotDataReceivedEvent collDeviceStatusChange(Collector out, DeviceState deviceState, MachineIotDataReceivedEvent lastDataReceivedEvent, MachineIotDataReceivedEvent event) { @@ -541,6 +543,7 @@ public class IotMachineEventGeneratorJob { command.setTimestamp(reportTime); out.collect(command); out.collect(machineOutputCommand); + return event; } else if ((lastWorkingStat == 1 || lastWorkingStat == 2) && workingStat == 0) { // 设备关机 PowerOffMachineCommand command = new PowerOffMachineCommand(); @@ -553,6 +556,7 @@ public class IotMachineEventGeneratorJob { command.setTimestamp(reportTime); out.collect(command); out.collect(machineOutputCommand); + return event; } else if (lastWorkingStat == 1 && workingStat == 2) { // 设备开始待机 StopMachineWorkingCommand command = new StopMachineWorkingCommand(); @@ -565,6 +569,7 @@ public class IotMachineEventGeneratorJob { command.setTimestamp(reportTime); out.collect(command); out.collect(machineOutputCommand); + return event; } else if ((lastWorkingStat == 2 || lastWorkingStat == 1) && workingStat == 1) { // 设备开始工作 StartMachineWorkingCommand command = new StartMachineWorkingCommand(); @@ -577,6 +582,8 @@ public class IotMachineEventGeneratorJob { command.setTimestamp(reportTime); out.collect(command); out.collect(machineOutputCommand); + return event; } + return null; } }