From 723795ddab95de08f1bc904871928a602193305d Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Tue, 6 Sep 2022 15:26:09 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/qniao/iot/IotMonitoringDataJob.java | 183 ++++++++---------- .../qniao/iot/constant/ConfigConstant.java | 14 +- 2 files changed, 89 insertions(+), 108 deletions(-) diff --git a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java index 2f59a2c..acb49e0 100644 --- a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java @@ -1,29 +1,19 @@ package com.qniao.iot; import cn.hutool.core.bean.BeanUtil; -import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONUtil; import com.qniao.iot.config.ApolloConfig; import com.qniao.iot.constant.ConfigConstant; import com.qniao.iot.machine.command.MachineOutputCommand; -import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; -import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema; import lombok.extern.slf4j.Slf4j; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.kafka.source.KafkaSource; -import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; @@ -36,8 +26,6 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; @@ -92,10 +80,10 @@ public class IotMonitoringDataJob { // 获取设备数据源 RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_HOST)) - .setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_POST)) + .setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_PORT)) .setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME)) .setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD)) - .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUAL_HOST)) + .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUALHOST)) .build(); final DataStream stream = env @@ -128,95 +116,96 @@ public class IotMonitoringDataJob { Collector out) { try { - DeviceTotalData lastedDeviceState = getLastDeviceTotalData(command); - Long lastReportTime = lastedDeviceState.getReportTime(); + DeviceTotalData lastDeviceState = getLastDeviceTotalData(command); Long reportTime = command.getTimestamp(); - Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat(); - Integer lastPwStat = lastedDeviceState.getMachinePwrStat(); - // 上次启动时间 - Long lastBootTime = lastedDeviceState.getLastBootTime(); - Long lastTheDayDuration = lastedDeviceState.getTheDayDuration(); - Long lastTheDayJobDuration = lastedDeviceState.getTheDayJobDuration(); - Long lastJobDurationTotal = lastedDeviceState.getJobDurationTotal(); - Long lastTheDayJobCount = lastedDeviceState.getTheDayJobCount(); - Long lastJobTotal = lastedDeviceState.getJobTotal(); - // 如果当前消息的时间大于等于上次消息的时间才进行处理 - Integer machinePwrStat = command.getMachinePwrStat(); - Integer machineWorkingStat = command.getMachineWorkingStat(); - Long currDuration = command.getCurrDuration(); - Long currCount = command.getCurrCount(); - // 当前数据 - DeviceTotalData nowDeviceState = new DeviceTotalData(); - nowDeviceState.setMachinePwrStat(machinePwrStat); - nowDeviceState.setMachineWorkingStat(machineWorkingStat); - LocalDate localDate = new Date(reportTime).toLocalDate(); - if (machinePwrStat.equals(0)) { - // 关机 - if (lastPwStat != 0) { - if (lastWorkingStat == 1) { - // 如果上次是工作状态,那么需要记录产量和生产时间 - nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration); - nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration); - nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration); - nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount); - nowDeviceState.setJobTotal(lastJobTotal + currCount); - } else { - nowDeviceState = lastedDeviceState; - } - } else { - nowDeviceState = lastedDeviceState; - } - nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); - nowDeviceState.setLastBootTime(lastBootTime); - nowDeviceState.setReportTime(reportTime); + Long lastReportTime = lastDeviceState.getReportTime(); + if(lastReportTime <= reportTime) { + Integer lastWorkingStat = lastDeviceState.getMachineWorkingStat(); + Integer lastPwStat = lastDeviceState.getMachinePwrStat(); + // 上次启动时间 + Long lastBootTime = lastDeviceState.getLastBootTime(); + Long lastTheDayDuration = lastDeviceState.getTheDayDuration(); + Long lastTheDayJobDuration = lastDeviceState.getTheDayJobDuration(); + Long lastJobDurationTotal = lastDeviceState.getJobDurationTotal(); + Long lastTheDayJobCount = lastDeviceState.getTheDayJobCount(); + Long lastJobTotal = lastDeviceState.getJobTotal(); + Integer machinePwrStat = command.getMachinePwrStat(); + Integer machineWorkingStat = command.getMachineWorkingStat(); + Long currJobDuration = command.getCurrJobDuration(); + Long currJobCount = command.getCurrJobCount(); + // 当前数据 + DeviceTotalData nowDeviceState = new DeviceTotalData(); nowDeviceState.setMachinePwrStat(machinePwrStat); nowDeviceState.setMachineWorkingStat(machineWorkingStat); - } else { - // 开机 - if (machineWorkingStat.equals(1)) { - // 工作 - nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount); - nowDeviceState.setJobTotal(lastJobTotal + currCount); - nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration); - nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration); + LocalDate localDate = new Date(reportTime).toLocalDate(); + if (machinePwrStat.equals(0)) { + // 关机 + if (lastPwStat != 0) { + if (lastWorkingStat == 1) { + // 如果上次是工作状态,那么需要记录产量和生产时间 + nowDeviceState.setTheDayDuration(lastTheDayDuration + currJobDuration); + nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currJobDuration); + nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currJobDuration); + nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currJobCount); + nowDeviceState.setJobTotal(lastJobTotal + currJobCount); + } else { + nowDeviceState = lastDeviceState; + } + } else { + nowDeviceState = lastDeviceState; + } + nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + nowDeviceState.setLastBootTime(lastBootTime); + nowDeviceState.setReportTime(reportTime); + nowDeviceState.setMachinePwrStat(machinePwrStat); + nowDeviceState.setMachineWorkingStat(machineWorkingStat); } else { - // 待机 - nowDeviceState = lastedDeviceState; - } - // 设置开机时长,待机也要进行累加,所以放这里 - nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration); - nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); - nowDeviceState.setLastBootTime(lastBootTime); - nowDeviceState.setReportTime(reportTime); - nowDeviceState.setMachinePwrStat(machinePwrStat); - nowDeviceState.setMachineWorkingStat(machineWorkingStat); - if (lastPwStat == 0) { - // 如果上次是关机消息,那么这次就是开机消息 - // 记录一个周期的开机时间 - nowDeviceState.setLastBootTime(reportTime); + // 开机 + if (machineWorkingStat.equals(1)) { + // 工作 + nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currJobCount); + nowDeviceState.setJobTotal(lastJobTotal + currJobCount); + nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currJobDuration); + nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currJobDuration); + } else { + // 待机 + nowDeviceState = lastDeviceState; + } + // 设置开机时长,待机也要进行累加,所以放这里 + nowDeviceState.setTheDayDuration(lastTheDayDuration + currJobDuration); + nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); + nowDeviceState.setLastBootTime(lastBootTime); + nowDeviceState.setReportTime(reportTime); + nowDeviceState.setMachinePwrStat(machinePwrStat); + nowDeviceState.setMachineWorkingStat(machineWorkingStat); + if (lastPwStat == 0) { + // 如果上次是关机消息,那么这次就是开机消息 + // 记录一个周期的开机时间 + nowDeviceState.setLastBootTime(reportTime); + } } - } - deviceTotalDataStat.update(nowDeviceState); - // 如果上次是待机,并且这次也是待机,那么就不需要发送了 - if (((!(lastWorkingStat == 2 && machineWorkingStat == 2)) - && (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) { - DeviceMonitoringData data = new DeviceMonitoringData(); - data.setDataSource(command.getDataSource()); - data.setMachineIotMac(command.getMac()); - data.setMachinePwrStat(command.getMachinePwrStat()); - data.setMachineWorkingStat(command.getMachineWorkingStat()); - data.setAccJobCount(nowDeviceState.getJobTotal()); - data.setCurrJobCount(nowDeviceState.getTheDayJobCount()); - data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration()); - data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); - data.setReportTime(reportTime); - data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); - data.setLastBootTime(nowDeviceState.getLastBootTime()); - data.setCurrDuration(nowDeviceState.getTheDayDuration()); - if (!isExistEs) { - isExistEs = true; + deviceTotalDataStat.update(nowDeviceState); + // 如果上次是待机,并且这次也是待机,那么就不需要发送了 + if (((!(lastWorkingStat == 2 && machineWorkingStat == 2)) + && (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) { + DeviceMonitoringData data = new DeviceMonitoringData(); + data.setDataSource(command.getDataSource()); + data.setMachineIotMac(command.getMac()); + data.setMachinePwrStat(command.getMachinePwrStat()); + data.setMachineWorkingStat(command.getMachineWorkingStat()); + data.setAccJobCount(nowDeviceState.getJobTotal()); + data.setCurrJobCount(nowDeviceState.getTheDayJobCount()); + data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration()); + data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); + data.setReportTime(reportTime); + data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + data.setLastBootTime(nowDeviceState.getLastBootTime()); + data.setCurrDuration(nowDeviceState.getTheDayDuration()); + if (!isExistEs) { + isExistEs = true; + } + out.collect(data); } - out.collect(data); } } catch (Exception e) { log.info("导致异常的信息:" + JSONUtil.toJsonStr(command)); diff --git a/src/main/java/com/qniao/iot/constant/ConfigConstant.java b/src/main/java/com/qniao/iot/constant/ConfigConstant.java index 0ffd72f..e36e465 100644 --- a/src/main/java/com/qniao/iot/constant/ConfigConstant.java +++ b/src/main/java/com/qniao/iot/constant/ConfigConstant.java @@ -2,12 +2,6 @@ package com.qniao.iot.constant; public interface ConfigConstant { - String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers"; - - String SOURCE_KAFKA_TOPICS = "source.kafka.topics"; - - String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId"; - String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host"; String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post"; @@ -22,17 +16,15 @@ public interface ConfigConstant { String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index"; - String ES_CONNECT_TIMEOUT = "es.connect.timeout"; - String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host"; - String SOURCE_RABBITMQ_POST = "source.rabbitmq.post"; + String SOURCE_RABBITMQ_PORT = "source.rabbitmq.port"; - String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.username"; + String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.userName"; String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password"; - String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtual.host"; + String SOURCE_RABBITMQ_VIRTUALHOST = "source.rabbitmq.virtualHost"; String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue"; }