From 32b88321d208c26b387e69306fe2b915ae241d64 Mon Sep 17 00:00:00 2001 From: "lizhongkang@qniao.cn" Date: Fri, 30 Dec 2022 15:23:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=97=B6=E9=97=B4=E8=BF=87?= =?UTF-8?q?=E6=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RootCloudIotDataFormatterJob.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java index d8dbd1d..9ac0690 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java @@ -43,6 +43,7 @@ import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -54,12 +55,11 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.config.SaslConfigs; -import org.apache.flink.core.fs.Path; -import org.checkerframework.checker.units.qual.C; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; +import java.time.*; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -102,6 +102,20 @@ public class RootCloudIotDataFormatterJob { // 把树根的数据转成我们自己的格式 SingleOutputStreamOperator transformDs = env .fromSource(source, WatermarkStrategy.noWatermarks(), "AIRootCloudWarningDataReceivedEvent Source") + .filter(item -> { + Object time = item.get("timestamp"); + if (Objects.nonNull(time)) { + Instant instant = Instant.ofEpochMilli((long) time); + ZoneId zoneId = ZoneId.systemDefault(); + LocalDateTime dateTime = LocalDateTime.ofInstant(instant, zoneId); + LocalDateTime compareTime1 = LocalDateTime.of(LocalDate.now(), LocalTime.of(8, 0, 0)); + LocalDateTime compareTime2 = LocalDateTime.of(LocalDate.now(), LocalTime.of(12, 30, 0)); + LocalDateTime compareTime3 = LocalDateTime.of(LocalDate.now(), LocalTime.of(14, 0, 0)); + LocalDateTime compareTime4 = LocalDateTime.of(LocalDate.now(), LocalTime.of(19, 30, 0)); + return (dateTime.isBefore(compareTime1) || dateTime.isAfter(compareTime2)) && (dateTime.isBefore(compareTime3) || dateTime.isAfter(compareTime4)); + } + return false; + }) .map((MapFunction) RootCloudIotDataFormatterJob::transform) .name("Transform AIWarningDataReceivedEvent"); @@ -159,6 +173,7 @@ public class RootCloudIotDataFormatterJob { } catch (InterruptedException e) { e.printStackTrace(); } + AIWarningDataReceivedEvent aiWaringDataReceivedEvent = new AIWarningDataReceivedEvent(); Object object = event.get("payload"); if (Objects.nonNull(object)) {