Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
7773f2ac84
3 changed files with 17 additions and 32 deletions
  1. 5
      src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java
  2. 4
      src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java
  3. 40
      src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java

5
src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java

@ -57,11 +57,6 @@ public class DeviceMonitoringData {
*/
private Long receivedTime;
/**
* 机器标识
*/
private Long machineId;
/**
* 累计工作时长单位秒
*/

4
src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java

@ -44,7 +44,7 @@ public class DeviceTotalData {
private LocalDate currLocalDate;
/**
* 上次消息时间
* 消息时间
*/
private Long lastReportTime;
private Long reportTime;
}

40
src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java

@ -2,14 +2,10 @@ package com.qniao.iot.gizwits;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.db.Db;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.gizwits.config.ApolloConfig;
import com.qniao.iot.gizwits.constant.ConfigConstant;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
@ -36,13 +32,11 @@ import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
@ -50,15 +44,12 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.sql.Date;
import java.sql.SQLException;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
import static java.time.temporal.ChronoUnit.SECONDS;
@Slf4j
public class IotMonitoringDataJob {
@ -163,7 +154,7 @@ public class IotMonitoringDataJob {
Integer dataSource = receivedEvent.getDataSource();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
if (lastedDeviceState.getLastReportTime() <= receivedEvent.getReportTime()) {
if (lastedDeviceState.getReportTime() <= receivedEvent.getReportTime()) {
if (lastWorkingStat == null) {
lastWorkingStat = getDeviceStateListJson(receivedEvent.getMachineIotMac());
lastPwStat = lastWorkingStat == 0 ? 0 : 1;
@ -173,7 +164,7 @@ public class IotMonitoringDataJob {
onDataState.update(onData);
}
LocalDate localDate = new Date(reportTime).toLocalDate();
Long lastReportTime = lastedDeviceState.getLastReportTime();
Long lastReportTime = lastedDeviceState.getReportTime();
if (lastReportTime == null) {
// 如果上次的消息时间为空那么不进行计算
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount());
@ -182,7 +173,7 @@ public class IotMonitoringDataJob {
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal());
}
// 直接通过两个消息的时间差进行计算毫秒
Long workingDuration = reportTime - lastedDeviceState.getLastReportTime();
Long workingDuration = reportTime - lastedDeviceState.getReportTime();
// 转为秒
workingDuration = workingDuration / 1000;
if (machinePwrStat.equals(0)) {
@ -211,8 +202,8 @@ public class IotMonitoringDataJob {
}
}
nowDeviceState.setCurrLocalDate(localDate);
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault()));
nowDeviceState.setLastReportTime(reportTime);
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getReportTime()), ZoneId.systemDefault()));
nowDeviceState.setReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
} else {
nowDeviceState = lastedDeviceState;
@ -241,10 +232,10 @@ public class IotMonitoringDataJob {
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
nowDeviceState.setCurrLocalDate(localDate);
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault()));
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getReportTime()), ZoneId.systemDefault()));
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
nowDeviceState.setLastReportTime(reportTime);
nowDeviceState.setReportTime(reportTime);
} else {
nowDeviceState = lastedDeviceState;
}
@ -253,7 +244,7 @@ public class IotMonitoringDataJob {
if (lastPwStat == 0) {
// 如果上次是关机消息那么这次就是开机消息
// 记录本次开机作为上次开机时间
nowDeviceState.setLastReportTime(reportTime);
nowDeviceState.setReportTime(reportTime);
// 记录一个周期的开机时间
onDataState.update(nowDeviceState);
onData = nowDeviceState;
@ -271,7 +262,8 @@ public class IotMonitoringDataJob {
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setLastBootTime(onData.getLastReportTime());
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
data.setLastBootTime(onData.getReportTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
out.collect(data);
}
@ -308,13 +300,11 @@ public class IotMonitoringDataJob {
LocalDateTime ldt = new Date(deviceMonitoringData.getLastBootTime())
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime();
data.setLastBootTime(ldt);
data.setLastReportTime(deviceMonitoringData.getReportTime());
data.setReportTime(deviceMonitoringData.getReportTime());
} else {
// es中也没有直接从老接口拿
data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime);
}
// TODO 测试数据记得删除
data.setJobTotal(6218646L);
value = data;
}
// 是否日期是当天的否则需要更新当天工作时长和当天工作量
@ -330,7 +320,7 @@ public class IotMonitoringDataJob {
value.setCurrLocalDate(localDate);
value.setLastBootTime(LocalDateTime.ofInstant(Instant
.ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault()));
value.setLastReportTime(deviceMonitoringData.getReportTime());
value.setReportTime(deviceMonitoringData.getReportTime());
value.setTheDayDuration(0L);
} else {
// value有值但是日期不对说明到了第二天那么只需要对当天数据清零即可
@ -341,7 +331,7 @@ public class IotMonitoringDataJob {
value.setCurrLocalDate(localDate);
value.setLastBootTime(value.getLastBootTime());
value.setTheDayDuration(0L);
value.setLastReportTime(reportTime);
value.setReportTime(reportTime);
}
}
deviceTotalDataStat.update(value);
@ -387,7 +377,7 @@ public class IotMonitoringDataJob {
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setCurrLocalDate(LocalDate.now());
deviceTotalData.setLastReportTime(reportTime);
deviceTotalData.setReportTime(reportTime);
stop = true;
break;
}
@ -401,7 +391,7 @@ public class IotMonitoringDataJob {
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setCurrLocalDate(LocalDate.now());
deviceTotalData.setLastReportTime(reportTime);
deviceTotalData.setReportTime(reportTime);
}
deviceTotalData.setTheDayDuration(0L);
return deviceTotalData;

Loading…
Cancel
Save