From 64a64144c110d037a06ce95741313696159563f4 Mon Sep 17 00:00:00 2001 From: zhoukunhua Date: Tue, 28 Jun 2022 16:05:18 +0800 Subject: [PATCH] oss demo --- .../iot/rc/RootCloudIotDataFormatterJob.java | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 04ac20c..68b53ee 100644 --- a/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -18,7 +18,16 @@ package com.qniao.iot.rc; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; /** * Skeleton for a Flink DataStream Job. @@ -35,31 +44,24 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class RootCloudIotDataFormatterJob { public static void main(String[] args) throws Exception { - // Sets up the execution environment, which is the main entry point - // to building Flink applications. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); + KafkaSource source = KafkaSource.builder() + .setBootstrapServers("kafka:9092") + .setTopics("test_topic") + .setGroupId("flink-kafka-demo") + .setStartingOffsets(OffsetsInitializer.earliest()) + .setValueOnlyDeserializer(new SimpleStringSchema()) + .build(); - /* - * Here, you can start creating your execution plan for Flink. - * - * Start with getting some data from the environment, like - * env.fromSequence(1, 10); - * - * then, transform the resulting DataStream using operations - * like - * .filter() - * .flatMap() - * .window() - * .process() - * - * and many more. - * Have a look at the programming guide: - * - * https://nightlies.apache.org/flink/flink-docs-stable/ - * - */ + DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); + String outputPath = "oss://qn-flink-test/test"; + StreamingFileSink sink = StreamingFileSink.forRowFormat( + new Path(outputPath), + new SimpleStringEncoder("UTF-8") + ).build(); + ds.addSink(sink); - // Execute program, beginning computation. - env.execute("Flink Java API Skeleton"); + env.execute("Kafka Job"); } }