From a050926ab2bc40f2f535b15f06cd2b994848ed2e Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Mon, 1 Aug 2022 13:48:06 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9oss=20sink?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/rc/RootCloudIotDataFormatterJob.java | 60 +++++++++++++++++-- 1 file changed, 55 insertions(+), 5 deletions(-) diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 3f64b4f..4074bd8 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -18,6 +18,8 @@ package com.qniao.iot.rc; +import cn.hutool.core.util.CharsetUtil; +import cn.hutool.json.JSONUtil; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.rc.config.ApolloConfig; import com.qniao.iot.rc.constant.ConfigConstant; @@ -28,6 +30,7 @@ import com.qniao.iot.rc.until.SnowFlake; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; @@ -35,10 +38,14 @@ import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.ProducerConfig; @@ -47,6 +54,10 @@ import org.apache.kafka.common.config.SaslConfigs; import java.math.BigDecimal; import java.security.Provider; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.*; /** @@ -98,7 +109,7 @@ public class RootCloudIotDataFormatterJob { kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); kafkaProducerConfig.setProperty("sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); // 写入kafka transformDs.sinkTo( KafkaSink.builder() @@ -115,11 +126,50 @@ public class RootCloudIotDataFormatterJob { // 发送到OSS存储 String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); - StreamingFileSink sink = StreamingFileSink.forRowFormat( + StreamingFileSink sink = StreamingFileSink.forRowFormat( new Path(outputPath), - new SimpleStringEncoder("UTF-8") - ).build(); - transformDs.addSink(sink); + new SimpleStringEncoder(CharsetUtil.UTF_8) + ).withBucketAssigner(new BucketAssigner() { + @Override + public String getBucketId(String element, Context context) { + + MachineIotDataReceivedEvent receivedEvent = JSONUtil.toBean(element, MachineIotDataReceivedEvent.class); + // 指定以日期的格式生成桶目录 + Long receivedTime = receivedEvent.getReceivedTime(); + // 获取设备状态目录名称 + String deviceStatusStr = getDeviceStatusStr(receivedEvent.getMachineWorkingStat()); + return deviceStatusStr + "/" + LocalDateTime.ofInstant(Instant.ofEpochMilli(receivedTime), ZoneOffset.of("+8")) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "/" + receivedEvent.getMachineIotMac(); + } + + private String getDeviceStatusStr(Integer machineWorkingStat) { + + if(machineWorkingStat == null) { + return "deviceOff"; + }else { + if (machineWorkingStat == 1) { + return "deviceWorking"; + } else if (machineWorkingStat == 2) { + return "deviceWaiting"; + } else { + return "deviceOff"; + } + } + } + + @Override + public SimpleVersionedSerializer getSerializer() { + return SimpleVersionedStringSerializer.INSTANCE; + } + }).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build(); + + + transformDs.map(new RichMapFunction() { + @Override + public String map(MachineIotDataReceivedEvent value) { + return JSONUtil.toJsonStr(value); + } + }).addSink(sink); env.execute("root cloud iot data formatter job"); }