diff --git a/pom.xml b/pom.xml
index 504a339..e3f3a5a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,123 +23,10 @@ under the License.
com.qniao
root-cloud-iot-connector
0.0.1-SNAPSHOT
- jar
-
- IOT Connector
-
-
- UTF-8
- 1.15.0
- 1.8
- ${target.java.version}
- ${target.java.version}
- 2.17.2
-
-
-
-
-
-
- org.apache.flink
- flink-streaming-java
- ${flink.version}
- provided
-
-
- org.apache.flink
- flink-clients
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-connector-kafka
- ${flink.version}
-
-
-
-
-
- org.apache.logging.log4j
- log4j-slf4j-impl
- ${log4j.version}
- runtime
-
-
- org.apache.logging.log4j
- log4j-api
- ${log4j.version}
- runtime
-
-
- org.apache.logging.log4j
- log4j-core
- ${log4j.version}
- runtime
-
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.1
-
- ${target.java.version}
- ${target.java.version}
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 3.1.1
-
-
-
- package
-
- shade
-
-
-
-
- org.apache.flink:flink-shaded-force-shading
- com.google.code.findbugs:jsr305
- org.slf4j:*
- org.apache.logging.log4j:*
-
-
-
-
-
- *:*
-
- META-INF/*.SF
- META-INF/*.DSA
- META-INF/*.RSA
-
-
-
-
-
-
- com.qniao.iot.rc.RootCloudIotDataFormatterJob
-
-
-
-
-
-
-
-
+
+ root-cloud-source
+ root-cloud-event
+ root-cloud-statistics
+
+ pom
diff --git a/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java b/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java
new file mode 100644
index 0000000..c0ca056
--- /dev/null
+++ b/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudReceiptedEvent.java
@@ -0,0 +1,164 @@
+package com.qniao.iot.rc;
+
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+
+/**
+ * @author Lzk
+ * @date 2022/7/1
+ **/
+@Data
+public class RootCloudReceiptedEvent implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+
+ /**
+ * 称重值
+ */
+ private BigDecimal weight;
+
+ /**
+ * 按键操作
+ * 0-无操作
+ * 1-表示申请获取称重值,需要把称重值下发到树根平台
+ * 2-当称重值不为空且大于0时表示入库申请,需要发送MQ到工厂版生成入库记录
+ */
+ private Integer key_ctl;
+
+
+ /**
+ * 称重状态 1表示稳定;2表示不稳定
+ */
+ private Integer weight_status;
+
+
+ /**
+ * 设备总电源启停状态 1有电 0断电
+ */
+ @JsonProperty("PWR_sta")
+ private Integer PWR_sta;
+
+
+ /**
+ * 今日待机时长/h
+ */
+ private BigDecimal waiting_hour;
+ /**
+ * 树根设备的UUID
+ */
+ private String __deviceId__;
+
+ /**
+ * 最小周期设定/秒
+ */
+ @JsonProperty("RTD_cycle_min")
+ private String RTD_cycle_min;
+ /**
+ * 设备对应的模型id
+ */
+ private String __logicalInterfaceId__;
+ /**
+ * 设备生产周期
+ */
+ private BigDecimal eqp_cycle_tm;
+ /**
+ * 今日停机时长/h
+ */
+ private BigDecimal stoping_hour;
+ /**
+ * 树根设备的物标识
+ */
+ private String __assetId__;
+
+ /**
+ * 停机时长/s
+ */
+ private String stoping_duration;
+ /**
+ * 设备对应的模型id
+ */
+ private String __deviceTypeId__;
+ /**
+ * 计数开关状态
+ */
+ @JsonProperty("IG_sta")
+ private Integer IG_sta;
+ /**
+ * 待机时长/s
+ */
+ private BigDecimal waiting_duration;
+ /**
+ * 作业执行周期/s
+ */
+ @JsonProperty("ACC_cycle_tm")
+ private String ACC_cycle_tm;
+ /**
+ * 树根云时间
+ */
+ private Long __cloud_time__;
+ /**
+ * 设备作业执行状态
+ */
+ @JsonProperty("ACC_sta")
+ private Integer ACC_sta;
+ /**
+ * 作业执行次数
+ */
+ @JsonProperty("ACC_count")
+ private Long ACC_count;
+ /**
+ * 作业执行累计次数
+ */
+ @JsonProperty("ACC_count_total")
+ private Long ACC_count_total;
+ /**
+ * 设备工作状态
+ */
+ private Integer working_sta;
+ /**
+ * “device”
+ */
+ private String __modelType__;
+ private Integer __metricsType__;
+ /**
+ * 今日作业时长 /h
+ */
+ private BigDecimal running_hour;
+ private String __tenantId__;
+ private Long __calculate_time__;
+ /**
+ * 模型id
+ */
+ private String __physicalInterfaceId__;
+ /**
+ * 创建时间
+ */
+ private Long __create_time__;
+ /**
+ * 最大周期设定/s
+ */
+ @JsonProperty("RTD_cycle_max")
+ private String RTD_cycle_max;
+ /**
+ * 数据时间戳
+ */
+ private Long data_timestamp;
+ private Long __timestamp__;
+ /**
+ * 作业时长/s
+ */
+ private BigDecimal running_duration;
+ /**
+ * 设备今日开机率
+ */
+ private BigDecimal eqp_ope_rt;
+
+ /**
+ * 设备今日作业率 (eqp_working_rt)
+ */
+ private BigDecimal eqp_working_rt;
+}
diff --git a/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
similarity index 81%
rename from src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
rename to root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
index 68b53ee..b3c4f58 100644
--- a/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
+++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
@@ -18,6 +18,7 @@
package com.qniao.iot.rc;
+import com.qniao.iot.rc.event.RootCloudReceiptedEventDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
@@ -46,19 +47,19 @@ public class RootCloudIotDataFormatterJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
- KafkaSource source = KafkaSource.builder()
+ KafkaSource source = KafkaSource.builder()
.setBootstrapServers("kafka:9092")
- .setTopics("test_topic")
+ .setTopics("root_cloud_iot_report_data_event")
.setGroupId("flink-kafka-demo")
.setStartingOffsets(OffsetsInitializer.earliest())
- .setValueOnlyDeserializer(new SimpleStringSchema())
+ .setValueOnlyDeserializer(new RootCloudReceiptedEventDeserializationSchema())
.build();
- DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source");
+ DataStream ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "My Kafka Source");
String outputPath = "oss://qn-flink-test/test";
- StreamingFileSink sink = StreamingFileSink.forRowFormat(
+ StreamingFileSink sink = StreamingFileSink.forRowFormat(
new Path(outputPath),
- new SimpleStringEncoder("UTF-8")
+ new SimpleStringEncoder("UTF-8")
).build();
ds.addSink(sink);
diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java
new file mode 100644
index 0000000..fce4568
--- /dev/null
+++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudReceiptedEventDeserializationSchema.java
@@ -0,0 +1,33 @@
+package com.qniao.iot.rc.event;
+
+import com.qniao.iot.rc.RootCloudReceiptedEvent;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * @author Lzk
+ */
+public class RootCloudReceiptedEventDeserializationSchema implements DeserializationSchema {
+ /**
+ * 注册JavaTimeModule,支持LocalDateTime字段的解析
+ */
+ final private ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ public RootCloudReceiptedEvent deserialize(byte[] message) throws IOException {
+ return objectMapper.readValue(message, RootCloudReceiptedEvent.class);
+ }
+
+ @Override
+ public boolean isEndOfStream(RootCloudReceiptedEvent nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeInformation.of(RootCloudReceiptedEvent.class);
+ }
+}
diff --git a/src/main/resources/log4j2.properties b/root-cloud-statistics/src/main/resources/log4j2.properties
similarity index 100%
rename from src/main/resources/log4j2.properties
rename to root-cloud-statistics/src/main/resources/log4j2.properties