diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index 229e285..49253a4 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -13,8 +13,10 @@ import com.qniao.iot.rc.constant.MachinePwrStatusEnum; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; @@ -115,16 +117,16 @@ public class IotDevicePowerOnAndOffDataJob { .filter((FilterFunction) value -> { Long reportTime = value.getReportTime(); - if(reportTime != null + if (reportTime != null && value.getDataSource() != null && value.getMachinePwrStat() != null) { String reportTimeStr = StrUtil.toString(reportTime); - if(reportTimeStr.length() == 10) { + if (reportTimeStr.length() == 10) { // 机智云那边的设备可能是秒或毫秒 reportTime = reportTime * 1000; } long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); // 晚30分钟的数据就不要了 - return nowTime - reportTime <= (30*60*1000); + return nowTime - reportTime <= (30 * 60 * 1000); } return false; }); @@ -138,10 +140,19 @@ public class IotDevicePowerOnAndOffDataJob { @Override public void open(Configuration parameters) { + StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1)) + .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) + .build(); + + ValueStateDescriptor powerOnAndOffDataEventValue = new ValueStateDescriptor<>("powerOnAndOffDataEventValue", + TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)); + // 设置状态值的过期时间,为了解决机器关机没有消息的情况 + powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig); + // 必须在 open 生命周期初始化 powerOnAndOffDataEventValueState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("powerOnAndOffDataEventValue", - TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class))); + .getState(powerOnAndOffDataEventValue); } @Override @@ -152,12 +163,12 @@ public class IotDevicePowerOnAndOffDataJob { IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event); Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); Long reportTime = event.getReportTime(); - if(reportTime > lastReportTime) { + if (reportTime > lastReportTime) { Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); Integer machinePwrStat = event.getMachinePwrStat(); Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat(); Integer machineWorkingStat = event.getMachineWorkingStat(); - if(!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) + if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) || (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) { Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount(); Long accJobCount = event.getAccJobCount(); @@ -201,11 +212,10 @@ public class IotDevicePowerOnAndOffDataJob { if (lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { // 只有开机时间不为空 - if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { - powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); - } else { + if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(machinePwrStat)) { powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime()); } + powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); out.collect(powerOnAndOffDataEvent); } else { @@ -269,10 +279,9 @@ public class IotDevicePowerOnAndOffDataJob { powerOnAndOffDataEvent.setMachinePwrStat(deviceMonitoringData.getMachinePwrStat()); powerOnAndOffDataEvent.setMachineWorkingStat(deviceMonitoringData.getMachineWorkingStat()); Long reportTime = deviceMonitoringData.getReportTime(); - if (machinePwrStat == 1) { - // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); - } else { + // 开机 + powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); + if (machinePwrStat == 0) { // 关机 powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); } @@ -287,10 +296,9 @@ public class IotDevicePowerOnAndOffDataJob { powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat()); powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat()); Long reportTime = event.getReportTime(); - if (machinePwrStat == 1) { - // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); - } else { + // 开机 + powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); + if (machinePwrStat == 0) { // 关机 powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); } diff --git a/iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties b/iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties index 6a3c421..457e902 100644 --- a/iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties +++ b/iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties @@ -2,4 +2,4 @@ app.id=iot-device-power-on-and-off-data # test 8.135.8.221 # prod 47.112.164.224 -apollo.meta=http://8.135.8.221:5000 \ No newline at end of file +apollo.meta=http://47.112.164.224:5000 \ No newline at end of file