diff --git a/iot-machine-state-event-generator-job/pom.xml b/iot-machine-state-event-generator-job/pom.xml index b93d92b..892c5df 100644 --- a/iot-machine-state-event-generator-job/pom.xml +++ b/iot-machine-state-event-generator-job/pom.xml @@ -121,6 +121,36 @@ apollo-core 2.0.1 + + + org.elasticsearch + elasticsearch + 7.5.1 + + + + + org.elasticsearch.client + elasticsearch-rest-client + 7.5.1 + + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + 7.5.1 + + + org.elasticsearch.client + elasticsearch-rest-client + + + org.elasticsearch + elasticsearch + + + diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/EsBulkProcessor.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/EsBulkProcessor.java new file mode 100644 index 0000000..ea345ef --- /dev/null +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/EsBulkProcessor.java @@ -0,0 +1,123 @@ +/* +package com.qniao.iot.machine.event.generator.job; + +import com.qniao.iot.machine.event.generator.config.ApolloConfig; +import com.qniao.iot.machine.event.generator.constant.ConfigConstant; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpHost; +import org.apache.http.NoHttpResponseException; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.HttpHostConnectException; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.*; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + + +@Slf4j +public class EsBulkProcessor { + + //es操作客户端 + public static final RestHighLevelClient restHighLevelClient; + + //批量操作的对象 + public static final BulkProcessor bulkProcessor; + + static { + List httpHosts = new ArrayList<>(); + //填充数据 + httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST), + ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), + ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); + //填充host节点 + RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0])); + + builder.setRequestConfigCallback(requestConfigBuilder -> { + requestConfigBuilder.setConnectTimeout(1000); + requestConfigBuilder.setSocketTimeout(1000); + requestConfigBuilder.setConnectionRequestTimeout(1000); + return requestConfigBuilder; + }); + + //填充用户名密码 + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userName", "password")); + + builder.setHttpClientConfigCallback(httpClientBuilder -> { + httpClientBuilder.setMaxConnTotal(30); + httpClientBuilder.setMaxConnPerRoute(30); + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + return httpClientBuilder; + }); + + restHighLevelClient = new RestHighLevelClient(builder); + } + + static { + bulkProcessor=createBulkProcessor(); + } + + private static BulkProcessor createBulkProcessor() { + + BulkProcessor.Listener listener = new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + log.info("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions()); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, + BulkResponse response) { + if (!response.hasFailures()) { + log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis()); + } else { + BulkItemResponse[] items = response.getItems(); + for (BulkItemResponse item : items) { + if (item.isFailed()) { + log.info("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage()); + break; + } + } + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, + Throwable failure) { + + List> requests = request.requests(); + List esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList()); + log.error("3. 【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure); + } + }; + + BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> { + restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener); + }), listener); + //到达10000条时刷新 + builder.setBulkActions(10000); + //内存到达8M时刷新 + builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB)); + //设置的刷新间隔10s + builder.setFlushInterval(TimeValue.timeValueSeconds(10)); + //设置允许执行的并发请求数。 + builder.setConcurrentRequests(8); + //设置重试策略 + builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3)); + return builder.build(); + } +} +*/ diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 3cebc2f..6a1c6ed 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -2,7 +2,6 @@ package com.qniao.iot.machine.event.generator.job; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.lang.Tuple; import cn.hutool.db.Db; import com.qniao.domain.BaseCommand; import com.qniao.iot.machine.command.PowerOffMachineCommand; @@ -45,9 +44,12 @@ import org.elasticsearch.client.Requests; import java.io.IOException; import java.sql.SQLException; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Date; import java.util.List; public class IotMachineEventGeneratorJob { @@ -157,7 +159,7 @@ public class IotMachineEventGeneratorJob { DeviceState deviceStateListJson = deviceState.value(); if (deviceStateListJson == null) { List list = Db.use().query(SQL, DeviceState.class, machineIotMac); - if(CollUtil.isNotEmpty(list)) { + if (CollUtil.isNotEmpty(list)) { deviceStateListJson = list.get(0); } if (deviceStateListJson != null) { @@ -227,9 +229,13 @@ public class IotMachineEventGeneratorJob { ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, (ElasticsearchSinkFunction) (machineIotDataReceivedEvent, runtimeContext, requestIndexer) -> { + // 按日期进行分割 + LocalDate reportDate = new Date(machineIotDataReceivedEvent.getReportTime()) + .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); + String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); //创建es 请求 IndexRequest indexRequest = Requests.indexRequest() - .index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX)) + .index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + indexDateSuffix) .source(BeanUtil.beanToMap(machineIotDataReceivedEvent)); requestIndexer.add(indexRequest); }