diff --git a/src/main/java/com/qniao/iot/rc/KafkaMessage.java b/src/main/java/com/qniao/iot/rc/KafkaMessage.java index a2f293f..767233e 100644 --- a/src/main/java/com/qniao/iot/rc/KafkaMessage.java +++ b/src/main/java/com/qniao/iot/rc/KafkaMessage.java @@ -6,5 +6,13 @@ import lombok.Data; public class KafkaMessage { private String key; - private String value; + private Integer value; + + @Override + public String toString() { + return "KafkaMessage{" + + "key='" + key + '\'' + + ", value='" + value + '\'' + + '}'; + } } diff --git a/src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java b/src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java index af913a6..751c9cd 100644 --- a/src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java +++ b/src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java @@ -19,6 +19,6 @@ public class KafkaMessageSchema implements DeserializationSchema { @Override public TypeInformation getProducedType() { - return null; + return TypeInformation.of(KafkaMessage.class); } } diff --git a/src/main/java/com/qniao/iot/rc/Person.java b/src/main/java/com/qniao/iot/rc/Person.java deleted file mode 100644 index e323c7b..0000000 --- a/src/main/java/com/qniao/iot/rc/Person.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.qniao.iot.rc; - -import lombok.AllArgsConstructor; -import lombok.Data; - -@Data -@AllArgsConstructor -public class Person { - private String name; - - private int age; - - public String toString() { - return getClass().getName() + "@ " + "name=" + this.name + "age=" + age; - } -} diff --git a/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 40b9c8e..7d5c90e 100644 --- a/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -18,46 +18,35 @@ package com.qniao.iot.rc; -import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; /** * 根云数据格式化作业 */ public class RootCloudIotDataFormatterJob { - private static final Logger LOGGER = LoggerFactory.getLogger(RootCloudIotDataFormatterJob.class); + //private static final Logger LOGGER = LoggerFactory.getLogger(RootCloudIotDataFormatterJob.class); public static void main(String[] args) throws Exception { -// // 创建执行环境 -// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -// -// KafkaSource source = KafkaSource.builder() -// .setBootstrapServers("localhost:9092") -// .setTopics("test_topic") -// .setGroupId("flink-kafka-demo") -// .setStartingOffsets(OffsetsInitializer.earliest()) -// .setValueOnlyDeserializer(new KafkaMessageSchema()) -// .build(); -// -// DataStream dc = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); -// //env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); -// -// env.execute("Kafka Job"); - -// final StreamExecutionEnvironment env = -// StreamExecutionEnvironment.getExecutionEnvironment(); -// DataStream flintstones = env.fromElements( -// new Person("Fred", 35), -// new Person("Wilma", 35), -// new Person("Pebbles", 2)); -// DataStream adults = flintstones.filter((FilterFunction) person -> person.getAge() >= 18); -// LOGGER.info("===================>>> before print"); -// adults.print(); -// LOGGER.info("===================>>> before execute"); -// env.execute("Peron Job"); -// LOGGER.info("===================>>> after print"); + // 创建执行环境 + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + KafkaSource source = KafkaSource.builder() + .setBootstrapServers("localhost:9092") + .setTopics("test_topic") + .setGroupId("flink-kafka-demo") + .setStartingOffsets(OffsetsInitializer.latest()) + .setValueOnlyDeserializer(new KafkaMessageSchema()) + .build(); + + DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); + + ds.addSink(new PrintSinkFunction<>()); + + env.execute("Kafka Job"); } } diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties index 32557f2..8b3186f 100644 --- a/src/main/resources/log4j2.properties +++ b/src/main/resources/log4j2.properties @@ -18,7 +18,6 @@ rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender -rootLogger.appenderRef.file.ref = MainAppender appender.console.name = ConsoleAppender appender.console.type = CONSOLE