Browse Source

更新

hph-优化版本
1049970895@qniao.cn 3 years ago
parent
commit
c81159cabe
2 changed files with 138 additions and 73 deletions
  1. 9
      iot-device-power-on-and-off-data-event/src/main/java/com/qniao/iot/device/power/event/IotDevicePowerOnAndOffDataEvent.java
  2. 202
      iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java

9
iot-device-power-on-and-off-data-event/src/main/java/com/qniao/iot/device/power/event/IotDevicePowerOnAndOffDataEvent.java

@ -39,12 +39,12 @@ public class IotDevicePowerOnAndOffDataEvent implements Serializable {
private Integer machineWorkingStat;
/**
* 当前作业计数
* 当前作业计数开机到关机
*/
private Long currJobCount;
/**
* 当前作业时长
* 当前作业时长开机到关机
*/
private Long currJobDuration;
@ -67,4 +67,9 @@ public class IotDevicePowerOnAndOffDataEvent implements Serializable {
* 实际接收到数据的时间
*/
private Long receivedTime;
/**
* 累加作业总数
*/
private Long accJobCount;
}

202
iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java

@ -1,6 +1,8 @@
package com.qniao.iot.device.power;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.device.power.config.ApolloConfig;
import com.qniao.iot.device.power.constant.ConfigConstant;
@ -37,6 +39,7 @@ import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
@ -45,11 +48,9 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
@ -120,6 +121,11 @@ public class IotDevicePowerOnAndOffDataJob {
Collector<IotDevicePowerOnAndOffDataEvent> out) throws Exception {
IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event);
Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat();
Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount();
Long accJobCount = event.getAccJobCount();
Long currJobCount = event.getCurrJobCount();
Integer dataSource = event.getDataSource();
Integer machinePwrStat = event.getMachinePwrStat();
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId());
@ -132,33 +138,46 @@ public class IotDevicePowerOnAndOffDataJob {
powerOnAndOffDataEvent.setReportTime(event.getReportTime());
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
if(lastPowerOnAndOffDataEvent == null) {
// 如果上一次是空的那么只能处理开机数据
if(MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) {
if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime());
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
out.collect(powerOnAndOffDataEvent);
// 上次是关机但是这次是开机说明周期产能从新开始
if(dataSource == 1) {
// 根云
powerOnAndOffDataEvent.setCurrJobCount(accJobCount - lastAccJobCount);
}else {
// 机智云
powerOnAndOffDataEvent.setCurrJobCount(currJobCount);
}
}
}else {
// 上次的状态只有两种要么只有开机时间不为空要么是开机和关机时间都不为空否则不处理
if(lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) {
if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) {
// 只有开机时间不为空
if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime());
}else {
powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime());
}
Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount();
// 直接累加
if(dataSource == 1) {
// 根云
powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + (accJobCount - lastAccJobCount));
}else {
// 机智云
powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount);
}
}
// 上次的状态只有两种要么只有开机时间不为空要么是开机和关机时间都不为空否则不处理
if(lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) {
if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) {
// 只有开机时间不为空
if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime());
}else {
powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime());
}
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
out.collect(powerOnAndOffDataEvent);
} else {
// // 开机和关机时间都不为空说明上一个生产周期已经过了那么当前设备的电源状态必须要是开机状态
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime());
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
out.collect(powerOnAndOffDataEvent);
} else {
// // 开机和关机时间都不为空说明上一个生产周期已经过了那么当前设备的电源状态必须要是开机状态
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime());
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
out.collect(powerOnAndOffDataEvent);
}
}
}
}
@ -173,56 +192,97 @@ public class IotDevicePowerOnAndOffDataJob {
return iotDevicePowerOnAndOffDataEvent;
}
private IotDevicePowerOnAndOffDataEvent getByEs(MachineIotDataReceivedEvent event) {
private IotDevicePowerOnAndOffDataEvent getByEs(MachineIotDataReceivedEvent event) throws IOException {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", event.getMachineIotMac()));
searchSourceBuilder.sort("receivedTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX));
searchRequest.source(searchSourceBuilder);
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate);
// 先判断索引是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (exists) {
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class);
}
}
// 如果没有就找清洗后的数据
MachineIotDataReceivedEvent deviceMonitoringData = getMachineIotDataReceivedEvent(event.getMachineIotMac());
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId());
if(deviceMonitoringData != null) {
powerOnAndOffDataEvent.setDataSource(deviceMonitoringData.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(deviceMonitoringData.getMachineIotMac());
powerOnAndOffDataEvent.setCurrJobCount(0L);
powerOnAndOffDataEvent.setCurrJobDuration(0L);
Integer machinePwrStat = deviceMonitoringData.getMachinePwrStat();
powerOnAndOffDataEvent.setMachinePwrStat(deviceMonitoringData.getMachinePwrStat());
powerOnAndOffDataEvent.setMachineWorkingStat(deviceMonitoringData.getMachineWorkingStat());
Long reportTime = deviceMonitoringData.getReportTime();
if (machinePwrStat == 1) {
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
} else {
// 关机
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
}
powerOnAndOffDataEvent.setReportTime(reportTime);
}else {
powerOnAndOffDataEvent.setDataSource(event.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac());
powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount());
powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration());
Integer machinePwrStat = event.getMachinePwrStat();
powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat());
powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat());
Long reportTime = event.getReportTime();
if (machinePwrStat == 1) {
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
} else {
// 关机
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
}
powerOnAndOffDataEvent.setReportTime(reportTime);
}
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
return powerOnAndOffDataEvent;
}
private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac) {
try {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
try{
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", event.getMachineIotMac()));
searchSourceBuilder.sort("receivedTime", SortOrder.DESC);
BoolQueryBuilder bool = new BoolQueryBuilder();
BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", machineIotMac));
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX));
searchRequest.source(searchSourceBuilder);
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate);
// 先判断索引是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (exists) {
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class);
} else {
// 如果没有就自定义
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId());
powerOnAndOffDataEvent.setDataSource(event.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac());
powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount());
powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration());
Integer machinePwrStat = event.getMachinePwrStat();
powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat());
powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat());
Long reportTime = event.getReportTime();
if (machinePwrStat == 1) {
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
} else {
// 关机
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
}
powerOnAndOffDataEvent.setReportTime(reportTime);
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
return powerOnAndOffDataEvent;
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.query(boolQueryBuilder);
SearchRequest request = new SearchRequest("iot_device_monitoring_data");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
if (RestStatus.OK.equals(response.status())) {
SearchHit[] hits = response.getHits().getHits();
if (hits.length > 0) {
SearchHit hit = hits[0];
String sourceAsString = hit.getSourceAsString();
return JSONUtil.toBean(sourceAsString, MachineIotDataReceivedEvent.class);
}
}
} catch (Exception e) {
log.error("获取es数据异常", e);
}catch (Exception e) {
log.error("获取 machine_iot_data_received_event 索引数据异常");
}
return null;
}

Loading…
Cancel
Save