Browse Source

feat: 增加根云接受事件模型

hph_优化版本
lizhongkang@qniao.cn 3 years ago
parent
commit
7758c2ad39
5 changed files with 210 additions and 125 deletions
  1. 125
      pom.xml
  2. 164
      root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java
  3. 13
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
  4. 33
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java
  5. 0
      root-cloud-statistics/src/main/resources/log4j2.properties

125
pom.xml

@ -23,123 +23,10 @@ under the License.
<groupId>com.qniao</groupId>
<artifactId>root-cloud-iot-connector</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>IOT Connector</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.0</flink.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.qniao.iot.rc.RootCloudIotDataFormatterJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<modules>
<module>root-cloud-source</module>
<module>root-cloud-event</module>
<module>root-cloud-statistics</module>
</modules>
<packaging>pom</packaging>
</project>

164
root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java

@ -0,0 +1,164 @@
package com.qniao.iot.rc;
import lombok.Data;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
import java.math.BigDecimal;
/**
* @author Lzk
* @date 2022/7/1
**/
@Data
public class RootCloudReceiptedEvent implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 称重值
*/
private BigDecimal weight;
/**
* 按键操作
* 0-无操作
* 1-表示申请获取称重值需要把称重值下发到树根平台
* 2-当称重值不为空且大于0时表示入库申请需要发送MQ到工厂版生成入库记录
*/
private Integer key_ctl;
/**
* 称重状态 1表示稳定2表示不稳定
*/
private Integer weight_status;
/**
* 设备总电源启停状态 1有电 0断电
*/
@JsonProperty("PWR_sta")
private Integer PWR_sta;
/**
* 今日待机时长/h
*/
private BigDecimal waiting_hour;
/**
* 树根设备的UUID
*/
private String __deviceId__;
/**
* 最小周期设定/
*/
@JsonProperty("RTD_cycle_min")
private String RTD_cycle_min;
/**
* 设备对应的模型id
*/
private String __logicalInterfaceId__;
/**
* 设备生产周期
*/
private BigDecimal eqp_cycle_tm;
/**
* 今日停机时长/h
*/
private BigDecimal stoping_hour;
/**
* 树根设备的物标识
*/
private String __assetId__;
/**
* 停机时长/s
*/
private String stoping_duration;
/**
* 设备对应的模型id
*/
private String __deviceTypeId__;
/**
* 计数开关状态
*/
@JsonProperty("IG_sta")
private Integer IG_sta;
/**
* 待机时长/s
*/
private BigDecimal waiting_duration;
/**
* 作业执行周期/s
*/
@JsonProperty("ACC_cycle_tm")
private String ACC_cycle_tm;
/**
* 树根云时间
*/
private Long __cloud_time__;
/**
* 设备作业执行状态
*/
@JsonProperty("ACC_sta")
private Integer ACC_sta;
/**
* 作业执行次数
*/
@JsonProperty("ACC_count")
private Long ACC_count;
/**
* 作业执行累计次数
*/
@JsonProperty("ACC_count_total")
private Long ACC_count_total;
/**
* 设备工作状态
*/
private Integer working_sta;
/**
* device
*/
private String __modelType__;
private Integer __metricsType__;
/**
* 今日作业时长 /h
*/
private BigDecimal running_hour;
private String __tenantId__;
private Long __calculate_time__;
/**
* 模型id
*/
private String __physicalInterfaceId__;
/**
* 创建时间
*/
private Long __create_time__;
/**
* 最大周期设定/s
*/
@JsonProperty("RTD_cycle_max")
private String RTD_cycle_max;
/**
* 数据时间戳
*/
private Long data_timestamp;
private Long __timestamp__;
/**
* 作业时长/s
*/
private BigDecimal running_duration;
/**
* 设备今日开机率
*/
private BigDecimal eqp_ope_rt;
/**
* 设备今日作业率 (eqp_working_rt)
*/
private BigDecimal eqp_working_rt;
}

src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java → root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -18,6 +18,7 @@
package com.qniao.iot.rc;
import com.qniao.iot.rc.event.RootCloudReceiptedEventDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
@ -46,19 +47,19 @@ public class RootCloudIotDataFormatterJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<String> source = KafkaSource.<String>builder()
KafkaSource<RootCloudReceiptedEvent> source = KafkaSource.<RootCloudReceiptedEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("test_topic")
.setTopics("root_cloud_iot_report_data_event")
.setGroupId("flink-kafka-demo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setValueOnlyDeserializer(new RootCloudReceiptedEventDeserializationSchema())
.build();
DataStream<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source");
DataStream<RootCloudReceiptedEvent> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source");
String outputPath = "oss://qn-flink-test/test";
StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
StreamingFileSink<RootCloudReceiptedEvent> sink = StreamingFileSink.forRowFormat(
new Path(outputPath),
new SimpleStringEncoder<String>("UTF-8")
new SimpleStringEncoder<RootCloudReceiptedEvent>("UTF-8")
).build();
ds.addSink(sink);

33
root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java

@ -0,0 +1,33 @@
package com.qniao.iot.rc.event;
import com.qniao.iot.rc.RootCloudReceiptedEvent;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
/**
* @author Lzk
*/
public class RootCloudReceiptedEventDeserializationSchema implements DeserializationSchema<RootCloudReceiptedEvent> {
/**
* 注册JavaTimeModule支持LocalDateTime字段的解析
*/
final private ObjectMapper objectMapper = new ObjectMapper();
@Override
public RootCloudReceiptedEvent deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, RootCloudReceiptedEvent.class);
}
@Override
public boolean isEndOfStream(RootCloudReceiptedEvent nextElement) {
return false;
}
@Override
public TypeInformation<RootCloudReceiptedEvent> getProducedType() {
return TypeInformation.of(RootCloudReceiptedEvent.class);
}
}

src/main/resources/log4j2.properties → root-cloud-statistics/src/main/resources/log4j2.properties

Loading…
Cancel
Save