|
|
|
@ -31,7 +31,6 @@ import org.apache.flink.connector.kafka.source.KafkaSource; |
|
|
|
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
|
|
|
import org.apache.flink.core.fs.Path; |
|
|
|
import org.apache.flink.streaming.api.CheckpointingMode; |
|
|
|
import org.apache.flink.streaming.api.datastream.DataStream; |
|
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
|
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; |
|
|
|
@ -63,21 +62,21 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) |
|
|
|
.build(); |
|
|
|
|
|
|
|
// 把树根的数据转成我们自己的格式 |
|
|
|
SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = env |
|
|
|
.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source") |
|
|
|
.map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) MachineIotDataReceivedEvent::transform) |
|
|
|
.name("Transform MachineIotDataReceivedEvent"); |
|
|
|
|
|
|
|
// 发送到OSS存储 |
|
|
|
DataStream<RootCloudIotDataReceiptedEvent> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); |
|
|
|
String outputPath = "oss://qn-flink-test/test"; |
|
|
|
StreamingFileSink<RootCloudIotDataReceiptedEvent> sink = StreamingFileSink.forRowFormat( |
|
|
|
StreamingFileSink<MachineIotDataReceivedEvent> sink = StreamingFileSink.forRowFormat( |
|
|
|
new Path(outputPath), |
|
|
|
new SimpleStringEncoder<RootCloudIotDataReceiptedEvent>("UTF-8") |
|
|
|
new SimpleStringEncoder<MachineIotDataReceivedEvent>("UTF-8") |
|
|
|
).build(); |
|
|
|
ds.addSink(sink); |
|
|
|
|
|
|
|
// 把树根的数据转成我们自己的格式 |
|
|
|
SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = ds |
|
|
|
.map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) MachineIotDataReceivedEvent::transform) |
|
|
|
.name("Transform MachineIotDataReceivedEvent"); |
|
|
|
transformDs.addSink(sink); |
|
|
|
|
|
|
|
// 转换后的格式发送到kafka |
|
|
|
// 再发送到kafka队列中 |
|
|
|
transformDs.sinkTo( |
|
|
|
KafkaSink.<MachineIotDataReceivedEvent>builder() |
|
|
|
.setBootstrapServers("kafka:9092") |
|
|
|
|