Browse Source

更新

hph_优化版本
1049970895@qniao.cn 3 years ago
parent
commit
ee6b20d3f8
3 changed files with 23 additions and 9 deletions
  1. 13
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
  2. 8
      root-cloud-statistics/src/test/java/com/qniao/iot/rc/EsRestClientService.java
  3. 11
      root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java

13
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -32,6 +32,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.sink.KafkaSink;
@ -46,6 +47,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
@ -54,11 +56,13 @@ import org.apache.kafka.common.config.SaslConfigs;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.security.Provider; import java.security.Provider;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit;
/** /**
* Skeleton for a Flink DataStream Job. * Skeleton for a Flink DataStream Job.
@ -161,7 +165,14 @@ public class RootCloudIotDataFormatterJob {
public SimpleVersionedSerializer<String> getSerializer() { public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE; return SimpleVersionedStringSerializer.INSTANCE;
} }
}).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build();
}).withRollingPolicy(DefaultRollingPolicy.builder()
// 每隔多长时间生成一个文件
.withRolloverInterval(Duration.ofHours(12))
// 默认60秒,未写入数据处于不活跃状态超时会滚动新文件
.withInactivityInterval(Duration.ofHours(12))
// 设置每个文件的最大大小 ,默认是128M
.withMaxPartSize(MemorySize.ofMebiBytes(128 * 1024 * 1024))
.build()).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build();
transformDs.map(new RichMapFunction<MachineIotDataReceivedEvent, String>() { transformDs.map(new RichMapFunction<MachineIotDataReceivedEvent, String>() {

8
root-cloud-statistics/src/test/java/com/qniao/iot/rc/EsRestClientService.java

@ -25,12 +25,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
/**
* 阿里云服务器搭建的ES服务
*
* @author lizixian
* @date 2020/3/16 10:41
*/
public class EsRestClientService { public class EsRestClientService {
private String host = "120.79.137.137:9200"; private String host = "120.79.137.137:9200";
@ -57,8 +51,6 @@ public class EsRestClientService {
/** /**
* 分页查询应设备应用安装列表-使用游标 * 分页查询应设备应用安装列表-使用游标
* *
* @author lizixian
* @date 2020/5/10 18:01
*/ */
public void queryDeviceListPage(String scrollId, Map<String, Object> map, int count) { public void queryDeviceListPage(String scrollId, Map<String, Object> map, int count) {

11
root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java

@ -0,0 +1,11 @@
package com.qniao.iot.rc;
import java.time.Duration;
public class TestDemo {
public static void main(String[] args) {
System.out.println(Duration.ofMillis(5).toMillis());
}
}
Loading…
Cancel
Save