From bbadd8bab6c27efa18dc3a55304114ada93016fa Mon Sep 17 00:00:00 2001 From: zangkun Date: Tue, 21 Jun 2022 16:00:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=87=E4=BB=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 12 ++++ .../java/com/qniao/iot/rc/KafkaMessage.java | 10 +++ .../com/qniao/iot/rc/KafkaMessageSchema.java | 24 +++++++ src/main/java/com/qniao/iot/rc/Person.java | 16 +++++ .../iot/rc/RootCloudIotDataFormatterJob.java | 68 +++++++++---------- src/main/resources/log4j2.properties | 3 +- 6 files changed, 97 insertions(+), 36 deletions(-) create mode 100644 src/main/java/com/qniao/iot/rc/KafkaMessage.java create mode 100644 src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java create mode 100644 src/main/java/com/qniao/iot/rc/Person.java diff --git a/pom.xml b/pom.xml index 504a339..895bf18 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,18 @@ under the License. ${flink.version} + + org.projectlombok + lombok + 1.18.24 + + + + com.alibaba + fastjson + 2.0.7 + + diff --git a/src/main/java/com/qniao/iot/rc/KafkaMessage.java b/src/main/java/com/qniao/iot/rc/KafkaMessage.java new file mode 100644 index 0000000..a2f293f --- /dev/null +++ b/src/main/java/com/qniao/iot/rc/KafkaMessage.java @@ -0,0 +1,10 @@ +package com.qniao.iot.rc; + +import lombok.Data; + +@Data +public class KafkaMessage { + private String key; + + private String value; +} diff --git a/src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java b/src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java new file mode 100644 index 0000000..af913a6 --- /dev/null +++ b/src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java @@ -0,0 +1,24 @@ +package com.qniao.iot.rc; + + +import com.alibaba.fastjson.JSON; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +public class KafkaMessageSchema implements DeserializationSchema { + @Override + public KafkaMessage deserialize(byte[] message) { + // json 转成对象 + return JSON.parseObject(new String(message), KafkaMessage.class); + } + + @Override + public boolean isEndOfStream(KafkaMessage nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return null; + } +} diff --git a/src/main/java/com/qniao/iot/rc/Person.java b/src/main/java/com/qniao/iot/rc/Person.java new file mode 100644 index 0000000..e323c7b --- /dev/null +++ b/src/main/java/com/qniao/iot/rc/Person.java @@ -0,0 +1,16 @@ +package com.qniao.iot.rc; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class Person { + private String name; + + private int age; + + public String toString() { + return getClass().getName() + "@ " + "name=" + this.name + "age=" + age; + } +} diff --git a/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 04ac20c..40b9c8e 100644 --- a/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -18,48 +18,46 @@ package com.qniao.iot.rc; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Skeleton for a Flink DataStream Job. - * - *

For a tutorial how to write a Flink application, check the - * tutorials and examples on the Flink Website. - * - *

To package your application into a JAR file for execution, run - * 'mvn clean package' on the command line. - * - *

If you change the name of the main class (with the public static void main(String[] args)) - * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). + * 根云数据格式化作业 */ public class RootCloudIotDataFormatterJob { + private static final Logger LOGGER = LoggerFactory.getLogger(RootCloudIotDataFormatterJob.class); public static void main(String[] args) throws Exception { - // Sets up the execution environment, which is the main entry point - // to building Flink applications. - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - /* - * Here, you can start creating your execution plan for Flink. - * - * Start with getting some data from the environment, like - * env.fromSequence(1, 10); - * - * then, transform the resulting DataStream using operations - * like - * .filter() - * .flatMap() - * .window() - * .process() - * - * and many more. - * Have a look at the programming guide: - * - * https://nightlies.apache.org/flink/flink-docs-stable/ - * - */ +// // 创建执行环境 +// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// +// KafkaSource source = KafkaSource.builder() +// .setBootstrapServers("localhost:9092") +// .setTopics("test_topic") +// .setGroupId("flink-kafka-demo") +// .setStartingOffsets(OffsetsInitializer.earliest()) +// .setValueOnlyDeserializer(new KafkaMessageSchema()) +// .build(); +// +// DataStream dc = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); +// //env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); +// +// env.execute("Kafka Job"); - // Execute program, beginning computation. - env.execute("Flink Java API Skeleton"); +// final StreamExecutionEnvironment env = +// StreamExecutionEnvironment.getExecutionEnvironment(); +// DataStream flintstones = env.fromElements( +// new Person("Fred", 35), +// new Person("Wilma", 35), +// new Person("Pebbles", 2)); +// DataStream adults = flintstones.filter((FilterFunction) person -> person.getAge() >= 18); +// LOGGER.info("===================>>> before print"); +// adults.print(); +// LOGGER.info("===================>>> before execute"); +// env.execute("Peron Job"); +// LOGGER.info("===================>>> after print"); } } diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties index 32c696e..32557f2 100644 --- a/src/main/resources/log4j2.properties +++ b/src/main/resources/log4j2.properties @@ -18,8 +18,9 @@ rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender +rootLogger.appenderRef.file.ref = MainAppender appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file