diff --git a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java index e6da999..4108c0b 100644 --- a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java @@ -4,10 +4,14 @@ import cn.hutool.core.bean.BeanUtil; import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONUtil; +import com.qniao.domain.BaseCommand; import com.qniao.iot.config.ApolloConfig; import com.qniao.iot.constant.ConfigConstant; -import com.qniao.iot.machine.command.MachineOutputCommand; +import com.qniao.iot.machine.command.*; +import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema; import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema; +import com.qniao.iot.utils.DeviceMonitoringDataRabbitMqSerializationSchema; +import com.rabbitmq.client.AMQP; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.time.Time; @@ -19,6 +23,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.util.Collector; @@ -131,7 +137,7 @@ public class IotMonitoringDataJob { DeviceTotalData lastDeviceState = getLastDeviceTotalData(command); Long reportTime = command.getTimestamp(); Long lastReportTime = lastDeviceState.getReportTime(); - if(lastReportTime <= reportTime) { + if (lastReportTime <= reportTime) { Integer lastWorkingStat = lastDeviceState.getMachineWorkingStat(); Integer lastPwStat = lastDeviceState.getMachinePwrStat(); // 上次启动时间 @@ -306,15 +312,15 @@ public class IotMonitoringDataJob { if (iotMac.equals(machineIotMac)) { deviceTotalData = new DeviceTotalData(); Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal"); - if(productionTotal == null) { + if (productionTotal == null) { deviceTotalData.setJobTotal(0L); - }else { + } else { deviceTotalData.setJobTotal(Integer.toUnsignedLong(Integer.parseInt(String.valueOf(productionTotal)))); } Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal"); - if(workTotalTotal == null) { + if (workTotalTotal == null) { deviceTotalData.setJobDurationTotal(0L); - }else { + } else { deviceTotalData.setJobDurationTotal(Integer.toUnsignedLong(Integer.parseInt(String.valueOf(workTotalTotal))) * 3600); } Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime"); @@ -394,12 +400,50 @@ public class IotMonitoringDataJob { } }).name("machineIotDataReceivedEventDataStream keyBy stream"); + // 写入rabbitmq + sinkRabbitMq(machineIotDataReceivedEventDataStream); + // 写入es sinkEs(machineIotDataReceivedEventDataStream); env.execute("device_monitoring_data"); } + private static void sinkRabbitMq(DataStream dataStream) { + + // rabbitmq配置 + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_HOST)) + .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST)) + .setUserName(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_USER_NAME)) + .setPassword(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_PASSWORD)) + .setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)) + .build(); + + // 发送相应的指令到rabbitmq的交换机 + dataStream + .addSink(new RMQSink<>(connectionConfig, new DeviceMonitoringDataRabbitMqSerializationSchema(), + new RMQSinkPublishOptions() { + + @Override + public String computeRoutingKey(DeviceMonitoringData data) { + return null; + } + + @Override + public AMQP.BasicProperties computeProperties(DeviceMonitoringData data) { + return null; + } + + @Override + public String computeExchange(DeviceMonitoringData data) { + // 交换机名称 + return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_EXCHANGE); + } + })).name("DeviceMonitoringData to rabbitmq Sink"); + } + + private static void sinkEs(DataStream dataStream) { List httpHosts = new ArrayList<>(); diff --git a/src/main/java/com/qniao/iot/constant/ConfigConstant.java b/src/main/java/com/qniao/iot/constant/ConfigConstant.java index e36e465..63a8975 100644 --- a/src/main/java/com/qniao/iot/constant/ConfigConstant.java +++ b/src/main/java/com/qniao/iot/constant/ConfigConstant.java @@ -27,4 +27,16 @@ public interface ConfigConstant { String SOURCE_RABBITMQ_VIRTUALHOST = "source.rabbitmq.virtualHost"; String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue"; + + String SINK_RABBITMQ_HOST = "sink.rabbitmq.host"; + + String SINK_RABBITMQ_PORT = "sink.rabbitmq.port"; + + String SINK_RABBITMQ_VIRTUAL_HOST = "sink.rabbitmq.virtualHost"; + + String SINK_RABBITMQ_USER_NAME = "sink.rabbitmq.userName"; + + String SINK_RABBITMQ_PASSWORD = "sink.rabbitmq.password"; + + String SINK_RABBITMQ_EXCHANGE = "sink.rabbitmq.exchange"; } diff --git a/src/main/java/com/qniao/iot/utils/DeviceMonitoringDataRabbitMqSerializationSchema.java b/src/main/java/com/qniao/iot/utils/DeviceMonitoringDataRabbitMqSerializationSchema.java new file mode 100644 index 0000000..9a85424 --- /dev/null +++ b/src/main/java/com/qniao/iot/utils/DeviceMonitoringDataRabbitMqSerializationSchema.java @@ -0,0 +1,22 @@ +package com.qniao.iot.utils; + +import com.qniao.iot.DeviceMonitoringData; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +public class DeviceMonitoringDataRabbitMqSerializationSchema implements SerializationSchema { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public DeviceMonitoringDataRabbitMqSerializationSchema() { + } + + public byte[] serialize(DeviceMonitoringData data) { + try { + return OBJECT_MAPPER.writeValueAsBytes(data); + } catch (JsonProcessingException var3) { + throw new IllegalArgumentException("Could not serialize record: " + data, var3); + } + } +}