From e158ccec4db096b0c395c95cd9e5c02daeec070b Mon Sep 17 00:00:00 2001 From: "lizhongkang@qniao.cn" Date: Fri, 1 Jul 2022 17:52:48 +0800 Subject: [PATCH] =?UTF-8?q?style=20:=20=E6=9B=B4=E6=94=B9=E7=B1=BB?= =?UTF-8?q?=E5=90=8D=E7=A7=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...va => RootCloudIotDataReceiptedEvent.java} | 2 +- .../iot/rc/RootCloudIotDataFormatterJob.java | 14 ++++---- ...taReceiptedEventDeserializationSchema.java | 33 +++++++++++++++++++ ...udReceiptedEventDeserializationSchema.java | 33 ------------------- 4 files changed, 42 insertions(+), 40 deletions(-) rename root-cloud-event/src/main/java/com/qniao/iot/rc/{RootCloudReceiptedEvent.java => RootCloudIotDataReceiptedEvent.java} (97%) create mode 100644 root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java delete mode 100644 root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java diff --git a/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java b/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudIotDataReceiptedEvent.java similarity index 97% rename from root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java rename to root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudIotDataReceiptedEvent.java index c0ca056..32b5672 100644 --- a/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java +++ b/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudIotDataReceiptedEvent.java @@ -11,7 +11,7 @@ import java.math.BigDecimal; * @date 2022/7/1 **/ @Data -public class RootCloudReceiptedEvent implements Serializable { +public class RootCloudIotDataReceiptedEvent implements Serializable { private static final long serialVersionUID = 1L; diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index b3c4f58..08a0221 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -18,7 +18,7 @@ package com.qniao.iot.rc; -import com.qniao.iot.rc.event.RootCloudReceiptedEventDeserializationSchema; +import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; @@ -41,25 +41,27 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin * *

If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). + * + * @author Lzk */ public class RootCloudIotDataFormatterJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); - KafkaSource source = KafkaSource.builder() + KafkaSource source = KafkaSource.builder() .setBootstrapServers("kafka:9092") .setTopics("root_cloud_iot_report_data_event") .setGroupId("flink-kafka-demo") .setStartingOffsets(OffsetsInitializer.earliest()) - .setValueOnlyDeserializer(new RootCloudReceiptedEventDeserializationSchema()) + .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); - DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); + DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); String outputPath = "oss://qn-flink-test/test"; - StreamingFileSink sink = StreamingFileSink.forRowFormat( + StreamingFileSink sink = StreamingFileSink.forRowFormat( new Path(outputPath), - new SimpleStringEncoder("UTF-8") + new SimpleStringEncoder("UTF-8") ).build(); ds.addSink(sink); diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java new file mode 100644 index 0000000..f41d6d7 --- /dev/null +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java @@ -0,0 +1,33 @@ +package com.qniao.iot.rc.event; + +import com.qniao.iot.rc.RootCloudIotDataReceiptedEvent; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + +/** + * @author Lzk + */ +public class RootCloudIotDataReceiptedEventDeserializationSchema implements DeserializationSchema { + /** + * 注册JavaTimeModule,支持LocalDateTime字段的解析 + */ + final private ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public RootCloudIotDataReceiptedEvent deserialize(byte[] message) throws IOException { + return objectMapper.readValue(message, RootCloudIotDataReceiptedEvent.class); + } + + @Override + public boolean isEndOfStream(RootCloudIotDataReceiptedEvent nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(RootCloudIotDataReceiptedEvent.class); + } +} diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java deleted file mode 100644 index fce4568..0000000 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.qniao.iot.rc.event; - -import com.qniao.iot.rc.RootCloudReceiptedEvent; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; - -/** - * @author Lzk - */ -public class RootCloudReceiptedEventDeserializationSchema implements DeserializationSchema { - /** - * 注册JavaTimeModule,支持LocalDateTime字段的解析 - */ - final private ObjectMapper objectMapper = new ObjectMapper(); - - @Override - public RootCloudReceiptedEvent deserialize(byte[] message) throws IOException { - return objectMapper.readValue(message, RootCloudReceiptedEvent.class); - } - - @Override - public boolean isEndOfStream(RootCloudReceiptedEvent nextElement) { - return false; - } - - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(RootCloudReceiptedEvent.class); - } -}