Browse Source

更新

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
parent
commit
64da2631dd
1 changed files with 51 additions and 87 deletions
  1. 138
      src/main/java/com/qniao/iot/IotMonitoringDataJob.java

138
src/main/java/com/qniao/iot/IotMonitoringDataJob.java

@ -48,6 +48,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.sql.Date;
import java.time.*;
import java.time.format.DateTimeFormatter;
@ -81,7 +82,6 @@ public class IotMonitoringDataJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
@ -103,16 +103,11 @@ public class IotMonitoringDataJob {
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> {
Long reportTime = value.getReportTime();
if(reportTime != null
if (reportTime != null
&& value.getDataSource() != null && value.getMachinePwrStat() != null) {
String reportTimeStr = StrUtil.toString(reportTime);
if(reportTimeStr.length() == 10) {
// 机智云那边的设备可能是秒或毫秒
reportTime = reportTime * 1000;
}
long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
// 晚30分钟的数据就不要了
return nowTime - reportTime <= (30*60*1000);
return nowTime - reportTime <= (30 * 60 * 1000);
}
return false;
});
@ -125,15 +120,6 @@ public class IotMonitoringDataJob {
// 上次设备数据
private ValueState<DeviceTotalData> deviceTotalDataStat;
// 开机数据
//private ValueState<DeviceTotalData> onDataState;
// 上次的工作状态
//private ValueState<Integer> lastWorkingStatState;
// 上次的开机状态
//private ValueState<Integer> lastPwStatState;
// 是否存在es中假设都存在
private boolean isExistEs = true;
@ -144,14 +130,6 @@ public class IotMonitoringDataJob {
deviceTotalDataStat = getRuntimeContext()
.getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class)));
/*onDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class)));
lastWorkingStatState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class)));
lastPwStatState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastPwStat", TypeInformation.of(Integer.class)));*/
}
@Override
@ -161,66 +139,50 @@ public class IotMonitoringDataJob {
try {
DeviceTotalData lastedDeviceState = getLastDeviceTotalData(receivedEvent);
Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat();
Integer lastPwStat = lastedDeviceState.getMachinePwrStat();
// 上次启动时间
Long lastBootTime = lastedDeviceState.getLastBootTime();
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
Long lastReportTime = lastedDeviceState.getReportTime();
Long reportTime = receivedEvent.getReportTime();
String reportTimeStr = StrUtil.toString(reportTime);
if(reportTimeStr.length() == 10) {
// 机智云那边的设备可能是秒或毫秒
reportTime = reportTime * 1000;
}
// 1树根 0机智云
Integer dataSource = receivedEvent.getDataSource();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
if (lastedDeviceState.getReportTime() <= receivedEvent.getReportTime()) {
if (lastWorkingStat == null || lastPwStat == null) {
lastWorkingStat = lastedDeviceState.getMachineWorkingStat();
lastPwStat = lastedDeviceState.getMachinePwrStat();
}
// 如果这次的消息事件小于上次消息的时间那么就进行丢弃
if (lastReportTime <= reportTime) {
Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat();
Integer lastPwStat = lastedDeviceState.getMachinePwrStat();
// 上次启动时间
Long lastBootTime = lastedDeviceState.getLastBootTime();
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
// 1树根 0机智云
Integer dataSource = receivedEvent.getDataSource();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
LocalDate localDate = new Date(reportTime).toLocalDate();
Long lastReportTime = lastedDeviceState.getReportTime();
if (lastReportTime == null) {
// 如果上次的消息时间为空那么不进行计算
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal());
}
// 直接通过两个消息的时间差进行计算毫秒
Long workingDuration = reportTime - lastedDeviceState.getReportTime();
// 转为秒
workingDuration = workingDuration / 1000;
if (machinePwrStat.equals(0)) {
// 关机
if (lastPwStat != 0) {
if (lastWorkingStat == 1) {
// 如果上次是工作中那就进行累加
if (lastReportTime != null) {
if (dataSource == 1) {
// 树根
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
Long lastJobTotal = lastedDeviceState.getJobTotal();
Long accJobCount = receivedEvent.getAccJobCount();
// 直接往上类
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + (accJobCount - lastJobTotal));
nowDeviceState.setJobTotal(accJobCount);
} else {
// 机智云
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
// 如果上次是工作状态那么需要记录产量和生产时间
if (dataSource == 1) {
// 树根
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
Long lastJobTotal = lastedDeviceState.getJobTotal();
Long accJobCount = receivedEvent.getAccJobCount();
// 直接往上累加
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + (accJobCount - lastJobTotal));
nowDeviceState.setJobTotal(accJobCount);
} else {
// 机智云
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
} else {
nowDeviceState = lastedDeviceState;
@ -234,6 +196,7 @@ public class IotMonitoringDataJob {
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
} else {
// 开机
if (machineWorkingStat.equals(1)) {
// 工作
if (dataSource == 1) {
@ -259,7 +222,7 @@ public class IotMonitoringDataJob {
// 设置开机时长待机也要进行累加所以放这里
if (dataSource == 1) {
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
}else {
} else {
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration());
}
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
@ -290,13 +253,13 @@ public class IotMonitoringDataJob {
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
data.setLastBootTime(nowDeviceState.getLastBootTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
if(!isExistEs) {
if (!isExistEs) {
isExistEs = true;
}
out.collect(data);
}
}
}catch (Exception e) {
} catch (Exception e) {
log.info("导致异常的信息:" + JSONUtil.toJsonStr(receivedEvent));
log.error("处理异常", e);
}
@ -332,7 +295,8 @@ public class IotMonitoringDataJob {
value.setReportTime(reportTime);
}
// 是否日期是当天的否则需要更新当天工作时长和当天工作量
if (LocalDate.parse(value.getCurrLocalDate(), DateTimeFormatter.ofPattern("yyyy-MM-dd")).isBefore(localDate)) {
if (LocalDate.parse(value.getCurrLocalDate(), DateTimeFormatter.ofPattern("yyyy-MM-dd"))
.isBefore(localDate)) {
// value有值但是日期不对说明到了第二天那么只需要对当天数据清零即可
value.setTheDayJobDuration(0L);
value.setTheDayJobCount(0L);
@ -385,18 +349,18 @@ public class IotMonitoringDataJob {
deviceTotalData.setCurrLocalDate(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
// 是否在线
Object isOnlineObj = JSONUtil.getByPath(JSONUtil.parse(o), "isOnline");
if(isOnlineObj != null) {
if (isOnlineObj != null) {
int isOnline = Integer.parseInt(String.valueOf(isOnlineObj));
if(isOnline == 0) {
if (isOnline == 0) {
// 开机
deviceTotalData.setMachinePwrStat(1);
deviceTotalData.setMachineWorkingStat(2);
}else {
} else {
// 关机
deviceTotalData.setMachinePwrStat(0);
deviceTotalData.setMachineWorkingStat(0);
}
}else {
} else {
deviceTotalData.setMachinePwrStat(0);
deviceTotalData.setMachineWorkingStat(0);
}
@ -512,12 +476,12 @@ public class IotMonitoringDataJob {
private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) {
if(currIndicesDateSuffix == null) {
if (currIndicesDateSuffix == null) {
// 当前月的索引为空
createIndices(indicesName, indexDateSuffix);
}else {
} else {
// 校验当前消息能否符合当前索引
if(!indexDateSuffix.equals(currIndicesDateSuffix)) {
if (!indexDateSuffix.equals(currIndicesDateSuffix)) {
// 如果不符合需要重建索引
createIndices(indicesName, indexDateSuffix);
}
@ -531,7 +495,7 @@ public class IotMonitoringDataJob {
// 先判断客户端是否存在
try {
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if(!exists) {
if (!exists) {
// 创建索引
CreateIndexRequest request = new CreateIndexRequest(indicesName);
// 字段映射
@ -582,7 +546,7 @@ public class IotMonitoringDataJob {
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
if(!acknowledged || !shardsAcknowledged) {
if (!acknowledged || !shardsAcknowledged) {
throw new Exception("自定义索引创建失败!!!");
}
currIndicesDateSuffix = indexDateSuffix;

Loading…
Cancel
Save