用于云盒设备数据统计
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

607 lines
38 KiB

package com.qniao.iot;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Db;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.config.ApolloConfig;
import com.qniao.iot.constant.ConfigConstant;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.sql.Date;
import java.sql.SQLException;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
@Slf4j
public class IotMonitoringDataJob {
private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient
.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME),
ApolloConfig.getInt(ConfigConstant.ES_POST),
ApolloConfig.getStr(ConfigConstant.ES_SCHEME)))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME),
ApolloConfig.getStr(ConfigConstant.ES_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
})
.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT));
return requestConfigBuilder;
}));
private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" +
"from qn_machine_realtime_state qmrs\n" +
" LEFT JOIN (select example_id, count_unit from qn_machine_list where is_delete = 0) qml\n" +
" ON qmrs.iot_mac = qml.example_id\n" +
"where qmrs.iot_mac = ?\n" +
" and qmrs.is_delete = 0";
/**
* 当前索引日期后缀
*/
private static String currIndicsDateSuffix;
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
.build();
// 设备数据源转换
DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
// 数据过滤
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> {
Long reportTime = value.getReportTime();
if(reportTime != null
&& value.getDataSource() != null && value.getMachinePwrStat() != null) {
String reportTimeStr = StrUtil.toString(reportTime);
if(reportTimeStr.length() == 10) {
// 机智云那边的设备可能是秒或毫秒
reportTime = reportTime * 1000;
}
long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
// 晚30分钟的数据就不要了
return nowTime - reportTime <= (30*60*1000);
}
return false;
});
// mac分组并进行工作时长的集合操作
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = streamOperator
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>() {
// 最新的设备数据
private ValueState<DeviceTotalData> deviceTotalDataStat;
// 开机数据
private ValueState<DeviceTotalData> onDataState;
// 上次的工作状态
private ValueState<Integer> lastWorkingStatState;
// 上次的开机状态
private ValueState<Integer> lastPwStatState;
// 是否存在es中(假设都存在)
private boolean isExistEs = true;
@Override
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
deviceTotalDataStat = getRuntimeContext()
.getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class)));
onDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class)));
lastWorkingStatState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class)));
lastPwStatState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastPwStat", TypeInformation.of(Integer.class)));
}
@Override
public void processElement(MachineIotDataReceivedEvent receivedEvent,
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>.Context ctx,
Collector<DeviceMonitoringData> out) {
try {
DeviceTotalData onData = onDataState.value();
Integer lastWorkingStat = lastWorkingStatState.value();
Integer lastPwStat = lastPwStatState.value();
DeviceTotalData lastedDeviceState = getLastDeviceTotalData(receivedEvent);
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
lastPwStatState.update(machinePwrStat);
Long reportTime = receivedEvent.getReportTime();
String reportTimeStr = StrUtil.toString(reportTime);
if(reportTimeStr.length() == 10) {
// 机智云那边的设备可能是秒或毫秒
reportTime = reportTime * 1000;
}
// 1树根 0机智云
Integer dataSource = receivedEvent.getDataSource();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
if (lastedDeviceState.getReportTime() <= receivedEvent.getReportTime()) {
if (lastWorkingStat == null || lastPwStat == null) {
lastWorkingStat = lastedDeviceState.getMachineWorkingStat();
lastPwStat = lastedDeviceState.getMachinePwrStat();
}
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
if (onData == null) {
onData = lastedDeviceState;
onDataState.update(onData);
}
LocalDate localDate = new Date(reportTime).toLocalDate();
Long lastReportTime = lastedDeviceState.getReportTime();
if (lastReportTime == null) {
// 如果上次的消息时间为空,那么不进行计算
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal());
}
// 直接通过两个消息的时间差进行计算(毫秒)
Long workingDuration = reportTime - lastedDeviceState.getReportTime();
// 转为秒
workingDuration = workingDuration / 1000;
if (machinePwrStat.equals(0)) {
if (lastPwStat != 0) {
if (lastWorkingStat == 1) {
// 如果上次是工作中,那就进行累加
if (lastReportTime != null) {
if (dataSource == 1) {
// 树根
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
} else {
// 机智云
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
}
} else {
nowDeviceState = lastedDeviceState;
}
} else {
nowDeviceState = lastedDeviceState;
}
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(onData.getReportTime());
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
} else {
if (machineWorkingStat.equals(1)) {
// 工作
if (dataSource == 1) {
// 树根(今日当前数 + 这次信息点距离上次信息点生产的数量)
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
} else {
// 机智云
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
}
} else {
// 待机或开机
nowDeviceState = lastedDeviceState;
}
// 设置开机时长,待机也要进行累加,所以放这里
if (dataSource == 1) {
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
}else {
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration());
}
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(onData.getReportTime());
nowDeviceState.setReportTime(reportTime);
nowDeviceState.setMachinePwrStat(machinePwrStat);
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
if (lastPwStat == 0) {
// 如果上次是关机消息,那么这次就是开机消息
// 记录一个周期的开机时间
onDataState.update(nowDeviceState);
onData = nowDeviceState;
}
}
deviceTotalDataStat.update(nowDeviceState);
// 如果上次是待机,并且这次也是待机,那么就不需要发送了
if (((!(lastWorkingStat == 2 && machineWorkingStat == 2))
&& (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
data.setLastBootTime(onData.getReportTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
if(!isExistEs) {
isExistEs = true;
}
out.collect(data);
}
}
}catch (Exception e) {
log.info("导致异常的信息:" + JSONUtil.toJsonStr(receivedEvent));
log.error("处理异常", e);
}
}
private DeviceTotalData getLastDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception {
// 上一次的数据
DeviceTotalData value = deviceTotalDataStat.value();
Long reportTime = event.getReportTime();
LocalDate localDate = new Date(reportTime).toLocalDate();
if (value == null) {
value = new DeviceTotalData();
// 从es中获取
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac());
if (deviceMonitoringData != null) {
value.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
value.setJobTotal(deviceMonitoringData.getAccJobCount());
// 单位秒
value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
value.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
value.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
value.setLastBootTime(deviceMonitoringData.getLastBootTime());
value.setMachinePwrStat(deviceMonitoringData.getMachinePwrStat());
value.setMachineWorkingStat(deviceMonitoringData.getMachineWorkingStat());
value.setTheDayDuration(deviceMonitoringData.getCurrDuration());
} else {
// es中也没有,直接从老接口拿
isExistEs = false;
value = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime);
}
// 因为ReportTime参与后面的计算,所以如果是第一次取这个数据需要设置为当前消息的时间,要不然会有很大的差值
value.setReportTime(reportTime);
}
// 是否日期是当天的,否则需要更新当天工作时长和当天工作量
if (LocalDate.parse(value.getCurrLocalDate(), DateTimeFormatter.ofPattern("yyyy-MM-dd")).isBefore(localDate)) {
// value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可
value.setTheDayJobDuration(0L);
value.setTheDayJobCount(0L);
value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
value.setTheDayDuration(0L);
value.setReportTime(reportTime);
}
deviceTotalDataStat.update(value);
return value;
}
private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) {
DeviceTotalData deviceTotalData = null;
// 通过http去请求之前的接口拿数据
int i = 0;
boolean stop = false;
while (i <= 20 && !stop) {
i++;
String result = HttpUtil
.get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D");
Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data");
if (data == null) {
break;
}
Object records = JSONUtil.getByPath(JSONUtil.parse(data), "records");
if (records == null) {
break;
}
JSONArray objects = JSONUtil.parseArray(records);
if (objects.isEmpty()) {
break;
}
for (Object o : objects.toArray()) {
Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac");
Long iotMac = Long.parseLong((String) mac);
if (iotMac.equals(machineIotMac)) {
deviceTotalData = new DeviceTotalData();
Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal");
deviceTotalData.setJobTotal(Integer.toUnsignedLong(Integer.parseInt(String.valueOf(productionTotal))));
Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal");
deviceTotalData.setJobDurationTotal(Integer.toUnsignedLong(Integer.parseInt(String.valueOf(workTotalTotal))) * 3600);
Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime");
LocalDateTime lastBootTime = LocalDateTime
.parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
deviceTotalData.setLastBootTime(lastBootTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setCurrLocalDate(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
// 是否在线
Object isOnlineObj = JSONUtil.getByPath(JSONUtil.parse(o), "isOnline");
if(isOnlineObj != null) {
int isOnline = Integer.parseInt(String.valueOf(isOnlineObj));
if(isOnline == 0) {
// 开机
deviceTotalData.setMachinePwrStat(1);
deviceTotalData.setMachineWorkingStat(2);
}else {
// 关机
deviceTotalData.setMachinePwrStat(0);
deviceTotalData.setMachineWorkingStat(0);
}
}else {
deviceTotalData.setMachinePwrStat(0);
deviceTotalData.setMachineWorkingStat(0);
}
stop = true;
break;
}
}
}
if (deviceTotalData == null) {
deviceTotalData = new DeviceTotalData();
deviceTotalData.setJobTotal(0L);
deviceTotalData.setJobDurationTotal(0L);
deviceTotalData.setLastBootTime(reportTime);
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setMachinePwrStat(0);
deviceTotalData.setMachineWorkingStat(0);
deviceTotalData.setCurrLocalDate(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
deviceTotalData.setReportTime(reportTime);
}
deviceTotalData.setTheDayDuration(0L);
return deviceTotalData;
}
private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac) {
try {
// 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询)
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象,将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX));
searchRequest.source(searchSourceBuilder);
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate);
// 先判断索引是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (exists) {
// 执行查询,然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class);
}
}
} catch (Exception e) {
log.error("获取es数据异常", e);
}
return null;
}
}).name("machineIotDataReceivedEventDataStream keyBy stream");
// 写入es
sinkEs(machineIotDataReceivedEventDataStream);
env.execute("device_monitoring_data");
}
private static void sinkEs(DataStream<DeviceMonitoringData> dataStream) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
ElasticsearchSink.Builder<DeviceMonitoringData> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
(ElasticsearchSinkFunction<DeviceMonitoringData>) (deviceMonitoringData, runtimeContext, requestIndexer) -> {
LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime())
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
// 索引名称
String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix;
// 校验索引是否存在
checkIndicesIsExists(indexDateSuffix, indicesName);
//创建es 请求
IndexRequest indexRequest = Requests.indexRequest()
.index(indicesName)
.source(BeanUtil.beanToMap(deviceMonitoringData));
requestIndexer.add(indexRequest);
}
);
//刷新前缓冲的最大动作量
esSinkBuilder.setBulkFlushMaxActions(10);
//刷新前缓冲区的最大数据大小(以MB为单位)
esSinkBuilder.setBulkFlushMaxSizeMb(5);
//无论缓冲操作的数量或大小如何,都要刷新的时间间隔
esSinkBuilder.setBulkFlushInterval(5000L);
// 客户端创建配置回调,配置账号密码
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder
.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
return requestConfigBuilder;
});
}
);
//数据流添加sink
dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink");
}
private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) {
if(currIndicsDateSuffix == null) {
// 当前月的索引为空
createIndices(indicesName, indexDateSuffix);
}else {
// 校验当前消息能否符合当前索引
if(!indexDateSuffix.equals(currIndicsDateSuffix)) {
// 如果不符合,需要重建索引
createIndices(indicesName, indexDateSuffix);
}
}
}
private static void createIndices(String indicesName, String indexDateSuffix) {
// 判断索引是否存在
GetIndexRequest exist = new GetIndexRequest(indicesName);
// 先判断客户端是否存在
try {
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if(!exists) {
// 创建索引
CreateIndexRequest request = new CreateIndexRequest(indicesName);
// 字段映射
String mappersStr = "{\n" +
" \"properties\": {\n" +
" \"accJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"accJobCountDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"dataSource\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"lastBootTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"machineIotMac\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"machinePwrStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"machineWorkingStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"receivedTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"reportTime\": {\n" +
" \"type\": \"date\"\n" +
" }\n" +
" }\n" +
"}";
request.mapping(mappersStr, XContentType.JSON);
// 设置索引别名
request.alias(new Alias(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)));
// 暂时不管是否创建成功
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
if(!acknowledged || !shardsAcknowledged) {
throw new Exception("自定义索引创建失败!!!");
}
currIndicsDateSuffix = indexDateSuffix;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}