Browse Source

oss demo

hph_优化版本
周坤华 3 years ago
parent
commit
64a64144c1
1 changed files with 25 additions and 23 deletions
  1. 48
      src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

48
src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -18,7 +18,16 @@
package com.qniao.iot.rc;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
/**
* Skeleton for a Flink DataStream Job.
@ -35,31 +44,24 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class RootCloudIotDataFormatterJob {
public static void main(String[] args) throws Exception {
// Sets up the execution environment, which is the main entry point
// to building Flink applications.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("test_topic")
.setGroupId("flink-kafka-demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
/*
* Here, you can start creating your execution plan for Flink.
*
* Start with getting some data from the environment, like
* env.fromSequence(1, 10);
*
* then, transform the resulting DataStream<Long> using operations
* like
* .filter()
* .flatMap()
* .window()
* .process()
*
* and many more.
* Have a look at the programming guide:
*
* https://nightlies.apache.org/flink/flink-docs-stable/
*
*/
DataStream<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source");
String outputPath = "oss://qn-flink-test/test";
StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
new Path(outputPath),
new SimpleStringEncoder<String>("UTF-8")
).build();
ds.addSink(sink);
// Execute program, beginning computation.
env.execute("Flink Java API Skeleton");
env.execute("Kafka Job");
}
}
Loading…
Cancel
Save