diff --git a/iot-device-power-on-and-off-data-job/pom.xml b/iot-device-power-on-and-off-data-job/pom.xml index 44b28dd..adbd2c2 100644 --- a/iot-device-power-on-and-off-data-job/pom.xml +++ b/iot-device-power-on-and-off-data-job/pom.xml @@ -143,6 +143,12 @@ 0.0.1-SNAPSHOT compile + + + com.qniao + iot-machine-data-command + 0.0.1-SNAPSHOT + diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index 49253a4..d8c8f9f 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -7,8 +7,10 @@ import com.qniao.iot.device.power.config.ApolloConfig; import com.qniao.iot.device.power.constant.ConfigConstant; import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent; import com.qniao.iot.device.power.utils.SnowFlake; +import com.qniao.iot.machine.command.MachineOutputCommand; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; +import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema; import com.qniao.iot.rc.constant.MachinePwrStatusEnum; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -22,12 +24,15 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.util.Collector; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -99,41 +104,22 @@ public class IotDevicePowerOnAndOffDataJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); // 获取设备数据源 - KafkaSource source = KafkaSource.builder() - .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) - .setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS)) - .setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) - .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) - .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") - .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_HOST)) + .setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_PORT)) + .setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME)) + .setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD)) + .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUAL_HOST)) .build(); // 设备数据源转换 - DataStreamSource dataStreamSource = env - .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); + final DataStream dataStreamSource = env + .addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE), + false, new MachineOutputCommandDeserializationSchema())).setParallelism(1); - // 数据过滤 - SingleOutputStreamOperator streamOperator = dataStreamSource - .filter((FilterFunction) value -> { - - Long reportTime = value.getReportTime(); - if (reportTime != null - && value.getDataSource() != null && value.getMachinePwrStat() != null) { - String reportTimeStr = StrUtil.toString(reportTime); - if (reportTimeStr.length() == 10) { - // 机智云那边的设备可能是秒或毫秒 - reportTime = reportTime * 1000; - } - long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - // 晚30分钟的数据就不要了 - return nowTime - reportTime <= (30 * 60 * 1000); - } - return false; - }); - - SingleOutputStreamOperator outputStreamOperator = streamOperator - .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) - .process(new KeyedProcessFunction() { + SingleOutputStreamOperator outputStreamOperator = dataStreamSource + .keyBy(MachineOutputCommand::getMac) + .process(new KeyedProcessFunction() { private ValueState powerOnAndOffDataEventValueState; @@ -145,7 +131,8 @@ public class IotDevicePowerOnAndOffDataJob { .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); - ValueStateDescriptor powerOnAndOffDataEventValue = new ValueStateDescriptor<>("powerOnAndOffDataEventValue", + ValueStateDescriptor powerOnAndOffDataEventValue + = new ValueStateDescriptor<>("powerOnAndOffDataEventValue", TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)); // 设置状态值的过期时间,为了解决机器关机没有消息的情况 powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig); @@ -156,34 +143,33 @@ public class IotDevicePowerOnAndOffDataJob { } @Override - public void processElement(MachineIotDataReceivedEvent event, - KeyedProcessFunction.Context ctx, + public void processElement(MachineOutputCommand command, + KeyedProcessFunction.Context ctx, Collector out) throws Exception { - IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event); + IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(command); Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); - Long reportTime = event.getReportTime(); + Long reportTime = command.getTimestamp(); if (reportTime > lastReportTime) { Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); - Integer machinePwrStat = event.getMachinePwrStat(); + Integer machinePwrStat = command.getMachinePwrStat(); Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat(); - Integer machineWorkingStat = event.getMachineWorkingStat(); + Integer machineWorkingStat = command.getMachineWorkingStat(); if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) || (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) { Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount(); - Long accJobCount = event.getAccJobCount(); - Long currJobCount = event.getCurrJobCount(); - Integer dataSource = event.getDataSource(); + Long accJobCount = command.getCurrTotalOutput(); + Long currJobCount = command.getCurrCount(); + Integer dataSource = command.getDataSource(); IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); powerOnAndOffDataEvent.setId(snowflake.nextId()); - powerOnAndOffDataEvent.setDataSource(event.getDataSource()); - powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); - powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat()); - powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat()); - powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); - powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); - powerOnAndOffDataEvent.setAccJobCount(event.getAccJobCount()); - powerOnAndOffDataEvent.setReportTime(event.getReportTime()); + powerOnAndOffDataEvent.setDataSource(command.getDataSource()); + powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); + powerOnAndOffDataEvent.setMachinePwrStat(command.getMachinePwrStat()); + powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); + powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration()); + powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); + powerOnAndOffDataEvent.setReportTime(command.getTimestamp()); powerOnAndOffDataEvent.setReceivedTime(LocalDateTime .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) { @@ -213,7 +199,7 @@ public class IotDevicePowerOnAndOffDataJob { if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { // 只有开机时间不为空 if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(machinePwrStat)) { - powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime()); + powerOnAndOffDataEvent.setMachinePowerOffTime(command.getTimestamp()); } powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); @@ -222,7 +208,7 @@ public class IotDevicePowerOnAndOffDataJob { // // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); + powerOnAndOffDataEvent.setMachinePowerOnTime(command.getTimestamp()); powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); out.collect(powerOnAndOffDataEvent); } @@ -232,20 +218,20 @@ public class IotDevicePowerOnAndOffDataJob { } } - private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineIotDataReceivedEvent event) throws IOException { + private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineOutputCommand command) throws IOException { IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value(); if (iotDevicePowerOnAndOffDataEvent == null) { - iotDevicePowerOnAndOffDataEvent = getByEs(event); + iotDevicePowerOnAndOffDataEvent = getByEs(command); } return iotDevicePowerOnAndOffDataEvent; } - private IotDevicePowerOnAndOffDataEvent getByEs(MachineIotDataReceivedEvent event) throws IOException { + private IotDevicePowerOnAndOffDataEvent getByEs(MachineOutputCommand command) throws IOException { // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", event.getMachineIotMac())); + searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", command.getMac())); searchSourceBuilder.sort("receivedTime", SortOrder.DESC); searchSourceBuilder.size(1); // 创建查询请求对象,将查询对象配置到其中 @@ -265,76 +251,28 @@ public class IotDevicePowerOnAndOffDataJob { return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class); } } - // 如果没有就找清洗后的数据 - MachineIotDataReceivedEvent deviceMonitoringData = getMachineIotDataReceivedEvent(event.getMachineIotMac()); IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); powerOnAndOffDataEvent.setId(snowflake.nextId()); - if (deviceMonitoringData != null) { - powerOnAndOffDataEvent.setDataSource(deviceMonitoringData.getDataSource()); - powerOnAndOffDataEvent.setMachineIotMac(deviceMonitoringData.getMachineIotMac()); - powerOnAndOffDataEvent.setAccJobCount(deviceMonitoringData.getAccJobCount()); - powerOnAndOffDataEvent.setCurrJobCount(0L); - powerOnAndOffDataEvent.setCurrJobDuration(0L); - Integer machinePwrStat = deviceMonitoringData.getMachinePwrStat(); - powerOnAndOffDataEvent.setMachinePwrStat(deviceMonitoringData.getMachinePwrStat()); - powerOnAndOffDataEvent.setMachineWorkingStat(deviceMonitoringData.getMachineWorkingStat()); - Long reportTime = deviceMonitoringData.getReportTime(); - // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); - if (machinePwrStat == 0) { - // 关机 - powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); - } - powerOnAndOffDataEvent.setReportTime(reportTime); - } else { - powerOnAndOffDataEvent.setDataSource(event.getDataSource()); - powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); - powerOnAndOffDataEvent.setAccJobCount(event.getAccJobCount()); - powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); - powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); - Integer machinePwrStat = event.getMachinePwrStat(); - powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat()); - powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat()); - Long reportTime = event.getReportTime(); - // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); - if (machinePwrStat == 0) { - // 关机 - powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); - } - powerOnAndOffDataEvent.setReportTime(reportTime); + powerOnAndOffDataEvent.setDataSource(command.getDataSource()); + powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); + powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); + powerOnAndOffDataEvent.setCurrJobCount(command.getCurrCount()); + powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration()); + Integer machinePwrStat = command.getMachinePwrStat(); + powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat); + powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); + Long reportTime = command.getTimestamp(); + // 开机 + powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); + if (machinePwrStat == 0) { + // 关机 + powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); } + powerOnAndOffDataEvent.setReportTime(reportTime); powerOnAndOffDataEvent.setReceivedTime(LocalDateTime .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); return powerOnAndOffDataEvent; } - - private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac) { - - try { - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - BoolQueryBuilder bool = new BoolQueryBuilder(); - BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", machineIotMac)); - searchSourceBuilder.size(1); - searchSourceBuilder.sort("reportTime", SortOrder.DESC); - searchSourceBuilder.query(boolQueryBuilder); - SearchRequest request = new SearchRequest(ApolloConfig.getStr(ConfigConstant.DATA_ELASTICSEARCH_INDEX)); - request.source(searchSourceBuilder); - // 执行请求 - SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); - if (RestStatus.OK.equals(response.status())) { - SearchHit[] hits = response.getHits().getHits(); - if (hits.length > 0) { - SearchHit hit = hits[0]; - String sourceAsString = hit.getSourceAsString(); - return JSONUtil.toBean(sourceAsString, MachineIotDataReceivedEvent.class); - } - } - } catch (Exception e) { - log.error("获取 machine_iot_data_received_event 索引数据异常"); - } - return null; - } }).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); sinkEs(outputStreamOperator); diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java index fa1b96e..2019ecd 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java @@ -1,14 +1,7 @@ package com.qniao.iot.device.power.constant; public interface ConfigConstant { - - - String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers"; - - String SOURCE_KAFKA_TOPICS = "source.kafka.topics"; - - String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId"; - + String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host"; String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post"; @@ -23,23 +16,19 @@ public interface ConfigConstant { String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index"; - String ES_CONNECT_TIMEOUT = "es.connect.timeout"; - String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id"; String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host"; - String SOURCE_RABBITMQ_PORT = "source.rabbitmq.port"; - - String SOURCE_RABBITMQ_USER_NAME = "source.rabbitmq.userName"; + String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.username"; String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password"; - String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtualHost"; - - String DATA_ELASTICSEARCH_INDEX = "data.elasticsearch.index"; + String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtual.host"; String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue"; + + String SOURCE_RABBITMQ_PORT = "source.rabbitmq.port"; }