diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index d20b001..977f978 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -31,7 +31,6 @@ import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; @@ -63,21 +62,21 @@ public class RootCloudIotDataFormatterJob { .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); + // 把树根的数据转成我们自己的格式 + SingleOutputStreamOperator transformDs = env + .fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source") + .map((MapFunction) MachineIotDataReceivedEvent::transform) + .name("Transform MachineIotDataReceivedEvent"); + // 发送到OSS存储 - DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); String outputPath = "oss://qn-flink-test/test"; - StreamingFileSink sink = StreamingFileSink.forRowFormat( + StreamingFileSink sink = StreamingFileSink.forRowFormat( new Path(outputPath), - new SimpleStringEncoder("UTF-8") + new SimpleStringEncoder("UTF-8") ).build(); - ds.addSink(sink); - - // 把树根的数据转成我们自己的格式 - SingleOutputStreamOperator transformDs = ds - .map((MapFunction) MachineIotDataReceivedEvent::transform) - .name("Transform MachineIotDataReceivedEvent"); + transformDs.addSink(sink); - // 转换后的格式发送到kafka + // 再发送到kafka队列中 transformDs.sinkTo( KafkaSink.builder() .setBootstrapServers("kafka:9092")