Browse Source

初始化

master
zangkun 3 years ago
parent
commit
63722e83be
8 changed files with 361 additions and 2 deletions
  1. 2
      README.md
  2. 61
      iot-machine-data-event/pom.xml
  3. 73
      iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java
  4. 32
      iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventKafkaDeserializationSchema.java
  5. 22
      iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventKafkaSerializationSchema.java
  6. 143
      iot-machine-state-event-generator-job/pom.xml
  7. 23
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java
  8. 7
      pom.xml

2
README.md

@ -1,2 +1,2 @@
# iot-machine-status-event-generator
# iot-machine-state-event-generator

61
iot-machine-data-event/pom.xml

@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iot-machine-state-event-generator</artifactId>
<groupId>com.qniao</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>iot-machine-data-event</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.0</flink.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
<lombok.version>1.18.24</lombok.version>
<jackson.version>2.13.3</jackson.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
</dependencies>
</project>

73
iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java

@ -0,0 +1,73 @@
package com.qniao.iot.machine.event;
import lombok.Data;
import java.io.Serializable;
/**
* 机器物联数据已接收事件
**/
@Data
public class MachineIotDataReceivedEvent implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 唯一标识
*/
private Long id;
/**
* 数据来源
*/
private Integer dataSource;
/**
* 设备物联地址(云盒物理标识)
*/
private Long machineIotMac;
/**
* 机器电源状态
*/
private Integer machinePwrStat;
/**
* 机器工作状态
*/
private Integer machineWorkingStat;
/**
* 累加作业总数
*/
private Long accJobCount;
/**
* 当前作业计数
*/
private Long currJobCount;
/**
* 当前作业时长
*/
private Long currJobDuration;
/**
* 当前待机时长
*/
private Long currWaitingDuration;
/**
* 当前停机时长
*/
private Long currStoppingDuration;
/**
* 计数开关状态
*/
private Integer igStat;
/**
* 数据采样时间
*/
private Long reportTime;
}

32
iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventKafkaDeserializationSchema.java

@ -0,0 +1,32 @@
package com.qniao.iot.machine.event;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
/**
* 机器物联数据已接收事件反序列化概要
*/
public class MachineIotDataReceivedEventKafkaDeserializationSchema implements DeserializationSchema<MachineIotDataReceivedEvent> {
/**
* 注册JavaTimeModule支持LocalDateTime字段的解析
*/
final private ObjectMapper objectMapper = new ObjectMapper();
@Override
public MachineIotDataReceivedEvent deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, MachineIotDataReceivedEvent.class);
}
@Override
public boolean isEndOfStream(MachineIotDataReceivedEvent nextElement) {
return false;
}
@Override
public TypeInformation<MachineIotDataReceivedEvent> getProducedType() {
return TypeInformation.of(MachineIotDataReceivedEvent.class);
}
}

22
iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventKafkaSerializationSchema.java

@ -0,0 +1,22 @@
package com.qniao.iot.machine.event;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
/**
* 机器物联数据已接收事件序列化概要
*/
public class MachineIotDataReceivedEventKafkaSerializationSchema implements SerializationSchema<MachineIotDataReceivedEvent> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
public byte[] serialize(MachineIotDataReceivedEvent event) {
try {
return OBJECT_MAPPER.writeValueAsBytes(event);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + event, e);
}
}
}

143
iot-machine-state-event-generator-job/pom.xml

@ -0,0 +1,143 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iot-machine-state-event-generator</artifactId>
<groupId>com.qniao</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>iot-machine-state-evnet-generator-job</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.0</flink.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>ddd-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.13.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.qniao.iot.rc.RootCloudIotDataFormatterJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

23
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -0,0 +1,23 @@
package com.qniao.iot.machine.event.generator.job;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class IotMachineEventGeneratorJob {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(params.get("source.bootstrap.servers"))
.setTopics("root_cloud_iot_report_data_event")
.setGroupId("root_cloud_iot_data_etl")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema())
.build();
}
}

7
pom.xml

@ -21,7 +21,12 @@ under the License.
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.qniao</groupId> <groupId>com.qniao</groupId>
<artifactId>iot-machine-status-event-generator</artifactId>
<artifactId>iot-machine-state-event-generator</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<modules>
<module>iot-machine-data-event</module>
<module>iot-machine-state-event-generator-job</module>
<module>iot-machine-data-event</module>
</modules>
<packaging>pom</packaging> <packaging>pom</packaging>
</project> </project>
Loading…
Cancel
Save