Browse Source

更新

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
parent
commit
723795ddab
2 changed files with 89 additions and 108 deletions
  1. 183
      src/main/java/com/qniao/iot/IotMonitoringDataJob.java
  2. 14
      src/main/java/com/qniao/iot/constant/ConfigConstant.java

183
src/main/java/com/qniao/iot/IotMonitoringDataJob.java

@ -1,29 +1,19 @@
package com.qniao.iot;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.config.ApolloConfig;
import com.qniao.iot.constant.ConfigConstant;
import com.qniao.iot.machine.command.MachineOutputCommand;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
@ -36,8 +26,6 @@ import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
@ -92,10 +80,10 @@ public class IotMonitoringDataJob {
// 获取设备数据源
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_HOST))
.setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_POST))
.setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_PORT))
.setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME))
.setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD))
.setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUAL_HOST))
.setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUALHOST))
.build();
final DataStream<MachineOutputCommand> stream = env
@ -128,95 +116,96 @@ public class IotMonitoringDataJob {
Collector<DeviceMonitoringData> out) {
try {
DeviceTotalData lastedDeviceState = getLastDeviceTotalData(command);
Long lastReportTime = lastedDeviceState.getReportTime();
DeviceTotalData lastDeviceState = getLastDeviceTotalData(command);
Long reportTime = command.getTimestamp();
Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat();
Integer lastPwStat = lastedDeviceState.getMachinePwrStat();
// 上次启动时间
Long lastBootTime = lastedDeviceState.getLastBootTime();
Long lastTheDayDuration = lastedDeviceState.getTheDayDuration();
Long lastTheDayJobDuration = lastedDeviceState.getTheDayJobDuration();
Long lastJobDurationTotal = lastedDeviceState.getJobDurationTotal();
Long lastTheDayJobCount = lastedDeviceState.getTheDayJobCount();
Long lastJobTotal = lastedDeviceState.getJobTotal();
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = command.getMachinePwrStat();
Integer machineWorkingStat = command.getMachineWorkingStat();
Long currDuration = command.getCurrDuration();
Long currCount = command.getCurrCount();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
LocalDate localDate = new Date(reportTime).toLocalDate();
if (machinePwrStat.equals(0)) {
// 关机
if (lastPwStat != 0) {
if (lastWorkingStat == 1) {
// 如果上次是工作状态那么需要记录产量和生产时间
nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration);
nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration);
nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration);
nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount);
nowDeviceState.setJobTotal(lastJobTotal + currCount);
} else {
nowDeviceState = lastedDeviceState;
}
} else {
nowDeviceState = lastedDeviceState;
}
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
Long lastReportTime = lastDeviceState.getReportTime();
if(lastReportTime <= reportTime) {
Integer lastWorkingStat = lastDeviceState.getMachineWorkingStat();
Integer lastPwStat = lastDeviceState.getMachinePwrStat();
// 上次启动时间
Long lastBootTime = lastDeviceState.getLastBootTime();
Long lastTheDayDuration = lastDeviceState.getTheDayDuration();
Long lastTheDayJobDuration = lastDeviceState.getTheDayJobDuration();
Long lastJobDurationTotal = lastDeviceState.getJobDurationTotal();
Long lastTheDayJobCount = lastDeviceState.getTheDayJobCount();
Long lastJobTotal = lastDeviceState.getJobTotal();
Integer machinePwrStat = command.getMachinePwrStat();
Integer machineWorkingStat = command.getMachineWorkingStat();
Long currJobDuration = command.getCurrJobDuration();
Long currJobCount = command.getCurrJobCount();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
} else {
// 开机
if (machineWorkingStat.equals(1)) {
// 工作
nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount);
nowDeviceState.setJobTotal(lastJobTotal + currCount);
nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration);
nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration);
LocalDate localDate = new Date(reportTime).toLocalDate();
if (machinePwrStat.equals(0)) {
// 关机
if (lastPwStat != 0) {
if (lastWorkingStat == 1) {
// 如果上次是工作状态那么需要记录产量和生产时间
nowDeviceState.setTheDayDuration(lastTheDayDuration + currJobDuration);
nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currJobDuration);
nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currJobDuration);
nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currJobCount);
nowDeviceState.setJobTotal(lastJobTotal + currJobCount);
} else {
nowDeviceState = lastDeviceState;
}
} else {
nowDeviceState = lastDeviceState;
}
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
} else {
// 待机
nowDeviceState = lastedDeviceState;
}
// 设置开机时长待机也要进行累加所以放这里
nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration);
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
if (lastPwStat == 0) {
// 如果上次是关机消息那么这次就是开机消息
// 记录一个周期的开机时间
nowDeviceState.setLastBootTime(reportTime);
// 开机
if (machineWorkingStat.equals(1)) {
// 工作
nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currJobCount);
nowDeviceState.setJobTotal(lastJobTotal + currJobCount);
nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currJobDuration);
nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currJobDuration);
} else {
// 待机
nowDeviceState = lastDeviceState;
}
// 设置开机时长待机也要进行累加所以放这里
nowDeviceState.setTheDayDuration(lastTheDayDuration + currJobDuration);
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
if (lastPwStat == 0) {
// 如果上次是关机消息那么这次就是开机消息
// 记录一个周期的开机时间
nowDeviceState.setLastBootTime(reportTime);
}
}
}
deviceTotalDataStat.update(nowDeviceState);
// 如果上次是待机并且这次也是待机那么就不需要发送了
if (((!(lastWorkingStat == 2 && machineWorkingStat == 2))
&& (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(command.getDataSource());
data.setMachineIotMac(command.getMac());
data.setMachinePwrStat(command.getMachinePwrStat());
data.setMachineWorkingStat(command.getMachineWorkingStat());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
data.setLastBootTime(nowDeviceState.getLastBootTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
if (!isExistEs) {
isExistEs = true;
deviceTotalDataStat.update(nowDeviceState);
// 如果上次是待机并且这次也是待机那么就不需要发送了
if (((!(lastWorkingStat == 2 && machineWorkingStat == 2))
&& (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(command.getDataSource());
data.setMachineIotMac(command.getMac());
data.setMachinePwrStat(command.getMachinePwrStat());
data.setMachineWorkingStat(command.getMachineWorkingStat());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
data.setLastBootTime(nowDeviceState.getLastBootTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
if (!isExistEs) {
isExistEs = true;
}
out.collect(data);
}
out.collect(data);
}
} catch (Exception e) {
log.info("导致异常的信息:" + JSONUtil.toJsonStr(command));

14
src/main/java/com/qniao/iot/constant/ConfigConstant.java

@ -2,12 +2,6 @@ package com.qniao.iot.constant;
public interface ConfigConstant {
String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers";
String SOURCE_KAFKA_TOPICS = "source.kafka.topics";
String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId";
String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host";
String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post";
@ -22,17 +16,15 @@ public interface ConfigConstant {
String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index";
String ES_CONNECT_TIMEOUT = "es.connect.timeout";
String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host";
String SOURCE_RABBITMQ_POST = "source.rabbitmq.post";
String SOURCE_RABBITMQ_PORT = "source.rabbitmq.port";
String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.username";
String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.userName";
String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password";
String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtual.host";
String SOURCE_RABBITMQ_VIRTUALHOST = "source.rabbitmq.virtualHost";
String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue";
}
Loading…
Cancel
Save