|
|
|
@ -19,6 +19,7 @@ |
|
|
|
package com.qniao.iot.rc; |
|
|
|
|
|
|
|
import cn.hutool.core.util.CharsetUtil; |
|
|
|
import cn.hutool.core.util.NumberUtil; |
|
|
|
import cn.hutool.core.util.StrUtil; |
|
|
|
import cn.hutool.json.JSONUtil; |
|
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; |
|
|
|
@ -58,6 +59,7 @@ import java.time.Instant; |
|
|
|
import java.time.LocalDateTime; |
|
|
|
import java.time.ZoneOffset; |
|
|
|
import java.time.format.DateTimeFormatter; |
|
|
|
import java.util.Objects; |
|
|
|
import java.util.Properties; |
|
|
|
|
|
|
|
/** |
|
|
|
@ -93,6 +95,8 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = env |
|
|
|
.fromSource(source, WatermarkStrategy.noWatermarks(), "root cloud iot data receipted event Source") |
|
|
|
.map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) RootCloudIotDataFormatterJob::transform) |
|
|
|
// 过滤掉转换失败的数据 |
|
|
|
.filter(Objects::nonNull) |
|
|
|
.name("Transform MachineIotDataReceivedEvent"); |
|
|
|
|
|
|
|
|
|
|
|
@ -182,7 +186,11 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
// 树根的getACC_count每次关机就会清0,所以应该是一个周期的产量 |
|
|
|
machineIotDataReceivedEvent.setCountOfThePeriod(event.getACC_count()); |
|
|
|
machineIotDataReceivedEvent.setId(snowflake.nextId()); |
|
|
|
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__())); |
|
|
|
String assetId = event.get__assetId__(); |
|
|
|
if (!NumberUtil.isNumber(assetId)) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(assetId)); |
|
|
|
machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); |
|
|
|
machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta()); |
|
|
|
machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta()); |
|
|
|
|