Compare commits

...

4 Commits

Author SHA1 Message Date
zangkun 24f1855cad Revert "备份" 3 years ago
zangkun 3338c50bb9 调通了 3 years ago
周坤华 780dcd8d54 version 3 years ago
zangkun bbadd8bab6 备份 3 years ago
6 changed files with 77 additions and 36 deletions
Split View
  1. 1
      .gitignore
  2. 15
      pom.xml
  3. 18
      src/main/java/com/qniao/iot/rc/KafkaMessage.java
  4. 24
      src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java
  5. 53
      src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
  6. 2
      src/main/resources/log4j2.properties

1
.gitignore

@ -7,6 +7,7 @@
# ---> Java
# Compiled class file
*.class
target/
# Log file
*.log

15
pom.xml

@ -43,13 +43,12 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
@ -58,6 +57,18 @@ under the License.
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.7</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>

18
src/main/java/com/qniao/iot/rc/KafkaMessage.java

@ -0,0 +1,18 @@
package com.qniao.iot.rc;
import lombok.Data;
@Data
public class KafkaMessage {
private String key;
private Integer value;
@Override
public String toString() {
return "KafkaMessage{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
'}';
}
}

24
src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java

@ -0,0 +1,24 @@
package com.qniao.iot.rc;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
public class KafkaMessageSchema implements DeserializationSchema<KafkaMessage> {
@Override
public KafkaMessage deserialize(byte[] message) {
// json 转成对象
return JSON.parseObject(new String(message), KafkaMessage.class);
}
@Override
public boolean isEndOfStream(KafkaMessage nextElement) {
return false;
}
@Override
public TypeInformation<KafkaMessage> getProducedType() {
return TypeInformation.of(KafkaMessage.class);
}
}

53
src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -18,48 +18,35 @@
package com.qniao.iot.rc;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
/**
* Skeleton for a Flink DataStream Job.
*
* <p>For a tutorial how to write a Flink application, check the
* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
* <p>If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
* 根云数据格式化作业
*/
public class RootCloudIotDataFormatterJob {
//private static final Logger LOGGER = LoggerFactory.getLogger(RootCloudIotDataFormatterJob.class);
public static void main(String[] args) throws Exception {
// Sets up the execution environment, which is the main entry point
// to building Flink applications.
// 创建执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
* Here, you can start creating your execution plan for Flink.
*
* Start with getting some data from the environment, like
* env.fromSequence(1, 10);
*
* then, transform the resulting DataStream<Long> using operations
* like
* .filter()
* .flatMap()
* .window()
* .process()
*
* and many more.
* Have a look at the programming guide:
*
* https://nightlies.apache.org/flink/flink-docs-stable/
*
*/
KafkaSource<KafkaMessage> source = KafkaSource.<KafkaMessage>builder()
.setBootstrapServers("kafka:9092")
.setTopics("test_topic")
.setGroupId("flink-kafka-demo")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new KafkaMessageSchema())
.build();
DataStream<KafkaMessage> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source");
ds.addSink(new PrintSinkFunction<>());
// Execute program, beginning computation.
env.execute("Flink Java API Skeleton");
env.execute("Kafka Job");
}
}

2
src/main/resources/log4j2.properties

@ -22,4 +22,4 @@ rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Loading…
Cancel
Save