Browse Source

更新

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
parent
commit
2a4d7a2b06
3 changed files with 74 additions and 52 deletions
  1. 12
      pom.xml
  2. 102
      src/main/java/com/qniao/iot/IotMonitoringDataJob.java
  3. 12
      src/main/java/com/qniao/iot/constant/ConfigConstant.java

12
pom.xml

@ -122,6 +122,18 @@
<artifactId>druid</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.12</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-command</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
<build>

102
src/main/java/com/qniao/iot/IotMonitoringDataJob.java

@ -7,11 +7,14 @@ import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.config.ApolloConfig;
import com.qniao.iot.constant.ConfigConstant;
import com.qniao.iot.machine.command.MachineOutputCommand;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
@ -25,6 +28,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
@ -85,37 +90,22 @@ public class IotMonitoringDataJob {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_HOST))
.setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_POST))
.setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME))
.setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD))
.setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUAL_HOST))
.build();
// 设备数据源转换
DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
// 数据过滤
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> {
Long reportTime = value.getReportTime();
if (reportTime != null
&& value.getDataSource() != null && value.getMachinePwrStat() != null) {
long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
// 晚30分钟的数据就不要了
return nowTime - reportTime <= (30 * 60 * 1000);
}
return false;
});
final DataStream<MachineOutputCommand> stream = env
.addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE),
true, new MachineOutputCommandDeserializationSchema())).setParallelism(1);
// mac分组并进行工作时长的集合操作
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = streamOperator
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>() {
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = stream
.keyBy(MachineOutputCommand::getMac)
.process(new KeyedProcessFunction<Long, MachineOutputCommand, DeviceMonitoringData>() {
// 上次设备数据
private ValueState<DeviceTotalData> deviceTotalDataStat;
@ -133,23 +123,30 @@ public class IotMonitoringDataJob {
}
@Override
public void processElement(MachineIotDataReceivedEvent receivedEvent,
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>.Context ctx,
public void processElement(MachineOutputCommand command,
KeyedProcessFunction<Long, MachineOutputCommand, DeviceMonitoringData>.Context ctx,
Collector<DeviceMonitoringData> out) {
try {
DeviceTotalData lastedDeviceState = getLastDeviceTotalData(receivedEvent);
DeviceTotalData lastedDeviceState = getLastDeviceTotalData(command);
Long lastReportTime = lastedDeviceState.getReportTime();
Long reportTime = receivedEvent.getReportTime();
Long reportTime = command.getTimestamp();
// 如果这次的消息事件小于上次消息的时间那么就进行丢弃
if (lastReportTime <= reportTime) {
Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat();
Integer lastPwStat = lastedDeviceState.getMachinePwrStat();
// 上次启动时间
Long lastBootTime = lastedDeviceState.getLastBootTime();
Long lastTheDayDuration = lastedDeviceState.getTheDayDuration();
Long lastTheDayJobDuration = lastedDeviceState.getTheDayJobDuration();
Long lastJobDurationTotal = lastedDeviceState.getJobDurationTotal();
Long lastTheDayJobCount = lastedDeviceState.getTheDayJobCount();
Long lastJobTotal = lastedDeviceState.getJobTotal();
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
Integer machinePwrStat = command.getMachinePwrStat();
Integer machineWorkingStat = command.getMachineWorkingStat();
Long currDuration = command.getCurrDuration();
Long currCount = command.getCurrCount();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
nowDeviceState.setMachinePwrStat(machinePwrStat);
@ -160,11 +157,11 @@ public class IotMonitoringDataJob {
if (lastPwStat != 0) {
if (lastWorkingStat == 1) {
// 如果上次是工作状态那么需要记录产量和生产时间
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrDuration());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrDuration());
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrCount());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrCount());
nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration);
nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration);
nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration);
nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount);
nowDeviceState.setJobTotal(lastJobTotal + currCount);
} else {
nowDeviceState = lastedDeviceState;
}
@ -180,16 +177,16 @@ public class IotMonitoringDataJob {
// 开机
if (machineWorkingStat.equals(1)) {
// 工作
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrCount());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrDuration());
nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currCount);
nowDeviceState.setJobTotal(lastJobTotal + currCount);
nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currDuration);
nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currDuration);
} else {
// 待机
nowDeviceState = lastedDeviceState;
}
// 设置开机时长待机也要进行累加所以放这里
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrDuration());
nowDeviceState.setTheDayDuration(lastTheDayDuration + currDuration);
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
@ -206,10 +203,10 @@ public class IotMonitoringDataJob {
if (((!(lastWorkingStat == 2 && machineWorkingStat == 2))
&& (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setDataSource(command.getDataSource());
data.setMachineIotMac(command.getMac());
data.setMachinePwrStat(command.getMachinePwrStat());
data.setMachineWorkingStat(command.getMachineWorkingStat());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
@ -225,21 +222,22 @@ public class IotMonitoringDataJob {
}
}
} catch (Exception e) {
log.info("导致异常的信息:" + JSONUtil.toJsonStr(receivedEvent));
log.info("导致异常的信息:" + JSONUtil.toJsonStr(command));
log.error("处理异常", e);
}
}
private DeviceTotalData getLastDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception {
private DeviceTotalData getLastDeviceTotalData(MachineOutputCommand command) throws Exception {
// 上一次的数据
DeviceTotalData value = deviceTotalDataStat.value();
Long reportTime = event.getReportTime();
Long reportTime = command.getTimestamp();
LocalDate localDate = new Date(reportTime).toLocalDate();
Long mac = command.getMac();
if (value == null) {
value = new DeviceTotalData();
// 从es中获取
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac());
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(mac);
if (deviceMonitoringData != null) {
value.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
value.setJobTotal(deviceMonitoringData.getAccJobCount());
@ -254,7 +252,7 @@ public class IotMonitoringDataJob {
} else {
// es中也没有直接从老接口拿
isExistEs = false;
value = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime);
value = queryDeviceMonitoringData(mac, reportTime);
}
// 因为ReportTime参与后面的计算所以如果是第一次取这个数据需要设置为当前消息的时间要不然会有很大的差值
value.setReportTime(reportTime);

12
src/main/java/com/qniao/iot/constant/ConfigConstant.java

@ -33,4 +33,16 @@ public interface ConfigConstant {
String ES_PASSWORD = "es.password";
String ES_CONNECT_TIMEOUT = "es.connect.timeout";
String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host";
String SOURCE_RABBITMQ_POST = "source.rabbitmq.post";
String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.username";
String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password";
String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtual.host";
String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue";
}
Loading…
Cancel
Save