|
|
|
@ -13,8 +13,10 @@ import com.qniao.iot.rc.constant.MachinePwrStatusEnum; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
|
import org.apache.flink.api.common.functions.FilterFunction; |
|
|
|
import org.apache.flink.api.common.state.StateTtlConfig; |
|
|
|
import org.apache.flink.api.common.state.ValueState; |
|
|
|
import org.apache.flink.api.common.state.ValueStateDescriptor; |
|
|
|
import org.apache.flink.api.common.time.Time; |
|
|
|
import org.apache.flink.api.common.typeinfo.TypeInformation; |
|
|
|
import org.apache.flink.configuration.Configuration; |
|
|
|
import org.apache.flink.connector.kafka.source.KafkaSource; |
|
|
|
@ -115,16 +117,16 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
|
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> { |
|
|
|
|
|
|
|
Long reportTime = value.getReportTime(); |
|
|
|
if(reportTime != null |
|
|
|
if (reportTime != null |
|
|
|
&& value.getDataSource() != null && value.getMachinePwrStat() != null) { |
|
|
|
String reportTimeStr = StrUtil.toString(reportTime); |
|
|
|
if(reportTimeStr.length() == 10) { |
|
|
|
if (reportTimeStr.length() == 10) { |
|
|
|
// 机智云那边的设备可能是秒或毫秒 |
|
|
|
reportTime = reportTime * 1000; |
|
|
|
} |
|
|
|
long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); |
|
|
|
// 晚30分钟的数据就不要了 |
|
|
|
return nowTime - reportTime <= (30*60*1000); |
|
|
|
return nowTime - reportTime <= (30 * 60 * 1000); |
|
|
|
} |
|
|
|
return false; |
|
|
|
}); |
|
|
|
@ -138,10 +140,19 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
|
@Override |
|
|
|
public void open(Configuration parameters) { |
|
|
|
|
|
|
|
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1)) |
|
|
|
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) |
|
|
|
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) |
|
|
|
.build(); |
|
|
|
|
|
|
|
ValueStateDescriptor<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValue = new ValueStateDescriptor<>("powerOnAndOffDataEventValue", |
|
|
|
TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)); |
|
|
|
// 设置状态值的过期时间,为了解决机器关机没有消息的情况 |
|
|
|
powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig); |
|
|
|
|
|
|
|
// 必须在 open 生命周期初始化 |
|
|
|
powerOnAndOffDataEventValueState = getRuntimeContext() |
|
|
|
.getState(new ValueStateDescriptor<>("powerOnAndOffDataEventValue", |
|
|
|
TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class))); |
|
|
|
.getState(powerOnAndOffDataEventValue); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -152,12 +163,12 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
|
IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event); |
|
|
|
Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); |
|
|
|
Long reportTime = event.getReportTime(); |
|
|
|
if(reportTime > lastReportTime) { |
|
|
|
if (reportTime > lastReportTime) { |
|
|
|
Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); |
|
|
|
Integer machinePwrStat = event.getMachinePwrStat(); |
|
|
|
Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat(); |
|
|
|
Integer machineWorkingStat = event.getMachineWorkingStat(); |
|
|
|
if(!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) |
|
|
|
if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) |
|
|
|
|| (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) { |
|
|
|
Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount(); |
|
|
|
Long accJobCount = event.getAccJobCount(); |
|
|
|
@ -201,11 +212,10 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
|
if (lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { |
|
|
|
if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { |
|
|
|
// 只有开机时间不为空 |
|
|
|
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); |
|
|
|
} else { |
|
|
|
if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(machinePwrStat)) { |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime()); |
|
|
|
} |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); |
|
|
|
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); |
|
|
|
out.collect(powerOnAndOffDataEvent); |
|
|
|
} else { |
|
|
|
@ -269,10 +279,9 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
|
powerOnAndOffDataEvent.setMachinePwrStat(deviceMonitoringData.getMachinePwrStat()); |
|
|
|
powerOnAndOffDataEvent.setMachineWorkingStat(deviceMonitoringData.getMachineWorkingStat()); |
|
|
|
Long reportTime = deviceMonitoringData.getReportTime(); |
|
|
|
if (machinePwrStat == 1) { |
|
|
|
// 开机 |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); |
|
|
|
} else { |
|
|
|
// 开机 |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); |
|
|
|
if (machinePwrStat == 0) { |
|
|
|
// 关机 |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); |
|
|
|
} |
|
|
|
@ -287,10 +296,9 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
|
powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat()); |
|
|
|
powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat()); |
|
|
|
Long reportTime = event.getReportTime(); |
|
|
|
if (machinePwrStat == 1) { |
|
|
|
// 开机 |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); |
|
|
|
} else { |
|
|
|
// 开机 |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); |
|
|
|
if (machinePwrStat == 0) { |
|
|
|
// 关机 |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); |
|
|
|
} |
|
|
|
|