diff --git a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java index 9b71164..ab6e18c 100644 --- a/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java +++ b/iot-gizwits-statistics/src/main/java/com/qniao/iot/gizwits/GizWitsIotDataFormatterJob.java @@ -31,6 +31,7 @@ import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.gizwits.noti.noticlient.bean.resp.NotiRespPushEvents; +import com.gizwits.noti.noticlient.bean.resp.body.AbstractPushEventBody; import com.gizwits.noti.noticlient.bean.resp.body.OffLineEventBody; import com.gizwits.noti.noticlient.bean.resp.body.OnLineEventBody; import com.gizwits.noti.noticlient.util.CommandUtils; @@ -43,6 +44,7 @@ import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaSerializationSchema; import com.qniao.iot.rc.constant.DataSource; import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; @@ -63,6 +65,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.util.Collector; + import java.time.*; import java.time.format.DateTimeFormatter; import java.util.*; @@ -84,8 +87,11 @@ public class GizWitsIotDataFormatterJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); + + // 添加机智云数据源 DataStreamSource streamSource = env.addSource(new GizWitsIotSource()); + // 把机智云的数据转成我们自己的格式 SingleOutputStreamOperator transformDs = streamSource .flatMap(new RichFlatMapFunction() { @@ -187,15 +193,28 @@ public class GizWitsIotDataFormatterJob { machineIotDataReceivedEvent.setId(snowflake.nextId()); machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(mac)); - machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); machineIotDataReceivedEvent.setMachinePwrStat(0); machineIotDataReceivedEvent.setMachineWorkingStat(0); - machineIotDataReceivedEvent.setCurrJobCount(0L); - machineIotDataReceivedEvent.setCurrJobDuration(0L); - machineIotDataReceivedEvent.setCurrStoppingDuration(0L); + machineIotDataReceivedEvent.setAccJobCount(0L); + machineIotDataReceivedEvent.setCurrCount(0L); + machineIotDataReceivedEvent.setCurrDuration(0L); + machineIotDataReceivedEvent.setJobDurationOfTheDay(0L); + machineIotDataReceivedEvent.setJobCountOfTheDay(0L); + machineIotDataReceivedEvent.setDurationOfThePeriod(0L); + machineIotDataReceivedEvent.setCountOfThePeriod(0L); machineIotDataReceivedEvent.setCurrWaitingDuration(0L); + machineIotDataReceivedEvent.setCurrStoppingDuration(0L); + machineIotDataReceivedEvent.setIgStat(null); machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); - machineIotDataReceivedEvent.setReportTime(offLineEventBody.getCreatedAt()); + Long createdAt = offLineEventBody.getCreatedAt(); + if(createdAt != null) { + String createdAtStr = StrUtil.toString(createdAt); + if(createdAtStr.length() == 10) { + // 秒转为毫秒 + createdAtStr = createdAtStr + "000"; + } + machineIotDataReceivedEvent.setReportTime(Long.parseLong(createdAtStr)); + } receivedEventList.add(machineIotDataReceivedEvent); return receivedEventList; } @@ -208,14 +227,28 @@ public class GizWitsIotDataFormatterJob { machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(mac)); machineIotDataReceivedEvent.setMachinePwrStat(1); - // 上线之后的工作状态是待机中 machineIotDataReceivedEvent.setMachineWorkingStat(2); - machineIotDataReceivedEvent.setCurrJobCount(0L); - machineIotDataReceivedEvent.setCurrJobDuration(0L); - machineIotDataReceivedEvent.setCurrStoppingDuration(0L); + machineIotDataReceivedEvent.setAccJobCount(0L); + machineIotDataReceivedEvent.setCurrCount(0L); + machineIotDataReceivedEvent.setCurrDuration(0L); + machineIotDataReceivedEvent.setJobDurationOfTheDay(0L); + machineIotDataReceivedEvent.setJobCountOfTheDay(0L); + machineIotDataReceivedEvent.setDurationOfThePeriod(0L); + machineIotDataReceivedEvent.setCountOfThePeriod(0L); machineIotDataReceivedEvent.setCurrWaitingDuration(0L); + machineIotDataReceivedEvent.setCurrStoppingDuration(0L); + machineIotDataReceivedEvent.setIgStat(null); machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); - machineIotDataReceivedEvent.setReportTime(onLineEventBody.getCreatedAt()); + // 上线之后的工作状态是待机中 + Long createdAt = onLineEventBody.getCreatedAt(); + if(createdAt != null) { + String createdAtStr = StrUtil.toString(createdAt); + if(createdAtStr.length() == 10) { + // 秒转为毫秒 + createdAtStr = createdAtStr + "000"; + } + machineIotDataReceivedEvent.setReportTime(Long.parseLong(createdAtStr)); + } receivedEventList.add(machineIotDataReceivedEvent); return receivedEventList; } @@ -229,19 +262,29 @@ public class GizWitsIotDataFormatterJob { machineIotDataReceivedEvent.setDataSource(DataSource.TACT_CLOUD); machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(deviceStatus.getMac())); machineIotDataReceivedEvent.setMachinePwrStat(1); - Long count = deviceStatus.getCount(); - if (count == null || count == 0) { - machineIotDataReceivedEvent.setMachineWorkingStat(2); - } else { - machineIotDataReceivedEvent.setMachineWorkingStat(1); - } - machineIotDataReceivedEvent.setCurrJobCount(count); - machineIotDataReceivedEvent.setCurrJobDuration(deviceStatus.getDuration()); - machineIotDataReceivedEvent.setCurrStoppingDuration(0L); - machineIotDataReceivedEvent.setCurrWaitingDuration(0L); + machineIotDataReceivedEvent.setMachineWorkingStat(1); machineIotDataReceivedEvent.setAccJobCount(0L); + machineIotDataReceivedEvent.setCurrDuration(deviceStatus.getDuration()); + machineIotDataReceivedEvent.setJobDurationOfTheDay(0L); + machineIotDataReceivedEvent.setJobCountOfTheDay(0L); + machineIotDataReceivedEvent.setDurationOfThePeriod(0L); + machineIotDataReceivedEvent.setCountOfThePeriod(deviceStatus.getTotal()); + machineIotDataReceivedEvent.setCurrWaitingDuration(0L); + machineIotDataReceivedEvent.setCurrStoppingDuration(0L); + machineIotDataReceivedEvent.setIgStat(null); + Long count = deviceStatus.getCount(); + machineIotDataReceivedEvent.setCurrCount(count); machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); - machineIotDataReceivedEvent.setReportTime(deviceStatus.getTimestamp().getTime()); + Date timestamp = deviceStatus.getTimestamp(); + if(timestamp != null) { + Long createdAt = deviceStatus.getTimestamp().getTime(); + String createdAtStr = StrUtil.toString(createdAt); + if(createdAtStr.length() == 10) { + // 秒转为毫秒 + createdAtStr = createdAtStr + "000"; + } + machineIotDataReceivedEvent.setReportTime(Long.parseLong(createdAtStr)); + } receivedEventList.add(machineIotDataReceivedEvent); }); return receivedEventList;