From 6fe05562e3b7cf116bad4704da8b61906f55d31a Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Wed, 13 Jul 2022 10:58:26 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- root-cloud-statistics/pom.xml | 7 ++-- .../iot/rc/RootCloudIotDataFormatterJob.java | 33 +++++++++---------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/root-cloud-statistics/pom.xml b/root-cloud-statistics/pom.xml index f4ae707..abe5f1c 100644 --- a/root-cloud-statistics/pom.xml +++ b/root-cloud-statistics/pom.xml @@ -126,12 +126,9 @@ under the License. org.apache.flink flink-connector-elasticsearch7_2.11 - 1.14.5 + 1.13.0 + compile - - - - diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index f653971..0a1967c 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -57,17 +57,17 @@ import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; +import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Objects; +import java.net.InetSocketAddress; +import java.util.*; /** * Skeleton for a Flink DataStream Job. @@ -146,10 +146,11 @@ public class RootCloudIotDataFormatterJob { } } } - }).name(""); + }).name("keyBy stream"); - //sinkRabbitMq(commandDataStream); + // 写入rabbitmq + // sinkRabbitMq(commandDataStream); // 写入es List httpHosts = new ArrayList<>(); @@ -178,24 +179,22 @@ public class RootCloudIotDataFormatterJob { requestIndexer.add(indexRequest); } ); - /* 必须设置flush参数 */ //刷新前缓冲的最大动作量 esSinkBuilder.setBulkFlushMaxActions(10); //刷新前缓冲区的最大数据大小(以MB为单位) esSinkBuilder.setBulkFlushMaxSizeMb(5); //论缓冲操作的数量或大小如何都要刷新的时间间隔 esSinkBuilder.setBulkFlushInterval(5000L); - esSinkBuilder.setRestClientFactory((RestClientFactory) restClientBuilder -> { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials("elastic", "qn56521")); - restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { - httpAsyncClientBuilder.disableAuthCaching(); - return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - }); - }); + + esSinkBuilder.setRestClientFactory( + restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","qn56521")); + return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + }) + ); //数据流添加sink - commandDataStream.addSink(esSinkBuilder.build()); + commandDataStream.addSink(esSinkBuilder.build()).name("BaseCommand sink"); env.execute("Kafka Job"); }