Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
430b6972d5
2 changed files with 82 additions and 2 deletions
  1. 79
      src/main/java/com/qniao/iot/IotMonitoringDataJob.java
  2. 5
      src/test/java/Demo2.java

79
src/main/java/com/qniao/iot/IotMonitoringDataJob.java

@ -34,11 +34,16 @@ import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*; import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
@ -46,6 +51,7 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.sql.Date; import java.sql.Date;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.*; import java.time.*;
@ -467,9 +473,13 @@ public class IotMonitoringDataJob {
LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime()) LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime())
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
// 索引名称
String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix;
// 校验索引是否存在
checkIndicesIsExists(indicesName);
//创建es 请求 //创建es 请求
IndexRequest indexRequest = Requests.indexRequest() IndexRequest indexRequest = Requests.indexRequest()
.index(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix)
.index(indicesName)
.source(BeanUtil.beanToMap(deviceMonitoringData)); .source(BeanUtil.beanToMap(deviceMonitoringData));
requestIndexer.add(indexRequest); requestIndexer.add(indexRequest);
} }
@ -501,4 +511,71 @@ public class IotMonitoringDataJob {
//数据流添加sink //数据流添加sink
dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink"); dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink");
} }
private static void checkIndicesIsExists(String indicesName) {
// 判断索引是否存在
GetIndexRequest exist = new GetIndexRequest(indicesName);
// 先判断客户端是否存在
try {
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if(!exists) {
// 创建索引
CreateIndexRequest request = new CreateIndexRequest(indicesName);
// 字段映射
String mappersStr = "{\n" +
" \"properties\": {\n" +
" \"accJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"accJobCountDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"dataSource\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"lastBootTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"machineIotMac\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"machinePwrStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"machineWorkingStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"receivedTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"reportTime\": {\n" +
" \"type\": \"date\"\n" +
" }\n" +
" }\n" +
"}";
request.mapping(mappersStr, XContentType.JSON);
// 设置索引别名
request.alias(new Alias(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)));
// 暂时不管是否创建成功
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
if(!acknowledged || !shardsAcknowledged) {
throw new Exception("自定义索引创建失败!!!");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
} }

5
src/test/java/Demo2.java

@ -3,6 +3,7 @@ import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.ZoneOffset;
public class Demo2 { public class Demo2 {
@ -15,6 +16,8 @@ public class Demo2 {
System.out.println(data);*/ System.out.println(data);*/
LocalDate reportDate = new java.util.Date(1660531324418L)
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
System.out.println(reportDate);
} }
} }
Loading…
Cancel
Save