From 36a43ab51482070db2d675bbb5825f3367274502 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Sat, 3 Sep 2022 02:56:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../event/MachineIotDataReceivedEvent.java | 2 +- .../job/IotMachineEventGeneratorJob.java | 23 +++++++++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java index 65a20a6..8cef901 100644 --- a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java +++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java @@ -18,7 +18,7 @@ public class MachineIotDataReceivedEvent implements Serializable { private Long id; /** - * 数据来源 + * 数据来源(0机智云 1树根) */ private Integer dataSource; diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index d241048..3854bf2 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -15,6 +15,7 @@ import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializati import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema; import com.qniao.iot.machine.event.generator.config.ApolloConfig; import com.qniao.iot.machine.event.generator.constant.ConfigConstant; +import com.qniao.iot.rc.constant.DataSource; import com.rabbitmq.client.AMQP; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -66,6 +67,8 @@ import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.sql.SQLException; import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; @@ -121,16 +124,28 @@ public class IotMachineEventGeneratorJob { DataStreamSource dataStreamSource = env .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); - // 过滤掉工作状态但是产能为0的信息 SingleOutputStreamOperator streamOperator = dataStreamSource .filter(new RichFilterFunction() { @Override public boolean filter(MachineIotDataReceivedEvent value) { - Integer machineWorkingStat = value.getMachineWorkingStat(); - Long currJobCount = value.getCurrJobCount(); - return !(machineWorkingStat == 2 && currJobCount == 0); + Integer dataSource = value.getDataSource(); + boolean bool = true; + if (DataSource.TACT_CLOUD.equals(dataSource)) { + // 机智云(树根的校验不了) + Long currCount = value.getCurrCount(); + Integer machineWorkingStat = value.getMachineWorkingStat(); + bool = !(machineWorkingStat == 2 && currCount == 0); + } + if (bool && value.getMachinePwrStat() != null + && value.getMachineIotMac() != null + && value.getMachineWorkingStat() != null && value.getReportTime() != null) { + long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + // 晚30分钟的数据就不要了 + return nowTime - value.getReportTime() <= (30 * 60 * 1000); + } + return false; } }).name("machine iot data received event filter");