From b762db6d66bca3ad2a34c5d968d56dc3bf4680d4 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Mon, 5 Sep 2022 17:46:50 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iot-device-power-on-and-off-data-job/pom.xml | 6 + .../power/IotDevicePowerOnAndOffDataJob.java | 174 ++++++------------ .../device/power/constant/ConfigConstant.java | 21 +-- 3 files changed, 67 insertions(+), 134 deletions(-) diff --git a/iot-device-power-on-and-off-data-job/pom.xml b/iot-device-power-on-and-off-data-job/pom.xml index 44b28dd..adbd2c2 100644 --- a/iot-device-power-on-and-off-data-job/pom.xml +++ b/iot-device-power-on-and-off-data-job/pom.xml @@ -143,6 +143,12 @@ 0.0.1-SNAPSHOT compile + + + com.qniao + iot-machine-data-command + 0.0.1-SNAPSHOT + diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index 49253a4..d8c8f9f 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -7,8 +7,10 @@ import com.qniao.iot.device.power.config.ApolloConfig; import com.qniao.iot.device.power.constant.ConfigConstant; import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent; import com.qniao.iot.device.power.utils.SnowFlake; +import com.qniao.iot.machine.command.MachineOutputCommand; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; +import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema; import com.qniao.iot.rc.constant.MachinePwrStatusEnum; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -22,12 +24,15 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; +import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; +import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.util.Collector; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -99,41 +104,22 @@ public class IotDevicePowerOnAndOffDataJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); // 获取设备数据源 - KafkaSource source = KafkaSource.builder() - .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) - .setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS)) - .setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) - .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) - .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") - .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) + RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() + .setHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_HOST)) + .setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_PORT)) + .setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME)) + .setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD)) + .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUAL_HOST)) .build(); // 设备数据源转换 - DataStreamSource dataStreamSource = env - .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); + final DataStream dataStreamSource = env + .addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE), + false, new MachineOutputCommandDeserializationSchema())).setParallelism(1); - // 数据过滤 - SingleOutputStreamOperator streamOperator = dataStreamSource - .filter((FilterFunction) value -> { - - Long reportTime = value.getReportTime(); - if (reportTime != null - && value.getDataSource() != null && value.getMachinePwrStat() != null) { - String reportTimeStr = StrUtil.toString(reportTime); - if (reportTimeStr.length() == 10) { - // 机智云那边的设备可能是秒或毫秒 - reportTime = reportTime * 1000; - } - long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - // 晚30分钟的数据就不要了 - return nowTime - reportTime <= (30 * 60 * 1000); - } - return false; - }); - - SingleOutputStreamOperator outputStreamOperator = streamOperator - .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) - .process(new KeyedProcessFunction() { + SingleOutputStreamOperator outputStreamOperator = dataStreamSource + .keyBy(MachineOutputCommand::getMac) + .process(new KeyedProcessFunction() { private ValueState powerOnAndOffDataEventValueState; @@ -145,7 +131,8 @@ public class IotDevicePowerOnAndOffDataJob { .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); - ValueStateDescriptor powerOnAndOffDataEventValue = new ValueStateDescriptor<>("powerOnAndOffDataEventValue", + ValueStateDescriptor powerOnAndOffDataEventValue + = new ValueStateDescriptor<>("powerOnAndOffDataEventValue", TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)); // 设置状态值的过期时间,为了解决机器关机没有消息的情况 powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig); @@ -156,34 +143,33 @@ public class IotDevicePowerOnAndOffDataJob { } @Override - public void processElement(MachineIotDataReceivedEvent event, - KeyedProcessFunction.Context ctx, + public void processElement(MachineOutputCommand command, + KeyedProcessFunction.Context ctx, Collector out) throws Exception { - IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event); + IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(command); Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); - Long reportTime = event.getReportTime(); + Long reportTime = command.getTimestamp(); if (reportTime > lastReportTime) { Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); - Integer machinePwrStat = event.getMachinePwrStat(); + Integer machinePwrStat = command.getMachinePwrStat(); Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat(); - Integer machineWorkingStat = event.getMachineWorkingStat(); + Integer machineWorkingStat = command.getMachineWorkingStat(); if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) || (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) { Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount(); - Long accJobCount = event.getAccJobCount(); - Long currJobCount = event.getCurrJobCount(); - Integer dataSource = event.getDataSource(); + Long accJobCount = command.getCurrTotalOutput(); + Long currJobCount = command.getCurrCount(); + Integer dataSource = command.getDataSource(); IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); powerOnAndOffDataEvent.setId(snowflake.nextId()); - powerOnAndOffDataEvent.setDataSource(event.getDataSource()); - powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); - powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat()); - powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat()); - powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); - powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); - powerOnAndOffDataEvent.setAccJobCount(event.getAccJobCount()); - powerOnAndOffDataEvent.setReportTime(event.getReportTime()); + powerOnAndOffDataEvent.setDataSource(command.getDataSource()); + powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); + powerOnAndOffDataEvent.setMachinePwrStat(command.getMachinePwrStat()); + powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); + powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration()); + powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); + powerOnAndOffDataEvent.setReportTime(command.getTimestamp()); powerOnAndOffDataEvent.setReceivedTime(LocalDateTime .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) { @@ -213,7 +199,7 @@ public class IotDevicePowerOnAndOffDataJob { if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { // 只有开机时间不为空 if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(machinePwrStat)) { - powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime()); + powerOnAndOffDataEvent.setMachinePowerOffTime(command.getTimestamp()); } powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); @@ -222,7 +208,7 @@ public class IotDevicePowerOnAndOffDataJob { // // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); + powerOnAndOffDataEvent.setMachinePowerOnTime(command.getTimestamp()); powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); out.collect(powerOnAndOffDataEvent); } @@ -232,20 +218,20 @@ public class IotDevicePowerOnAndOffDataJob { } } - private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineIotDataReceivedEvent event) throws IOException { + private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineOutputCommand command) throws IOException { IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value(); if (iotDevicePowerOnAndOffDataEvent == null) { - iotDevicePowerOnAndOffDataEvent = getByEs(event); + iotDevicePowerOnAndOffDataEvent = getByEs(command); } return iotDevicePowerOnAndOffDataEvent; } - private IotDevicePowerOnAndOffDataEvent getByEs(MachineIotDataReceivedEvent event) throws IOException { + private IotDevicePowerOnAndOffDataEvent getByEs(MachineOutputCommand command) throws IOException { // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", event.getMachineIotMac())); + searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", command.getMac())); searchSourceBuilder.sort("receivedTime", SortOrder.DESC); searchSourceBuilder.size(1); // 创建查询请求对象,将查询对象配置到其中 @@ -265,76 +251,28 @@ public class IotDevicePowerOnAndOffDataJob { return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class); } } - // 如果没有就找清洗后的数据 - MachineIotDataReceivedEvent deviceMonitoringData = getMachineIotDataReceivedEvent(event.getMachineIotMac()); IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); powerOnAndOffDataEvent.setId(snowflake.nextId()); - if (deviceMonitoringData != null) { - powerOnAndOffDataEvent.setDataSource(deviceMonitoringData.getDataSource()); - powerOnAndOffDataEvent.setMachineIotMac(deviceMonitoringData.getMachineIotMac()); - powerOnAndOffDataEvent.setAccJobCount(deviceMonitoringData.getAccJobCount()); - powerOnAndOffDataEvent.setCurrJobCount(0L); - powerOnAndOffDataEvent.setCurrJobDuration(0L); - Integer machinePwrStat = deviceMonitoringData.getMachinePwrStat(); - powerOnAndOffDataEvent.setMachinePwrStat(deviceMonitoringData.getMachinePwrStat()); - powerOnAndOffDataEvent.setMachineWorkingStat(deviceMonitoringData.getMachineWorkingStat()); - Long reportTime = deviceMonitoringData.getReportTime(); - // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); - if (machinePwrStat == 0) { - // 关机 - powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); - } - powerOnAndOffDataEvent.setReportTime(reportTime); - } else { - powerOnAndOffDataEvent.setDataSource(event.getDataSource()); - powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac()); - powerOnAndOffDataEvent.setAccJobCount(event.getAccJobCount()); - powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount()); - powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration()); - Integer machinePwrStat = event.getMachinePwrStat(); - powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat()); - powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat()); - Long reportTime = event.getReportTime(); - // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); - if (machinePwrStat == 0) { - // 关机 - powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); - } - powerOnAndOffDataEvent.setReportTime(reportTime); + powerOnAndOffDataEvent.setDataSource(command.getDataSource()); + powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); + powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); + powerOnAndOffDataEvent.setCurrJobCount(command.getCurrCount()); + powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration()); + Integer machinePwrStat = command.getMachinePwrStat(); + powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat); + powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); + Long reportTime = command.getTimestamp(); + // 开机 + powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); + if (machinePwrStat == 0) { + // 关机 + powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); } + powerOnAndOffDataEvent.setReportTime(reportTime); powerOnAndOffDataEvent.setReceivedTime(LocalDateTime .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); return powerOnAndOffDataEvent; } - - private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac) { - - try { - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - BoolQueryBuilder bool = new BoolQueryBuilder(); - BoolQueryBuilder boolQueryBuilder = bool.must(QueryBuilders.termQuery("machineIotMac", machineIotMac)); - searchSourceBuilder.size(1); - searchSourceBuilder.sort("reportTime", SortOrder.DESC); - searchSourceBuilder.query(boolQueryBuilder); - SearchRequest request = new SearchRequest(ApolloConfig.getStr(ConfigConstant.DATA_ELASTICSEARCH_INDEX)); - request.source(searchSourceBuilder); - // 执行请求 - SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); - if (RestStatus.OK.equals(response.status())) { - SearchHit[] hits = response.getHits().getHits(); - if (hits.length > 0) { - SearchHit hit = hits[0]; - String sourceAsString = hit.getSourceAsString(); - return JSONUtil.toBean(sourceAsString, MachineIotDataReceivedEvent.class); - } - } - } catch (Exception e) { - log.error("获取 machine_iot_data_received_event 索引数据异常"); - } - return null; - } }).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); sinkEs(outputStreamOperator); diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java index fa1b96e..2019ecd 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java @@ -1,14 +1,7 @@ package com.qniao.iot.device.power.constant; public interface ConfigConstant { - - - String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers"; - - String SOURCE_KAFKA_TOPICS = "source.kafka.topics"; - - String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId"; - + String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host"; String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post"; @@ -23,23 +16,19 @@ public interface ConfigConstant { String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index"; - String ES_CONNECT_TIMEOUT = "es.connect.timeout"; - String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id"; String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host"; - String SOURCE_RABBITMQ_PORT = "source.rabbitmq.port"; - - String SOURCE_RABBITMQ_USER_NAME = "source.rabbitmq.userName"; + String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.username"; String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password"; - String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtualHost"; - - String DATA_ELASTICSEARCH_INDEX = "data.elasticsearch.index"; + String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtual.host"; String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue"; + + String SOURCE_RABBITMQ_PORT = "source.rabbitmq.port"; } From 31e3b28578782ea76b10e0112e3b16ecb60c4708 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Tue, 6 Sep 2022 15:18:01 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../power/IotDevicePowerOnAndOffDataJob.java | 43 +++++-------------- .../device/power/constant/ConfigConstant.java | 6 +-- 2 files changed, 13 insertions(+), 36 deletions(-) diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index d8c8f9f..0688e5a 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -1,31 +1,23 @@ package com.qniao.iot.device.power; import cn.hutool.core.bean.BeanUtil; -import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.qniao.iot.device.power.config.ApolloConfig; import com.qniao.iot.device.power.constant.ConfigConstant; import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent; import com.qniao.iot.device.power.utils.SnowFlake; import com.qniao.iot.machine.command.MachineOutputCommand; -import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; -import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema; import com.qniao.iot.rc.constant.MachinePwrStatusEnum; import lombok.extern.slf4j.Slf4j; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.kafka.source.KafkaSource; -import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; @@ -39,8 +31,6 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; @@ -53,7 +43,6 @@ import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -109,7 +98,7 @@ public class IotDevicePowerOnAndOffDataJob { .setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_PORT)) .setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME)) .setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD)) - .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUAL_HOST)) + .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUALHOST)) .build(); // 设备数据源转换 @@ -157,17 +146,14 @@ public class IotDevicePowerOnAndOffDataJob { Integer machineWorkingStat = command.getMachineWorkingStat(); if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) || (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) { - Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount(); - Long accJobCount = command.getCurrTotalOutput(); - Long currJobCount = command.getCurrCount(); - Integer dataSource = command.getDataSource(); + Long currJobCount = command.getCurrJobCount(); + Long currJobDuration = command.getCurrJobDuration(); IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); powerOnAndOffDataEvent.setId(snowflake.nextId()); powerOnAndOffDataEvent.setDataSource(command.getDataSource()); powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); powerOnAndOffDataEvent.setMachinePwrStat(command.getMachinePwrStat()); powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); - powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration()); powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); powerOnAndOffDataEvent.setReportTime(command.getTimestamp()); powerOnAndOffDataEvent.setReceivedTime(LocalDateTime @@ -175,24 +161,15 @@ public class IotDevicePowerOnAndOffDataJob { if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) { if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { // 上次是关机,但是这次是开机,说明周期产能从新开始 - if (dataSource == 1) { - // 根云 - powerOnAndOffDataEvent.setCurrJobCount(accJobCount - lastAccJobCount); - } else { - // 机智云 - powerOnAndOffDataEvent.setCurrJobCount(currJobCount); - } + powerOnAndOffDataEvent.setCurrJobCount(currJobCount); + powerOnAndOffDataEvent.setCurrJobDuration(currJobDuration); } } else { Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount(); + Long lastCurrJobDuration = lastPowerOnAndOffDataEvent.getCurrJobDuration(); // 直接累加 - if (dataSource == 1) { - // 根云 - powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + (accJobCount - lastAccJobCount)); - } else { - // 机智云 - powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount); - } + powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount); + powerOnAndOffDataEvent.setCurrJobDuration(lastCurrJobDuration + currJobDuration); } // 上次的状态只有两种,要么只有开机时间不为空,要么是开机和关机时间都不为空,否则不处理 if (lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { @@ -256,8 +233,8 @@ public class IotDevicePowerOnAndOffDataJob { powerOnAndOffDataEvent.setDataSource(command.getDataSource()); powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); - powerOnAndOffDataEvent.setCurrJobCount(command.getCurrCount()); - powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration()); + powerOnAndOffDataEvent.setCurrJobCount(command.getCurrJobCount()); + powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrJobDuration()); Integer machinePwrStat = command.getMachinePwrStat(); powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat); powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java index 2019ecd..81b2909 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java @@ -1,7 +1,7 @@ package com.qniao.iot.device.power.constant; public interface ConfigConstant { - + String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host"; String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post"; @@ -22,11 +22,11 @@ public interface ConfigConstant { String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host"; - String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.username"; + String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.userName"; String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password"; - String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtual.host"; + String SOURCE_RABBITMQ_VIRTUALHOST = "source.rabbitmq.virtualHost"; String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue"; From 79f3544cb1419ff453425c0a12016c3d43e2dd25 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Tue, 6 Sep 2022 16:20:15 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index 0688e5a..fd4c6c7 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -139,7 +139,7 @@ public class IotDevicePowerOnAndOffDataJob { IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(command); Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); Long reportTime = command.getTimestamp(); - if (reportTime > lastReportTime) { + if (reportTime >= lastReportTime) { Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); Integer machinePwrStat = command.getMachinePwrStat(); Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat(); From 0a0e20ae08a2318e3297fe3d035180d225ffd09c Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Tue, 6 Sep 2022 16:44:39 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E5=88=9B=E5=BB=BA=E7=B4=A2=E5=BC=95?= =?UTF-8?q?=E6=97=B6=E6=96=B0=E5=A2=9E=E5=85=AC=E5=B9=B3=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/device/power/IotDevicePowerOnAndOffDataJob.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index fd4c6c7..a8a6c00 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -58,6 +58,7 @@ import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; @Slf4j public class IotDevicePowerOnAndOffDataJob { @@ -82,6 +83,8 @@ public class IotDevicePowerOnAndOffDataJob { return requestConfigBuilder; })); + private static final ReentrantLock lock = new ReentrantLock(true); + /** * 当前索引日期后缀 */ @@ -328,6 +331,7 @@ public class IotDevicePowerOnAndOffDataJob { GetIndexRequest exist = new GetIndexRequest(indicesName); // 先判断客户端是否存在 try { + lock.lock(); boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); if (!exists) { // 创建索引 @@ -387,6 +391,8 @@ public class IotDevicePowerOnAndOffDataJob { } } catch (Exception e) { e.printStackTrace(); + }finally { + lock.unlock(); } } } From b3e66253cde6611d2c7f4e9822f47e8a28a985f5 Mon Sep 17 00:00:00 2001 From: "hupenghui@qniao.cn" <1049970895> Date: Wed, 7 Sep 2022 00:14:03 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 4 ++++ .../iot/device/power/IotDevicePowerOnAndOffDataJob.java | 7 +------ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index 8452323..6e816d3 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,7 @@ buildNumber.properties .mvn/wrapper/maven-wrapper.jar /.idea +/.iml +/iot-device-power-on-and-off-data.iml +/iot-device-power-on-and-off-data-event/iot-device-power-on-and-off-data-event.iml +/iot-device-power-on-and-off-data-job/iot-device-power-on-and-off-data-job.iml diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index a8a6c00..636d37e 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -83,8 +83,6 @@ public class IotDevicePowerOnAndOffDataJob { return requestConfigBuilder; })); - private static final ReentrantLock lock = new ReentrantLock(true); - /** * 当前索引日期后缀 */ @@ -118,7 +116,7 @@ public class IotDevicePowerOnAndOffDataJob { @Override public void open(Configuration parameters) { - StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1)) + StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); @@ -331,7 +329,6 @@ public class IotDevicePowerOnAndOffDataJob { GetIndexRequest exist = new GetIndexRequest(indicesName); // 先判断客户端是否存在 try { - lock.lock(); boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); if (!exists) { // 创建索引 @@ -391,8 +388,6 @@ public class IotDevicePowerOnAndOffDataJob { } } catch (Exception e) { e.printStackTrace(); - }finally { - lock.unlock(); } } }