Browse Source

更新

master
1049970895@qniao.cn 2 years ago
parent
commit
4cfcd9e5f4
4 changed files with 51 additions and 16 deletions
  1. 15
      root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudIotDataReceiptedEvent.java
  2. 11
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
  3. 4
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java
  4. 37
      root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java

15
root-cloud-event/src/main/java/com/qniao/iot/rc/RootCloudIotDataReceiptedEvent.java

@ -2,6 +2,7 @@ package com.qniao.iot.rc;
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect;
import lombok.Data; import lombok.Data;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable; import java.io.Serializable;
@ -57,7 +58,7 @@ public class RootCloudIotDataReceiptedEvent implements Serializable {
* 最小周期设定/ * 最小周期设定/
*/ */
@JsonProperty("RTD_cycle_min") @JsonProperty("RTD_cycle_min")
private String RTD_cycle_min;
private Integer RTD_cycle_min;
/** /**
* 设备对应的模型id * 设备对应的模型id
*/ */
@ -96,7 +97,7 @@ public class RootCloudIotDataReceiptedEvent implements Serializable {
* 作业执行周期/s * 作业执行周期/s
*/ */
@JsonProperty("ACC_cycle_tm") @JsonProperty("ACC_cycle_tm")
private String ACC_cycle_tm;
private Integer ACC_cycle_tm;
/** /**
* 树根云时间 * 树根云时间
*/ */
@ -143,7 +144,7 @@ public class RootCloudIotDataReceiptedEvent implements Serializable {
* 最大周期设定/s * 最大周期设定/s
*/ */
@JsonProperty("RTD_cycle_max") @JsonProperty("RTD_cycle_max")
private String RTD_cycle_max;
private Integer RTD_cycle_max;
/** /**
* 数据时间戳 * 数据时间戳
*/ */
@ -163,15 +164,15 @@ public class RootCloudIotDataReceiptedEvent implements Serializable {
*/ */
private BigDecimal eqp_working_rt; private BigDecimal eqp_working_rt;
private Online __online__;
//private Online __online__;
@JsonProperty("__thingName__") @JsonProperty("__thingName__")
private String thingName;
private String __thingName__;
@Data
/*@Data
public static class Online { public static class Online {
private boolean connected; private boolean connected;
private boolean directlyLinked; private boolean directlyLinked;
}
}*/
} }

11
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -89,14 +89,11 @@ public class RootCloudIotDataFormatterJob {
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema())
.build(); .build();
// 把树根的数据转成我们自己的格式 // 把树根的数据转成我们自己的格式
SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = env SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "root cloud iot data receipted event Source") .fromSource(source, WatermarkStrategy.noWatermarks(), "root cloud iot data receipted event Source")
.map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) RootCloudIotDataFormatterJob::transform) .map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) RootCloudIotDataFormatterJob::transform)
// 过滤掉转换失败的数据
.filter(Objects::nonNull)
.filter(machineIotDataReceivedEvent -> machineIotDataReceivedEvent.getMachineIotMac() != null)
.name("Transform MachineIotDataReceivedEvent"); .name("Transform MachineIotDataReceivedEvent");
@ -177,7 +174,7 @@ public class RootCloudIotDataFormatterJob {
env.execute("root cloud iot data formatter job"); env.execute("root cloud iot data formatter job");
} }
private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) {
public static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) {
MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent(); MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent();
machineIotDataReceivedEvent.setCurrJobCount(0L); machineIotDataReceivedEvent.setCurrJobCount(0L);
@ -187,8 +184,8 @@ public class RootCloudIotDataFormatterJob {
machineIotDataReceivedEvent.setCountOfThePeriod(event.getACC_count()); machineIotDataReceivedEvent.setCountOfThePeriod(event.getACC_count());
machineIotDataReceivedEvent.setId(snowflake.nextId()); machineIotDataReceivedEvent.setId(snowflake.nextId());
String assetId = event.get__assetId__(); String assetId = event.get__assetId__();
if (!NumberUtil.isNumber(assetId)) {
return null;
if (!NumberUtil.isLong(assetId)) {
return machineIotDataReceivedEvent;
} }
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(assetId)); machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(assetId));
machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD);

4
root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java

