Browse Source

备份

zk-demo
zangkun 3 years ago
parent
commit
bbadd8bab6
6 changed files with 97 additions and 36 deletions
  1. 12
      pom.xml
  2. 10
      src/main/java/com/qniao/iot/rc/KafkaMessage.java
  3. 24
      src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java
  4. 16
      src/main/java/com/qniao/iot/rc/Person.java
  5. 68
      src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
  6. 3
      src/main/resources/log4j2.properties

12
pom.xml

@ -58,6 +58,18 @@ under the License.
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.7</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>

10
src/main/java/com/qniao/iot/rc/KafkaMessage.java

@ -0,0 +1,10 @@
package com.qniao.iot.rc;
import lombok.Data;
@Data
public class KafkaMessage {
private String key;
private String value;
}

24
src/main/java/com/qniao/iot/rc/KafkaMessageSchema.java

@ -0,0 +1,24 @@
package com.qniao.iot.rc;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
public class KafkaMessageSchema implements DeserializationSchema<KafkaMessage> {
@Override
public KafkaMessage deserialize(byte[] message) {
// json 转成对象
return JSON.parseObject(new String(message), KafkaMessage.class);
}
@Override
public boolean isEndOfStream(KafkaMessage nextElement) {
return false;
}
@Override
public TypeInformation<KafkaMessage> getProducedType() {
return null;
}
}

16
src/main/java/com/qniao/iot/rc/Person.java

@ -0,0 +1,16 @@
package com.qniao.iot.rc;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Person {
private String name;
private int age;
public String toString() {
return getClass().getName() + "@ " + "name=" + this.name + "age=" + age;
}
}

68
src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -18,48 +18,46 @@
package com.qniao.iot.rc;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Skeleton for a Flink DataStream Job.
*
* <p>For a tutorial how to write a Flink application, check the
* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
* <p>If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
* 根云数据格式化作业
*/
public class RootCloudIotDataFormatterJob {
private static final Logger LOGGER = LoggerFactory.getLogger(RootCloudIotDataFormatterJob.class);
public static void main(String[] args) throws Exception {
// Sets up the execution environment, which is the main entry point
// to building Flink applications.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
* Here, you can start creating your execution plan for Flink.
*
* Start with getting some data from the environment, like
* env.fromSequence(1, 10);
*
* then, transform the resulting DataStream<Long> using operations
* like
* .filter()
* .flatMap()
* .window()
* .process()
*
* and many more.
* Have a look at the programming guide:
*
* https://nightlies.apache.org/flink/flink-docs-stable/
*
*/
// // 创建执行环境
// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//
// KafkaSource<KafkaMessage> source = KafkaSource.<KafkaMessage>builder()
// .setBootstrapServers("localhost:9092")
// .setTopics("test_topic")
// .setGroupId("flink-kafka-demo")
// .setStartingOffsets(OffsetsInitializer.earliest())
// .setValueOnlyDeserializer(new KafkaMessageSchema())
// .build();
//
// DataStream<KafkaMessage> dc = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source");
// //env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
//
// env.execute("Kafka Job");
// Execute program, beginning computation.
env.execute("Flink Java API Skeleton");
// final StreamExecutionEnvironment env =
// StreamExecutionEnvironment.getExecutionEnvironment();
// DataStream<Person> flintstones = env.fromElements(
// new Person("Fred", 35),
// new Person("Wilma", 35),
// new Person("Pebbles", 2));
// DataStream<Person> adults = flintstones.filter((FilterFunction<Person>) person -> person.getAge() >= 18);
// LOGGER.info("===================>>> before print");
// adults.print();
// LOGGER.info("===================>>> before execute");
// env.execute("Peron Job");
// LOGGER.info("===================>>> after print");
}
}

3
src/main/resources/log4j2.properties

@ -18,8 +18,9 @@
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.file.ref = MainAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Loading…
Cancel
Save