From c6d84dc3a4f5d297a50e47329bd5d9393c57a916 Mon Sep 17 00:00:00 2001 From: "lizhongkang@qniao.cn" Date: Sat, 2 Jul 2022 14:29:19 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9B=E6=9B=B4=E6=94=B9=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=88=B0OSS=E7=9A=84=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/rc/RootCloudIotDataFormatterJob.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index d20b001..977f978 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -31,7 +31,6 @@ import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; @@ -63,21 +62,21 @@ public class RootCloudIotDataFormatterJob { .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); + // 把树根的数据转成我们自己的格式 + SingleOutputStreamOperator transformDs = env + .fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source") + .map((MapFunction) MachineIotDataReceivedEvent::transform) + .name("Transform MachineIotDataReceivedEvent"); + // 发送到OSS存储 - DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); String outputPath = "oss://qn-flink-test/test"; - StreamingFileSink sink = StreamingFileSink.forRowFormat( + StreamingFileSink sink = StreamingFileSink.forRowFormat( new Path(outputPath), - new SimpleStringEncoder("UTF-8") + new SimpleStringEncoder("UTF-8") ).build(); - ds.addSink(sink); - - // 把树根的数据转成我们自己的格式 - SingleOutputStreamOperator transformDs = ds - .map((MapFunction) MachineIotDataReceivedEvent::transform) - .name("Transform MachineIotDataReceivedEvent"); + transformDs.addSink(sink); - // 转换后的格式发送到kafka + // 再发送到kafka队列中 transformDs.sinkTo( KafkaSink.builder() .setBootstrapServers("kafka:9092")