diff --git a/root-cloud-statistics/pom.xml b/root-cloud-statistics/pom.xml
index 6077f77..83fcbc5 100644
--- a/root-cloud-statistics/pom.xml
+++ b/root-cloud-statistics/pom.xml
@@ -112,11 +112,11 @@ under the License.
0.0.1-SNAPSHOT
-
+
org.apache.flink
diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
index aff767a..25258ea 100644
--- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
+++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
@@ -19,6 +19,7 @@
package com.qniao.iot.rc;
import cn.hutool.core.util.CharsetUtil;
+import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
@@ -58,6 +59,7 @@ import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
+import java.util.Objects;
import java.util.Properties;
/**
@@ -93,6 +95,8 @@ public class RootCloudIotDataFormatterJob {
SingleOutputStreamOperator transformDs = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "root cloud iot data receipted event Source")
.map((MapFunction) RootCloudIotDataFormatterJob::transform)
+ // 过滤掉转换失败的数据
+ .filter(Objects::nonNull)
.name("Transform MachineIotDataReceivedEvent");
@@ -182,7 +186,11 @@ public class RootCloudIotDataFormatterJob {
// 树根的getACC_count每次关机就会清0,所以应该是一个周期的产量
machineIotDataReceivedEvent.setCountOfThePeriod(event.getACC_count());
machineIotDataReceivedEvent.setId(snowflake.nextId());
- machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__()));
+ String assetId = event.get__assetId__();
+ if (!NumberUtil.isNumber(assetId)) {
+ return null;
+ }
+ machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(assetId));
machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD);
machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta());
machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta());
diff --git a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java
index f7268c1..649a1dc 100644
--- a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java
+++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java
@@ -1,11 +1,12 @@
package com.qniao.iot.rc;
+import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.RandomUtil;
public class TestDemo {
public static void main(String[] args) {
- System.out.println(RandomUtil.randomLong(99999999999L, 999999999999L));
+ System.out.println(NumberUtil.isNumber("4118111220110083"));
}
}