diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index d8c8f9f..0688e5a 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -1,31 +1,23 @@ package com.qniao.iot.device.power; import cn.hutool.core.bean.BeanUtil; -import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.qniao.iot.device.power.config.ApolloConfig; import com.qniao.iot.device.power.constant.ConfigConstant; import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent; import com.qniao.iot.device.power.utils.SnowFlake; import com.qniao.iot.machine.command.MachineOutputCommand; -import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; -import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema; import com.qniao.iot.rc.constant.MachinePwrStatusEnum; import lombok.extern.slf4j.Slf4j; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.kafka.source.KafkaSource; -import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; @@ -39,8 +31,6 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; @@ -53,7 +43,6 @@ import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -109,7 +98,7 @@ public class IotDevicePowerOnAndOffDataJob { .setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_PORT)) .setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME)) .setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD)) - .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUAL_HOST)) + .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUALHOST)) .build(); // 设备数据源转换 @@ -157,17 +146,14 @@ public class IotDevicePowerOnAndOffDataJob { Integer machineWorkingStat = command.getMachineWorkingStat(); if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) || (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) { - Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount(); - Long accJobCount = command.getCurrTotalOutput(); - Long currJobCount = command.getCurrCount(); - Integer dataSource = command.getDataSource(); + Long currJobCount = command.getCurrJobCount(); + Long currJobDuration = command.getCurrJobDuration(); IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); powerOnAndOffDataEvent.setId(snowflake.nextId()); powerOnAndOffDataEvent.setDataSource(command.getDataSource()); powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); powerOnAndOffDataEvent.setMachinePwrStat(command.getMachinePwrStat()); powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); - powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration()); powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); powerOnAndOffDataEvent.setReportTime(command.getTimestamp()); powerOnAndOffDataEvent.setReceivedTime(LocalDateTime @@ -175,24 +161,15 @@ public class IotDevicePowerOnAndOffDataJob { if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) { if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { // 上次是关机,但是这次是开机,说明周期产能从新开始 - if (dataSource == 1) { - // 根云 - powerOnAndOffDataEvent.setCurrJobCount(accJobCount - lastAccJobCount); - } else { - // 机智云 - powerOnAndOffDataEvent.setCurrJobCount(currJobCount); - } + powerOnAndOffDataEvent.setCurrJobCount(currJobCount); + powerOnAndOffDataEvent.setCurrJobDuration(currJobDuration); } } else { Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount(); + Long lastCurrJobDuration = lastPowerOnAndOffDataEvent.getCurrJobDuration(); // 直接累加 - if (dataSource == 1) { - // 根云 - powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + (accJobCount - lastAccJobCount)); - } else { - // 机智云 - powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount); - } + powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount); + powerOnAndOffDataEvent.setCurrJobDuration(lastCurrJobDuration + currJobDuration); } // 上次的状态只有两种,要么只有开机时间不为空,要么是开机和关机时间都不为空,否则不处理 if (lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { @@ -256,8 +233,8 @@ public class IotDevicePowerOnAndOffDataJob { powerOnAndOffDataEvent.setDataSource(command.getDataSource()); powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); - powerOnAndOffDataEvent.setCurrJobCount(command.getCurrCount()); - powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration()); + powerOnAndOffDataEvent.setCurrJobCount(command.getCurrJobCount()); + powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrJobDuration()); Integer machinePwrStat = command.getMachinePwrStat(); powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat); powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java index 2019ecd..81b2909 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java @@ -1,7 +1,7 @@ package com.qniao.iot.device.power.constant; public interface ConfigConstant { - + String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host"; String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post"; @@ -22,11 +22,11 @@ public interface ConfigConstant { String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host"; - String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.username"; + String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.userName"; String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password"; - String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtual.host"; + String SOURCE_RABBITMQ_VIRTUALHOST = "source.rabbitmq.virtualHost"; String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue";