diff --git a/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java b/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudIotDataReceiptedEvent.java similarity index 97% rename from root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java rename to root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudIotDataReceiptedEvent.java index c0ca056..32b5672 100644 --- a/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java +++ b/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudIotDataReceiptedEvent.java @@ -11,7 +11,7 @@ import java.math.BigDecimal; * @date 2022/7/1 **/ @Data -public class RootCloudReceiptedEvent implements Serializable { +public class RootCloudIotDataReceiptedEvent implements Serializable { private static final long serialVersionUID = 1L; diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index b3c4f58..08a0221 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -18,7 +18,7 @@ package com.qniao.iot.rc; -import com.qniao.iot.rc.event.RootCloudReceiptedEventDeserializationSchema; +import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; @@ -41,25 +41,27 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin * *

If you change the name of the main class (with the public static void main(String[] args)) * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). + * + * @author Lzk */ public class RootCloudIotDataFormatterJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); - KafkaSource source = KafkaSource.builder() + KafkaSource source = KafkaSource.builder() .setBootstrapServers("kafka:9092") .setTopics("root_cloud_iot_report_data_event") .setGroupId("flink-kafka-demo") .setStartingOffsets(OffsetsInitializer.earliest()) - .setValueOnlyDeserializer(new RootCloudReceiptedEventDeserializationSchema()) + .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); - DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); + DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); String outputPath = "oss://qn-flink-test/test"; - StreamingFileSink sink = StreamingFileSink.forRowFormat( + StreamingFileSink sink = StreamingFileSink.forRowFormat( new Path(outputPath), - new SimpleStringEncoder("UTF-8") + new SimpleStringEncoder("UTF-8") ).build(); ds.addSink(sink); diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java new file mode 100644 index 0000000..f41d6d7 --- /dev/null +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java @@ -0,0 +1,33 @@ +package com.qniao.iot.rc.event; + +import com.qniao.iot.rc.RootCloudIotDataReceiptedEvent; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + +/** + * @author Lzk + */ +public class RootCloudIotDataReceiptedEventDeserializationSchema implements DeserializationSchema { + /** + * 注册JavaTimeModule,支持LocalDateTime字段的解析 + */ + final private ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public RootCloudIotDataReceiptedEvent deserialize(byte[] message) throws IOException { + return objectMapper.readValue(message, RootCloudIotDataReceiptedEvent.class); + } + + @Override + public boolean isEndOfStream(RootCloudIotDataReceiptedEvent nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(RootCloudIotDataReceiptedEvent.class); + } +} diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java deleted file mode 100644 index fce4568..0000000 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.qniao.iot.rc.event; - -import com.qniao.iot.rc.RootCloudReceiptedEvent; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; - -/** - * @author Lzk - */ -public class RootCloudReceiptedEventDeserializationSchema implements DeserializationSchema { - /** - * 注册JavaTimeModule,支持LocalDateTime字段的解析 - */ - final private ObjectMapper objectMapper = new ObjectMapper(); - - @Override - public RootCloudReceiptedEvent deserialize(byte[] message) throws IOException { - return objectMapper.readValue(message, RootCloudReceiptedEvent.class); - } - - @Override - public boolean isEndOfStream(RootCloudReceiptedEvent nextElement) { - return false; - } - - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(RootCloudReceiptedEvent.class); - } -}