From 7758c2ad393611ff5276659af24b271692cb9517 Mon Sep 17 00:00:00 2001 From: "lizhongkang@qniao.cn" Date: Fri, 1 Jul 2022 17:49:50 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E6=A0=B9=E4=BA=91?= =?UTF-8?q?=E6=8E=A5=E5=8F=97=E4=BA=8B=E4=BB=B6=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 125 +------------ .../qniao/iot/rc/RootCloudReceiptedEvent.java | 164 ++++++++++++++++++ .../iot/rc/RootCloudIotDataFormatterJob.java | 13 +- ...udReceiptedEventDeserializationSchema.java | 33 ++++ .../src}/main/resources/log4j2.properties | 0 5 files changed, 210 insertions(+), 125 deletions(-) create mode 100644 root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java rename {src => root-cloud-statistics/src}/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java (81%) create mode 100644 root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java rename {src => root-cloud-statistics/src}/main/resources/log4j2.properties (100%) diff --git a/pom.xml b/pom.xml index 504a339..e3f3a5a 100644 --- a/pom.xml +++ b/pom.xml @@ -23,123 +23,10 @@ under the License. com.qniao root-cloud-iot-connector 0.0.1-SNAPSHOT - jar - - IOT Connector - - - UTF-8 - 1.15.0 - 1.8 - ${target.java.version} - ${target.java.version} - 2.17.2 - - - - - - - org.apache.flink - flink-streaming-java - ${flink.version} - provided - - - org.apache.flink - flink-clients - ${flink.version} - provided - - - - org.apache.flink - flink-connector-kafka - ${flink.version} - - - - - - org.apache.logging.log4j - log4j-slf4j-impl - ${log4j.version} - runtime - - - org.apache.logging.log4j - log4j-api - ${log4j.version} - runtime - - - org.apache.logging.log4j - log4j-core - ${log4j.version} - runtime - - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.1 - - ${target.java.version} - ${target.java.version} - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 3.1.1 - - - - package - - shade - - - - - org.apache.flink:flink-shaded-force-shading - com.google.code.findbugs:jsr305 - org.slf4j:* - org.apache.logging.log4j:* - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - com.qniao.iot.rc.RootCloudIotDataFormatterJob - - - - - - - - + + root-cloud-source + root-cloud-event + root-cloud-statistics + + pom diff --git a/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java b/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java new file mode 100644 index 0000000..c0ca056 --- /dev/null +++ b/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java @@ -0,0 +1,164 @@ +package com.qniao.iot.rc; + +import lombok.Data; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; +import java.math.BigDecimal; + +/** + * @author Lzk + * @date 2022/7/1 + **/ +@Data +public class RootCloudReceiptedEvent implements Serializable { + + private static final long serialVersionUID = 1L; + + + /** + * 称重值 + */ + private BigDecimal weight; + + /** + * 按键操作 + * 0-无操作 + * 1-表示申请获取称重值,需要把称重值下发到树根平台 + * 2-当称重值不为空且大于0时表示入库申请,需要发送MQ到工厂版生成入库记录 + */ + private Integer key_ctl; + + + /** + * 称重状态 1表示稳定;2表示不稳定 + */ + private Integer weight_status; + + + /** + * 设备总电源启停状态 1有电 0断电 + */ + @JsonProperty("PWR_sta") + private Integer PWR_sta; + + + /** + * 今日待机时长/h + */ + private BigDecimal waiting_hour; + /** + * 树根设备的UUID + */ + private String __deviceId__; + + /** + * 最小周期设定/秒 + */ + @JsonProperty("RTD_cycle_min") + private String RTD_cycle_min; + /** + * 设备对应的模型id + */ + private String __logicalInterfaceId__; + /** + * 设备生产周期 + */ + private BigDecimal eqp_cycle_tm; + /** + * 今日停机时长/h + */ + private BigDecimal stoping_hour; + /** + * 树根设备的物标识 + */ + private String __assetId__; + + /** + * 停机时长/s + */ + private String stoping_duration; + /** + * 设备对应的模型id + */ + private String __deviceTypeId__; + /** + * 计数开关状态 + */ + @JsonProperty("IG_sta") + private Integer IG_sta; + /** + * 待机时长/s + */ + private BigDecimal waiting_duration; + /** + * 作业执行周期/s + */ + @JsonProperty("ACC_cycle_tm") + private String ACC_cycle_tm; + /** + * 树根云时间 + */ + private Long __cloud_time__; + /** + * 设备作业执行状态 + */ + @JsonProperty("ACC_sta") + private Integer ACC_sta; + /** + * 作业执行次数 + */ + @JsonProperty("ACC_count") + private Long ACC_count; + /** + * 作业执行累计次数 + */ + @JsonProperty("ACC_count_total") + private Long ACC_count_total; + /** + * 设备工作状态 + */ + private Integer working_sta; + /** + * “device” + */ + private String __modelType__; + private Integer __metricsType__; + /** + * 今日作业时长 /h + */ + private BigDecimal running_hour; + private String __tenantId__; + private Long __calculate_time__; + /** + * 模型id + */ + private String __physicalInterfaceId__; + /** + * 创建时间 + */ + private Long __create_time__; + /** + * 最大周期设定/s + */ + @JsonProperty("RTD_cycle_max") + private String RTD_cycle_max; + /** + * 数据时间戳 + */ + private Long data_timestamp; + private Long __timestamp__; + /** + * 作业时长/s + */ + private BigDecimal running_duration; + /** + * 设备今日开机率 + */ + private BigDecimal eqp_ope_rt; + + /** + * 设备今日作业率 (eqp_working_rt) + */ + private BigDecimal eqp_working_rt; +} diff --git a/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java similarity index 81% rename from src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java rename to root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 68b53ee..b3c4f58 100644 --- a/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -18,6 +18,7 @@ package com.qniao.iot.rc; +import com.qniao.iot.rc.event.RootCloudReceiptedEventDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; @@ -46,19 +47,19 @@ public class RootCloudIotDataFormatterJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); - KafkaSource source = KafkaSource.builder() + KafkaSource source = KafkaSource.builder() .setBootstrapServers("kafka:9092") - .setTopics("test_topic") + .setTopics("root_cloud_iot_report_data_event") .setGroupId("flink-kafka-demo") .setStartingOffsets(OffsetsInitializer.earliest()) - .setValueOnlyDeserializer(new SimpleStringSchema()) + .setValueOnlyDeserializer(new RootCloudReceiptedEventDeserializationSchema()) .build(); - DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); + DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source"); String outputPath = "oss://qn-flink-test/test"; - StreamingFileSink sink = StreamingFileSink.forRowFormat( + StreamingFileSink sink = StreamingFileSink.forRowFormat( new Path(outputPath), - new SimpleStringEncoder("UTF-8") + new SimpleStringEncoder("UTF-8") ).build(); ds.addSink(sink); diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java new file mode 100644 index 0000000..fce4568 --- /dev/null +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java @@ -0,0 +1,33 @@ +package com.qniao.iot.rc.event; + +import com.qniao.iot.rc.RootCloudReceiptedEvent; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + +/** + * @author Lzk + */ +public class RootCloudReceiptedEventDeserializationSchema implements DeserializationSchema { + /** + * 注册JavaTimeModule,支持LocalDateTime字段的解析 + */ + final private ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public RootCloudReceiptedEvent deserialize(byte[] message) throws IOException { + return objectMapper.readValue(message, RootCloudReceiptedEvent.class); + } + + @Override + public boolean isEndOfStream(RootCloudReceiptedEvent nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(RootCloudReceiptedEvent.class); + } +} diff --git a/src/main/resources/log4j2.properties b/root-cloud-statistics/src/main/resources/log4j2.properties similarity index 100% rename from src/main/resources/log4j2.properties rename to root-cloud-statistics/src/main/resources/log4j2.properties