|
|
|
@ -57,17 +57,17 @@ import org.apache.http.HttpHost; |
|
|
|
import org.apache.http.auth.AuthScope; |
|
|
|
import org.apache.http.auth.UsernamePasswordCredentials; |
|
|
|
import org.apache.http.client.CredentialsProvider; |
|
|
|
import org.apache.http.client.config.RequestConfig; |
|
|
|
import org.apache.http.impl.client.BasicCredentialsProvider; |
|
|
|
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; |
|
|
|
import org.elasticsearch.action.index.IndexRequest; |
|
|
|
import org.elasticsearch.client.Requests; |
|
|
|
import org.elasticsearch.client.RestClient; |
|
|
|
import org.elasticsearch.client.RestClientBuilder; |
|
|
|
|
|
|
|
import java.math.BigDecimal; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.HashMap; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Objects; |
|
|
|
import java.net.InetSocketAddress; |
|
|
|
import java.util.*; |
|
|
|
|
|
|
|
/** |
|
|
|
* Skeleton for a Flink DataStream Job. |
|
|
|
@ -146,10 +146,11 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}).name(""); |
|
|
|
}).name("keyBy stream"); |
|
|
|
|
|
|
|
|
|
|
|
//sinkRabbitMq(commandDataStream); |
|
|
|
// 写入rabbitmq |
|
|
|
// sinkRabbitMq(commandDataStream); |
|
|
|
|
|
|
|
// 写入es |
|
|
|
List<HttpHost> httpHosts = new ArrayList<>(); |
|
|
|
@ -178,24 +179,22 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
requestIndexer.add(indexRequest); |
|
|
|
} |
|
|
|
); |
|
|
|
/* 必须设置flush参数 */ |
|
|
|
//刷新前缓冲的最大动作量 |
|
|
|
esSinkBuilder.setBulkFlushMaxActions(10); |
|
|
|
//刷新前缓冲区的最大数据大小(以MB为单位) |
|
|
|
esSinkBuilder.setBulkFlushMaxSizeMb(5); |
|
|
|
//论缓冲操作的数量或大小如何都要刷新的时间间隔 |
|
|
|
esSinkBuilder.setBulkFlushInterval(5000L); |
|
|
|
esSinkBuilder.setRestClientFactory((RestClientFactory) restClientBuilder -> { |
|
|
|
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
|
|
|
credentialsProvider.setCredentials(AuthScope.ANY, |
|
|
|
new UsernamePasswordCredentials("elastic", "qn56521")); |
|
|
|
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { |
|
|
|
httpAsyncClientBuilder.disableAuthCaching(); |
|
|
|
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); |
|
|
|
}); |
|
|
|
}); |
|
|
|
|
|
|
|
esSinkBuilder.setRestClientFactory( |
|
|
|
restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { |
|
|
|
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
|
|
|
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","qn56521")); |
|
|
|
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); |
|
|
|
}) |
|
|
|
); |
|
|
|
//数据流添加sink |
|
|
|
commandDataStream.addSink(esSinkBuilder.build()); |
|
|
|
commandDataStream.addSink(esSinkBuilder.build()).name("BaseCommand sink"); |
|
|
|
|
|
|
|
env.execute("Kafka Job"); |
|
|
|
} |
|
|
|
|