diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/EsBulkProcessor.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/EsBulkProcessor.java deleted file mode 100644 index ea345ef..0000000 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/EsBulkProcessor.java +++ /dev/null @@ -1,123 +0,0 @@ -/* -package com.qniao.iot.machine.event.generator.job; - -import com.qniao.iot.machine.event.generator.config.ApolloConfig; -import com.qniao.iot.machine.event.generator.constant.ConfigConstant; -import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpHost; -import org.apache.http.NoHttpResponseException; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.conn.ConnectTimeoutException; -import org.apache.http.conn.HttpHostConnectException; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.*; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - - -@Slf4j -public class EsBulkProcessor { - - //es操作客户端 - public static final RestHighLevelClient restHighLevelClient; - - //批量操作的对象 - public static final BulkProcessor bulkProcessor; - - static { - List httpHosts = new ArrayList<>(); - //填充数据 - httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST), - ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), - ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); - //填充host节点 - RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0])); - - builder.setRequestConfigCallback(requestConfigBuilder -> { - requestConfigBuilder.setConnectTimeout(1000); - requestConfigBuilder.setSocketTimeout(1000); - requestConfigBuilder.setConnectionRequestTimeout(1000); - return requestConfigBuilder; - }); - - //填充用户名密码 - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userName", "password")); - - builder.setHttpClientConfigCallback(httpClientBuilder -> { - httpClientBuilder.setMaxConnTotal(30); - httpClientBuilder.setMaxConnPerRoute(30); - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - return httpClientBuilder; - }); - - restHighLevelClient = new RestHighLevelClient(builder); - } - - static { - bulkProcessor=createBulkProcessor(); - } - - private static BulkProcessor createBulkProcessor() { - - BulkProcessor.Listener listener = new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - log.info("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions()); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, - BulkResponse response) { - if (!response.hasFailures()) { - log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis()); - } else { - BulkItemResponse[] items = response.getItems(); - for (BulkItemResponse item : items) { - if (item.isFailed()) { - log.info("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage()); - break; - } - } - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, - Throwable failure) { - - List> requests = request.requests(); - List esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList()); - log.error("3. 【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure); - } - }; - - BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> { - restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener); - }), listener); - //到达10000条时刷新 - builder.setBulkActions(10000); - //内存到达8M时刷新 - builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB)); - //设置的刷新间隔10s - builder.setFlushInterval(TimeValue.timeValueSeconds(10)); - //设置允许执行的并发请求数。 - builder.setConcurrentRequests(8); - //设置重试策略 - builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3)); - return builder.build(); - } -} -*/ diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 4c67e93..3b402dc 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -38,10 +38,12 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; +import org.elasticsearch.client.RestClientBuilder; import java.io.IOException; import java.sql.SQLException; @@ -56,10 +58,10 @@ public class IotMachineEventGeneratorJob { private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" + "from qn_machine_realtime_state qmrs\n" + - " LEFT JOIN qn_machine_list qml ON qmrs.iot_mac = qml.example_id\n" + + " LEFT JOIN (select example_id, count_unit from qn_machine_list where is_delete = 0) qml\n" + + " ON qmrs.iot_mac = qml.example_id\n" + "where qmrs.iot_mac = ?\n" + - " and qmrs.is_delete = 0\n" + - " and qml.is_delete = 0"; + " and qmrs.is_delete = 0"; public static void main(String[] args) throws Exception { @@ -219,6 +221,8 @@ public class IotMachineEventGeneratorJob { // 交换机名称 return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE); } + + })).name("commandDataStream to rabbitmq Sink"); }