getProducedType() {
+ return null;
+ }
+}
diff --git a/src/main/java/com/qniao/iot/rc/Person.java b/src/main/java/com/qniao/iot/rc/Person.java
new file mode 100644
index 0000000..e323c7b
--- /dev/null
+++ b/src/main/java/com/qniao/iot/rc/Person.java
@@ -0,0 +1,16 @@
+package com.qniao.iot.rc;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class Person {
+ private String name;
+
+ private int age;
+
+ public String toString() {
+ return getClass().getName() + "@ " + "name=" + this.name + "age=" + age;
+ }
+}
diff --git a/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
index 04ac20c..40b9c8e 100644
--- a/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
+++ b/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
@@ -18,48 +18,46 @@
package com.qniao.iot.rc;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Skeleton for a Flink DataStream Job.
- *
- * For a tutorial how to write a Flink application, check the
- * tutorials and examples on the Flink Website.
- *
- *
To package your application into a JAR file for execution, run
- * 'mvn clean package' on the command line.
- *
- *
If you change the name of the main class (with the public static void main(String[] args))
- * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
+ * 根云数据格式化作业
*/
public class RootCloudIotDataFormatterJob {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RootCloudIotDataFormatterJob.class);
public static void main(String[] args) throws Exception {
- // Sets up the execution environment, which is the main entry point
- // to building Flink applications.
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- /*
- * Here, you can start creating your execution plan for Flink.
- *
- * Start with getting some data from the environment, like
- * env.fromSequence(1, 10);
- *
- * then, transform the resulting DataStream using operations
- * like
- * .filter()
- * .flatMap()
- * .window()
- * .process()
- *
- * and many more.
- * Have a look at the programming guide:
- *
- * https://nightlies.apache.org/flink/flink-docs-stable/
- *
- */
+// // 创建执行环境
+// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+//
+// KafkaSource source = KafkaSource.builder()
+// .setBootstrapServers("localhost:9092")
+// .setTopics("test_topic")
+// .setGroupId("flink-kafka-demo")
+// .setStartingOffsets(OffsetsInitializer.earliest())
+// .setValueOnlyDeserializer(new KafkaMessageSchema())
+// .build();
+//
+// DataStream dc = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source");
+// //env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
+//
+// env.execute("Kafka Job");
- // Execute program, beginning computation.
- env.execute("Flink Java API Skeleton");
+// final StreamExecutionEnvironment env =
+// StreamExecutionEnvironment.getExecutionEnvironment();
+// DataStream flintstones = env.fromElements(
+// new Person("Fred", 35),
+// new Person("Wilma", 35),
+// new Person("Pebbles", 2));
+// DataStream adults = flintstones.filter((FilterFunction) person -> person.getAge() >= 18);
+// LOGGER.info("===================>>> before print");
+// adults.print();
+// LOGGER.info("===================>>> before execute");
+// env.execute("Peron Job");
+// LOGGER.info("===================>>> after print");
}
}
diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties
index 32c696e..32557f2 100644
--- a/src/main/resources/log4j2.properties
+++ b/src/main/resources/log4j2.properties
@@ -18,8 +18,9 @@
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
+rootLogger.appenderRef.file.ref = MainAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file