@ -1,7 +1,9 @@
package com.qniao.iot.rc.event; package com.qniao.iot.rc.event;
import cn.hutool.core.util.CharsetUtil; import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.qniao.iot.rc.RootCloudIotDataReceiptedEvent; import com.qniao.iot.rc.RootCloudIotDataReceiptedEvent;
@ -24,6 +26,8 @@ public class RootCloudIotDataReceiptedEventDeserializationSchema implements Dese
@Override @Override
public RootCloudIotDataReceiptedEvent deserialize(byte[] message) throws IOException { public RootCloudIotDataReceiptedEvent deserialize(byte[] message) throws IOException {
//log.info("message:{}", StrUtil.str(message, CharsetUtil.UTF_8));
return JSONUtil.toBean(StrUtil.str(message, CharsetUtil.UTF_8), RootCloudIotDataReceiptedEvent.class); return JSONUtil.toBean(StrUtil.str(message, CharsetUtil.UTF_8), RootCloudIotDataReceiptedEvent.class);
//return objectMapper.readValue(message, RootCloudIotDataReceiptedEvent.class); //return objectMapper.readValue(message, RootCloudIotDataReceiptedEvent.class);
} }

37
root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java

@ -1,12 +1,45 @@
package com.qniao.iot.rc; package com.qniao.iot.rc;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.RandomUtil; import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
public class TestDemo { public class TestDemo {
public static void main(String[] args) {
public static void main(String[] args) throws IOException {
/*String s = "{\"__thingName__\":\"102104060042\",\"eqp_working_rt\":19.03,\"waiting_hour\":15.99,\"__deviceId__\":\"ZfUMqbHA5y\",\"RTD_cycle_min\":5.00,\"__logicalInterfaceId__\":\"XpbNNBvYS4\",\"eqp_cycle_tm\":35.04,\"stoping_hour\":0.00,\"__assetId__\":\"102104060042\",\"PWR_sta\":1,\"stoping_duration\":0.00,\"__deviceTypeId__\":\"XpbNNBvYS4\",\"IG_sta\":0,\"waiting_duration\":57575.00,\"ACC_cycle_tm\":562.60,\"__cloud_time__\":1688298305613,\"ACC_sta\":1,\"ACC_count\":38194,\"ACC_count_total\":25868518,\"working_sta\":1,\"__modelType__\":\"device\",\"__metricsType__\":0,\"running_hour\":3.76,\"__tenantId__\":\"6007e36dbc127600549a5527\",\"__calculate_time__\":1688298305732,\"__physicalInterfaceId__\":\"XpbNNBvYS4\",\"__create_time__\":1688298303099,\"RTD_cycle_max\":45.00,\"data_timestamp\":1688298303,\"__timestamp__\":1688298303099,\"running_duration\":13528.00,\"eqp_ope_rt\":100.00}";
//System.out.println(JSONUtil.toBean(s, RootCloudIotDataReceiptedEvent.class));
//System.out.println(RootCloudIotDataFormatterJob.transform(JSONUtil.toBean(s, RootCloudIotDataReceiptedEvent.class)));
String str = StrUtil.str(s, CharsetUtil.UTF_8);
JSON parse = JSONUtil.parse(str);
if(str.contains("__online__")) {
Object assetId = JSONUtil.getByPath(parse, "__online__");
if(NumberUtil.isNumber(StrUtil.toString(assetId))) {
JSONUtil.putByPath(parse, "__online__", null);
}
}
System.out.println(JSONUtil.toBean(JSONUtil.toJsonStr(parse), RootCloudIotDataReceiptedEvent.class));*/
/* String s = "{\"Nu0020\":0,\"fuel_level\":76,\"Nu0021\":0,\"Nu0023\":0.0,\"__logicalInterfaceId__\":\"BUFLT_5006_MiModel_10135\",\"Nu0026\":0.000000,\"GPS_Velocity\":10.4,\"workamt_3\":0,\"workamt_2\":0,\"__assetId__\":\"BUFLT_5006_MiEn_672165063700406272\",\"workamt_5\":0,\"workamt_4\":0,\"__cloud_time__\":1688298309895,\"Bind_Status\":0,\"Timestamp_Local\":\"2023-07-02 11:45:07.000\",\"Nu0011\":0.0000,\"Nu0010\":0.0000,\"Nu0012\":0,\"Tbox_Battery_Voltage\":0.0000,\"modelType\":\"10135\",\"Nu0017\":0,\"Nu0019\":0,\"Nu0018\":0,\"__online__\":0,\"work_amt2\":\"0\",\"op_identify\":0,\"engine_speed\":987.2500,\"rsv18\":0,\"__thingName__\":\"BUFLT_5006_MiEn_672165063700406272\",\"battery_voltage\":27.90,\"deviceTypeId\":\"10135\",\"rsv14\":0,\"rsv15\":0,\"rsv16\":0,\"rsv17\":0,\"rsv10\":0,\"rsv2\":0.000000,\"rsv11\":0,\"rsv12\":0,\"rsv4\":0,\"__disableAlarm__\":false,\"rsv13\":0,\"rsv3\":0,\"baseInfoId\":\"672165063700406272\",\"Active_SPN\":0.0,\"GPS_Satellites_Count\":0,\"GPS_Latitude\":26.4025150,\"Tbox_Status\":0,\"tbox_id\":\"1217740021110\",\"Active_FMI\":0.0,\"Nu0031\":0,\"online_status\":1,\"Nu0033\":0,\"Nu0032\":0,\"Nu0035\":558.0000,\"Nu0034\":1787,\"Tbox_External_Voltage\":0,\"Nu0036\":1229.0000,\"In0001\":0,\"In0002\":0,\"engine_oil_pressure\":0.4,\"Lock_Status\":-1,\"__physicalInterfaceId__\":\"BUFLT_5006_Model_10135\",\"GPS_Latitude_Valid_Value\":26.4025150,\"fuel_rate\":1.470000,\"Network_Type\":0,\"DeviceStatus\":\"drive\",\"work_amt\":18714.640000,\"average_oil_consumption\":0.2500,\"accumulated_fuel_consumption\":50179,\"payload\":0.0000,\"__deviceTypeId__\":\"BUFLT_5006_MiModel_10135\",\"mileage\":18714.640000,\"Roll_Angle\":0,\"Tbox_cellular_type\":\"2\",\"__tenantId__\":\"5f867ef96e5dad004d557644\",\"travel_speed\":0,\"rsv9\":0,\"rsv6\":0,\"rsv5\":0,\"rsv8\":0,\"rsv7\":0,\"__timestamp__\":1688298307000,\"position\":\"\",\"GPS_Longitude_Valid_Value\":99.4362510,\"total_fuel_consumption\":50179.0,\"__deviceId__\":\"BUFLT_5006_MiEn_672165063700406272\",\"Nu0002\":0.4000,\"Nu0001\":75.00,\"Nu0004\":0,\"z_acc_gsensor\":0,\"Nu0003\":0.0,\"Nu0006\":0.0000,\"y_acc_gsensor\":0,\"Nu0005\":0.0000,\"Pitch_Angle\":0,\"Nu0008\":0,\"speed\":0,\"Nu0007\":0,\"duration\":2,\"Nu0009\":0,\"x_acc_gsensor\":0,\"water_temperature\":75,\"Timestamp_CloudM2M\":\"2023-07-02 11:45:09.895\",\"GPS_Positioning_Mode\":0,\"reserve9\":0,\"reserve4\":0,\"__modelType__\":\"thing\",\"engine_hr\":1920.2000,\"__metricsType__\":0,\"model_id\":\"10135\",\"__calculate_time__\":1688298309895,\"__create_time__\":1688298307000,\"__deptScope__\":\"\",\"last_ts\":1688298307000,\"online\":1,\"GPS_Longitude\":99.4362510,\"GPS_Recoup_Marks\":0,\"Tbox_Temperature\":0,\"workamt\":18714}";
System.out.println(JSONUtil.toBean(s, RootCloudIotDataReceiptedEvent.class));
System.out.println(RootCloudIotDataFormatterJob.transform(JSONUtil.toBean(s, RootCloudIotDataReceiptedEvent.class)));
*/
/*ObjectMapper objectMapper = new ObjectMapper();
System.out.println(objectMapper.readValue(s.getBytes(StandardCharsets.UTF_8), RootCloudIotDataReceiptedEvent.class));*/
System.out.println(NumberUtil.isLong("02000013265"));
System.out.println(NumberUtil.isNumber("4118111220110083"));
} }
} }
Loading…
Cancel
Save