diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 4074bd8..27ed43d 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; @@ -46,6 +47,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.ProducerConfig; @@ -54,11 +56,13 @@ import org.apache.kafka.common.config.SaslConfigs; import java.math.BigDecimal; import java.security.Provider; +import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.concurrent.TimeUnit; /** * Skeleton for a Flink DataStream Job. @@ -161,7 +165,14 @@ public class RootCloudIotDataFormatterJob { public SimpleVersionedSerializer getSerializer() { return SimpleVersionedStringSerializer.INSTANCE; } - }).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build(); + }).withRollingPolicy(DefaultRollingPolicy.builder() + // 每隔多长时间生成一个文件 + .withRolloverInterval(Duration.ofHours(12)) + // 默认60秒,未写入数据处于不活跃状态超时会滚动新文件 + .withInactivityInterval(Duration.ofHours(12)) + // 设置每个文件的最大大小 ,默认是128M + .withMaxPartSize(MemorySize.ofMebiBytes(128 * 1024 * 1024)) + .build()).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build(); transformDs.map(new RichMapFunction() { diff --git a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/EsRestClientService.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/EsRestClientService.java index 4213a49..773fbe1 100644 --- a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/EsRestClientService.java +++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/EsRestClientService.java @@ -25,12 +25,6 @@ import java.util.List; import java.util.Map; -/** - * 阿里云服务器搭建的ES服务 - * - * @author lizixian - * @date 2020/3/16 10:41 - */ public class EsRestClientService { private String host = "120.79.137.137:9200"; @@ -57,8 +51,6 @@ public class EsRestClientService { /** * 分页查询应设备应用安装列表-使用游标 * - * @author lizixian - * @date 2020/5/10 18:01 */ public void queryDeviceListPage(String scrollId, Map map, int count) { diff --git a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java new file mode 100644 index 0000000..526a1ee --- /dev/null +++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java @@ -0,0 +1,11 @@ +package com.qniao.iot.rc; + +import java.time.Duration; + +public class TestDemo { + + public static void main(String[] args) { + + System.out.println(Duration.ofMillis(5).toMillis()); + } +}