|
|
@ -107,37 +107,37 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
final DataStream<MachineOutputCommand> dataStreamSource = env |
|
|
final DataStream<MachineOutputCommand> dataStreamSource = env |
|
|
.addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE), |
|
|
.addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE), |
|
|
false, new MachineOutputCommandDeserializationSchema())).setParallelism(1); |
|
|
false, new MachineOutputCommandDeserializationSchema())).setParallelism(1); |
|
|
try { |
|
|
|
|
|
SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> outputStreamOperator = dataStreamSource |
|
|
|
|
|
.keyBy(MachineOutputCommand::getMac) |
|
|
|
|
|
.process(new KeyedProcessFunction<Long, MachineOutputCommand, IotDevicePowerOnAndOffDataEvent>() { |
|
|
|
|
|
|
|
|
|
|
|
private ValueState<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValueState; |
|
|
|
|
|
|
|
|
SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> outputStreamOperator = dataStreamSource |
|
|
|
|
|
.keyBy(MachineOutputCommand::getMac) |
|
|
|
|
|
.process(new KeyedProcessFunction<Long, MachineOutputCommand, IotDevicePowerOnAndOffDataEvent>() { |
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
public void open(Configuration parameters) { |
|
|
|
|
|
|
|
|
private ValueState<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValueState; |
|
|
|
|
|
|
|
|
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10)) |
|
|
|
|
|
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) |
|
|
|
|
|
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) |
|
|
|
|
|
.build(); |
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
public void open(Configuration parameters) { |
|
|
|
|
|
|
|
|
ValueStateDescriptor<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValue |
|
|
|
|
|
= new ValueStateDescriptor<>("powerOnAndOffDataEventValue", |
|
|
|
|
|
TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)); |
|
|
|
|
|
// 设置状态值的过期时间,为了解决手动修改数据没有同步的问题 |
|
|
|
|
|
powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig); |
|
|
|
|
|
|
|
|
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10)) |
|
|
|
|
|
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) |
|
|
|
|
|
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) |
|
|
|
|
|
.build(); |
|
|
|
|
|
|
|
|
// 必须在 open 生命周期初始化 |
|
|
|
|
|
powerOnAndOffDataEventValueState = getRuntimeContext() |
|
|
|
|
|
.getState(powerOnAndOffDataEventValue); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
ValueStateDescriptor<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValue |
|
|
|
|
|
= new ValueStateDescriptor<>("powerOnAndOffDataEventValue", |
|
|
|
|
|
TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)); |
|
|
|
|
|
// 设置状态值的过期时间,为了解决手动修改数据没有同步的问题 |
|
|
|
|
|
powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig); |
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
public void processElement(MachineOutputCommand command, |
|
|
|
|
|
KeyedProcessFunction<Long, MachineOutputCommand, IotDevicePowerOnAndOffDataEvent>.Context ctx, |
|
|
|
|
|
Collector<IotDevicePowerOnAndOffDataEvent> out) throws Exception { |
|
|
|
|
|
|
|
|
// 必须在 open 生命周期初始化 |
|
|
|
|
|
powerOnAndOffDataEventValueState = getRuntimeContext() |
|
|
|
|
|
.getState(powerOnAndOffDataEventValue); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
public void processElement(MachineOutputCommand command, |
|
|
|
|
|
KeyedProcessFunction<Long, MachineOutputCommand, IotDevicePowerOnAndOffDataEvent>.Context ctx, |
|
|
|
|
|
Collector<IotDevicePowerOnAndOffDataEvent> out) { |
|
|
|
|
|
try { |
|
|
IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(command); |
|
|
IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(command); |
|
|
Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); |
|
|
Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); |
|
|
Long reportTime = command.getTimestamp(); |
|
|
Long reportTime = command.getTimestamp(); |
|
|
@ -195,121 +195,125 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
log.error("iot_device_power_on_and_off_data processElement 执行异常", e); |
|
|
} |
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineOutputCommand command) throws IOException { |
|
|
|
|
|
|
|
|
private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineOutputCommand command) throws IOException { |
|
|
|
|
|
|
|
|
IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value(); |
|
|
|
|
|
if (iotDevicePowerOnAndOffDataEvent == null) { |
|
|
|
|
|
iotDevicePowerOnAndOffDataEvent = getByEs(command); |
|
|
|
|
|
} |
|
|
|
|
|
return iotDevicePowerOnAndOffDataEvent; |
|
|
|
|
|
|
|
|
IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value(); |
|
|
|
|
|
if (iotDevicePowerOnAndOffDataEvent == null) { |
|
|
|
|
|
iotDevicePowerOnAndOffDataEvent = getByEs(command); |
|
|
} |
|
|
} |
|
|
|
|
|
return iotDevicePowerOnAndOffDataEvent; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
private IotDevicePowerOnAndOffDataEvent getByEs(MachineOutputCommand command) throws IOException { |
|
|
|
|
|
|
|
|
private IotDevicePowerOnAndOffDataEvent getByEs(MachineOutputCommand command) throws IOException { |
|
|
|
|
|
|
|
|
// 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) |
|
|
|
|
|
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); |
|
|
|
|
|
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", command.getMac())); |
|
|
|
|
|
searchSourceBuilder.sort("receivedTime", SortOrder.DESC); |
|
|
|
|
|
searchSourceBuilder.size(1); |
|
|
|
|
|
// 创建查询请求对象,将查询对象配置到其中 |
|
|
|
|
|
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)); |
|
|
|
|
|
searchRequest.source(searchSourceBuilder); |
|
|
|
|
|
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); |
|
|
|
|
|
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate); |
|
|
|
|
|
// 先判断索引是否存在 |
|
|
|
|
|
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); |
|
|
|
|
|
if (exists) { |
|
|
|
|
|
// 执行查询,然后处理响应结果 |
|
|
|
|
|
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); |
|
|
|
|
|
// 根据状态和数据条数验证是否返回了数据 |
|
|
|
|
|
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { |
|
|
|
|
|
SearchHits hits = searchResponse.getHits(); |
|
|
|
|
|
SearchHit reqHit = hits.getHits()[0]; |
|
|
|
|
|
return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) |
|
|
|
|
|
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); |
|
|
|
|
|
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", command.getMac())); |
|
|
|
|
|
searchSourceBuilder.sort("receivedTime", SortOrder.DESC); |
|
|
|
|
|
searchSourceBuilder.size(1); |
|
|
|
|
|
// 创建查询请求对象,将查询对象配置到其中 |
|
|
|
|
|
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)); |
|
|
|
|
|
searchRequest.source(searchSourceBuilder); |
|
|
|
|
|
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); |
|
|
|
|
|
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate); |
|
|
|
|
|
// 先判断索引是否存在 |
|
|
|
|
|
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); |
|
|
|
|
|
if (exists) { |
|
|
|
|
|
// 执行查询,然后处理响应结果 |
|
|
|
|
|
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); |
|
|
|
|
|
// 根据状态和数据条数验证是否返回了数据 |
|
|
|
|
|
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { |
|
|
|
|
|
SearchHits hits = searchResponse.getHits(); |
|
|
|
|
|
SearchHit reqHit = hits.getHits()[0]; |
|
|
|
|
|
return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class); |
|
|
} |
|
|
} |
|
|
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); |
|
|
|
|
|
powerOnAndOffDataEvent.setId(snowflake.nextId()); |
|
|
|
|
|
powerOnAndOffDataEvent.setDataSource(command.getDataSource()); |
|
|
|
|
|
powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); |
|
|
|
|
|
powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobCount(command.getCurrJobCount()); |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrJobDuration()); |
|
|
|
|
|
Integer machinePwrStat = command.getMachinePwrStat(); |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat); |
|
|
|
|
|
powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); |
|
|
|
|
|
Long reportTime = command.getTimestamp(); |
|
|
|
|
|
// 开机 |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); |
|
|
|
|
|
if (machinePwrStat == 0) { |
|
|
|
|
|
// 关机 |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); |
|
|
|
|
|
} |
|
|
|
|
|
powerOnAndOffDataEvent.setReportTime(reportTime); |
|
|
|
|
|
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime |
|
|
|
|
|
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); |
|
|
|
|
|
return powerOnAndOffDataEvent; |
|
|
|
|
|
} |
|
|
} |
|
|
}).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); |
|
|
|
|
|
|
|
|
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); |
|
|
|
|
|
powerOnAndOffDataEvent.setId(snowflake.nextId()); |
|
|
|
|
|
powerOnAndOffDataEvent.setDataSource(command.getDataSource()); |
|
|
|
|
|
powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); |
|
|
|
|
|
powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobCount(command.getCurrJobCount()); |
|
|
|
|
|
powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrJobDuration()); |
|
|
|
|
|
Integer machinePwrStat = command.getMachinePwrStat(); |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat); |
|
|
|
|
|
powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); |
|
|
|
|
|
Long reportTime = command.getTimestamp(); |
|
|
|
|
|
// 开机 |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); |
|
|
|
|
|
if (machinePwrStat == 0) { |
|
|
|
|
|
// 关机 |
|
|
|
|
|
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); |
|
|
|
|
|
} |
|
|
|
|
|
powerOnAndOffDataEvent.setReportTime(reportTime); |
|
|
|
|
|
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime |
|
|
|
|
|
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); |
|
|
|
|
|
return powerOnAndOffDataEvent; |
|
|
|
|
|
} |
|
|
|
|
|
}).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); |
|
|
|
|
|
|
|
|
|
|
|
sinkEs(outputStreamOperator); |
|
|
|
|
|
|
|
|
sinkEs(outputStreamOperator); |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
log.error("iot_device_power_on_and_off_data 执行异常", e); |
|
|
|
|
|
} |
|
|
|
|
|
env.execute("iot_device_power_on_and_off_data"); |
|
|
env.execute("iot_device_power_on_and_off_data"); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private static void sinkEs(SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> dataStream) { |
|
|
private static void sinkEs(SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> dataStream) { |
|
|
|
|
|
try { |
|
|
|
|
|
List<HttpHost> httpHosts = new ArrayList<>(); |
|
|
|
|
|
httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST), |
|
|
|
|
|
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), |
|
|
|
|
|
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); |
|
|
|
|
|
ElasticsearchSink.Builder<IotDevicePowerOnAndOffDataEvent> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, |
|
|
|
|
|
(ElasticsearchSinkFunction<IotDevicePowerOnAndOffDataEvent>) (deviceMonitoringData, runtimeContext, requestIndexer) -> { |
|
|
|
|
|
|
|
|
List<HttpHost> httpHosts = new ArrayList<>(); |
|
|
|
|
|
httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST), |
|
|
|
|
|
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), |
|
|
|
|
|
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); |
|
|
|
|
|
ElasticsearchSink.Builder<IotDevicePowerOnAndOffDataEvent> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, |
|
|
|
|
|
(ElasticsearchSinkFunction<IotDevicePowerOnAndOffDataEvent>) (deviceMonitoringData, runtimeContext, requestIndexer) -> { |
|
|
|
|
|
|
|
|
|
|
|
LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime()) |
|
|
|
|
|
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); |
|
|
|
|
|
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); |
|
|
|
|
|
// 索引名称 |
|
|
|
|
|
String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix; |
|
|
|
|
|
// 校验索引是否存在 |
|
|
|
|
|
checkIndicesIsExists(indexDateSuffix, indicesName); |
|
|
|
|
|
//创建es 请求 |
|
|
|
|
|
IndexRequest indexRequest = Requests.indexRequest() |
|
|
|
|
|
.index(indicesName) |
|
|
|
|
|
.source(BeanUtil.beanToMap(deviceMonitoringData)); |
|
|
|
|
|
requestIndexer.add(indexRequest); |
|
|
|
|
|
} |
|
|
|
|
|
); |
|
|
|
|
|
//刷新前缓冲的最大动作量 |
|
|
|
|
|
esSinkBuilder.setBulkFlushMaxActions(10); |
|
|
|
|
|
//刷新前缓冲区的最大数据大小(以MB为单位) |
|
|
|
|
|
esSinkBuilder.setBulkFlushMaxSizeMb(5); |
|
|
|
|
|
//无论缓冲操作的数量或大小如何,都要刷新的时间间隔 |
|
|
|
|
|
esSinkBuilder.setBulkFlushInterval(5000L); |
|
|
|
|
|
// 客户端创建配置回调,配置账号密码 |
|
|
|
|
|
esSinkBuilder.setRestClientFactory( |
|
|
|
|
|
restClientBuilder -> { |
|
|
|
|
|
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { |
|
|
|
|
|
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
|
|
|
|
|
credentialsProvider.setCredentials(AuthScope.ANY, |
|
|
|
|
|
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), |
|
|
|
|
|
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); |
|
|
|
|
|
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); |
|
|
|
|
|
}); |
|
|
|
|
|
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { |
|
|
|
|
|
// 设置es连接超时时间 |
|
|
|
|
|
requestConfigBuilder |
|
|
|
|
|
.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); |
|
|
|
|
|
return requestConfigBuilder; |
|
|
|
|
|
}); |
|
|
|
|
|
} |
|
|
|
|
|
); |
|
|
|
|
|
//数据流添加sink |
|
|
|
|
|
dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink"); |
|
|
|
|
|
|
|
|
LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime()) |
|
|
|
|
|
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); |
|
|
|
|
|
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); |
|
|
|
|
|
// 索引名称 |
|
|
|
|
|
String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix; |
|
|
|
|
|
// 校验索引是否存在 |
|
|
|
|
|
checkIndicesIsExists(indexDateSuffix, indicesName); |
|
|
|
|
|
//创建es 请求 |
|
|
|
|
|
IndexRequest indexRequest = Requests.indexRequest() |
|
|
|
|
|
.index(indicesName) |
|
|
|
|
|
.source(BeanUtil.beanToMap(deviceMonitoringData)); |
|
|
|
|
|
requestIndexer.add(indexRequest); |
|
|
|
|
|
} |
|
|
|
|
|
); |
|
|
|
|
|
//刷新前缓冲的最大动作量 |
|
|
|
|
|
esSinkBuilder.setBulkFlushMaxActions(10); |
|
|
|
|
|
//刷新前缓冲区的最大数据大小(以MB为单位) |
|
|
|
|
|
esSinkBuilder.setBulkFlushMaxSizeMb(5); |
|
|
|
|
|
//无论缓冲操作的数量或大小如何,都要刷新的时间间隔 |
|
|
|
|
|
esSinkBuilder.setBulkFlushInterval(5000L); |
|
|
|
|
|
// 客户端创建配置回调,配置账号密码 |
|
|
|
|
|
esSinkBuilder.setRestClientFactory( |
|
|
|
|
|
restClientBuilder -> { |
|
|
|
|
|
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { |
|
|
|
|
|
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
|
|
|
|
|
credentialsProvider.setCredentials(AuthScope.ANY, |
|
|
|
|
|
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), |
|
|
|
|
|
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); |
|
|
|
|
|
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); |
|
|
|
|
|
}); |
|
|
|
|
|
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { |
|
|
|
|
|
// 设置es连接超时时间 |
|
|
|
|
|
requestConfigBuilder |
|
|
|
|
|
.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); |
|
|
|
|
|
return requestConfigBuilder; |
|
|
|
|
|
}); |
|
|
|
|
|
} |
|
|
|
|
|
); |
|
|
|
|
|
//数据流添加sink |
|
|
|
|
|
dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink"); |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
log.error("iot_device_power_on_and_off_data sinkEs 执行异常", e); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) { |
|
|
private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) { |
|
|
|