|
|
@ -18,7 +18,7 @@ |
|
|
|
|
|
|
|
|
package com.qniao.iot.rc; |
|
|
package com.qniao.iot.rc; |
|
|
|
|
|
|
|
|
import com.qniao.iot.rc.event.RootCloudReceiptedEventDeserializationSchema; |
|
|
|
|
|
|
|
|
import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema; |
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
import org.apache.flink.api.common.serialization.SimpleStringEncoder; |
|
|
import org.apache.flink.api.common.serialization.SimpleStringEncoder; |
|
|
import org.apache.flink.api.common.serialization.SimpleStringSchema; |
|
|
import org.apache.flink.api.common.serialization.SimpleStringSchema; |
|
|
@ -41,25 +41,27 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin |
|
|
* |
|
|
* |
|
|
* <p>If you change the name of the main class (with the public static void main(String[] args)) |
|
|
* <p>If you change the name of the main class (with the public static void main(String[] args)) |
|
|
* method, change the respective entry in the POM.xml file (simply search for 'mainClass'). |
|
|
* method, change the respective entry in the POM.xml file (simply search for 'mainClass'). |
|
|
|
|
|
* |
|
|
|
|
|
* @author Lzk |
|
|
*/ |
|
|
*/ |
|
|
public class RootCloudIotDataFormatterJob { |
|
|
public class RootCloudIotDataFormatterJob { |
|
|
|
|
|
|
|
|
public static void main(String[] args) throws Exception { |
|
|
public static void main(String[] args) throws Exception { |
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|
|
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|
|
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|
|
KafkaSource<RootCloudReceiptedEvent> source = KafkaSource.<RootCloudReceiptedEvent>builder() |
|
|
|
|
|
|
|
|
KafkaSource<RootCloudIotDataReceiptedEvent> source = KafkaSource.<RootCloudIotDataReceiptedEvent>builder() |
|
|
.setBootstrapServers("kafka:9092") |
|
|
.setBootstrapServers("kafka:9092") |
|
|
.setTopics("root_cloud_iot_report_data_event") |
|
|
.setTopics("root_cloud_iot_report_data_event") |
|
|
.setGroupId("flink-kafka-demo") |
|
|
.setGroupId("flink-kafka-demo") |
|
|
.setStartingOffsets(OffsetsInitializer.earliest()) |
|
|
.setStartingOffsets(OffsetsInitializer.earliest()) |
|
|
.setValueOnlyDeserializer(new RootCloudReceiptedEventDeserializationSchema()) |
|
|
|
|
|
|
|
|
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) |
|
|
.build(); |
|
|
.build(); |
|
|
|
|
|
|
|
|
DataStream<RootCloudReceiptedEvent> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); |
|
|
|
|
|
|
|
|
DataStream<RootCloudIotDataReceiptedEvent> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); |
|
|
String outputPath = "oss://qn-flink-test/test"; |
|
|
String outputPath = "oss://qn-flink-test/test"; |
|
|
StreamingFileSink<RootCloudReceiptedEvent> sink = StreamingFileSink.forRowFormat( |
|
|
|
|
|
|
|
|
StreamingFileSink<RootCloudIotDataReceiptedEvent> sink = StreamingFileSink.forRowFormat( |
|
|
new Path(outputPath), |
|
|
new Path(outputPath), |
|
|
new SimpleStringEncoder<RootCloudReceiptedEvent>("UTF-8") |
|
|
|
|
|
|
|
|
new SimpleStringEncoder<RootCloudIotDataReceiptedEvent>("UTF-8") |
|
|
).build(); |
|
|
).build(); |
|
|
ds.addSink(sink); |
|
|
ds.addSink(sink); |
|
|
|
|
|
|
|
|
|