From 0d84b81c49173f9f837d34c793f43718abbda450 Mon Sep 17 00:00:00 2001 From: "hupenghui@qniao.cn" <1049970895> Date: Sun, 14 Aug 2022 20:05:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/gizwits/DeviceMonitoringData.java | 5 + .../qniao/iot/gizwits/DeviceTotalData.java | 5 + .../iot/gizwits/IotMonitoringDataJob.java | 162 +++++++++--------- src/test/java/SourceMockerDemo.java | 9 +- 4 files changed, 92 insertions(+), 89 deletions(-) diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java b/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java index a25e528..17ed170 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java +++ b/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java @@ -42,6 +42,11 @@ public class DeviceMonitoringData { */ private Long currJobDuration; + /** + * 当前开机时长 + */ + private Long currDuration; + /** * 数据实际采样时间(单位豪秒) */ diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java index 78c009a..b0f2183 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java +++ b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java @@ -23,6 +23,11 @@ public class DeviceTotalData { */ private Long theDayJobDuration; + /** + * 当天开机时长(单位秒) + */ + private Long theDayDuration; + /** * 累计作业计数 */ diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java index af34ec4..4f86356 100644 --- a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java @@ -76,7 +76,7 @@ public class IotMonitoringDataJob { return requestConfigBuilder; })); - private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" + + private final static String SQL = "select qmrs.status \n" + "from qn_machine_realtime_state qmrs\n" + " LEFT JOIN (select example_id, count_unit from qn_machine_list where is_delete = 0) qml\n" + " ON qmrs.iot_mac = qml.example_id\n" + @@ -129,9 +129,12 @@ public class IotMonitoringDataJob { // 当前周期的待机数据 private ValueState lastWaitJobDataState; - // 上次的状态 + // 上次的工作状态 private ValueState lastWorkingStatState; + // 上次的开机状态 + private ValueState lastPwStatState; + @Override public void open(Configuration parameters) { @@ -153,6 +156,9 @@ public class IotMonitoringDataJob { lastWorkingStatState = getRuntimeContext() .getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class))); + + lastPwStatState = getRuntimeContext() + .getState(new ValueStateDescriptor<>("lastPwStat", TypeInformation.of(Integer.class))); } @Override @@ -162,28 +168,25 @@ public class IotMonitoringDataJob { DeviceTotalData onData = onDataState.value(); MachineIotDataReceivedEvent lastOffData = lastOffDataState.value(); - //MachineIotDataReceivedEvent lastOnData = lastOnDataState.value(); MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value(); Integer lastWorkingStat = lastWorkingStatState.value(); - DeviceTotalData lastedDeviceState = deviceTotalDataStat.value(); + Integer lastPwStat = lastPwStatState.value(); + DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent); // 如果当前消息的时间大于等于上次消息的时间才进行处理 Integer machinePwrStat = receivedEvent.getMachinePwrStat(); Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); lastWorkingStatState.update(machineWorkingStat); + lastPwStatState.update(machinePwrStat); Long reportTime = receivedEvent.getReportTime(); Long accJobCount = receivedEvent.getAccJobCount(); // 1树根 0机智云 Integer dataSource = receivedEvent.getDataSource(); // 当前数据 DeviceTotalData nowDeviceState = new DeviceTotalData(); - if (lastedDeviceState == null) { - lastedDeviceState = getDeviceTotalData(receivedEvent); - lastedDeviceState.setJobTotal(6218646L); - //lastOnData = receivedEvent; - } - if(lastedDeviceState.getLastReportTime() <= receivedEvent.getReportTime()) { + if (lastedDeviceState.getLastReportTime() <= receivedEvent.getReportTime()) { if (lastWorkingStat == null) { lastWorkingStat = getDeviceStateListJson(receivedEvent.getMachineIotMac()); + lastPwStat = lastWorkingStat == 0 ? 0 : 1; } if (onData == null) { onData = lastedDeviceState; @@ -191,110 +194,95 @@ public class IotMonitoringDataJob { } LocalDate localDate = new Date(reportTime).toLocalDate(); Long a; + Long lastReportTime = lastedDeviceState.getLastReportTime(); + if (lastReportTime == null) { + // 如果上次的消息时间为空,那么不进行计算 + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount()); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration()); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal()); + nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal()); + } + // 直接通过两个消息的时间差进行计算(毫秒) + Long workingDuration = reportTime - lastedDeviceState.getLastReportTime(); + // 转为秒 + workingDuration = workingDuration / 1000; if (machinePwrStat.equals(0)) { - if (lastWorkingStat == 1) { - // 如果上次是工作中,那就进行累加 - Long lastReportTime = lastedDeviceState.getLastReportTime(); - if (lastReportTime == null) { - // 如果上次的消息时间为空,那么不进行计算 - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount()); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration()); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal()); - nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal()); - } else { - // 直接通过两个消息的时间差进行计算(毫秒) - Long workingDuration = reportTime - lastedDeviceState.getLastReportTime(); - // 转为秒 - workingDuration = workingDuration / 1000; - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); - if(dataSource == 1) { - // 树根 - nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); - }else { - // 机智云 - Long jobTotal = lastedDeviceState.getJobTotal(); - Long workingJon; - if(accJobCount > jobTotal) { - workingJon = accJobCount - lastedDeviceState.getJobTotal(); - }else { - workingJon = 0L; + if (lastPwStat != 0) { + if (lastWorkingStat == 1) { + // 如果上次是工作中,那就进行累加 + if (lastReportTime != null) { + nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); + if (dataSource == 1) { + // 树根 + nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); + } else { + // 机智云 + Long jobTotal = lastedDeviceState.getJobTotal(); + Long workingJon; + if (accJobCount > jobTotal) { + workingJon = accJobCount - lastedDeviceState.getJobTotal(); + } else { + workingJon = 0L; + } + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); + nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); } - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); - nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); } - } - nowDeviceState.setCurrLocalDate(localDate); - /*if (lastOnData != null) { - nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime()), ZoneId.systemDefault())); + nowDeviceState.setCurrLocalDate(localDate); + nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault())); + nowDeviceState.setLastReportTime(reportTime); + deviceTotalDataStat.update(nowDeviceState); + // 关机后将待机数据清除 + lastWaitJobDataState.update(null); } else { - nowDeviceState.setLastBootTime(onData.getLastBootTime()); - }*/ - nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault())); - nowDeviceState.setLastReportTime(reportTime); - deviceTotalDataStat.update(nowDeviceState); - // 关机后将待机数据清除 - lastWaitJobDataState.update(null); - }else { + nowDeviceState = lastedDeviceState; + deviceTotalDataStat.update(nowDeviceState); + } + } else { nowDeviceState = lastedDeviceState; + deviceTotalDataStat.update(nowDeviceState); } - lastOffDataState.update(receivedEvent); } else { if (machineWorkingStat.equals(1)) { - // 工作中 - Long workingDuration = reportTime - lastedDeviceState.getLastReportTime(); - // 转为秒 - workingDuration = workingDuration /1000; - if(dataSource == 1) { + if (dataSource == 1) { // 树根(今日当前数 + 这次信息点距离上次信息点生产的数量) nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); - }else { + } else { // 机智云 Long jobTotal = lastedDeviceState.getJobTotal(); Long workingJon; - if(accJobCount > jobTotal) { + if (accJobCount > jobTotal) { workingJon = accJobCount - lastedDeviceState.getJobTotal(); - }else { + } else { workingJon = 0L; } nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); } nowDeviceState.setCurrLocalDate(localDate); - //nowDeviceState.setLastBootTime(onData.getLastBootTime()); nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault())); - if (lastWaitJobData != null) { - LocalDateTime lastWaitJobTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(lastWaitJobData.getReportTime()), - ZoneId.systemDefault()); - LocalDateTime localDateTime = LocalDateTime - .ofInstant(Instant.ofEpochMilli(reportTime), - ZoneId.systemDefault()); - a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + a); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + a); - } else { - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); - } + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); nowDeviceState.setLastReportTime(reportTime); deviceTotalDataStat.update(nowDeviceState); - }else { + } else { // 待机 lastWaitJobDataState.update(receivedEvent); nowDeviceState = lastedDeviceState; + deviceTotalDataStat.update(nowDeviceState); } - if (lastOffData != null) { + nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration); + if (lastPwStat == 0) { // 如果上次是关机消息,那么这次就是开机消息 // 记录本次开机作为上次开机时间 - //lastOnDataState.update(receivedEvent); nowDeviceState.setLastReportTime(reportTime); // 记录一个周期的开机时间 onDataState.update(nowDeviceState); onData = nowDeviceState; - lastOffDataState.update(null); } } // 如果上次是待机,并且这次也是待机,那么就不需要发送了 @@ -310,6 +298,7 @@ public class IotMonitoringDataJob { data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); data.setReportTime(reportTime); data.setLastBootTime(onData.getLastReportTime()); + data.setCurrDuration(nowDeviceState.getTheDayDuration()); out.collect(data); } } @@ -350,6 +339,8 @@ public class IotMonitoringDataJob { // es中也没有,直接从老接口拿 data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime); } + // TODO 测试数据,记得删除 + data.setJobTotal(6218646L); value = data; } // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 @@ -366,6 +357,7 @@ public class IotMonitoringDataJob { value.setLastBootTime(LocalDateTime.ofInstant(Instant .ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault())); value.setLastReportTime(deviceMonitoringData.getReportTime()); + value.setTheDayDuration(0L); } else { // value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可 value.setJobTotal(value.getJobTotal()); @@ -374,7 +366,8 @@ public class IotMonitoringDataJob { value.setTheDayJobCount(0L); value.setCurrLocalDate(localDate); value.setLastBootTime(value.getLastBootTime()); - value.setLastReportTime(value.getLastReportTime()); + value.setTheDayDuration(value.getTheDayDuration()); + value.setLastReportTime(reportTime); } } deviceTotalDataStat.update(value); @@ -406,7 +399,7 @@ public class IotMonitoringDataJob { } for (Object o : objects.toArray()) { Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac"); - Long iotMac = Long.parseLong((String)mac); + Long iotMac = Long.parseLong((String) mac); if (iotMac.equals(machineIotMac)) { deviceTotalData = new DeviceTotalData(); Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal"); @@ -436,6 +429,7 @@ public class IotMonitoringDataJob { deviceTotalData.setCurrLocalDate(LocalDate.now()); deviceTotalData.setLastReportTime(reportTime); } + deviceTotalData.setTheDayDuration(0L); return deviceTotalData; } @@ -453,10 +447,10 @@ public class IotMonitoringDataJob { // 创建查询请求对象,将查询对象配置到其中 SearchRequest searchRequest = new SearchRequest("DeviceMonitoringData"); searchRequest.source(searchSourceBuilder); - GetIndexRequest exist=new GetIndexRequest("DeviceMonitoringData"); + GetIndexRequest exist = new GetIndexRequest("DeviceMonitoringData"); // 先判断客户端是否存在 boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); - if(exists) { + if (exists) { // 执行查询,然后处理响应结果 SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); // 根据状态和数据条数验证是否返回了数据 diff --git a/src/test/java/SourceMockerDemo.java b/src/test/java/SourceMockerDemo.java index 01d13f5..d753d93 100644 --- a/src/test/java/SourceMockerDemo.java +++ b/src/test/java/SourceMockerDemo.java @@ -21,7 +21,6 @@ import java.util.concurrent.Future; public class SourceMockerDemo { // 延迟:毫秒 - public static final long DELAY = 1000; public static void main(String[] args) throws Exception { // 创建kafka配置属性 @@ -64,11 +63,11 @@ public class SourceMockerDemo { event.setCurrWaitingDuration(0L); event.setIgStat(0); event.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); - if(f%5 == 0) { - event.setReportTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + event.setReportTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + /*if(f > 20) { }else { event.setReportTime(LocalDateTime.now().plusDays(-1).toInstant(ZoneOffset.of("+8")).toEpochMilli()); - } + }*/ // 递增加一个随机数 event.setCurrJobCount(RandomUtil.randomLong(1, 5)); @@ -90,7 +89,7 @@ public class SourceMockerDemo { Future send = producer.send(record); System.out.println(send.get()); - Thread.sleep(DELAY); + Thread.sleep(RandomUtil.randomLong(1, 5) * 1000); } }