diff --git a/iot-device-power-on-and-off-data-event/src/main/java/com/qniao/iot/device/power/event/IotDevicePowerOnAndOffDataEvent.java b/iot-device-power-on-and-off-data-event/src/main/java/com/qniao/iot/device/power/event/IotDevicePowerOnAndOffDataEvent.java index 376614f..ee42fd1 100644 --- a/iot-device-power-on-and-off-data-event/src/main/java/com/qniao/iot/device/power/event/IotDevicePowerOnAndOffDataEvent.java +++ b/iot-device-power-on-and-off-data-event/src/main/java/com/qniao/iot/device/power/event/IotDevicePowerOnAndOffDataEvent.java @@ -39,12 +39,12 @@ public class IotDevicePowerOnAndOffDataEvent implements Serializable { private Integer machineWorkingStat; /** - * 当前作业计数 + * 当前作业计数(开机到关机) */ private Long currJobCount; /** - * 当前作业时长 + * 当前作业时长(开机到关机) */ private Long currJobDuration; @@ -67,4 +67,9 @@ public class IotDevicePowerOnAndOffDataEvent implements Serializable { * 实际接收到数据的时间 */ private Long receivedTime; + + /** + * 累加作业总数 + */ + private Long accJobCount; } diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index af8df5e..968fd39 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -1,6 +1,8 @@ package com.qniao.iot.device.power; import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.util.NumberUtil; +import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.qniao.iot.device.power.config.ApolloConfig; import com.qniao.iot.device.power.constant.ConfigConstant; @@ -37,6 +39,7 @@ import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -45,11 +48,9 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.ZoneOffset; +import java.time.*; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; @@ -120,6 +121,11 @@ public class IotDevicePowerOnAndOffDataJob { Collector out) throws Exception { IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event); + Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); + Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount(); + Long accJobCount = event.getAccJobCount(); + Long currJobCount = event.getCurrJobCount(); + Integer dataSource = event.getDataSource(); Integer machinePwrStat = event.getMachinePwrStat(); IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); powerOnAndOffDataEvent.setId(snowflake.nextId()); @@ -132,33 +138,46 @@ public class IotDevicePowerOnAndOffDataJob { powerOnAndOffDataEvent.setReportTime(event.getReportTime()); powerOnAndOffDataEvent.setReceivedTime(LocalDateTime .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); - if(lastPowerOnAndOffDataEvent == null) { - // 如果上一次是空的,那么只能处理开机数据 + if(MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) { if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { - powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); - powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); - out.collect(powerOnAndOffDataEvent); + // 上次是关机,但是这次是开机,说明周期产能从新开始 + if(dataSource == 1) { + // 根云 + powerOnAndOffDataEvent.setCurrJobCount(accJobCount - lastAccJobCount); + }else { + // 机智云 + powerOnAndOffDataEvent.setCurrJobCount(currJobCount); + } } }else { - // 上次的状态只有两种,要么只有开机时间不为空,要么是开机和关机时间都不为空,否则不处理 - if(lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { - if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { - // 只有开机时间不为空 - if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { - powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); - }else { - powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime()); - } + Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount(); + // 直接累加 + if(dataSource == 1) { + // 根云 + powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + (accJobCount - lastAccJobCount)); + }else { + // 机智云 + powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount); + } + } + // 上次的状态只有两种,要么只有开机时间不为空,要么是开机和关机时间都不为空,否则不处理 + if(lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { + if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { + // 只有开机时间不为空 + if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { + powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); + }else { + powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime()); + } + powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); + out.collect(powerOnAndOffDataEvent); + } else { + // // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 + if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { + // 开机 + powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); out.collect(powerOnAndOffDataEvent); - } else { - // // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 - if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { - // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); - powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); - out.collect(powerOnAndOffDataEvent); - } } } } @@ -173,56 +192,97 @@ public class IotDevicePowerOnAndOffDataJob { return iotDevicePowerOnAndOffDataEvent; } - private IotDevicePowerOnAndOffDataEvent getByEs(MachineIotDataReceivedEvent event) { + private IotDevicePowerOnAndOffDataEvent getByEs(MachineIotDataReceivedEvent event) throws IOException { + + // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", event.getMachineIotMac())); + searchSourceBuilder.sort("receivedTime", SortOrder.DESC); + searchSourceBuilder.size(1); + // 创建查询请求对象,将查询对象配置到其中 + SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)); + searchRequest.source(searchSourceBuilder); + String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); + GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate); + // 先判断索引是否存在 + boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); + if (exists) { + // 执行查询,然后处理响应结果 + SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + // 根据状态和数据条数验证是否返回了数据 + if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { + SearchHits hits = searchResponse.getHits(); + SearchHit reqHit = hits.getHits()[0]; + return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class); + } + } + // 如果没有就找清洗后的数据 + MachineIotDataReceivedEvent deviceMonitoringData = getMachineIotDataReceivedEvent(event.getMachineIotMac()); + IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); + powerOnAndOffDataEvent.setId(snowflake.nextId()); + if(deviceMonitoringData != null) { + powerOnAndOffDataEvent.setDataSource(deviceMonitoringData.getDataSource()); + powerOnAndOffDataEvent.setMachineIotMac(deviceMonitoringData.getMachineIotMac()); + powerOnAndOffDataEvent.setCurrJobCount(0L); + powerOnAndOffDataEvent.setCurrJobDuration(0L); + Integer machinePwrStat = deviceMonitoringData.getMachinePwrStat(); + powerOnAndOffDataEvent.setMachinePwrStat(deviceMonitoringData.getMachinePwrStat()); + powerOnAndOffDataEvent.setMachineWorkingStat(deviceMonitoringData.getMachineWorkingStat()); + Long reportTime = deviceMonitoringData.getReportTime(); + if (machinePwrStat == 1) { + // 开机 + powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); + } else { + // 关机 + powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); + } + powerOnAndOffDataEvent.setReportTime(reportTime); + }else { + powerOnAndOffDataEvent.setDataSource(event.getDataSource()); + powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); + powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); + powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); + Integer machinePwrStat = event.getMachinePwrStat(); + powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat()); + powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat()); + Long reportTime = event.getReportTime(); + if (machinePwrStat == 1) { + // 开机 + powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); + } else { + // 关机 + powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); + } + powerOnAndOffDataEvent.setReportTime(reportTime); + } + powerOnAndOffDataEvent.setReceivedTime(LocalDateTime + .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); + return powerOnAndOffDataEvent; + } + + private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac) { - try { - // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) + try{ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", event.getMachineIotMac())); - searchSourceBuilder.sort("receivedTime", SortOrder.DESC); + BoolQueryBuilder bool = new BoolQueryBuilder(); + BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", machineIotMac)); searchSourceBuilder.size(1); - // 创建查询请求对象,将查询对象配置到其中 - SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)); - searchRequest.source(searchSourceBuilder); - String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); - GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate); - // 先判断索引是否存在 - boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); - if (exists) { - // 执行查询,然后处理响应结果 - SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); - // 根据状态和数据条数验证是否返回了数据 - if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { - SearchHits hits = searchResponse.getHits(); - SearchHit reqHit = hits.getHits()[0]; - return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class); - } else { - // 如果没有就自定义 - IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); - powerOnAndOffDataEvent.setId(snowflake.nextId()); - powerOnAndOffDataEvent.setDataSource(event.getDataSource()); - powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); - powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); - powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); - Integer machinePwrStat = event.getMachinePwrStat(); - powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat()); - powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat()); - Long reportTime = event.getReportTime(); - if (machinePwrStat == 1) { - // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); - } else { - // 关机 - powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); - } - powerOnAndOffDataEvent.setReportTime(reportTime); - powerOnAndOffDataEvent.setReceivedTime(LocalDateTime - .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); - return powerOnAndOffDataEvent; + searchSourceBuilder.sort("reportTime", SortOrder.DESC); + searchSourceBuilder.query(boolQueryBuilder); + SearchRequest request = new SearchRequest("iot_device_monitoring_data"); + request.source(searchSourceBuilder); + // 执行请求 + SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); + if (RestStatus.OK.equals(response.status())) { + SearchHit[] hits = response.getHits().getHits(); + if (hits.length > 0) { + SearchHit hit = hits[0]; + String sourceAsString = hit.getSourceAsString(); + return JSONUtil.toBean(sourceAsString, MachineIotDataReceivedEvent.class); } } - } catch (Exception e) { - log.error("获取es数据异常", e); + }catch (Exception e) { + log.error("获取 machine_iot_data_received_event 索引数据异常"); } return null; }