Browse Source

更新

hph-优化版本
1049970895@qniao.cn 3 years ago
parent
commit
9870214bfb
6 changed files with 109 additions and 37 deletions
  1. 2
      .gitignore
  2. 81
      iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java
  3. 1
      iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/DataParsingUtils.java
  4. 21
      iot-gizwits-statistics/src/test/java/com/qniao/Test1.java
  5. 22
      iot-gizwits-statistics/src/test/java/com/qniao/Test2.java
  6. 19
      iot-gizwits-statistics/src/test/java/com/qniao/Test3.java

2
.gitignore

@ -0,0 +1,2 @@
*/target/*
/.idea/*

81
iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java

@ -19,6 +19,11 @@
package com.qniao.iot.gizwits;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.CalendarUtil;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
@ -37,47 +42,42 @@ import com.qniao.iot.gizwits.util.SnowFlake;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaSerializationSchema;
import com.qniao.iot.rc.constant.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
/**
* 机智云设备数据转换
*
* @author hph
*/
@Slf4j
public class GizWitsIotDataFormatterJob {
static SnowFlake snowflake = new SnowFlake(
Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)),
Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID))
);
Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID)));
public static void main(String[] args) throws Exception {
@ -92,13 +92,13 @@ public class GizWitsIotDataFormatterJob {
@Override
public void flatMap(JSONObject value, Collector<MachineIotDataReceivedEvent> out) {
List<MachineIotDataReceivedEvent> receivedEvents = transform(value);
if(CollUtil.isNotEmpty(receivedEvents)) {
if (CollUtil.isNotEmpty(receivedEvents)) {
receivedEvents.forEach(out::collect);
}
}
}).name("Transform MachineIotDataReceivedEvent");
//kafka 认证配置暂时注释后续可能需要放开
// kafka 认证配置暂时注释后续可能需要放开
/*Properties kafkaProducerConfig = new Properties();
kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
@ -106,7 +106,6 @@ public class GizWitsIotDataFormatterJob {
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";");
*/
// 写入kafka
transformDs.sinkTo(
KafkaSink.<MachineIotDataReceivedEvent>builder()
@ -126,7 +125,14 @@ public class GizWitsIotDataFormatterJob {
StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
new Path(outputPath),
new SimpleStringEncoder<String>(CharsetUtil.UTF_8)
).withBucketAssigner(new BucketAssigner<String, String>() {
).withRollingPolicy(DefaultRollingPolicy.builder()
// 每隔多长时间生成一个文件
.withRolloverInterval(Duration.ofHours(12))
// 默认60秒,未写入数据处于不活跃状态超时会滚动新文件
.withInactivityInterval(Duration.ofHours(12))
// 设置每个文件的最大大小 ,默认是128M
.withMaxPartSize(MemorySize.ofMebiBytes(128 * 1024 * 1024))
.build()).withBucketAssigner(new BucketAssigner<String, String>() {
@Override
public String getBucketId(String element, Context context) {
@ -136,16 +142,16 @@ public class GizWitsIotDataFormatterJob {
// 获取设备状态目录名称
String deviceStatusStr = getDeviceStatusStr(receivedEvent.getMachineWorkingStat());
return deviceStatusStr + "/" + LocalDateTime.ofInstant(Instant.ofEpochMilli(receivedTime), ZoneOffset.of("+8"))
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "/" + receivedEvent.getMachineIotMac();
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + "/" + receivedEvent.getMachineIotMac();
}
private String getDeviceStatusStr(Integer machineWorkingStat) {
if(machineWorkingStat == 1){
if (machineWorkingStat == 1) {
return "deviceWorking";
}else if(machineWorkingStat == 2){
} else if (machineWorkingStat == 2) {
return "deviceWaiting";
}else {
} else {
return "deviceOff";
}
}
@ -155,6 +161,7 @@ public class GizWitsIotDataFormatterJob {
return SimpleVersionedStringSerializer.INSTANCE;
}
}).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build();
transformDs.map(new RichMapFunction<MachineIotDataReceivedEvent, String>() {
@Override
public String map(MachineIotDataReceivedEvent value) {
@ -172,11 +179,11 @@ public class GizWitsIotDataFormatterJob {
MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent();
if (ObjectUtil.isNotEmpty(event)) {
String pushEventCode = CommandUtils.getPushEventCode(event);
if(NotiRespPushEvents.DEVICE_OFFLINE.getCode().equals(pushEventCode)) {
// 设备下线
if (NotiRespPushEvents.DEVICE_OFFLINE.getCode().equals(pushEventCode)) {
// 设备下线云盒下线
OffLineEventBody offLineEventBody = CommandUtils.parsePushEvent(event, OffLineEventBody.class);
String mac = offLineEventBody.getMac();
if(StrUtil.isNotEmpty(mac)) {
if (StrUtil.isNotEmpty(mac)) {
machineIotDataReceivedEvent.setId(snowflake.nextId());
machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD);
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(mac));
@ -192,11 +199,11 @@ public class GizWitsIotDataFormatterJob {
receivedEventList.add(machineIotDataReceivedEvent);
return receivedEventList;
}
}else if(NotiRespPushEvents.DEVICE_ONLINE.getCode().equals(pushEventCode)) {
// 设备上线
} else if (NotiRespPushEvents.DEVICE_ONLINE.getCode().equals(pushEventCode)) {
// 设备上线云盒上线连接网络
OnLineEventBody onLineEventBody = CommandUtils.parsePushEvent(event, OnLineEventBody.class);
String mac = onLineEventBody.getMac();
if(StrUtil.isNotEmpty(mac)) {
if (StrUtil.isNotEmpty(mac)) {
machineIotDataReceivedEvent.setId(snowflake.nextId());
machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD);
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(mac));
@ -211,22 +218,26 @@ public class GizWitsIotDataFormatterJob {
receivedEventList.add(machineIotDataReceivedEvent);
return receivedEventList;
}
}else if(NotiRespPushEvents.DEVICE_STATUS_KV.getCode().equals(pushEventCode)) {
// 设备生产数据
List<Map<String,Object>> mapList = DataParsingUtils.deviceStatusKvParsing(event);
mapList.forEach(e ->{
} else if (NotiRespPushEvents.DEVICE_STATUS_KV.getCode().equals(pushEventCode)) {
// 设备生产数据云盒上线并且上报机器的数据
List<Map<String, Object>> mapList = DataParsingUtils.deviceStatusKvParsing(event);
mapList.forEach(e -> {
JSONObject j = (JSONObject) JSON.toJSON(e);
DeviceStatus deviceStatus = j.toJavaObject(DeviceStatus.class);
machineIotDataReceivedEvent.setId(snowflake.nextId());
machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD);
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(deviceStatus.getMac()));
machineIotDataReceivedEvent.setMachinePwrStat(1);
machineIotDataReceivedEvent.setMachineWorkingStat(1);
machineIotDataReceivedEvent.setCurrJobCount(deviceStatus.getCount());
machineIotDataReceivedEvent.setCurrJobDuration(deviceStatus.getDuration());
if (deviceStatus.getTimestamp() == null) {
machineIotDataReceivedEvent.setMachineWorkingStat(2);
} else {
machineIotDataReceivedEvent.setMachineWorkingStat(1);
}
machineIotDataReceivedEvent.setCurrJobCount(deviceStatus.getTotal());
machineIotDataReceivedEvent.setCurrJobDuration(0L);
machineIotDataReceivedEvent.setCurrStoppingDuration(0L);
machineIotDataReceivedEvent.setCurrWaitingDuration(0L);
machineIotDataReceivedEvent.setAccJobCount(deviceStatus.getTotal());
machineIotDataReceivedEvent.setAccJobCount(0L);
machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis());
machineIotDataReceivedEvent.setReportTime(deviceStatus.getTimestamp().getTime());
receivedEventList.add(machineIotDataReceivedEvent);

1
iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/util/DataParsingUtils.java

@ -86,6 +86,7 @@ public class DataParsingUtils {
}
public static List<Map<String,Object>> deviceStatusKvParsing(JSONObject message){
String[] row = new String[]{"cmd",CREATED_AT,"delivery_id","did","event_type","mac","msg_id","product_key"};
String[] cl = new String[]{"sn","signal","cur_state","cur_speed"};
List<Map<String,Object>> list = new ArrayList<>();

21
iot-gizwits-statistics/src/test/java/com/qniao/Test1.java

@ -1,15 +1,32 @@
package com.qniao;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.gizwits.noti.noticlient.util.CommandUtils;
import com.qniao.iot.gizwits.source.GizWitsIotSource;
import com.qniao.iot.gizwits.util.DataParsingUtils;
import org.apache.commons.lang3.StringUtils;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class Test1 {
public static void main(String[] args) {
Long receivedTime = LocalDateTime.now().toInstant(ZoneOffset.ofHours(+8)).toEpochMilli();
/*Long receivedTime = LocalDateTime.now().toInstant(ZoneOffset.ofHours(+8)).toEpochMilli();
System.out.println(LocalDateTime.ofInstant(Instant.ofEpochMilli(receivedTime), ZoneOffset.of("+8"))
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));*/
JSONObject event = JSONObject.parseObject("{\"country\":\"China\",\"data\":{\"duration_2\":60,\"duration_1\":60,\"cur_speed\":0,\"state_1\":2,\"timestamp_3\":\"53b6f062\",\"timestamp_2\":\"17b6f062\",\"timestamp_1\":\"dbb5f062\",\"state_3\":2,\"state_2\":2,\"duration_3\":60,\"count_3\":4,\"count_2\":0,\"total_3\":2255,\"count_1\":1,\"total_2\":2251,\"sn\":75,\"total_1\":2251,\"signal\":3,\"cur_state\":2},\"city\":\"Unkown\",\"delivery_id\":4,\"ip\":\"117.132.198.66\",\"created_at\":1659942513.65440893173,\"mac\":\"861193040823503\",\"event_type\":\"device_status_kv\",\"product_key\":\"816caf9e2b2141be916f204214461df4\",\"cmd\":\"event_push\",\"msg_id\":\"vDBYUYSLQrmg7dtg+DlqJg\",\"region\":\"Unkown\",\"did\":\"apGAtdzI\"}");
List<Map<String,Object>> mapList = DataParsingUtils.deviceStatusKvParsing(event);
System.out.println(StrUtil.toString(mapList));
}
}

22
iot-gizwits-statistics/src/test/java/com/qniao/Test2.java

@ -0,0 +1,22 @@
package com.qniao;
import com.gizwits.noti.noticlient.util.CommandUtils;
import com.qniao.iot.gizwits.source.GizWitsIotSource;
import java.util.Arrays;
public class Test2 {
public static void main(String[] args) {
/*Long receivedTime = LocalDateTime.now().toInstant(ZoneOffset.ofHours(+8)).toEpochMilli();
System.out.println(LocalDateTime.ofInstant(Instant.ofEpochMilli(receivedTime), ZoneOffset.of("+8"))
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));*/
GizWitsIotSource gizWitsIotSource = new GizWitsIotSource();
gizWitsIotSource.open(null);
gizWitsIotSource.run(null);
}
}

19
iot-gizwits-statistics/src/test/java/com/qniao/Test3.java

@ -0,0 +1,19 @@
package com.qniao;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.gizwits.source.GizWitsIotSource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class Test3 {
public static void main(String[] args) {
JSONObject obj = JSONUtil.parseObj("{\"country\":\"China\",\"data\":{\"duration_2\":60,\"duration_1\":60,\"cur_speed\":0,\"state_1\":2,\"timestamp_3\":\"53b6f062\",\"timestamp_2\":\"17b6f062\",\"timestamp_1\":\"dbb5f062\",\"state_3\":2,\"state_2\":2,\"duration_3\":60,\"count_3\":4,\"count_2\":0,\"total_3\":2255,\"count_1\":1,\"total_2\":2251,\"sn\":75,\"total_1\":2251,\"signal\":3,\"cur_state\":2},\"city\":\"Unkown\",\"delivery_id\":4,\"ip\":\"117.132.198.66\",\"created_at\":\"2022-08-08T15:08:33\",\"mac\":\"861193040823503\",\"event_type\":\"device_status_kv\",\"product_key\":\"816caf9e2b2141be916f204214461df4\",\"cmd\":\"event_push\",\"msg_id\":\"vDBYUYSLQrmg7dtg+DlqJg\",\"region\":\"Unkown\",\"did\":\"apGAtdzI\"}");
String created_at = obj.getStr("created_at");
System.out.println(LocalDateTime.parse(created_at));
}
}
Loading…
Cancel
Save