用于云盒设备数据统计
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

579 lines
35 KiB

package com.qniao.iot;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.qniao.domain.BaseCommand;
import com.qniao.iot.config.ApolloConfig;
import com.qniao.iot.constant.ConfigConstant;
import com.qniao.iot.machine.command.*;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema;
import com.qniao.iot.machine.schema.MachineOutputCommandDeserializationSchema;
import com.qniao.iot.utils.DeviceMonitoringDataRabbitMqSerializationSchema;
import com.rabbitmq.client.AMQP;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.sql.Date;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class IotMonitoringDataJob {
private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient
.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
})
.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
return requestConfigBuilder;
}));
/**
* 当前索引日期后缀
*/
private static String currIndicesDateSuffix;
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_HOST))
.setPort(ApolloConfig.getInt(ConfigConstant.SOURCE_RABBITMQ_PORT))
.setUserName(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_USERNAME))
.setPassword(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_PASSWORD))
.setVirtualHost(ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_VIRTUALHOST))
.build();
final DataStream<MachineOutputCommand> stream = env
.addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE),
false, new MachineOutputCommandDeserializationSchema())).setParallelism(1);
// mac分组并进行工作时长的集合操作
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = stream
.keyBy(MachineOutputCommand::getMac)
.process(new KeyedProcessFunction<Long, MachineOutputCommand, DeviceMonitoringData>() {
// 上次设备数据
private ValueState<DeviceTotalData> deviceTotalDataStat;
// 是否存在es中(假设都存在)
private boolean isExistEs = true;
@Override
public void open(Configuration parameters) {
// 设置10分钟的过期时间
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<DeviceTotalData> deviceTotalDataValue = new ValueStateDescriptor<>("deviceTotalData",
TypeInformation.of(DeviceTotalData.class));
// 设置状态值的过期时间,为了解决手动插入数据但是状态值不同步的问题
deviceTotalDataValue.enableTimeToLive(ttlConfig);
// 必须在 open 生命周期初始化
deviceTotalDataStat = getRuntimeContext().getState(deviceTotalDataValue);
}
@Override
public void processElement(MachineOutputCommand command,
KeyedProcessFunction<Long, MachineOutputCommand, DeviceMonitoringData>.Context ctx,
Collector<DeviceMonitoringData> out) {
try {
DeviceTotalData lastDeviceState = getLastDeviceTotalData(command);
Long reportTime = command.getTimestamp();
Long lastReportTime = lastDeviceState.getReportTime();
if (lastReportTime <= reportTime) {
Integer lastWorkingStat = lastDeviceState.getMachineWorkingStat();
Integer lastPwStat = lastDeviceState.getMachinePwrStat();
// 上次启动时间
Long lastBootTime = lastDeviceState.getLastBootTime();
Long lastTheDayDuration = lastDeviceState.getTheDayDuration();
lastTheDayDuration = lastTheDayDuration == null ? 0L : lastTheDayDuration;
Long lastTheDayJobDuration = lastDeviceState.getTheDayJobDuration();
lastTheDayJobDuration = lastTheDayJobDuration == null ? 0L : lastTheDayJobDuration;
Long lastJobDurationTotal = lastDeviceState.getJobDurationTotal();
lastJobDurationTotal = lastJobDurationTotal == null ? 0L : lastJobDurationTotal;
Long lastTheDayJobCount = lastDeviceState.getTheDayJobCount();
lastTheDayJobCount = lastTheDayJobCount == null ? 0L : lastTheDayJobCount;
Long lastJobTotal = lastDeviceState.getJobTotal();
lastJobTotal = lastJobTotal == null ? 0L : lastJobTotal;
Integer machinePwrStat = command.getMachinePwrStat();
Integer machineWorkingStat = command.getMachineWorkingStat();
Long currJobDuration = command.getCurrJobDuration();
currJobDuration = currJobDuration == null ? 0L : currJobDuration;
Long currJobCount = command.getCurrJobCount();
currJobCount = currJobCount == null ? 0L : currJobCount;
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
LocalDate localDate = new Date(reportTime).toLocalDate();
if (machinePwrStat.equals(0)) {
// 关机
if (lastPwStat != 0) {
if (lastWorkingStat == 1) {
// 如果上次是工作状态,那么需要记录产量和生产时间
nowDeviceState.setTheDayDuration(lastTheDayDuration + currJobDuration);
nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currJobDuration);
nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currJobDuration);
nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currJobCount);
nowDeviceState.setJobTotal(lastJobTotal + currJobCount);
} else {
nowDeviceState = lastDeviceState;
}
} else {
nowDeviceState = lastDeviceState;
}
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
} else {
// 开机
if (machineWorkingStat.equals(1)) {
// 工作
nowDeviceState.setTheDayJobCount(lastTheDayJobCount + currJobCount);
nowDeviceState.setJobTotal(lastJobTotal + currJobCount);
nowDeviceState.setTheDayJobDuration(lastTheDayJobDuration + currJobDuration);
nowDeviceState.setJobDurationTotal(lastJobDurationTotal + currJobDuration);
} else {
// 待机
nowDeviceState = lastDeviceState;
}
// 设置开机时长,待机也要进行累加,所以放这里
nowDeviceState.setTheDayDuration(lastTheDayDuration + currJobDuration);
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(lastBootTime);
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
if (lastPwStat == 0) {
// 如果上次是关机消息,那么这次就是开机消息
// 记录一个周期的开机时间
nowDeviceState.setLastBootTime(reportTime);
}
}
deviceTotalDataStat.update(nowDeviceState);
// 如果上次是待机,并且这次也是待机,那么就不需要发送了
if (((!(lastWorkingStat == 2 && machineWorkingStat == 2))
&& (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(command.getDataSource());
data.setMachineIotMac(command.getMac());
data.setMachinePwrStat(command.getMachinePwrStat());
data.setMachineWorkingStat(command.getMachineWorkingStat());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
data.setLastBootTime(nowDeviceState.getLastBootTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
if (!isExistEs) {
isExistEs = true;
}
out.collect(data);
}
}
} catch (Exception e) {
log.info("导致异常的信息:" + JSONUtil.toJsonStr(command));
log.error("处理异常", e);
}
}
private DeviceTotalData getLastDeviceTotalData(MachineOutputCommand command) throws Exception {
// 上一次的数据
DeviceTotalData value = deviceTotalDataStat.value();
Long reportTime = command.getTimestamp();
LocalDate localDate = new Date(reportTime).toLocalDate();
Long mac = command.getMac();
if (value == null) {
value = new DeviceTotalData();
// 从es中获取
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(mac);
if (deviceMonitoringData != null) {
value.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
value.setJobTotal(deviceMonitoringData.getAccJobCount());
Long lastReportTime = deviceMonitoringData.getReportTime();
value.setCurrLocalDate(new Date(lastReportTime)
.toLocalDate().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
value.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
value.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
value.setLastBootTime(deviceMonitoringData.getLastBootTime());
value.setMachinePwrStat(deviceMonitoringData.getMachinePwrStat());
value.setMachineWorkingStat(deviceMonitoringData.getMachineWorkingStat());
value.setTheDayDuration(deviceMonitoringData.getCurrDuration());
} else {
// es中也没有,直接从老接口拿
isExistEs = false;
value = queryDeviceMonitoringData(mac, reportTime);
}
// 因为ReportTime参与后面的计算,所以如果是第一次取这个数据需要设置为当前消息的时间,要不然会有很大的差值
value.setReportTime(reportTime);
}
// 是否日期是当天的,否则需要更新当天工作时长和当天工作量
if (LocalDate.parse(value.getCurrLocalDate(), DateTimeFormatter.ofPattern("yyyy-MM-dd"))
.isBefore(localDate)) {
// value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可
value.setTheDayJobDuration(0L);
value.setTheDayJobCount(0L);
value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
value.setTheDayDuration(0L);
value.setReportTime(reportTime);
}
deviceTotalDataStat.update(value);
return value;
}
private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) {
DeviceTotalData deviceTotalData = null;
// 通过http去请求之前的接口拿数据
int i = 0;
boolean stop = false;
while (i <= 20 && !stop) {
i++;
String result = HttpUtil
.get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D");
Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data");
if (data == null) {
break;
}
Object records = JSONUtil.getByPath(JSONUtil.parse(data), "records");
if (records == null) {
break;
}
JSONArray objects = JSONUtil.parseArray(records);
if (objects.isEmpty()) {
break;
}
for (Object o : objects.toArray()) {
Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac");
Long iotMac = Long.parseLong((String) mac);
if (iotMac.equals(machineIotMac)) {
deviceTotalData = new DeviceTotalData();
Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal");
if (productionTotal == null) {
deviceTotalData.setJobTotal(0L);
} else {
deviceTotalData.setJobTotal(Integer.toUnsignedLong(Integer.parseInt(String.valueOf(productionTotal))));
}
Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal");
if (workTotalTotal == null) {
deviceTotalData.setJobDurationTotal(0L);
} else {
deviceTotalData.setJobDurationTotal(Integer.toUnsignedLong(Integer.parseInt(String.valueOf(workTotalTotal))) * 3600);
}
Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime");
LocalDateTime lastBootTime = LocalDateTime
.parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
deviceTotalData.setLastBootTime(lastBootTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setCurrLocalDate(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
// 是否在线
Object isOnlineObj = JSONUtil.getByPath(JSONUtil.parse(o), "isOnline");
if (isOnlineObj != null) {
int isOnline = Integer.parseInt(String.valueOf(isOnlineObj));
if (isOnline == 0) {
// 开机
deviceTotalData.setMachinePwrStat(1);
deviceTotalData.setMachineWorkingStat(2);
} else {
// 关机
deviceTotalData.setMachinePwrStat(0);
deviceTotalData.setMachineWorkingStat(0);
}
} else {
deviceTotalData.setMachinePwrStat(0);
deviceTotalData.setMachineWorkingStat(0);
}
stop = true;
break;
}
}
}
if (deviceTotalData == null) {
deviceTotalData = new DeviceTotalData();
deviceTotalData.setJobTotal(0L);
deviceTotalData.setJobDurationTotal(0L);
deviceTotalData.setLastBootTime(reportTime);
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setMachinePwrStat(0);
deviceTotalData.setMachineWorkingStat(0);
deviceTotalData.setCurrLocalDate(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
deviceTotalData.setReportTime(reportTime);
}
deviceTotalData.setTheDayDuration(0L);
return deviceTotalData;
}
private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac) {
try {
// 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询)
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象,将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX));
searchRequest.source(searchSourceBuilder);
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate);
// 先判断索引是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (exists) {
// 执行查询,然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class);
}
}
} catch (Exception e) {
log.error("获取es数据异常", e);
}
return null;
}
}).name("machineIotDataReceivedEventDataStream keyBy stream");
// 写入rabbitmq
sinkRabbitMq(machineIotDataReceivedEventDataStream);
// 写入es
sinkEs(machineIotDataReceivedEventDataStream);
env.execute("device_monitoring_data");
}
private static void sinkRabbitMq(DataStream<DeviceMonitoringData> dataStream) {
// rabbitmq配置
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_HOST))
.setVirtualHost(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST))
.setUserName(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_USER_NAME))
.setPassword(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_PASSWORD))
.setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT))
.build();
// 发送相应的指令到rabbitmq的交换机
dataStream
.addSink(new RMQSink<>(connectionConfig, new DeviceMonitoringDataRabbitMqSerializationSchema(),
new RMQSinkPublishOptions<DeviceMonitoringData>() {
@Override
public String computeRoutingKey(DeviceMonitoringData data) {
return null;
}
@Override
public AMQP.BasicProperties computeProperties(DeviceMonitoringData data) {
return null;
}
@Override
public String computeExchange(DeviceMonitoringData data) {
// 交换机名称
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_EXCHANGE);
}
})).name("DeviceMonitoringData to rabbitmq Sink");
}
private static void sinkEs(DataStream<DeviceMonitoringData> dataStream) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
ElasticsearchSink.Builder<DeviceMonitoringData> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
(ElasticsearchSinkFunction<DeviceMonitoringData>) (deviceMonitoringData, runtimeContext, requestIndexer) -> {
LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime())
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
// 索引名称
String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix;
// 校验索引是否存在
checkIndicesIsExists(indexDateSuffix, indicesName);
//创建es 请求
IndexRequest indexRequest = Requests.indexRequest()
.index(indicesName)
.source(BeanUtil.beanToMap(deviceMonitoringData));
requestIndexer.add(indexRequest);
}
);
//刷新前缓冲的最大动作量
esSinkBuilder.setBulkFlushMaxActions(10);
//刷新前缓冲区的最大数据大小(以MB为单位)
esSinkBuilder.setBulkFlushMaxSizeMb(5);
//无论缓冲操作的数量或大小如何,都要刷新的时间间隔
esSinkBuilder.setBulkFlushInterval(5000L);
// 客户端创建配置回调,配置账号密码
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder
.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
return requestConfigBuilder;
});
}
);
//数据流添加sink
dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink");
}
private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) {
if (currIndicesDateSuffix == null) {
// 当前月的索引为空
createIndices(indicesName, indexDateSuffix);
} else {
// 校验当前消息能否符合当前索引
if (!indexDateSuffix.equals(currIndicesDateSuffix)) {
// 如果不符合,需要重建索引
createIndices(indicesName, indexDateSuffix);
}
}
}
private static void createIndices(String indicesName, String indexDateSuffix) {
// 判断索引是否存在
GetIndexRequest exist = new GetIndexRequest(indicesName);
// 先判断客户端是否存在
try {
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (!exists) {
// 创建索引
CreateIndexRequest request = new CreateIndexRequest(indicesName);
// 字段映射
String mappersStr = "{\n" +
" \"properties\": {\n" +
" \"accJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"accJobCountDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"dataSource\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"lastBootTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"machineIotMac\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"machinePwrStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"machineWorkingStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"receivedTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"reportTime\": {\n" +
" \"type\": \"date\"\n" +
" }\n" +
" }\n" +
"}";
request.mapping(mappersStr, XContentType.JSON);
// 设置索引别名
request.alias(new Alias(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)));
// 暂时不管是否创建成功
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
if (!acknowledged || !shardsAcknowledged) {
throw new Exception("自定义索引创建失败!!!");
}
currIndicesDateSuffix = indexDateSuffix;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}