Browse Source

更新

hph-优化版本
1049970895@qniao.cn 3 years ago
parent
commit
5e59e517cc
1 changed files with 64 additions and 21 deletions
  1. 85
      iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java

85
iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java

@ -31,6 +31,7 @@ import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gizwits.noti.noticlient.bean.resp.NotiRespPushEvents;
import com.gizwits.noti.noticlient.bean.resp.body.AbstractPushEventBody;
import com.gizwits.noti.noticlient.bean.resp.body.OffLineEventBody;
import com.gizwits.noti.noticlient.bean.resp.body.OnLineEventBody;
import com.gizwits.noti.noticlient.util.CommandUtils;
@ -43,6 +44,7 @@ import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaSerializationSchema;
import com.qniao.iot.rc.constant.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
@ -63,6 +65,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.Collector;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
@ -84,8 +87,11 @@ public class GizWitsIotDataFormatterJob {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 添加机智云数据源
DataStreamSource<com.alibaba.fastjson.JSONObject> streamSource = env.addSource(new GizWitsIotSource());
// 把机智云的数据转成我们自己的格式
SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = streamSource
.flatMap(new RichFlatMapFunction<JSONObject, MachineIotDataReceivedEvent>() {
@ -187,15 +193,28 @@ public class GizWitsIotDataFormatterJob {
machineIotDataReceivedEvent.setId(snowflake.nextId());
machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD);
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(mac));
machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD);
machineIotDataReceivedEvent.setMachinePwrStat(0);
machineIotDataReceivedEvent.setMachineWorkingStat(0);
machineIotDataReceivedEvent.setCurrJobCount(0L);
machineIotDataReceivedEvent.setCurrJobDuration(0L);
machineIotDataReceivedEvent.setCurrStoppingDuration(0L);
machineIotDataReceivedEvent.setAccJobCount(0L);
machineIotDataReceivedEvent.setCurrCount(0L);
machineIotDataReceivedEvent.setCurrDuration(0L);
machineIotDataReceivedEvent.setJobDurationOfTheDay(0L);
machineIotDataReceivedEvent.setJobCountOfTheDay(0L);
machineIotDataReceivedEvent.setDurationOfThePeriod(0L);
machineIotDataReceivedEvent.setCountOfThePeriod(0L);
machineIotDataReceivedEvent.setCurrWaitingDuration(0L);
machineIotDataReceivedEvent.setCurrStoppingDuration(0L);
machineIotDataReceivedEvent.setIgStat(null);
machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis());
machineIotDataReceivedEvent.setReportTime(offLineEventBody.getCreatedAt());
Long createdAt = offLineEventBody.getCreatedAt();
if(createdAt != null) {
String createdAtStr = StrUtil.toString(createdAt);
if(createdAtStr.length() == 10) {
// 秒转为毫秒
createdAtStr = createdAtStr + "000";
}
machineIotDataReceivedEvent.setReportTime(Long.parseLong(createdAtStr));
}
receivedEventList.add(machineIotDataReceivedEvent);
return receivedEventList;
}
@ -208,14 +227,28 @@ public class GizWitsIotDataFormatterJob {
machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD);
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(mac));
machineIotDataReceivedEvent.setMachinePwrStat(1);
// 上线之后的工作状态是待机中
machineIotDataReceivedEvent.setMachineWorkingStat(2);
machineIotDataReceivedEvent.setCurrJobCount(0L);
machineIotDataReceivedEvent.setCurrJobDuration(0L);
machineIotDataReceivedEvent.setCurrStoppingDuration(0L);
machineIotDataReceivedEvent.setAccJobCount(0L);
machineIotDataReceivedEvent.setCurrCount(0L);
machineIotDataReceivedEvent.setCurrDuration(0L);
machineIotDataReceivedEvent.setJobDurationOfTheDay(0L);
machineIotDataReceivedEvent.setJobCountOfTheDay(0L);
machineIotDataReceivedEvent.setDurationOfThePeriod(0L);
machineIotDataReceivedEvent.setCountOfThePeriod(0L);
machineIotDataReceivedEvent.setCurrWaitingDuration(0L);
machineIotDataReceivedEvent.setCurrStoppingDuration(0L);
machineIotDataReceivedEvent.setIgStat(null);
machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis());
machineIotDataReceivedEvent.setReportTime(onLineEventBody.getCreatedAt());
// 上线之后的工作状态是待机中
Long createdAt = onLineEventBody.getCreatedAt();
if(createdAt != null) {
String createdAtStr = StrUtil.toString(createdAt);
if(createdAtStr.length() == 10) {
// 秒转为毫秒
createdAtStr = createdAtStr + "000";
}
machineIotDataReceivedEvent.setReportTime(Long.parseLong(createdAtStr));
}
receivedEventList.add(machineIotDataReceivedEvent);
return receivedEventList;
}
@ -229,19 +262,29 @@ public class GizWitsIotDataFormatterJob {
machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD);
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(deviceStatus.getMac()));
machineIotDataReceivedEvent.setMachinePwrStat(1);
Long count = deviceStatus.getCount();
if (count == null || count == 0) {
machineIotDataReceivedEvent.setMachineWorkingStat(2);
} else {
machineIotDataReceivedEvent.setMachineWorkingStat(1);
}
machineIotDataReceivedEvent.setCurrJobCount(count);
machineIotDataReceivedEvent.setCurrJobDuration(deviceStatus.getDuration());
machineIotDataReceivedEvent.setCurrStoppingDuration(0L);
machineIotDataReceivedEvent.setCurrWaitingDuration(0L);
machineIotDataReceivedEvent.setMachineWorkingStat(1);
machineIotDataReceivedEvent.setAccJobCount(0L);
machineIotDataReceivedEvent.setCurrDuration(deviceStatus.getDuration());
machineIotDataReceivedEvent.setJobDurationOfTheDay(0L);
machineIotDataReceivedEvent.setJobCountOfTheDay(0L);
machineIotDataReceivedEvent.setDurationOfThePeriod(0L);
machineIotDataReceivedEvent.setCountOfThePeriod(deviceStatus.getTotal());
machineIotDataReceivedEvent.setCurrWaitingDuration(0L);
machineIotDataReceivedEvent.setCurrStoppingDuration(0L);
machineIotDataReceivedEvent.setIgStat(null);
Long count = deviceStatus.getCount();
machineIotDataReceivedEvent.setCurrCount(count);
machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis());
machineIotDataReceivedEvent.setReportTime(deviceStatus.getTimestamp().getTime());
Date timestamp = deviceStatus.getTimestamp();
if(timestamp != null) {
Long createdAt = deviceStatus.getTimestamp().getTime();
String createdAtStr = StrUtil.toString(createdAt);
if(createdAtStr.length() == 10) {
// 秒转为毫秒
createdAtStr = createdAtStr + "000";
}
machineIotDataReceivedEvent.setReportTime(Long.parseLong(createdAtStr));
}
receivedEventList.add(machineIotDataReceivedEvent);
});
return receivedEventList;

Loading…
Cancel
Save