diff --git a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java index 7400e53..283046e 100644 --- a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java @@ -85,6 +85,11 @@ public class IotMonitoringDataJob { "where qmrs.iot_mac = ?\n" + " and qmrs.is_delete = 0"; + /** + * 当前索引日期后缀 + */ + private static String currIndicsDateSuffix; + public static void main(String[] args) throws Exception { @@ -476,7 +481,7 @@ public class IotMonitoringDataJob { // 索引名称 String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix; // 校验索引是否存在 - checkIndicesIsExists(indicesName); + checkIndicesIsExists(indexDateSuffix, indicesName); //创建es 请求 IndexRequest indexRequest = Requests.indexRequest() .index(indicesName) @@ -512,7 +517,21 @@ public class IotMonitoringDataJob { dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink"); } - private static void checkIndicesIsExists(String indicesName) { + private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) { + + if(currIndicsDateSuffix == null) { + // 当前月的索引为空 + createIndices(indicesName, indexDateSuffix); + }else { + // 校验当前消息能否符合当前索引 + if(!indexDateSuffix.equals(currIndicsDateSuffix)) { + // 如果不符合,需要重建索引 + createIndices(indicesName, indexDateSuffix); + } + } + } + + private static void createIndices(String indicesName, String indexDateSuffix) { // 判断索引是否存在 GetIndexRequest exist = new GetIndexRequest(indicesName); @@ -573,6 +592,7 @@ public class IotMonitoringDataJob { if(!acknowledged || !shardsAcknowledged) { throw new Exception("自定义索引创建失败!!!"); } + currIndicsDateSuffix = indexDateSuffix; } } catch (Exception e) { e.printStackTrace();