From 5b0df76952fb6d82fd52c5cc43908298ec7fe76a Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Tue, 6 Sep 2022 14:12:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/qniao/iot/DeviceMonitoringData.java | 4 +- .../com/qniao/iot/IotMonitoringDataJob.java | 173 +++++++++--------- .../qniao/iot/constant/ConfigConstant.java | 10 - 3 files changed, 87 insertions(+), 100 deletions(-) diff --git a/src/main/java/com/qniao/iot/DeviceMonitoringData.java b/src/main/java/com/qniao/iot/DeviceMonitoringData.java index 4981549..f4bb8eb 100644 --- a/src/main/java/com/qniao/iot/DeviceMonitoringData.java +++ b/src/main/java/com/qniao/iot/DeviceMonitoringData.java @@ -33,12 +33,12 @@ public class DeviceMonitoringData { private Long accJobCount; /** - * 当前作业计数 + * 当天作业计数(当天的产量) */ private Long currJobCount; /** - * 当前作业时长 + * 当天作业时长(当天的工作时长) */ private Long currJobDuration; diff --git a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java index 4cdf7ed..2f59a2c 100644 --- a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java @@ -63,19 +63,19 @@ import java.util.*; public class IotMonitoringDataJob { private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient - .builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME), - ApolloConfig.getInt(ConfigConstant.ES_POST), - ApolloConfig.getStr(ConfigConstant.ES_SCHEME))) + .builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST), + ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), + ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))) .setHttpClientConfigCallback(httpAsyncClientBuilder -> { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME), - ApolloConfig.getStr(ConfigConstant.ES_PASSWORD))); + new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), + ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); }) .setRequestConfigCallback(requestConfigBuilder -> { // 设置es连接超时时间 - requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT)); + requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); return requestConfigBuilder; })); @@ -131,95 +131,92 @@ public class IotMonitoringDataJob { DeviceTotalData lastedDeviceState = getLastDeviceTotalData(command); Long lastReportTime = lastedDeviceState.getReportTime(); Long reportTime = command.getTimestamp(); - // 如果这次的消息事件小于上次消息的时间,那么就进行丢弃 - if (lastReportTime <= reportTime) { - Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat(); - Integer lastPwStat = lastedDeviceState.getMachinePwrStat(); - // 上次启动时间 - Long lastBootTime = lastedDeviceState.getLastBootTime(); - Long lastTheDayDuration = lastedDeviceState.getTheDayDuration(); - Long lastTheDayJobDuration = lastedDeviceState.getTheDayJobDuration(); - Long lastJobDurationTotal = lastedDeviceState.getJobDurationTotal(); - Long lastTheDayJobCount = lastedDeviceState.getTheDayJobCount(); - Long lastJobTotal = lastedDeviceState.getJobTotal(); - // 如果当前消息的时间大于等于上次消息的时间才进行处理 - Integer machinePwrStat = command.getMachinePwrStat(); - Integer machineWorkingStat = command.getMachineWorkingStat(); - Long currDuration = command.getCurrDuration(); - Long currCount = command.getCurrCount(); - // 当前数据 - DeviceTotalData nowDeviceState = new DeviceTotalData(); - nowDeviceState.setMachinePwrStat(machinePwrStat); - nowDeviceState.setMachineWorkingStat(machineWorkingStat); - LocalDate localDate = new Date(reportTime).toLocalDate(); - if (machinePwrStat.equals(0)) { - // 关机 - if (lastPwStat != 0) { - if (lastWorkingStat == 1) { - // 如果上次是工作状态,那么需要记录产量和生产时间 - nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration); - nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration); - nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration); - nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount); - nowDeviceState.setJobTotal(lastJobTotal + currCount); - } else { - nowDeviceState = lastedDeviceState; - } - } else { - nowDeviceState = lastedDeviceState; - } - nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); - nowDeviceState.setLastBootTime(lastBootTime); - nowDeviceState.setReportTime(reportTime); - nowDeviceState.setMachinePwrStat(machinePwrStat); - nowDeviceState.setMachineWorkingStat(machineWorkingStat); - } else { - // 开机 - if (machineWorkingStat.equals(1)) { - // 工作 - nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount); - nowDeviceState.setJobTotal(lastJobTotal + currCount); + Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat(); + Integer lastPwStat = lastedDeviceState.getMachinePwrStat(); + // 上次启动时间 + Long lastBootTime = lastedDeviceState.getLastBootTime(); + Long lastTheDayDuration = lastedDeviceState.getTheDayDuration(); + Long lastTheDayJobDuration = lastedDeviceState.getTheDayJobDuration(); + Long lastJobDurationTotal = lastedDeviceState.getJobDurationTotal(); + Long lastTheDayJobCount = lastedDeviceState.getTheDayJobCount(); + Long lastJobTotal = lastedDeviceState.getJobTotal(); + // 如果当前消息的时间大于等于上次消息的时间才进行处理 + Integer machinePwrStat = command.getMachinePwrStat(); + Integer machineWorkingStat = command.getMachineWorkingStat(); + Long currDuration = command.getCurrDuration(); + Long currCount = command.getCurrCount(); + // 当前数据 + DeviceTotalData nowDeviceState = new DeviceTotalData(); + nowDeviceState.setMachinePwrStat(machinePwrStat); + nowDeviceState.setMachineWorkingStat(machineWorkingStat); + LocalDate localDate = new Date(reportTime).toLocalDate(); + if (machinePwrStat.equals(0)) { + // 关机 + if (lastPwStat != 0) { + if (lastWorkingStat == 1) { + // 如果上次是工作状态,那么需要记录产量和生产时间 + nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration); nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration); nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration); + nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount); + nowDeviceState.setJobTotal(lastJobTotal + currCount); } else { - // 待机 nowDeviceState = lastedDeviceState; } - // 设置开机时长,待机也要进行累加,所以放这里 - nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration); - nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); - nowDeviceState.setLastBootTime(lastBootTime); - nowDeviceState.setReportTime(reportTime); - nowDeviceState.setMachinePwrStat(machinePwrStat); - nowDeviceState.setMachineWorkingStat(machineWorkingStat); - if (lastPwStat == 0) { - // 如果上次是关机消息,那么这次就是开机消息 - // 记录一个周期的开机时间 - nowDeviceState.setLastBootTime(reportTime); - } + } else { + nowDeviceState = lastedDeviceState; } - deviceTotalDataStat.update(nowDeviceState); - // 如果上次是待机,并且这次也是待机,那么就不需要发送了 - if (((!(lastWorkingStat == 2 && machineWorkingStat == 2)) - && (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) { - DeviceMonitoringData data = new DeviceMonitoringData(); - data.setDataSource(command.getDataSource()); - data.setMachineIotMac(command.getMac()); - data.setMachinePwrStat(command.getMachinePwrStat()); - data.setMachineWorkingStat(command.getMachineWorkingStat()); - data.setAccJobCount(nowDeviceState.getJobTotal()); - data.setCurrJobCount(nowDeviceState.getTheDayJobCount()); - data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration()); - data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); - data.setReportTime(reportTime); - data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); - data.setLastBootTime(nowDeviceState.getLastBootTime()); - data.setCurrDuration(nowDeviceState.getTheDayDuration()); - if (!isExistEs) { - isExistEs = true; - } - out.collect(data); + nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + nowDeviceState.setLastBootTime(lastBootTime); + nowDeviceState.setReportTime(reportTime); + nowDeviceState.setMachinePwrStat(machinePwrStat); + nowDeviceState.setMachineWorkingStat(machineWorkingStat); + } else { + // 开机 + if (machineWorkingStat.equals(1)) { + // 工作 + nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount); + nowDeviceState.setJobTotal(lastJobTotal + currCount); + nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration); + nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration); + } else { + // 待机 + nowDeviceState = lastedDeviceState; + } + // 设置开机时长,待机也要进行累加,所以放这里 + nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration); + nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + nowDeviceState.setLastBootTime(lastBootTime); + nowDeviceState.setReportTime(reportTime); + nowDeviceState.setMachinePwrStat(machinePwrStat); + nowDeviceState.setMachineWorkingStat(machineWorkingStat); + if (lastPwStat == 0) { + // 如果上次是关机消息,那么这次就是开机消息 + // 记录一个周期的开机时间 + nowDeviceState.setLastBootTime(reportTime); + } + } + deviceTotalDataStat.update(nowDeviceState); + // 如果上次是待机,并且这次也是待机,那么就不需要发送了 + if (((!(lastWorkingStat == 2 && machineWorkingStat == 2)) + && (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) { + DeviceMonitoringData data = new DeviceMonitoringData(); + data.setDataSource(command.getDataSource()); + data.setMachineIotMac(command.getMac()); + data.setMachinePwrStat(command.getMachinePwrStat()); + data.setMachineWorkingStat(command.getMachineWorkingStat()); + data.setAccJobCount(nowDeviceState.getJobTotal()); + data.setCurrJobCount(nowDeviceState.getTheDayJobCount()); + data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration()); + data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); + data.setReportTime(reportTime); + data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + data.setLastBootTime(nowDeviceState.getLastBootTime()); + data.setCurrDuration(nowDeviceState.getTheDayDuration()); + if (!isExistEs) { + isExistEs = true; } + out.collect(data); } } catch (Exception e) { log.info("导致异常的信息:" + JSONUtil.toJsonStr(command)); diff --git a/src/main/java/com/qniao/iot/constant/ConfigConstant.java b/src/main/java/com/qniao/iot/constant/ConfigConstant.java index c660f2c..0ffd72f 100644 --- a/src/main/java/com/qniao/iot/constant/ConfigConstant.java +++ b/src/main/java/com/qniao/iot/constant/ConfigConstant.java @@ -22,16 +22,6 @@ public interface ConfigConstant { String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index"; - String ES_HOST_NAME = "es.host.name"; - - String ES_POST = "es.post"; - - String ES_SCHEME = "es.scheme"; - - String ES_USER_NAME = "es.user.name"; - - String ES_PASSWORD = "es.password"; - String ES_CONNECT_TIMEOUT = "es.connect.timeout"; String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host";