From 1ba55a66a02bb850a2e3f952000f1039881f8701 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Mon, 3 Jul 2023 09:07:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- root-cloud-statistics/pom.xml | 4 ++-- .../com/qniao/iot/rc/RootCloudIotDataFormatterJob.java | 10 +++++++++- .../src/test/java/com/qniao/iot/rc/TestDemo.java | 3 ++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/root-cloud-statistics/pom.xml b/root-cloud-statistics/pom.xml index 6077f77..83fcbc5 100644 --- a/root-cloud-statistics/pom.xml +++ b/root-cloud-statistics/pom.xml @@ -112,11 +112,11 @@ under the License. 0.0.1-SNAPSHOT - + org.apache.flink diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index aff767a..25258ea 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -19,6 +19,7 @@ package com.qniao.iot.rc; import cn.hutool.core.util.CharsetUtil; +import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; @@ -58,6 +59,7 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.Objects; import java.util.Properties; /** @@ -93,6 +95,8 @@ public class RootCloudIotDataFormatterJob { SingleOutputStreamOperator transformDs = env .fromSource(source, WatermarkStrategy.noWatermarks(), "root cloud iot data receipted event Source") .map((MapFunction) RootCloudIotDataFormatterJob::transform) + // 过滤掉转换失败的数据 + .filter(Objects::nonNull) .name("Transform MachineIotDataReceivedEvent"); @@ -182,7 +186,11 @@ public class RootCloudIotDataFormatterJob { // 树根的getACC_count每次关机就会清0,所以应该是一个周期的产量 machineIotDataReceivedEvent.setCountOfThePeriod(event.getACC_count()); machineIotDataReceivedEvent.setId(snowflake.nextId()); - machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__())); + String assetId = event.get__assetId__(); + if (!NumberUtil.isNumber(assetId)) { + return null; + } + machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(assetId)); machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta()); machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta()); diff --git a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java index f7268c1..649a1dc 100644 --- a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java +++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/TestDemo.java @@ -1,11 +1,12 @@ package com.qniao.iot.rc; +import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.RandomUtil; public class TestDemo { public static void main(String[] args) { - System.out.println(RandomUtil.randomLong(99999999999L, 999999999999L)); + System.out.println(NumberUtil.isNumber("4118111220110083")); } }