Browse Source

更新

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
parent
commit
5b0df76952
3 changed files with 87 additions and 100 deletions
  1. 4
      src/main/java/com/qniao/iot/DeviceMonitoringData.java
  2. 173
      src/main/java/com/qniao/iot/IotMonitoringDataJob.java
  3. 10
      src/main/java/com/qniao/iot/constant/ConfigConstant.java

4
src/main/java/com/qniao/iot/DeviceMonitoringData.java

@ -33,12 +33,12 @@ public class DeviceMonitoringData {
private Long accJobCount;
/**
* 前作业计数
* 天作业计数当天的产量
*/
private Long currJobCount;
/**
* 前作业时长
* 天作业时长当天的工作时长
*/
private Long currJobDuration;

173
src/main/java/com/qniao/iot/IotMonitoringDataJob.java

@ -63,19 +63,19 @@ import java.util.*;
public class IotMonitoringDataJob {
private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient
.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME),
ApolloConfig.getInt(ConfigConstant.ES_POST),
ApolloConfig.getStr(ConfigConstant.ES_SCHEME)))
.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME),
ApolloConfig.getStr(ConfigConstant.ES_PASSWORD)));
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
})
.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT));
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
return requestConfigBuilder;
}));
@ -131,95 +131,92 @@ public class IotMonitoringDataJob {
DeviceTotalData lastedDeviceState = getLastDeviceTotalData(command);
Long lastReportTime = lastedDeviceState.getReportTime();
Long reportTime = command.getTimestamp();
// 如果这次的消息事件小于上次消息的时间那么就进行丢弃
if (lastReportTime <= reportTime) {
Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat();
Integer lastPwStat = lastedDeviceState.getMachinePwrStat();
// 上次启动时间
Long lastBootTime = lastedDeviceState.getLastBootTime();
Long lastTheDayDuration = lastedDeviceState.getTheDayDuration();
Long lastTheDayJobDuration = lastedDeviceState.getTheDayJobDuration();
Long lastJobDurationTotal = lastedDeviceState.getJobDurationTotal();
Long lastTheDayJobCount = lastedDeviceState.getTheDayJobCount();
Long lastJobTotal = lastedDeviceState.getJobTotal();
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = command.getMachinePwrStat();
Integer machineWorkingStat = command.getMachineWorkingStat();
Long currDuration = command.getCurrDuration();
Long currCount = command.getCurrCount();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
LocalDate localDate = new Date(reportTime).toLocalDate();
if (machinePwrStat.equals(0)) {
// 关机
if (lastPwStat != 0) {
if (lastWorkingStat == 1) {
// 如果上次是工作状态那么需要记录产量和生产时间
nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration);
nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration);
nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration);
nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount);
nowDeviceState.setJobTotal(lastJobTotal + currCount);
} else {
nowDeviceState = lastedDeviceState;
}
} else {
nowDeviceState = lastedDeviceState;
}
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
} else {
// 开机
if (machineWorkingStat.equals(1)) {
// 工作
nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount);
nowDeviceState.setJobTotal(lastJobTotal + currCount);
Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat();
Integer lastPwStat = lastedDeviceState.getMachinePwrStat();
// 上次启动时间
Long lastBootTime = lastedDeviceState.getLastBootTime();
Long lastTheDayDuration = lastedDeviceState.getTheDayDuration();
Long lastTheDayJobDuration = lastedDeviceState.getTheDayJobDuration();
Long lastJobDurationTotal = lastedDeviceState.getJobDurationTotal();
Long lastTheDayJobCount = lastedDeviceState.getTheDayJobCount();
Long lastJobTotal = lastedDeviceState.getJobTotal();
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = command.getMachinePwrStat();
Integer machineWorkingStat = command.getMachineWorkingStat();
Long currDuration = command.getCurrDuration();
Long currCount = command.getCurrCount();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
LocalDate localDate = new Date(reportTime).toLocalDate();
if (machinePwrStat.equals(0)) {
// 关机
if (lastPwStat != 0) {
if (lastWorkingStat == 1) {
// 如果上次是工作状态那么需要记录产量和生产时间
nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration);
nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration);
nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration);
nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount);
nowDeviceState.setJobTotal(lastJobTotal + currCount);
} else {
// 待机
nowDeviceState = lastedDeviceState;
}
// 设置开机时长待机也要进行累加所以放这里
nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration);
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
if (lastPwStat == 0) {
// 如果上次是关机消息那么这次就是开机消息
// 记录一个周期的开机时间
nowDeviceState.setLastBootTime(reportTime);
}
} else {
nowDeviceState = lastedDeviceState;
}
deviceTotalDataStat.update(nowDeviceState);
// 如果上次是待机并且这次也是待机那么就不需要发送了
if (((!(lastWorkingStat == 2 && machineWorkingStat == 2))
&& (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(command.getDataSource());
data.setMachineIotMac(command.getMac());
data.setMachinePwrStat(command.getMachinePwrStat());
data.setMachineWorkingStat(command.getMachineWorkingStat());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
data.setLastBootTime(nowDeviceState.getLastBootTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
if (!isExistEs) {
isExistEs = true;
}
out.collect(data);
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
} else {
// 开机
if (machineWorkingStat.equals(1)) {
// 工作
nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount);
nowDeviceState.setJobTotal(lastJobTotal + currCount);
nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration);
nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration);
} else {
// 待机
nowDeviceState = lastedDeviceState;
}
// 设置开机时长待机也要进行累加所以放这里
nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration);
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
if (lastPwStat == 0) {
// 如果上次是关机消息那么这次就是开机消息
// 记录一个周期的开机时间
nowDeviceState.setLastBootTime(reportTime);
}
}
deviceTotalDataStat.update(nowDeviceState);
// 如果上次是待机并且这次也是待机那么就不需要发送了
if (((!(lastWorkingStat == 2 && machineWorkingStat == 2))
&& (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(command.getDataSource());
data.setMachineIotMac(command.getMac());
data.setMachinePwrStat(command.getMachinePwrStat());
data.setMachineWorkingStat(command.getMachineWorkingStat());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
data.setLastBootTime(nowDeviceState.getLastBootTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
if (!isExistEs) {
isExistEs = true;
}
out.collect(data);
}
} catch (Exception e) {
log.info("导致异常的信息:" + JSONUtil.toJsonStr(command));

10
src/main/java/com/qniao/iot/constant/ConfigConstant.java

@ -22,16 +22,6 @@ public interface ConfigConstant {
String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index";
String ES_HOST_NAME = "es.host.name";
String ES_POST = "es.post";
String ES_SCHEME = "es.scheme";
String ES_USER_NAME = "es.user.name";
String ES_PASSWORD = "es.password";
String ES_CONNECT_TIMEOUT = "es.connect.timeout";
String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host";

Loading…
Cancel
Save