diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 24bcc64..fbf901b 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -44,6 +44,8 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.io.IOException; import java.sql.SQLException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.*; import java.util.stream.Collectors; @@ -296,20 +298,28 @@ public class IotMachineEventGeneratorJob { if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { // 设备开机 - out.collect(new PowerOnMachineCommand(newState.getMachineId(), - newState.getMachineIotMac(), event.getCurrJobCount())); + PowerOnMachineCommand powerOnMachineCommand = new PowerOnMachineCommand(newState.getMachineId(), + newState.getMachineIotMac(), event.getCurrJobCount()); + powerOnMachineCommand.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + out.collect(powerOnMachineCommand); } else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { // 设备关机 - out.collect(new PowerOffMachineCommand(newState.getMachineId(), - newState.getMachineIotMac(), event.getCurrJobCount())); + PowerOffMachineCommand powerOffMachineCommand = new PowerOffMachineCommand(newState.getMachineId(), + newState.getMachineIotMac(), event.getCurrJobCount()); + powerOffMachineCommand.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + out.collect(powerOffMachineCommand); } else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { // 设备开始待机 - out.collect(new StopMachineWorkingCommand(newState.getMachineId(), - newState.getMachineIotMac(), event.getCurrJobCount())); + StopMachineWorkingCommand stopMachineWorkingCommand = new StopMachineWorkingCommand(newState.getMachineId(), + newState.getMachineIotMac(), event.getCurrJobCount()); + stopMachineWorkingCommand.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + out.collect(stopMachineWorkingCommand); } else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { // 设备开始工作 - out.collect(new StartMachineWorkingCommand(newState.getMachineId(), - newState.getMachineIotMac(), event.getCurrJobCount())); + StartMachineWorkingCommand startMachineWorkingCommand = new StartMachineWorkingCommand(newState.getMachineId(), + newState.getMachineIotMac(), event.getCurrJobCount()); + startMachineWorkingCommand.setTimestamp(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + out.collect(startMachineWorkingCommand); } }