Browse Source

增加时间过滤

master
lizhongkang@qniao.cn 3 years ago
parent
commit
32b88321d2
1 changed files with 17 additions and 2 deletions
  1. 19
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java

19
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java

@ -43,6 +43,7 @@ import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@ -54,12 +55,11 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SaslConfigs;
import org.apache.flink.core.fs.Path;
import org.checkerframework.checker.units.qual.C;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.time.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -102,6 +102,20 @@ public class RootCloudIotDataFormatterJob {
// 把树根的数据转成我们自己的格式 // 把树根的数据转成我们自己的格式
SingleOutputStreamOperator<AIWarningDataReceivedEvent> transformDs = env SingleOutputStreamOperator<AIWarningDataReceivedEvent> transformDs = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "AIRootCloudWarningDataReceivedEvent Source") .fromSource(source, WatermarkStrategy.noWatermarks(), "AIRootCloudWarningDataReceivedEvent Source")
.filter(item -> {
Object time = item.get("timestamp");
if (Objects.nonNull(time)) {
Instant instant = Instant.ofEpochMilli((long) time);
ZoneId zoneId = ZoneId.systemDefault();
LocalDateTime dateTime = LocalDateTime.ofInstant(instant, zoneId);
LocalDateTime compareTime1 = LocalDateTime.of(LocalDate.now(), LocalTime.of(8, 0, 0));
LocalDateTime compareTime2 = LocalDateTime.of(LocalDate.now(), LocalTime.of(12, 30, 0));
LocalDateTime compareTime3 = LocalDateTime.of(LocalDate.now(), LocalTime.of(14, 0, 0));
LocalDateTime compareTime4 = LocalDateTime.of(LocalDate.now(), LocalTime.of(19, 30, 0));
return (dateTime.isBefore(compareTime1) || dateTime.isAfter(compareTime2)) && (dateTime.isBefore(compareTime3) || dateTime.isAfter(compareTime4));
}
return false;
})
.map((MapFunction<JSONObject, AIWarningDataReceivedEvent>) RootCloudIotDataFormatterJob::transform) .map((MapFunction<JSONObject, AIWarningDataReceivedEvent>) RootCloudIotDataFormatterJob::transform)
.name("Transform AIWarningDataReceivedEvent"); .name("Transform AIWarningDataReceivedEvent");
@ -159,6 +173,7 @@ public class RootCloudIotDataFormatterJob {
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
AIWarningDataReceivedEvent aiWaringDataReceivedEvent = new AIWarningDataReceivedEvent(); AIWarningDataReceivedEvent aiWaringDataReceivedEvent = new AIWarningDataReceivedEvent();
Object object = event.get("payload"); Object object = event.get("payload");
if (Objects.nonNull(object)) { if (Objects.nonNull(object)) {

Loading…
Cancel
Save