|
|
|
@ -18,48 +18,35 @@ |
|
|
|
|
|
|
|
package com.qniao.iot.rc; |
|
|
|
|
|
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
|
import org.apache.flink.connector.kafka.source.KafkaSource; |
|
|
|
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
|
|
|
import org.apache.flink.streaming.api.datastream.DataStream; |
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
|
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; |
|
|
|
|
|
|
|
/** |
|
|
|
* Skeleton for a Flink DataStream Job. |
|
|
|
* |
|
|
|
* <p>For a tutorial how to write a Flink application, check the |
|
|
|
* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>. |
|
|
|
* |
|
|
|
* <p>To package your application into a JAR file for execution, run |
|
|
|
* 'mvn clean package' on the command line. |
|
|
|
* |
|
|
|
* <p>If you change the name of the main class (with the public static void main(String[] args)) |
|
|
|
* method, change the respective entry in the POM.xml file (simply search for 'mainClass'). |
|
|
|
* 根云数据格式化作业 |
|
|
|
*/ |
|
|
|
public class RootCloudIotDataFormatterJob { |
|
|
|
//private static final Logger LOGGER = LoggerFactory.getLogger(RootCloudIotDataFormatterJob.class); |
|
|
|
|
|
|
|
public static void main(String[] args) throws Exception { |
|
|
|
// Sets up the execution environment, which is the main entry point |
|
|
|
// to building Flink applications. |
|
|
|
// 创建执行环境 |
|
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|
|
|
|
|
|
|
/* |
|
|
|
* Here, you can start creating your execution plan for Flink. |
|
|
|
* |
|
|
|
* Start with getting some data from the environment, like |
|
|
|
* env.fromSequence(1, 10); |
|
|
|
* |
|
|
|
* then, transform the resulting DataStream<Long> using operations |
|
|
|
* like |
|
|
|
* .filter() |
|
|
|
* .flatMap() |
|
|
|
* .window() |
|
|
|
* .process() |
|
|
|
* |
|
|
|
* and many more. |
|
|
|
* Have a look at the programming guide: |
|
|
|
* |
|
|
|
* https://nightlies.apache.org/flink/flink-docs-stable/ |
|
|
|
* |
|
|
|
*/ |
|
|
|
KafkaSource<KafkaMessage> source = KafkaSource.<KafkaMessage>builder() |
|
|
|
.setBootstrapServers("kafka:9092") |
|
|
|
.setTopics("test_topic") |
|
|
|
.setGroupId("flink-kafka-demo") |
|
|
|
.setStartingOffsets(OffsetsInitializer.latest()) |
|
|
|
.setValueOnlyDeserializer(new KafkaMessageSchema()) |
|
|
|
.build(); |
|
|
|
|
|
|
|
DataStream<KafkaMessage> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); |
|
|
|
|
|
|
|
ds.addSink(new PrintSinkFunction<>()); |
|
|
|
|
|
|
|
// Execute program, beginning computation. |
|
|
|
env.execute("Flink Java API Skeleton"); |
|
|
|
env.execute("Kafka Job"); |
|
|
|
} |
|
|
|
} |