From e1ed012067ca07f84ee03147d34da118cd0e3e7c Mon Sep 17 00:00:00 2001
From: "1049970895@qniao.cn" <1049970895>
Date: Thu, 1 Sep 2022 16:54:28 +0800
Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E5=AE=8C=E6=88=90=EF=BC=8C?=
=?UTF-8?q?=E5=BE=85=E6=B5=8B=E8=AF=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../iot/rc/RootCloudIotDataFormatterJob.java | 275 +++++++++++++-----
.../com/qniao/iot/rc/config/ApolloConfig.java | 9 +-
.../qniao/iot/rc/constant/ConfigConstant.java | 14 +
3 files changed, 223 insertions(+), 75 deletions(-)
diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
index 834a137..ba45128 100644
--- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
+++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
@@ -19,6 +19,7 @@
package com.qniao.iot.rc;
import cn.hutool.core.util.CharsetUtil;
+import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.rc.config.ApolloConfig;
@@ -27,11 +28,16 @@ import com.qniao.iot.rc.constant.DataSource;
import com.qniao.iot.rc.event.MachineIotDataReceivedEventSerializationSchema;
import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema;
import com.qniao.iot.rc.until.SnowFlake;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
@@ -41,48 +47,52 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsIni
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.util.Collector;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import java.io.IOException;
import java.math.BigDecimal;
-import java.security.Provider;
-import java.time.Duration;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
+import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
-import java.util.concurrent.TimeUnit;
/**
- * Skeleton for a Flink DataStream Job.
- *
- *
For a tutorial how to write a Flink application, check the
- * tutorials and examples on the Flink Website.
- *
- *
To package your application into a JAR file for execution, run
- * 'mvn clean package' on the command line.
- *
- *
If you change the name of the main class (with the public static void main(String[] args))
- * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
- *
* @author Lzk
*/
+@Slf4j
public class RootCloudIotDataFormatterJob {
static SnowFlake snowflake = new SnowFlake(
- Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)),
- Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID))
+ Long.parseLong(ApolloConfig.getStr(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)),
+ Long.parseLong(ApolloConfig.getStr(ConfigConstant.SNOW_FLAKE_MACHINE_ID))
);
public static void main(String[] args) throws Exception {
@@ -91,9 +101,9 @@ public class RootCloudIotDataFormatterJob {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource source = KafkaSource.builder()
- .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
- .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
- .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID))
+ .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
+ .setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS))
+ .setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUPID))
/*.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN")
.setProperty("sasl.jaas.config",
@@ -104,32 +114,184 @@ public class RootCloudIotDataFormatterJob {
.build();
// 把树根的数据转成我们自己的格式
- SingleOutputStreamOperator transformDs = env
- .fromSource(source, WatermarkStrategy.noWatermarks(), "RootCloudIotDataReceiptedEvent Source")
- .map((MapFunction) RootCloudIotDataFormatterJob::transform)
- .name("Transform MachineIotDataReceivedEvent");
+ DataStreamSource streamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "RootCloudIotDataReceiptedEvent Source");
+
+ // 数据过滤
+ SingleOutputStreamOperator streamOperator = streamSource
+ .filter(new RichFilterFunction() {
+ @Override
+ public boolean filter(RootCloudIotDataReceiptedEvent value) {
+
+ Long reportTime = value.get__timestamp__();
+ if (reportTime != null) {
+ String reportTimeStr = StrUtil.toString(reportTime);
+ if (reportTimeStr.length() == 10) {
+ value.set__timestamp__(reportTime * 1000);
+ }
+ }
+ return value.getWorking_sta() != null
+ && value.get__assetId__() != null
+ && value.getPWR_sta() != null && reportTime != null && value.getACC_count() != null;
+ }
+ }).name("machine iot data received event filter operator");
+
+ // 分组操作
+ SingleOutputStreamOperator outputStreamOperator = streamOperator
+ .keyBy(RootCloudIotDataReceiptedEvent::get__assetId__)
+ .process(new KeyedProcessFunction() {
+
+ private final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(
+ RestClient.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_HOST),
+ ApolloConfig.getInt(ConfigConstant.ELASTICSEARCH_POST),
+ ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_SCHEME)))
+ .setHttpClientConfigCallback(httpAsyncClientBuilder -> {
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_USER_NAME),
+ ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_PASSWORD)));
+ return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ }).setRequestConfigCallback(requestConfigBuilder -> {
+ // 设置es连接超时时间
+ requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ELASTICSEARCH_CONNECT_TIMEOUT));
+ return requestConfigBuilder;
+ }));
+
+ private ValueState eventValueState;
+
+ @Override
+ public void open(Configuration parameters) {
+
+ eventValueState = getRuntimeContext()
+ .getState(new ValueStateDescriptor<>("machineIotDataReceivedEventState",
+ TypeInformation.of(MachineIotDataReceivedEvent.class)));
+ }
+
+ @Override
+ public void processElement(RootCloudIotDataReceiptedEvent value,
+ KeyedProcessFunction.Context ctx,
+ Collector out) throws IOException {
+ // 数据清洗
+ Long machineIotMac = Long.valueOf(value.get__assetId__());
+ MachineIotDataReceivedEvent lastReceivedEvent = eventValueState.value();
+ if (lastReceivedEvent == null) {
+ lastReceivedEvent = getMachineIotDataReceivedEvent(machineIotMac, value);
+ }
+ Integer pwrSta = value.getPWR_sta();
+ Integer workingSta = value.getWorking_sta();
+ Long accCount = value.getACC_count();
+ Integer lastPwrStat = lastReceivedEvent.getMachinePwrStat();
+ Integer lastWorkingStat = lastReceivedEvent.getMachineWorkingStat();
+ MachineIotDataReceivedEvent receivedEvent = new MachineIotDataReceivedEvent();
+ receivedEvent.setId(snowflake.nextId());
+ receivedEvent.setDataSource(DataSource.ROOT_CLOUD);
+ receivedEvent.setMachineIotMac(machineIotMac);
+ receivedEvent.setMachinePwrStat(pwrSta);
+ if (pwrSta == 1 && accCount != null && accCount == 0) {
+ // 如果是开机状态并且没有产量就设置为待机
+ receivedEvent.setMachineWorkingStat(2);
+ } else {
+ receivedEvent.setMachineWorkingStat(workingSta);
+ }
+ receivedEvent.setAccJobCount(accCount);
+ if ((pwrSta == 1 && workingSta == 1)
+ || (lastPwrStat == 1 && lastWorkingStat == 1)) {
+ // 只有当前是工作中或上次是工作中才进行计算
+ Long lastReportTime = lastReceivedEvent.getReportTime();
+ Long reportTime = value.get__timestamp__();
+ // 如果这次的消息和上次的消息相差半个小时,那么不进行计算
+ if (reportTime - lastReportTime <= 30 * 60 * 1000) {
+ receivedEvent.setCurrJobCount(value.getACC_count() - lastReceivedEvent.getAccJobCount());
+ receivedEvent.setCurrJobDuration(reportTime - lastReportTime);
+ }
+ }
+ receivedEvent.setCurrWaitingDuration(0L);
+ receivedEvent.setCurrStoppingDuration(0L);
+ receivedEvent.setIgStat(value.getIG_sta());
+ receivedEvent.setReportTime(value.get__timestamp__());
+ receivedEvent.setReceivedTime(System.currentTimeMillis());
+ eventValueState.update(receivedEvent);
+ out.collect(receivedEvent);
+ }
+
+ private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac,
+ RootCloudIotDataReceiptedEvent event) {
+
+ MachineIotDataReceivedEvent receivedEvent = getFromEs(machineIotMac);
+ if (receivedEvent == null) {
+ // 在es中没有查到,说明是新机器
+ receivedEvent = new MachineIotDataReceivedEvent();
+ receivedEvent.setId(snowflake.nextId());
+ receivedEvent.setDataSource(DataSource.ROOT_CLOUD);
+ receivedEvent.setMachineIotMac(machineIotMac);
+ receivedEvent.setMachinePwrStat(event.getPWR_sta());
+ receivedEvent.setMachineWorkingStat(event.getWorking_sta());
+ receivedEvent.setAccJobCount(event.getACC_count());
+ receivedEvent.setCurrJobCount(0L);
+ receivedEvent.setCurrJobDuration(0L);
+ receivedEvent.setCurrWaitingDuration(0L);
+ receivedEvent.setCurrStoppingDuration(0L);
+ receivedEvent.setIgStat(event.getIG_sta());
+ receivedEvent.setReportTime(event.get__timestamp__());
+ receivedEvent.setReceivedTime(System.currentTimeMillis());
+ }
+ return receivedEvent;
+ }
+
+ private MachineIotDataReceivedEvent getFromEs(Long machineIotMac) {
+
+ try {
+ // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询)
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
+ searchSourceBuilder.sort("reportTime", SortOrder.DESC);
+ searchSourceBuilder.size(1);
+ // 创建查询请求对象,将查询对象配置到其中
+ SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_INDEX));
+ searchRequest.source(searchSourceBuilder);
+ String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
+ GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_INDEX) + "_" + nowDate);
+ // 先判断索引是否存在
+ boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
+ if (exists) {
+ // 执行查询,然后处理响应结果
+ SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
+ // 根据状态和数据条数验证是否返回了数据
+ if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
+ SearchHits hits = searchResponse.getHits();
+ SearchHit reqHit = hits.getHits()[0];
+ return JSONUtil.toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class);
+ }
+ }
+ } catch (Exception e) {
+ log.error("获取es数据异常", e);
+ }
+ return null;
+ }
+ }).name("machine iot data received event keyBy");
+
Properties kafkaProducerConfig = new Properties();
kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
kafkaProducerConfig.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";");
+
// 写入kafka
- transformDs.sinkTo(
+ outputStreamOperator.sinkTo(
KafkaSink.builder()
- .setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS))
+ .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS))
//.setKafkaProducerConfig(kafkaProducerConfig)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
- .setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS))
+ .setTopic(ApolloConfig.getStr(ConfigConstant.SINK_KAFKA_TOPICS))
.setValueSerializationSchema(new MachineIotDataReceivedEventSerializationSchema())
.build()
).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
- ).name("MachineIotDataReceivedEvent Sink");
+ ).name("machine iot data received event Sink");
// 发送到OSS存储
- String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH);
+ String outputPath = ApolloConfig.getStr(ConfigConstant.SINK_OSS_PATH);
StreamingFileSink sink = StreamingFileSink.forRowFormat(
new Path(outputPath),
new SimpleStringEncoder(CharsetUtil.UTF_8)
@@ -148,9 +310,9 @@ public class RootCloudIotDataFormatterJob {
private String getDeviceStatusStr(Integer machineWorkingStat) {
- if(machineWorkingStat == null) {
+ if (machineWorkingStat == null) {
return "deviceOff";
- }else {
+ } else {
if (machineWorkingStat == 1) {
return "deviceWorking";
} else if (machineWorkingStat == 2) {
@@ -167,15 +329,15 @@ public class RootCloudIotDataFormatterJob {
}
}).withRollingPolicy(DefaultRollingPolicy.builder()
// 每隔多长时间生成一个文件
- .withRolloverInterval(Duration.ofHours(12))
- // 默认60秒,未写入数据处于不活跃状态超时会滚动新文件
- .withInactivityInterval(Duration.ofHours(12))
+ .withRolloverInterval(Duration.ofHours(6))
+ // 未写入数据处于不活跃状态超时会滚动新文件
+ .withInactivityInterval(Duration.ofHours(1))
// 设置每个文件的最大大小 ,默认是128M
.withMaxPartSize(MemorySize.ofMebiBytes(128 * 1024 * 1024))
.build()).withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".json").build()).build();
- transformDs.map(new RichMapFunction() {
+ outputStreamOperator.map(new RichMapFunction() {
@Override
public String map(MachineIotDataReceivedEvent value) {
return JSONUtil.toJsonStr(value);
@@ -184,37 +346,4 @@ public class RootCloudIotDataFormatterJob {
env.execute("root cloud iot data formatter job");
}
-
-
- private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) {
-
-
- MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent();
- if (Objects.nonNull(event)) {
- machineIotDataReceivedEvent.setId(snowflake.nextId());
- machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__()));
- machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD);
- Integer pwrSta = event.getPWR_sta();
- machineIotDataReceivedEvent.setMachinePwrStat(pwrSta);
- machineIotDataReceivedEvent.setIgStat(event.getIG_sta());
- machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total());
- Long accCount = event.getACC_count();
- machineIotDataReceivedEvent.setCurrJobCount(accCount);
- if(pwrSta != null && pwrSta == 1 && accCount != null && accCount == 0) {
- // 如果是开机状态并且没有产量就设置为待机
- machineIotDataReceivedEvent.setMachineWorkingStat(2);
- }else {
- machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta());
- }
- machineIotDataReceivedEvent.setCurrJobDuration(Objects.isNull(event.getRunning_duration()) ? null : event.getRunning_duration().longValue());
- if (StringUtils.isNotBlank(event.getStoping_duration())) {
- BigDecimal stoppingDuration = new BigDecimal(event.getStoping_duration());
- machineIotDataReceivedEvent.setCurrStoppingDuration(stoppingDuration.longValue());
- }
- machineIotDataReceivedEvent.setCurrWaitingDuration(Objects.isNull(event.getWaiting_duration()) ? null : event.getWaiting_duration().longValue());
- machineIotDataReceivedEvent.setReceivedTime(System.currentTimeMillis());
- machineIotDataReceivedEvent.setReportTime(event.get__timestamp__());
- }
- return machineIotDataReceivedEvent;
- }
}
diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/config/ApolloConfig.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/config/ApolloConfig.java
index 8e3933a..208bd10 100644
--- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/config/ApolloConfig.java
+++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/config/ApolloConfig.java
@@ -7,13 +7,18 @@ public class ApolloConfig {
private static final Config config = ConfigService.getAppConfig();
- public static String get(String key, String defaultValue) {
+ public static String getStr(String key, String defaultValue) {
return config.getProperty(key, defaultValue);
}
- public static String get(String key) {
+ public static String getStr(String key) {
return config.getProperty(key, null);
}
+
+ public static Integer getInt(String key) {
+
+ return config.getIntProperty(key, null);
+ }
}
diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java
index 38e795e..5acf996 100644
--- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java
+++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/constant/ConfigConstant.java
@@ -17,4 +17,18 @@ public interface ConfigConstant {
String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id";
String SINK_OSS_PATH = "sink.oss.path";
+
+ String ELASTICSEARCH_INDEX = "elasticsearch.index";
+
+ String ELASTICSEARCH_HOST = "elasticsearch.host";
+
+ String ELASTICSEARCH_POST = "elasticsearch.post";
+
+ String ELASTICSEARCH_SCHEME = "elasticsearch.scheme";
+
+ String ELASTICSEARCH_USER_NAME = "elasticsearch.userName";
+
+ String ELASTICSEARCH_PASSWORD = "elasticsearch.password";
+
+ String ELASTICSEARCH_CONNECT_TIMEOUT = "elasticsearch.connect.timeout";
}