Browse Source

更新

hph-优化版本
1049970895@qniao.cn 3 years ago
parent
commit
31e3b28578
2 changed files with 13 additions and 36 deletions
  1. 43
      iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java
  2. 6
      iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java

43
iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java

@ -1,31 +1,23 @@
package com.qniao.iot.device.power;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.device.power.config.ApolloConfig;
import com.qniao.iot.device.power.constant.ConfigConstant;
import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent;
import com.qniao.iot.device.power.utils.SnowFlake;
import com.qniao.iot.machine.command.MachineOutputCommand;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema;
import com.qniao.iot.rc.constant.MachinePwrStatusEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
@ -39,8 +31,6 @@ import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
@ -53,7 +43,6 @@ import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
@ -109,7 +98,7 @@ public class IotDevicePowerOnAndOffDataJob {
.setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_PORT))
.setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME))
.setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD))
.setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUAL_HOST))
.setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUALHOST))
.build();
// 设备数据源转换
@ -157,17 +146,14 @@ public class IotDevicePowerOnAndOffDataJob {
Integer machineWorkingStat = command.getMachineWorkingStat();
if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0)
|| (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) {
Long lastAccJobCount = lastPowerOnAndOffDataEvent.getAccJobCount();
Long accJobCount = command.getCurrTotalOutput();
Long currJobCount = command.getCurrCount();
Integer dataSource = command.getDataSource();
Long currJobCount = command.getCurrJobCount();
Long currJobDuration = command.getCurrJobDuration();
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId());
powerOnAndOffDataEvent.setDataSource(command.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(command.getMac());
powerOnAndOffDataEvent.setMachinePwrStat(command.getMachinePwrStat());
powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat());
powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration());
powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput());
powerOnAndOffDataEvent.setReportTime(command.getTimestamp());
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
@ -175,24 +161,15 @@ public class IotDevicePowerOnAndOffDataJob {
if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) {
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
// 上次是关机但是这次是开机说明周期产能从新开始
if (dataSource == 1) {
// 根云
powerOnAndOffDataEvent.setCurrJobCount(accJobCount - lastAccJobCount);
} else {
// 机智云
powerOnAndOffDataEvent.setCurrJobCount(currJobCount);
}
powerOnAndOffDataEvent.setCurrJobCount(currJobCount);
powerOnAndOffDataEvent.setCurrJobDuration(currJobDuration);
}
} else {
Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount();
Long lastCurrJobDuration = lastPowerOnAndOffDataEvent.getCurrJobDuration();
// 直接累加
if (dataSource == 1) {
// 根云
powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + (accJobCount - lastAccJobCount));
} else {
// 机智云
powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount);
}
powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount);
powerOnAndOffDataEvent.setCurrJobDuration(lastCurrJobDuration + currJobDuration);
}
// 上次的状态只有两种要么只有开机时间不为空要么是开机和关机时间都不为空否则不处理
if (lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) {
@ -256,8 +233,8 @@ public class IotDevicePowerOnAndOffDataJob {
powerOnAndOffDataEvent.setDataSource(command.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(command.getMac());
powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput());
powerOnAndOffDataEvent.setCurrJobCount(command.getCurrCount());
powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrDuration());
powerOnAndOffDataEvent.setCurrJobCount(command.getCurrJobCount());
powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrJobDuration());
Integer machinePwrStat = command.getMachinePwrStat();
powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat);
powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat());

6
iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java

@ -1,7 +1,7 @@
package com.qniao.iot.device.power.constant;
public interface ConfigConstant {
String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host";
String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post";
@ -22,11 +22,11 @@ public interface ConfigConstant {
String SOURCE_RABBITMQ_HOST = "source.rabbitmq.host";
String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.username";
String SOURCE_RABBITMQ_USERNAME = "source.rabbitmq.userName";
String SOURCE_RABBITMQ_PASSWORD = "source.rabbitmq.password";
String SOURCE_RABBITMQ_VIRTUAL_HOST = "source.rabbitmq.virtual.host";
String SOURCE_RABBITMQ_VIRTUALHOST = "source.rabbitmq.virtualHost";
String SOURCE_RABBITMQ_QUEUE = "source.rabbitmq.queue";

Loading…
Cancel
Save