From 9870214bfb474a7129f4e89be685a498f28bbc19 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Tue, 9 Aug 2022 17:58:54 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 + .../gizwits/GizWitsIotDataFormatterJob.java | 81 +++++++++++-------- .../iot/gizwits/util/DataParsingUtils.java | 1 + .../src/test/java/com/qniao/Test1.java | 21 ++++- .../src/test/java/com/qniao/Test2.java | 22 +++++ .../src/test/java/com/qniao/Test3.java | 19 +++++ 6 files changed, 109 insertions(+), 37 deletions(-) create mode 100644 .gitignore create mode 100644 iot-gizwits-statistics/src/test/java/com/qniao/Test2.java create mode 100644 iot-gizwits-statistics/src/test/java/com/qniao/Test3.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..87e3eba --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*/target/* +/.idea/* diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java index 90717ba..8a108ad 100644 --- a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java @@ -19,6 +19,11 @@ package com.qniao.iot.gizwits; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.date.CalendarUtil; +import cn.hutool.core.date.DateUnit; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.date.LocalDateTimeUtil; +import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.CharsetUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; @@ -37,47 +42,42 @@ import com.qniao.iot.gizwits.util.SnowFlake; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaSerializationSchema; import com.qniao.iot.rc.constant.DataSource; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; -import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.util.Collector; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.common.config.SaslConfigs; - -import java.io.IOException; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.ZoneOffset; +import java.time.*; import java.time.format.DateTimeFormatter; import java.util.*; /** * 机智云设备数据转换 + * * @author hph */ +@Slf4j public class GizWitsIotDataFormatterJob { static SnowFlake snowflake = new SnowFlake( Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)), - Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID)) - ); + Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID))); public static void main(String[] args) throws Exception { @@ -92,13 +92,13 @@ public class GizWitsIotDataFormatterJob { @Override public void flatMap(JSONObject value, Collector out) { List receivedEvents = transform(value); - if(CollUtil.isNotEmpty(receivedEvents)) { + if (CollUtil.isNotEmpty(receivedEvents)) { receivedEvents.forEach(out::collect); } } }).name("Transform MachineIotDataReceivedEvent"); - //kafka 认证配置,暂时注释,后续可能需要放开 + // kafka 认证配置,暂时注释,后续可能需要放开 /*Properties kafkaProducerConfig = new Properties(); kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); @@ -106,7 +106,6 @@ public class GizWitsIotDataFormatterJob { "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); */ - // 写入kafka transformDs.sinkTo( KafkaSink.builder() @@ -126,7 +125,14 @@ public class GizWitsIotDataFormatterJob { StreamingFileSink sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder(CharsetUtil.UTF_8) - ).withBucketAssigner(new BucketAssigner() { + ).withRollingPolicy(DefaultRollingPolicy.builder() + // 每隔多长时间生成一个文件 + .withRolloverInterval(Duration.ofHours(12)) + // 默认60秒,未写入数据处于不活跃状态超时会滚动新文件 + .withInactivityInterval(Duration.ofHours(12)) + // 设置每个文件的最大大小 ,默认是128M + .withMaxPartSize(MemorySize.ofMebiBytes(128 * 1024 * 1024)) + .build()).withBucketAssigner(new BucketAssigner() { @Override public String getBucketId(String element, Context context) { @@ -136,16 +142,16 @@ public class GizWitsIotDataFormatterJob { // 获取设备状态目录名称 String deviceStatusStr = getDeviceStatusStr(receivedEvent.getMachineWorkingStat()); return deviceStatusStr + "/" + LocalDateTime.ofInstant(Instant.ofEpochMilli(receivedTime), ZoneOffset.of("+8")) - .format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "/" + receivedEvent.getMachineIotMac(); + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "/" + receivedEvent.getMachineIotMac(); } private String getDeviceStatusStr(Integer machineWorkingStat) { - if(machineWorkingStat == 1){ + if (machineWorkingStat == 1) { return "deviceWorking"; - }else if(machineWorkingStat == 2){ + } else if (machineWorkingStat == 2) { return "deviceWaiting"; - }else { + } else { return "deviceOff"; } } @@ -155,6 +161,7 @@ public class GizWitsIotDataFormatterJob { return SimpleVersionedStringSerializer.INSTANCE; } }).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build(); + transformDs.map(new RichMapFunction() { @Override public String map(MachineIotDataReceivedEvent value) { @@ -172,11 +179,11 @@ public class GizWitsIotDataFormatterJob { MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent(); if (ObjectUtil.isNotEmpty(event)) { String pushEventCode = CommandUtils.getPushEventCode(event); - if(NotiRespPushEvents.DEVICE_OFFLINE.getCode().equals(pushEventCode)) { - // 设备下线 + if (NotiRespPushEvents.DEVICE_OFFLINE.getCode().equals(pushEventCode)) { + // 设备下线(云盒下线) OffLineEventBody offLineEventBody = CommandUtils.parsePushEvent(event, OffLineEventBody.class); String mac = offLineEventBody.getMac(); - if(StrUtil.isNotEmpty(mac)) { + if (StrUtil.isNotEmpty(mac)) { machineIotDataReceivedEvent.setId(snowflake.nextId()); machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(mac)); @@ -192,11 +199,11 @@ public class GizWitsIotDataFormatterJob { receivedEventList.add(machineIotDataReceivedEvent); return receivedEventList; } - }else if(NotiRespPushEvents.DEVICE_ONLINE.getCode().equals(pushEventCode)) { - // 设备上线 + } else if (NotiRespPushEvents.DEVICE_ONLINE.getCode().equals(pushEventCode)) { + // 设备上线(云盒上线,连接网络) OnLineEventBody onLineEventBody = CommandUtils.parsePushEvent(event, OnLineEventBody.class); String mac = onLineEventBody.getMac(); - if(StrUtil.isNotEmpty(mac)) { + if (StrUtil.isNotEmpty(mac)) { machineIotDataReceivedEvent.setId(snowflake.nextId()); machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(mac)); @@ -211,22 +218,26 @@ public class GizWitsIotDataFormatterJob { receivedEventList.add(machineIotDataReceivedEvent); return receivedEventList; } - }else if(NotiRespPushEvents.DEVICE_STATUS_KV.getCode().equals(pushEventCode)) { - // 设备生产数据 - List> mapList = DataParsingUtils.deviceStatusKvParsing(event); - mapList.forEach(e ->{ + } else if (NotiRespPushEvents.DEVICE_STATUS_KV.getCode().equals(pushEventCode)) { + // 设备生产数据(云盒上线,并且上报机器的数据) + List> mapList = DataParsingUtils.deviceStatusKvParsing(event); + mapList.forEach(e -> { JSONObject j = (JSONObject) JSON.toJSON(e); DeviceStatus deviceStatus = j.toJavaObject(DeviceStatus.class); machineIotDataReceivedEvent.setId(snowflake.nextId()); machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(deviceStatus.getMac())); machineIotDataReceivedEvent.setMachinePwrStat(1); - machineIotDataReceivedEvent.setMachineWorkingStat(1); - machineIotDataReceivedEvent.setCurrJobCount(deviceStatus.getCount()); - machineIotDataReceivedEvent.setCurrJobDuration(deviceStatus.getDuration()); + if (deviceStatus.getTimestamp() == null) { + machineIotDataReceivedEvent.setMachineWorkingStat(2); + } else { + machineIotDataReceivedEvent.setMachineWorkingStat(1); + } + machineIotDataReceivedEvent.setCurrJobCount(deviceStatus.getTotal()); + machineIotDataReceivedEvent.setCurrJobDuration(0L); machineIotDataReceivedEvent.setCurrStoppingDuration(0L); machineIotDataReceivedEvent.setCurrWaitingDuration(0L); - machineIotDataReceivedEvent.setAccJobCount(deviceStatus.getTotal()); + machineIotDataReceivedEvent.setAccJobCount(0L); machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); machineIotDataReceivedEvent.setReportTime(deviceStatus.getTimestamp().getTime()); receivedEventList.add(machineIotDataReceivedEvent); diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/DataParsingUtils.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/DataParsingUtils.java index 1d983bd..b04587b 100644 --- a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/DataParsingUtils.java +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/DataParsingUtils.java @@ -86,6 +86,7 @@ public class DataParsingUtils { } public static List> deviceStatusKvParsing(JSONObject message){ + String[] row = new String[]{"cmd",CREATED_AT,"delivery_id","did","event_type","mac","msg_id","product_key"}; String[] cl = new String[]{"sn","signal","cur_state","cur_speed"}; List> list = new ArrayList<>(); diff --git a/iot-gizwits-statistics/src/test/java/com/qniao/Test1.java b/iot-gizwits-statistics/src/test/java/com/qniao/Test1.java index 3a4ff79..8cafc66 100644 --- a/iot-gizwits-statistics/src/test/java/com/qniao/Test1.java +++ b/iot-gizwits-statistics/src/test/java/com/qniao/Test1.java @@ -1,15 +1,32 @@ package com.qniao; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.HexUtil; +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSONObject; +import com.gizwits.noti.noticlient.util.CommandUtils; +import com.qniao.iot.gizwits.source.GizWitsIotSource; +import com.qniao.iot.gizwits.util.DataParsingUtils; +import org.apache.commons.lang3.StringUtils; + import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.List; +import java.util.Map; public class Test1 { public static void main(String[] args) { - Long receivedTime = LocalDateTime.now().toInstant(ZoneOffset.ofHours(+8)).toEpochMilli(); + /*Long receivedTime = LocalDateTime.now().toInstant(ZoneOffset.ofHours(+8)).toEpochMilli(); System.out.println(LocalDateTime.ofInstant(Instant.ofEpochMilli(receivedTime), ZoneOffset.of("+8")) - .format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));*/ + + JSONObject event = JSONObject.parseObject("{\"country\":\"China\",\"data\":{\"duration_2\":60,\"duration_1\":60,\"cur_speed\":0,\"state_1\":2,\"timestamp_3\":\"53b6f062\",\"timestamp_2\":\"17b6f062\",\"timestamp_1\":\"dbb5f062\",\"state_3\":2,\"state_2\":2,\"duration_3\":60,\"count_3\":4,\"count_2\":0,\"total_3\":2255,\"count_1\":1,\"total_2\":2251,\"sn\":75,\"total_1\":2251,\"signal\":3,\"cur_state\":2},\"city\":\"Unkown\",\"delivery_id\":4,\"ip\":\"117.132.198.66\",\"created_at\":1659942513.65440893173,\"mac\":\"861193040823503\",\"event_type\":\"device_status_kv\",\"product_key\":\"816caf9e2b2141be916f204214461df4\",\"cmd\":\"event_push\",\"msg_id\":\"vDBYUYSLQrmg7dtg+DlqJg\",\"region\":\"Unkown\",\"did\":\"apGAtdzI\"}"); + List> mapList = DataParsingUtils.deviceStatusKvParsing(event); + System.out.println(StrUtil.toString(mapList)); + } } diff --git a/iot-gizwits-statistics/src/test/java/com/qniao/Test2.java b/iot-gizwits-statistics/src/test/java/com/qniao/Test2.java new file mode 100644 index 0000000..640fdc6 --- /dev/null +++ b/iot-gizwits-statistics/src/test/java/com/qniao/Test2.java @@ -0,0 +1,22 @@ +package com.qniao; + +import com.gizwits.noti.noticlient.util.CommandUtils; +import com.qniao.iot.gizwits.source.GizWitsIotSource; + +import java.util.Arrays; + +public class Test2 { + + public static void main(String[] args) { + /*Long receivedTime = LocalDateTime.now().toInstant(ZoneOffset.ofHours(+8)).toEpochMilli(); + System.out.println(LocalDateTime.ofInstant(Instant.ofEpochMilli(receivedTime), ZoneOffset.of("+8")) + .format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));*/ + + GizWitsIotSource gizWitsIotSource = new GizWitsIotSource(); + + gizWitsIotSource.open(null); + + gizWitsIotSource.run(null); + + } +} diff --git a/iot-gizwits-statistics/src/test/java/com/qniao/Test3.java b/iot-gizwits-statistics/src/test/java/com/qniao/Test3.java new file mode 100644 index 0000000..558c595 --- /dev/null +++ b/iot-gizwits-statistics/src/test/java/com/qniao/Test3.java @@ -0,0 +1,19 @@ +package com.qniao; + +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import com.qniao.iot.gizwits.source.GizWitsIotSource; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +public class Test3 { + + public static void main(String[] args) { + + JSONObject obj = JSONUtil.parseObj("{\"country\":\"China\",\"data\":{\"duration_2\":60,\"duration_1\":60,\"cur_speed\":0,\"state_1\":2,\"timestamp_3\":\"53b6f062\",\"timestamp_2\":\"17b6f062\",\"timestamp_1\":\"dbb5f062\",\"state_3\":2,\"state_2\":2,\"duration_3\":60,\"count_3\":4,\"count_2\":0,\"total_3\":2255,\"count_1\":1,\"total_2\":2251,\"sn\":75,\"total_1\":2251,\"signal\":3,\"cur_state\":2},\"city\":\"Unkown\",\"delivery_id\":4,\"ip\":\"117.132.198.66\",\"created_at\":\"2022-08-08T15:08:33\",\"mac\":\"861193040823503\",\"event_type\":\"device_status_kv\",\"product_key\":\"816caf9e2b2141be916f204214461df4\",\"cmd\":\"event_push\",\"msg_id\":\"vDBYUYSLQrmg7dtg+DlqJg\",\"region\":\"Unkown\",\"did\":\"apGAtdzI\"}"); + String created_at = obj.getStr("created_at"); + System.out.println(LocalDateTime.parse(created_at)); + + } +}