Browse Source

调通了

zk-demo
zangkun 3 years ago
parent
commit
3338c50bb9
5 changed files with 31 additions and 51 deletions
  1. 10
      src/main/java/com/qniao/iot/rc/KafkaMessage.java
  2. 2
      src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java
  3. 16
      src/main/java/com/qniao/iot/rc/Person.java
  4. 53
      src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
  5. 1
      src/main/resources/log4j2.properties

10
src/main/java/com/qniao/iot/rc/KafkaMessage.java

@ -6,5 +6,13 @@ import lombok.Data;
public class KafkaMessage {
private String key;
private String value;
private Integer value;
@Override
public String toString() {
return "KafkaMessage{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
'}';
}
}

2
src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java

@ -19,6 +19,6 @@ public class KafkaMessageSchema implements DeserializationSchema<KafkaMessage> {
@Override
public TypeInformation<KafkaMessage> getProducedType() {
return null;
return TypeInformation.of(KafkaMessage.class);
}
}

16
src/main/java/com/qniao/iot/rc/Person.java

@ -1,16 +0,0 @@
package com.qniao.iot.rc;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Person {
private String name;
private int age;
public String toString() {
return getClass().getName() + "@ " + "name=" + this.name + "age=" + age;
}
}

53
src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -18,46 +18,35 @@
package com.qniao.iot.rc;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
/**
* 根云数据格式化作业
*/
public class RootCloudIotDataFormatterJob {
private static final Logger LOGGER = LoggerFactory.getLogger(RootCloudIotDataFormatterJob.class);
//private static final Logger LOGGER = LoggerFactory.getLogger(RootCloudIotDataFormatterJob.class);
public static void main(String[] args) throws Exception {
// // 创建执行环境
// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//
// KafkaSource<KafkaMessage> source = KafkaSource.<KafkaMessage>builder()
// .setBootstrapServers("localhost:9092")
// .setTopics("test_topic")
// .setGroupId("flink-kafka-demo")
// .setStartingOffsets(OffsetsInitializer.earliest())
// .setValueOnlyDeserializer(new KafkaMessageSchema())
// .build();
//
// DataStream<KafkaMessage> dc = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source");
// //env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
//
// env.execute("Kafka Job");
// final StreamExecutionEnvironment env =
// StreamExecutionEnvironment.getExecutionEnvironment();
// DataStream<Person> flintstones = env.fromElements(
// new Person("Fred", 35),
// new Person("Wilma", 35),
// new Person("Pebbles", 2));
// DataStream<Person> adults = flintstones.filter((FilterFunction<Person>) person -> person.getAge() >= 18);
// LOGGER.info("===================>>> before print");
// adults.print();
// LOGGER.info("===================>>> before execute");
// env.execute("Peron Job");
// LOGGER.info("===================>>> after print");
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<KafkaMessage> source = KafkaSource.<KafkaMessage>builder()
.setBootstrapServers("localhost:9092")
.setTopics("test_topic")
.setGroupId("flink-kafka-demo")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new KafkaMessageSchema())
.build();
DataStream<KafkaMessage> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source");
ds.addSink(new PrintSinkFunction<>());
env.execute("Kafka Job");
}
}

1
src/main/resources/log4j2.properties

@ -18,7 +18,6 @@
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.file.ref = MainAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE

Loading…
Cancel
Save