Browse Source

更新

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
parent
commit
6bcbf65e88
1 changed files with 22 additions and 2 deletions
  1. 24
      src/main/java/com/qniao/iot/IotMonitoringDataJob.java

24
src/main/java/com/qniao/iot/IotMonitoringDataJob.java

@ -85,6 +85,11 @@ public class IotMonitoringDataJob {
"where qmrs.iot_mac = ?\n" + "where qmrs.iot_mac = ?\n" +
" and qmrs.is_delete = 0"; " and qmrs.is_delete = 0";
/**
* 当前索引日期后缀
*/
private static String currIndicsDateSuffix;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -476,7 +481,7 @@ public class IotMonitoringDataJob {
// 索引名称 // 索引名称
String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix; String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix;
// 校验索引是否存在 // 校验索引是否存在
checkIndicesIsExists(indicesName);
checkIndicesIsExists(indexDateSuffix, indicesName);
//创建es 请求 //创建es 请求
IndexRequest indexRequest = Requests.indexRequest() IndexRequest indexRequest = Requests.indexRequest()
.index(indicesName) .index(indicesName)
@ -512,7 +517,21 @@ public class IotMonitoringDataJob {
dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink"); dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink");
} }
private static void checkIndicesIsExists(String indicesName) {
private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) {
if(currIndicsDateSuffix == null) {
// 当前月的索引为空
createIndices(indicesName, indexDateSuffix);
}else {
// 校验当前消息能否符合当前索引
if(!indexDateSuffix.equals(currIndicsDateSuffix)) {
// 如果不符合需要重建索引
createIndices(indicesName, indexDateSuffix);
}
}
}
private static void createIndices(String indicesName, String indexDateSuffix) {
// 判断索引是否存在 // 判断索引是否存在
GetIndexRequest exist = new GetIndexRequest(indicesName); GetIndexRequest exist = new GetIndexRequest(indicesName);
@ -573,6 +592,7 @@ public class IotMonitoringDataJob {
if(!acknowledged || !shardsAcknowledged) { if(!acknowledged || !shardsAcknowledged) {
throw new Exception("自定义索引创建失败!!!"); throw new Exception("自定义索引创建失败!!!");
} }
currIndicsDateSuffix = indexDateSuffix;
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();

Loading…
Cancel
Save