From 4cfcd9e5f4131c054815f99c2ef138fa4f43990b Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Mon, 3 Jul 2023 13:39:46 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rc/RootCloudIotDataReceiptedEvent.java | 15 ++++---- .../iot/rc/RootCloudIotDataFormatterJob.java | 11 ++---- ...taReceiptedEventDeserializationSchema.java | 4 ++ .../test/java/com/qniao/iot/rc/TestDemo.java | 37 ++++++++++++++++++- 4 files changed, 51 insertions(+), 16 deletions(-) diff --git a/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudIotDataReceiptedEvent.java b/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudIotDataReceiptedEvent.java index a83eb23..1fb9e0c 100644 --- a/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudIotDataReceiptedEvent.java +++ b/root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudIotDataReceiptedEvent.java @@ -2,6 +2,7 @@ package com.qniao.iot.rc; import com.fasterxml.jackson.annotation.JsonAutoDetect; import lombok.Data; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import java.io.Serializable; @@ -57,7 +58,7 @@ public class RootCloudIotDataReceiptedEvent implements Serializable { * 最小周期设定/秒 */ @JsonProperty("RTD_cycle_min") - private String RTD_cycle_min; + private Integer RTD_cycle_min; /** * 设备对应的模型id */ @@ -96,7 +97,7 @@ public class RootCloudIotDataReceiptedEvent implements Serializable { * 作业执行周期/s */ @JsonProperty("ACC_cycle_tm") - private String ACC_cycle_tm; + private Integer ACC_cycle_tm; /** * 树根云时间 */ @@ -143,7 +144,7 @@ public class RootCloudIotDataReceiptedEvent implements Serializable { * 最大周期设定/s */ @JsonProperty("RTD_cycle_max") - private String RTD_cycle_max; + private Integer RTD_cycle_max; /** * 数据时间戳 */ @@ -163,15 +164,15 @@ public class RootCloudIotDataReceiptedEvent implements Serializable { */ private BigDecimal eqp_working_rt; - private Online __online__; + //private Online __online__; @JsonProperty("__thingName__") - private String thingName; + private String __thingName__; - @Data + /*@Data public static class Online { private boolean connected; private boolean directlyLinked; - } + }*/ } diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 25258ea..c8e3e45 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -89,14 +89,11 @@ public class RootCloudIotDataFormatterJob { .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); - - // 把树根的数据转成我们自己的格式 SingleOutputStreamOperator transformDs = env .fromSource(source, WatermarkStrategy.noWatermarks(), "root cloud iot data receipted event Source") .map((MapFunction) RootCloudIotDataFormatterJob::transform) - // 过滤掉转换失败的数据 - .filter(Objects::nonNull) + .filter(machineIotDataReceivedEvent -> machineIotDataReceivedEvent.getMachineIotMac() != null) .name("Transform MachineIotDataReceivedEvent"); @@ -177,7 +174,7 @@ public class RootCloudIotDataFormatterJob { env.execute("root cloud iot data formatter job"); } - private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) { + public static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) { MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent(); machineIotDataReceivedEvent.setCurrJobCount(0L); @@ -187,8 +184,8 @@ public class RootCloudIotDataFormatterJob { machineIotDataReceivedEvent.setCountOfThePeriod(event.getACC_count()); machineIotDataReceivedEvent.setId(snowflake.nextId()); String assetId = event.get__assetId__(); - if (!NumberUtil.isNumber(assetId)) { - return null; + if (!NumberUtil.isLong(assetId)) { + return machineIotDataReceivedEvent; } machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(assetId)); machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java index 4ff025f..c0f4785 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java @@ -1,7 +1,9 @@ package com.qniao.iot.rc.event; import cn.hutool.core.util.CharsetUtil; +import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSON; import cn.hutool.json.JSONUtil; import com.fasterxml.jackson.databind.ObjectMapper; import com.qniao.iot.rc.RootCloudIotDataReceiptedEvent; @@ -24,6 +26,8 @@ public class RootCloudIotDataReceiptedEventDeserializationSchema implements Dese @Override public RootCloudIotDataReceiptedEvent deserialize(byte[] message) throws IOException { + + //log.info("message:{}", StrUtil.str(message, CharsetUtil.UTF_8)); return JSONUtil.toBean(StrUtil.str(message, CharsetUtil.UTF_8), RootCloudIotDataReceiptedEvent.class); //return objectMapper.readValue(message, RootCloudIotDataReceiptedEvent.class); } diff --git a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java index 649a1dc..6fb9d62 100644 --- a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java +++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java @@ -1,12 +1,45 @@ package com.qniao.iot.rc; +import cn.hutool.core.util.CharsetUtil; import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.RandomUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSON; +import cn.hutool.json.JSONUtil; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; public class TestDemo { - public static void main(String[] args) { + public static void main(String[] args) throws IOException { + + /*String s = "{\"__thingName__\":\"102104060042\",\"eqp_working_rt\":19.03,\"waiting_hour\":15.99,\"__deviceId__\":\"ZfUMqbHA5y\",\"RTD_cycle_min\":5.00,\"__logicalInterfaceId__\":\"XpbNNBvYS4\",\"eqp_cycle_tm\":35.04,\"stoping_hour\":0.00,\"__assetId__\":\"102104060042\",\"PWR_sta\":1,\"stoping_duration\":0.00,\"__deviceTypeId__\":\"XpbNNBvYS4\",\"IG_sta\":0,\"waiting_duration\":57575.00,\"ACC_cycle_tm\":562.60,\"__cloud_time__\":1688298305613,\"ACC_sta\":1,\"ACC_count\":38194,\"ACC_count_total\":25868518,\"working_sta\":1,\"__modelType__\":\"device\",\"__metricsType__\":0,\"running_hour\":3.76,\"__tenantId__\":\"6007e36dbc127600549a5527\",\"__calculate_time__\":1688298305732,\"__physicalInterfaceId__\":\"XpbNNBvYS4\",\"__create_time__\":1688298303099,\"RTD_cycle_max\":45.00,\"data_timestamp\":1688298303,\"__timestamp__\":1688298303099,\"running_duration\":13528.00,\"eqp_ope_rt\":100.00}"; + //System.out.println(JSONUtil.toBean(s, RootCloudIotDataReceiptedEvent.class)); + //System.out.println(RootCloudIotDataFormatterJob.transform(JSONUtil.toBean(s, RootCloudIotDataReceiptedEvent.class))); + + String str = StrUtil.str(s, CharsetUtil.UTF_8); + JSON parse = JSONUtil.parse(str); + if(str.contains("__online__")) { + Object assetId = JSONUtil.getByPath(parse, "__online__"); + if(NumberUtil.isNumber(StrUtil.toString(assetId))) { + JSONUtil.putByPath(parse, "__online__", null); + } + } + System.out.println(JSONUtil.toBean(JSONUtil.toJsonStr(parse), RootCloudIotDataReceiptedEvent.class));*/ + + + /* String s = "{\"Nu0020\":0,\"fuel_level\":76,\"Nu0021\":0,\"Nu0023\":0.0,\"__logicalInterfaceId__\":\"BUFLT_5006_MiModel_10135\",\"Nu0026\":0.000000,\"GPS_Velocity\":10.4,\"workamt_3\":0,\"workamt_2\":0,\"__assetId__\":\"BUFLT_5006_MiEn_672165063700406272\",\"workamt_5\":0,\"workamt_4\":0,\"__cloud_time__\":1688298309895,\"Bind_Status\":0,\"Timestamp_Local\":\"2023-07-02 11:45:07.000\",\"Nu0011\":0.0000,\"Nu0010\":0.0000,\"Nu0012\":0,\"Tbox_Battery_Voltage\":0.0000,\"modelType\":\"10135\",\"Nu0017\":0,\"Nu0019\":0,\"Nu0018\":0,\"__online__\":0,\"work_amt2\":\"0\",\"op_identify\":0,\"engine_speed\":987.2500,\"rsv18\":0,\"__thingName__\":\"BUFLT_5006_MiEn_672165063700406272\",\"battery_voltage\":27.90,\"deviceTypeId\":\"10135\",\"rsv14\":0,\"rsv15\":0,\"rsv16\":0,\"rsv17\":0,\"rsv10\":0,\"rsv2\":0.000000,\"rsv11\":0,\"rsv12\":0,\"rsv4\":0,\"__disableAlarm__\":false,\"rsv13\":0,\"rsv3\":0,\"baseInfoId\":\"672165063700406272\",\"Active_SPN\":0.0,\"GPS_Satellites_Count\":0,\"GPS_Latitude\":26.4025150,\"Tbox_Status\":0,\"tbox_id\":\"1217740021110\",\"Active_FMI\":0.0,\"Nu0031\":0,\"online_status\":1,\"Nu0033\":0,\"Nu0032\":0,\"Nu0035\":558.0000,\"Nu0034\":1787,\"Tbox_External_Voltage\":0,\"Nu0036\":1229.0000,\"In0001\":0,\"In0002\":0,\"engine_oil_pressure\":0.4,\"Lock_Status\":-1,\"__physicalInterfaceId__\":\"BUFLT_5006_Model_10135\",\"GPS_Latitude_Valid_Value\":26.4025150,\"fuel_rate\":1.470000,\"Network_Type\":0,\"DeviceStatus\":\"drive\",\"work_amt\":18714.640000,\"average_oil_consumption\":0.2500,\"accumulated_fuel_consumption\":50179,\"payload\":0.0000,\"__deviceTypeId__\":\"BUFLT_5006_MiModel_10135\",\"mileage\":18714.640000,\"Roll_Angle\":0,\"Tbox_cellular_type\":\"2\",\"__tenantId__\":\"5f867ef96e5dad004d557644\",\"travel_speed\":0,\"rsv9\":0,\"rsv6\":0,\"rsv5\":0,\"rsv8\":0,\"rsv7\":0,\"__timestamp__\":1688298307000,\"position\":\"\",\"GPS_Longitude_Valid_Value\":99.4362510,\"total_fuel_consumption\":50179.0,\"__deviceId__\":\"BUFLT_5006_MiEn_672165063700406272\",\"Nu0002\":0.4000,\"Nu0001\":75.00,\"Nu0004\":0,\"z_acc_gsensor\":0,\"Nu0003\":0.0,\"Nu0006\":0.0000,\"y_acc_gsensor\":0,\"Nu0005\":0.0000,\"Pitch_Angle\":0,\"Nu0008\":0,\"speed\":0,\"Nu0007\":0,\"duration\":2,\"Nu0009\":0,\"x_acc_gsensor\":0,\"water_temperature\":75,\"Timestamp_CloudM2M\":\"2023-07-02 11:45:09.895\",\"GPS_Positioning_Mode\":0,\"reserve9\":0,\"reserve4\":0,\"__modelType__\":\"thing\",\"engine_hr\":1920.2000,\"__metricsType__\":0,\"model_id\":\"10135\",\"__calculate_time__\":1688298309895,\"__create_time__\":1688298307000,\"__deptScope__\":\"\",\"last_ts\":1688298307000,\"online\":1,\"GPS_Longitude\":99.4362510,\"GPS_Recoup_Marks\":0,\"Tbox_Temperature\":0,\"workamt\":18714}"; + System.out.println(JSONUtil.toBean(s, RootCloudIotDataReceiptedEvent.class)); + System.out.println(RootCloudIotDataFormatterJob.transform(JSONUtil.toBean(s, RootCloudIotDataReceiptedEvent.class))); +*/ + + /*ObjectMapper objectMapper = new ObjectMapper(); + System.out.println(objectMapper.readValue(s.getBytes(StandardCharsets.UTF_8), RootCloudIotDataReceiptedEvent.class));*/ + + System.out.println(NumberUtil.isLong("02000013265")); - System.out.println(NumberUtil.isNumber("4118111220110083")); } }