Browse Source

新增es按日期分割

master
1049970895@qniao.cn 3 years ago
parent
commit
910668aa50
3 changed files with 162 additions and 3 deletions
  1. 30
      iot-machine-state-event-generator-job/pom.xml
  2. 123
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/EsBulkProcessor.java
  3. 12
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

30
iot-machine-state-event-generator-job/pom.xml

@ -121,6 +121,36 @@
<artifactId>apollo-core</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.5.1</version>
</dependency>
<!-- elasticsearch-rest-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.5.1</version>
</dependency>
<!-- elasticsearch-rest-high-level-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.5.1</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>

123
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/EsBulkProcessor.java

@ -0,0 +1,123 @@
/*
package com.qniao.iot.machine.event.generator.job;
import com.qniao.iot.machine.event.generator.config.ApolloConfig;
import com.qniao.iot.machine.event.generator.constant.ConfigConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.NoHttpResponseException;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
public class EsBulkProcessor {
//es操作客户端
public static final RestHighLevelClient restHighLevelClient;
//批量操作的对象
public static final BulkProcessor bulkProcessor;
static {
List<HttpHost> httpHosts = new ArrayList<>();
//填充数据
httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
//填充host节点
RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]));
builder.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(1000);
requestConfigBuilder.setSocketTimeout(1000);
requestConfigBuilder.setConnectionRequestTimeout(1000);
return requestConfigBuilder;
});
//填充用户名密码
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userName", "password"));
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(30);
httpClientBuilder.setMaxConnPerRoute(30);
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder;
});
restHighLevelClient = new RestHighLevelClient(builder);
}
static {
bulkProcessor=createBulkProcessor();
}
private static BulkProcessor createBulkProcessor() {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
log.info("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
if (!response.hasFailures()) {
log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
} else {
BulkItemResponse[] items = response.getItems();
for (BulkItemResponse item : items) {
if (item.isFailed()) {
log.info("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
break;
}
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
List<DocWriteRequest<?>> requests = request.requests();
List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
log.error("3. 【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);
}
};
BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
}), listener);
//到达10000条时刷新
builder.setBulkActions(10000);
//内存到达8M时刷新
builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
//设置的刷新间隔10s
builder.setFlushInterval(TimeValue.timeValueSeconds(10));
//设置允许执行的并发请求数
builder.setConcurrentRequests(8);
//设置重试策略
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
return builder.build();
}
}
*/

12
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -2,7 +2,6 @@ package com.qniao.iot.machine.event.generator.job;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Tuple;
import cn.hutool.db.Db;
import com.qniao.domain.BaseCommand;
import com.qniao.iot.machine.command.PowerOffMachineCommand;
@ -45,9 +44,12 @@ import org.elasticsearch.client.Requests;
import java.io.IOException;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class IotMachineEventGeneratorJob {
@ -157,7 +159,7 @@ public class IotMachineEventGeneratorJob {
DeviceState deviceStateListJson = deviceState.value();
if (deviceStateListJson == null) {
List<DeviceState> list = Db.use().query(SQL, DeviceState.class, machineIotMac);
if(CollUtil.isNotEmpty(list)) {
if (CollUtil.isNotEmpty(list)) {
deviceStateListJson = list.get(0);
}
if (deviceStateListJson != null) {
@ -227,9 +229,13 @@ public class IotMachineEventGeneratorJob {
ElasticsearchSink.Builder<MachineIotDataReceivedEvent> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
(ElasticsearchSinkFunction<MachineIotDataReceivedEvent>) (machineIotDataReceivedEvent, runtimeContext, requestIndexer) -> {
// 按日期进行分割
LocalDate reportDate = new Date(machineIotDataReceivedEvent.getReportTime())
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
//创建es 请求
IndexRequest indexRequest = Requests.indexRequest()
.index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))
.index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + indexDateSuffix)
.source(BeanUtil.beanToMap(machineIotDataReceivedEvent));
requestIndexer.add(indexRequest);
}

Loading…
Cancel
Save