diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java index ba1e910..8c92d84 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java +++ b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java @@ -9,9 +9,9 @@ import java.time.LocalDateTime; public class DeviceTotalData { /** - * 上次开机时间 + * 上次开机时间(时间戳) */ - private LocalDateTime lastBootTime; + private Long lastBootTime; /** * 当天作业计数 @@ -39,9 +39,9 @@ public class DeviceTotalData { private Long jobDurationTotal; /** - * 当前日期 + * 当前日期(格式:yyyy-MM-dd) */ - private LocalDate currLocalDate; + private String currLocalDate; /** * 消息时间 diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java index 5305e53..6b65a57 100644 --- a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java @@ -102,6 +102,8 @@ public class IotMonitoringDataJob { .filter((FilterFunction) value -> value.getReportTime() != null && value.getDataSource() != null && value.getMachinePwrStat() != null); + streamOperator.print().name("kafka 数据源:"); + // mac分组并进行工作时长的集合操作 DataStream machineIotDataReceivedEventDataStream = streamOperator .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) @@ -139,138 +141,142 @@ public class IotMonitoringDataJob { @Override public void processElement(MachineIotDataReceivedEvent receivedEvent, KeyedProcessFunction.Context ctx, - Collector out) throws Exception { - - DeviceTotalData onData = onDataState.value(); - Integer lastWorkingStat = lastWorkingStatState.value(); - Integer lastPwStat = lastPwStatState.value(); - DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent); - // 如果当前消息的时间大于等于上次消息的时间才进行处理 - Integer machinePwrStat = receivedEvent.getMachinePwrStat(); - Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); - lastWorkingStatState.update(machineWorkingStat); - lastPwStatState.update(machinePwrStat); - Long reportTime = receivedEvent.getReportTime(); - Long accJobCount = receivedEvent.getAccJobCount(); - // 1树根 0机智云 - Integer dataSource = receivedEvent.getDataSource(); - // 当前数据 - DeviceTotalData nowDeviceState = new DeviceTotalData(); - if (lastedDeviceState.getReportTime() <= receivedEvent.getReportTime()) { - if (lastWorkingStat == null) { - DeviceState deviceState = getDeviceStateListJson(receivedEvent.getMachineIotMac()); - Integer status = deviceState == null ? null : deviceState.getStatus(); - lastWorkingStat = status == null ? 0 : status; - lastPwStat = lastWorkingStat == 0 ? 0 : 1; - } - if (onData == null) { - onData = lastedDeviceState; - onDataState.update(onData); - } - LocalDate localDate = new Date(reportTime).toLocalDate(); - Long lastReportTime = lastedDeviceState.getReportTime(); - if (lastReportTime == null) { - // 如果上次的消息时间为空,那么不进行计算 - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount()); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration()); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal()); - nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal()); - } - // 直接通过两个消息的时间差进行计算(毫秒) - Long workingDuration = reportTime - lastedDeviceState.getReportTime(); - // 转为秒 - workingDuration = workingDuration / 1000; - if (machinePwrStat.equals(0)) { - if (lastPwStat != 0) { - if (lastWorkingStat == 1) { - // 如果上次是工作中,那就进行累加 - if (lastReportTime != null) { - nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); - if (dataSource == 1) { - // 树根 - nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); - } else { - // 机智云 - Long jobTotal = lastedDeviceState.getJobTotal(); - Long workingJon; - if (accJobCount > jobTotal) { - workingJon = accJobCount - lastedDeviceState.getJobTotal(); + Collector out) { + + try { + DeviceTotalData onData = onDataState.value(); + Integer lastWorkingStat = lastWorkingStatState.value(); + Integer lastPwStat = lastPwStatState.value(); + DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent); + // 如果当前消息的时间大于等于上次消息的时间才进行处理 + Integer machinePwrStat = receivedEvent.getMachinePwrStat(); + Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); + lastWorkingStatState.update(machineWorkingStat); + lastPwStatState.update(machinePwrStat); + Long reportTime = receivedEvent.getReportTime(); + Long accJobCount = receivedEvent.getAccJobCount(); + // 1树根 0机智云 + Integer dataSource = receivedEvent.getDataSource(); + // 当前数据 + DeviceTotalData nowDeviceState = new DeviceTotalData(); + if (lastedDeviceState.getReportTime() <= receivedEvent.getReportTime()) { + if (lastWorkingStat == null) { + DeviceState deviceState = getDeviceStateListJson(receivedEvent.getMachineIotMac()); + Integer status = deviceState == null ? null : deviceState.getStatus(); + lastWorkingStat = status == null ? 0 : status; + lastPwStat = lastWorkingStat == 0 ? 0 : 1; + } + if (onData == null) { + onData = lastedDeviceState; + onDataState.update(onData); + } + LocalDate localDate = new Date(reportTime).toLocalDate(); + Long lastReportTime = lastedDeviceState.getReportTime(); + if (lastReportTime == null) { + // 如果上次的消息时间为空,那么不进行计算 + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount()); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration()); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal()); + nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal()); + } + // 直接通过两个消息的时间差进行计算(毫秒) + Long workingDuration = reportTime - lastedDeviceState.getReportTime(); + // 转为秒 + workingDuration = workingDuration / 1000; + if (machinePwrStat.equals(0)) { + if (lastPwStat != 0) { + if (lastWorkingStat == 1) { + // 如果上次是工作中,那就进行累加 + if (lastReportTime != null) { + nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); + if (dataSource == 1) { + // 树根 + nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); } else { - workingJon = 0L; + // 机智云 + Long jobTotal = lastedDeviceState.getJobTotal(); + Long workingJon; + if (accJobCount > jobTotal) { + workingJon = accJobCount - lastedDeviceState.getJobTotal(); + } else { + workingJon = 0L; + } + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); + nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); } - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); - nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); } + nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + nowDeviceState.setLastBootTime(onData.getReportTime()); + nowDeviceState.setReportTime(reportTime); + } else { + nowDeviceState = lastedDeviceState; } - nowDeviceState.setCurrLocalDate(localDate); - nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getReportTime()), ZoneId.systemDefault())); - nowDeviceState.setReportTime(reportTime); deviceTotalDataStat.update(nowDeviceState); } else { nowDeviceState = lastedDeviceState; deviceTotalDataStat.update(nowDeviceState); } } else { - nowDeviceState = lastedDeviceState; - deviceTotalDataStat.update(nowDeviceState); - } - } else { - if (machineWorkingStat.equals(1)) { - if (dataSource == 1) { - // 树根(今日当前数 + 这次信息点距离上次信息点生产的数量) - nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); - } else { - // 机智云 - Long jobTotal = lastedDeviceState.getJobTotal(); - Long workingJon; - if (accJobCount > jobTotal) { - workingJon = accJobCount - lastedDeviceState.getJobTotal(); + if (machineWorkingStat.equals(1)) { + if (dataSource == 1) { + // 树根(今日当前数 + 这次信息点距离上次信息点生产的数量) + nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); } else { - workingJon = 0L; + // 机智云 + Long jobTotal = lastedDeviceState.getJobTotal(); + Long workingJon; + if (accJobCount > jobTotal) { + workingJon = accJobCount - lastedDeviceState.getJobTotal(); + } else { + workingJon = 0L; + } + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); + nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); } - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); - nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); + nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + nowDeviceState.setLastBootTime(onData.getReportTime()); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); + nowDeviceState.setReportTime(reportTime); + } else { + nowDeviceState = lastedDeviceState; + } + deviceTotalDataStat.update(nowDeviceState); + nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration); + if (lastPwStat == 0) { + // 如果上次是关机消息,那么这次就是开机消息 + // 记录本次开机作为上次开机时间 + nowDeviceState.setReportTime(reportTime); + // 记录一个周期的开机时间 + onDataState.update(nowDeviceState); + onData = nowDeviceState; } - nowDeviceState.setCurrLocalDate(localDate); - nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getReportTime()), ZoneId.systemDefault())); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); - nowDeviceState.setReportTime(reportTime); - } else { - nowDeviceState = lastedDeviceState; } - deviceTotalDataStat.update(nowDeviceState); - nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration); - if (lastPwStat == 0) { - // 如果上次是关机消息,那么这次就是开机消息 - // 记录本次开机作为上次开机时间 - nowDeviceState.setReportTime(reportTime); - // 记录一个周期的开机时间 - onDataState.update(nowDeviceState); - onData = nowDeviceState; + // 如果上次是待机,并且这次也是待机,那么就不需要发送了 + if (!(lastWorkingStat == 2 && machineWorkingStat == 2)) { + DeviceMonitoringData data = new DeviceMonitoringData(); + data.setDataSource(receivedEvent.getDataSource()); + data.setMachineIotMac(receivedEvent.getMachineIotMac()); + data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); + data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); + data.setAccJobCount(nowDeviceState.getJobTotal()); + data.setCurrJobCount(nowDeviceState.getTheDayJobCount()); + data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration()); + data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); + data.setReportTime(reportTime); + data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + data.setLastBootTime(onData.getReportTime()); + data.setCurrDuration(nowDeviceState.getTheDayDuration()); + out.collect(data); } } - // 如果上次是待机,并且这次也是待机,那么就不需要发送了 - if (!(lastWorkingStat == 2 && machineWorkingStat == 2)) { - DeviceMonitoringData data = new DeviceMonitoringData(); - data.setDataSource(receivedEvent.getDataSource()); - data.setMachineIotMac(receivedEvent.getMachineIotMac()); - data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); - data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); - data.setAccJobCount(nowDeviceState.getJobTotal()); - data.setCurrJobCount(nowDeviceState.getTheDayJobCount()); - data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration()); - data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); - data.setReportTime(reportTime); - data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); - data.setLastBootTime(onData.getReportTime()); - data.setCurrDuration(nowDeviceState.getTheDayDuration()); - out.collect(data); - } + }catch (Exception e) { + log.info("导致异常的信息:" + JSONUtil.toJsonStr(receivedEvent)); + log.error("处理异常", e); } } @@ -298,12 +304,10 @@ public class IotMonitoringDataJob { data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); data.setJobTotal(deviceMonitoringData.getAccJobCount()); // 单位秒 - data.setCurrLocalDate(localDate); + data.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); - LocalDateTime ldt = new Date(deviceMonitoringData.getLastBootTime()) - .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime(); - data.setLastBootTime(ldt); + data.setLastBootTime(deviceMonitoringData.getLastBootTime()); data.setReportTime(deviceMonitoringData.getReportTime()); } else { // es中也没有,直接从老接口拿 @@ -312,7 +316,7 @@ public class IotMonitoringDataJob { value = data; } // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 - if (value.getCurrLocalDate().isBefore(localDate)) { + if (LocalDate.parse(value.getCurrLocalDate(), DateTimeFormatter.ofPattern("yyyy-MM-dd")).isBefore(localDate)) { // 先从es中拿昨天最新的 DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond()); @@ -321,9 +325,8 @@ public class IotMonitoringDataJob { value.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); value.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); value.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); - value.setCurrLocalDate(localDate); - value.setLastBootTime(LocalDateTime.ofInstant(Instant - .ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault())); + value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + value.setLastBootTime(deviceMonitoringData.getLastBootTime()); value.setReportTime(deviceMonitoringData.getReportTime()); value.setTheDayDuration(0L); } else { @@ -332,7 +335,7 @@ public class IotMonitoringDataJob { value.setJobDurationTotal(value.getJobDurationTotal()); value.setTheDayJobDuration(0L); value.setTheDayJobCount(0L); - value.setCurrLocalDate(localDate); + value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); value.setLastBootTime(value.getLastBootTime()); value.setTheDayDuration(0L); value.setReportTime(reportTime); @@ -377,10 +380,10 @@ public class IotMonitoringDataJob { Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime"); LocalDateTime lastBootTime = LocalDateTime .parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); - deviceTotalData.setLastBootTime(lastBootTime); + deviceTotalData.setLastBootTime(lastBootTime.toInstant(ZoneOffset.of("+8")).toEpochMilli()); deviceTotalData.setTheDayJobDuration(0L); deviceTotalData.setTheDayJobCount(0L); - deviceTotalData.setCurrLocalDate(LocalDate.now()); + deviceTotalData.setCurrLocalDate(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); deviceTotalData.setReportTime(reportTime); stop = true; break; @@ -391,10 +394,10 @@ public class IotMonitoringDataJob { deviceTotalData = new DeviceTotalData(); deviceTotalData.setJobTotal(0L); deviceTotalData.setJobDurationTotal(0L); - deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime), ZoneId.systemDefault())); + deviceTotalData.setLastBootTime(reportTime); deviceTotalData.setTheDayJobDuration(0L); deviceTotalData.setTheDayJobCount(0L); - deviceTotalData.setCurrLocalDate(LocalDate.now()); + deviceTotalData.setCurrLocalDate(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); deviceTotalData.setReportTime(reportTime); } deviceTotalData.setTheDayDuration(0L); diff --git a/src/test/java/Demo2.java b/src/test/java/Demo2.java index 5e0d16e..2765cfc 100644 --- a/src/test/java/Demo2.java +++ b/src/test/java/Demo2.java @@ -2,6 +2,8 @@ import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; +import java.time.LocalDate; + public class Demo2 { public static void main(String[] args) { @@ -14,9 +16,5 @@ public class Demo2 { System.out.println(data);*/ - Long a = 2314L; -a = a/1000; - - System.out.println(a); } }