diff --git a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java index 706b2d2..7400e53 100644 --- a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java @@ -34,11 +34,16 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.*; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -46,6 +51,7 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; +import java.io.IOException; import java.sql.Date; import java.sql.SQLException; import java.time.*; @@ -467,9 +473,13 @@ public class IotMonitoringDataJob { LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime()) .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); + // 索引名称 + String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix; + // 校验索引是否存在 + checkIndicesIsExists(indicesName); //创建es 请求 IndexRequest indexRequest = Requests.indexRequest() - .index(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix) + .index(indicesName) .source(BeanUtil.beanToMap(deviceMonitoringData)); requestIndexer.add(indexRequest); } @@ -501,4 +511,71 @@ public class IotMonitoringDataJob { //数据流添加sink dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink"); } + + private static void checkIndicesIsExists(String indicesName) { + + // 判断索引是否存在 + GetIndexRequest exist = new GetIndexRequest(indicesName); + // 先判断客户端是否存在 + try { + boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); + if(!exists) { + // 创建索引 + CreateIndexRequest request = new CreateIndexRequest(indicesName); + // 字段映射 + String mappersStr = "{\n" + + " \"properties\": {\n" + + " \"accJobCount\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"accJobCountDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"currDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"currJobCount\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"currJobDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"dataSource\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"lastBootTime\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"machineIotMac\": {\n" + + " \"type\": \"keyword\"\n" + + " },\n" + + " \"machinePwrStat\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"machineWorkingStat\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"receivedTime\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"reportTime\": {\n" + + " \"type\": \"date\"\n" + + " }\n" + + " }\n" + + "}"; + request.mapping(mappersStr, XContentType.JSON); + // 设置索引别名 + request.alias(new Alias(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX))); + // 暂时不管是否创建成功 + CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT); + boolean acknowledged = createIndexResponse.isAcknowledged(); + boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged(); + if(!acknowledged || !shardsAcknowledged) { + throw new Exception("自定义索引创建失败!!!"); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } } diff --git a/src/test/java/Demo2.java b/src/test/java/Demo2.java index 2765cfc..e7f2ac2 100644 --- a/src/test/java/Demo2.java +++ b/src/test/java/Demo2.java @@ -3,6 +3,7 @@ import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import java.time.LocalDate; +import java.time.ZoneOffset; public class Demo2 { @@ -15,6 +16,8 @@ public class Demo2 { System.out.println(data);*/ - + LocalDate reportDate = new java.util.Date(1660531324418L) + .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); + System.out.println(reportDate); } }