Browse Source

创建索引时新增公平锁

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
parent
commit
7211182a48
1 changed files with 6 additions and 0 deletions
  1. 6
      src/main/java/com/qniao/iot/IotMonitoringDataJob.java

6
src/main/java/com/qniao/iot/IotMonitoringDataJob.java

@ -46,6 +46,7 @@ import java.sql.Date;
import java.time.*; import java.time.*;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j @Slf4j
public class IotMonitoringDataJob { public class IotMonitoringDataJob {
@ -67,6 +68,8 @@ public class IotMonitoringDataJob {
return requestConfigBuilder; return requestConfigBuilder;
})); }));
private static final ReentrantLock lock = new ReentrantLock(true);
/** /**
* 当前索引日期后缀 * 当前索引日期后缀
*/ */
@ -445,6 +448,7 @@ public class IotMonitoringDataJob {
try { try {
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (!exists) { if (!exists) {
lock.lock();
// 创建索引 // 创建索引
CreateIndexRequest request = new CreateIndexRequest(indicesName); CreateIndexRequest request = new CreateIndexRequest(indicesName);
// 字段映射 // 字段映射
@ -502,6 +506,8 @@ public class IotMonitoringDataJob {
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
}finally {
lock.unlock();
} }
} }
} }
Loading…
Cancel
Save