You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
594 lines
38 KiB
594 lines
38 KiB
package com.qniao.iot;
|
|
|
|
import cn.hutool.core.bean.BeanUtil;
|
|
import cn.hutool.core.util.StrUtil;
|
|
import cn.hutool.http.HttpUtil;
|
|
import cn.hutool.json.JSONArray;
|
|
import cn.hutool.json.JSONUtil;
|
|
import com.qniao.iot.config.ApolloConfig;
|
|
import com.qniao.iot.constant.ConfigConstant;
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
|
import org.apache.flink.api.common.functions.FilterFunction;
|
|
import org.apache.flink.api.common.state.*;
|
|
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
|
import org.apache.flink.configuration.Configuration;
|
|
import org.apache.flink.connector.kafka.source.KafkaSource;
|
|
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
|
|
import org.apache.flink.streaming.api.CheckpointingMode;
|
|
import org.apache.flink.streaming.api.datastream.DataStream;
|
|
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
|
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
|
|
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
|
|
import org.apache.flink.util.Collector;
|
|
import org.apache.http.HttpHost;
|
|
import org.apache.http.auth.AuthScope;
|
|
import org.apache.http.auth.UsernamePasswordCredentials;
|
|
import org.apache.http.client.CredentialsProvider;
|
|
import org.apache.http.impl.client.BasicCredentialsProvider;
|
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
|
|
import org.elasticsearch.action.admin.indices.alias.Alias;
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
import org.elasticsearch.action.search.SearchRequest;
|
|
import org.elasticsearch.action.search.SearchResponse;
|
|
import org.elasticsearch.client.*;
|
|
import org.elasticsearch.client.indices.CreateIndexRequest;
|
|
import org.elasticsearch.client.indices.CreateIndexResponse;
|
|
import org.elasticsearch.client.indices.GetIndexRequest;
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
|
import org.elasticsearch.index.query.QueryBuilders;
|
|
import org.elasticsearch.rest.RestStatus;
|
|
import org.elasticsearch.search.SearchHit;
|
|
import org.elasticsearch.search.SearchHits;
|
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
import org.elasticsearch.search.sort.SortOrder;
|
|
import java.sql.Date;
|
|
import java.time.*;
|
|
import java.time.format.DateTimeFormatter;
|
|
import java.util.*;
|
|
|
|
@Slf4j
|
|
public class IotMonitoringDataJob {
|
|
|
|
private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient
|
|
.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME),
|
|
ApolloConfig.getInt(ConfigConstant.ES_POST),
|
|
ApolloConfig.getStr(ConfigConstant.ES_SCHEME)))
|
|
.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
|
|
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
|
credentialsProvider.setCredentials(AuthScope.ANY,
|
|
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME),
|
|
ApolloConfig.getStr(ConfigConstant.ES_PASSWORD)));
|
|
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
|
})
|
|
.setRequestConfigCallback(requestConfigBuilder -> {
|
|
// 设置es连接超时时间
|
|
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT));
|
|
return requestConfigBuilder;
|
|
}));
|
|
|
|
/**
|
|
* 当前索引日期后缀
|
|
*/
|
|
private static String currIndicesDateSuffix;
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
|
|
|
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
|
|
// 获取设备数据源
|
|
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
|
|
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
|
|
.setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS))
|
|
.setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
|
|
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
|
|
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
|
|
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
|
|
.build();
|
|
|
|
// 设备数据源转换
|
|
DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env
|
|
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
|
|
|
|
// 数据过滤
|
|
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource
|
|
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> {
|
|
|
|
Long reportTime = value.getReportTime();
|
|
if(reportTime != null
|
|
&& value.getDataSource() != null && value.getMachinePwrStat() != null) {
|
|
String reportTimeStr = StrUtil.toString(reportTime);
|
|
if(reportTimeStr.length() == 10) {
|
|
// 机智云那边的设备可能是秒或毫秒
|
|
reportTime = reportTime * 1000;
|
|
}
|
|
long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
|
// 晚30分钟的数据就不要了
|
|
return nowTime - reportTime <= (30*60*1000);
|
|
}
|
|
return false;
|
|
});
|
|
|
|
// mac分组并进行工作时长的集合操作
|
|
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = streamOperator
|
|
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
|
|
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>() {
|
|
|
|
// 最新的设备数据
|
|
private ValueState<DeviceTotalData> deviceTotalDataStat;
|
|
|
|
// 开机数据
|
|
private ValueState<DeviceTotalData> onDataState;
|
|
|
|
// 上次的工作状态
|
|
private ValueState<Integer> lastWorkingStatState;
|
|
|
|
// 上次的开机状态
|
|
private ValueState<Integer> lastPwStatState;
|
|
|
|
// 是否存在es中(假设都存在)
|
|
private boolean isExistEs = true;
|
|
|
|
@Override
|
|
public void open(Configuration parameters) {
|
|
|
|
// 必须在 open 生命周期初始化
|
|
deviceTotalDataStat = getRuntimeContext()
|
|
.getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class)));
|
|
|
|
onDataState = getRuntimeContext()
|
|
.getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class)));
|
|
|
|
lastWorkingStatState = getRuntimeContext()
|
|
.getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class)));
|
|
|
|
lastPwStatState = getRuntimeContext()
|
|
.getState(new ValueStateDescriptor<>("lastPwStat", TypeInformation.of(Integer.class)));
|
|
}
|
|
|
|
@Override
|
|
public void processElement(MachineIotDataReceivedEvent receivedEvent,
|
|
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>.Context ctx,
|
|
Collector<DeviceMonitoringData> out) {
|
|
|
|
try {
|
|
DeviceTotalData onData = onDataState.value();
|
|
Integer lastWorkingStat = lastWorkingStatState.value();
|
|
Integer lastPwStat = lastPwStatState.value();
|
|
DeviceTotalData lastedDeviceState = getLastDeviceTotalData(receivedEvent);
|
|
// 如果当前消息的时间大于等于上次消息的时间才进行处理
|
|
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
|
|
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
|
|
lastWorkingStatState.update(machineWorkingStat);
|
|
lastPwStatState.update(machinePwrStat);
|
|
Long reportTime = receivedEvent.getReportTime();
|
|
String reportTimeStr = StrUtil.toString(reportTime);
|
|
if(reportTimeStr.length() == 10) {
|
|
// 机智云那边的设备可能是秒或毫秒
|
|
reportTime = reportTime * 1000;
|
|
}
|
|
// 1树根 0机智云
|
|
Integer dataSource = receivedEvent.getDataSource();
|
|
// 当前数据
|
|
DeviceTotalData nowDeviceState = new DeviceTotalData();
|
|
if (lastedDeviceState.getReportTime() <= receivedEvent.getReportTime()) {
|
|
if (lastWorkingStat == null || lastPwStat == null) {
|
|
lastWorkingStat = lastedDeviceState.getMachineWorkingStat();
|
|
lastPwStat = lastedDeviceState.getMachinePwrStat();
|
|
}
|
|
nowDeviceState.setMachinePwrStat(machinePwrStat);
|
|
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
|
|
if (onData == null) {
|
|
onData = lastedDeviceState;
|
|
onDataState.update(onData);
|
|
}
|
|
LocalDate localDate = new Date(reportTime).toLocalDate();
|
|
Long lastReportTime = lastedDeviceState.getReportTime();
|
|
if (lastReportTime == null) {
|
|
// 如果上次的消息时间为空,那么不进行计算
|
|
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount());
|
|
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration());
|
|
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal());
|
|
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal());
|
|
}
|
|
// 直接通过两个消息的时间差进行计算(毫秒)
|
|
Long workingDuration = reportTime - lastedDeviceState.getReportTime();
|
|
// 转为秒
|
|
workingDuration = workingDuration / 1000;
|
|
if (machinePwrStat.equals(0)) {
|
|
if (lastPwStat != 0) {
|
|
if (lastWorkingStat == 1) {
|
|
// 如果上次是工作中,那就进行累加
|
|
if (lastReportTime != null) {
|
|
if (dataSource == 1) {
|
|
// 树根
|
|
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
|
|
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
|
|
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
|
|
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
|
|
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
|
|
} else {
|
|
// 机智云
|
|
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration());
|
|
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
|
|
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
|
|
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrJobCount());
|
|
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
|
|
}
|
|
}
|
|
} else {
|
|
nowDeviceState = lastedDeviceState;
|
|
}
|
|
} else {
|
|
nowDeviceState = lastedDeviceState;
|
|
}
|
|
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
|
|
nowDeviceState.setLastBootTime(onData.getReportTime());
|
|
nowDeviceState.setReportTime(reportTime);
|
|
nowDeviceState.setMachinePwrStat(machinePwrStat);
|
|
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
|
|
} else {
|
|
if (machineWorkingStat.equals(1)) {
|
|
// 工作
|
|
if (dataSource == 1) {
|
|
// 树根(今日当前数 + 这次信息点距离上次信息点生产的数量)
|
|
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
|
|
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
|
|
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
|
|
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
|
|
} else {
|
|
// 机智云
|
|
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrJobCount());
|
|
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
|
|
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
|
|
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
|
|
}
|
|
} else {
|
|
// 待机
|
|
nowDeviceState = lastedDeviceState;
|
|
}
|
|
// 设置开机时长,待机也要进行累加,所以放这里
|
|
if (dataSource == 1) {
|
|
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
|
|
}else {
|
|
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration());
|
|
}
|
|
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
|
|
nowDeviceState.setLastBootTime(onData.getReportTime());
|
|
nowDeviceState.setReportTime(reportTime);
|
|
nowDeviceState.setMachinePwrStat(machinePwrStat);
|
|
nowDeviceState.setMachineWorkingStat(machineWorkingStat);
|
|
if (lastPwStat == 0) {
|
|
// 如果上次是关机消息,那么这次就是开机消息
|
|
// 记录一个周期的开机时间
|
|
onDataState.update(nowDeviceState);
|
|
onData = nowDeviceState;
|
|
}
|
|
}
|
|
deviceTotalDataStat.update(nowDeviceState);
|
|
// 如果上次是待机,并且这次也是待机,那么就不需要发送了
|
|
if (((!(lastWorkingStat == 2 && machineWorkingStat == 2))
|
|
&& (!(lastPwStat == 0 && machinePwrStat == 0))) || !isExistEs) {
|
|
DeviceMonitoringData data = new DeviceMonitoringData();
|
|
data.setDataSource(receivedEvent.getDataSource());
|
|
data.setMachineIotMac(receivedEvent.getMachineIotMac());
|
|
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
|
|
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
|
|
data.setAccJobCount(nowDeviceState.getJobTotal());
|
|
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
|
|
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
|
|
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
|
|
data.setReportTime(reportTime);
|
|
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
|
|
data.setLastBootTime(onData.getReportTime());
|
|
data.setCurrDuration(nowDeviceState.getTheDayDuration());
|
|
if(!isExistEs) {
|
|
isExistEs = true;
|
|
}
|
|
out.collect(data);
|
|
}
|
|
}
|
|
}catch (Exception e) {
|
|
log.info("导致异常的信息:" + JSONUtil.toJsonStr(receivedEvent));
|
|
log.error("处理异常", e);
|
|
}
|
|
}
|
|
|
|
private DeviceTotalData getLastDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception {
|
|
|
|
// 上一次的数据
|
|
DeviceTotalData value = deviceTotalDataStat.value();
|
|
Long reportTime = event.getReportTime();
|
|
LocalDate localDate = new Date(reportTime).toLocalDate();
|
|
if (value == null) {
|
|
value = new DeviceTotalData();
|
|
// 从es中获取
|
|
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac());
|
|
if (deviceMonitoringData != null) {
|
|
value.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
|
|
value.setJobTotal(deviceMonitoringData.getAccJobCount());
|
|
// 单位秒
|
|
value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
|
|
value.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
|
|
value.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
|
|
value.setLastBootTime(deviceMonitoringData.getLastBootTime());
|
|
value.setMachinePwrStat(deviceMonitoringData.getMachinePwrStat());
|
|
value.setMachineWorkingStat(deviceMonitoringData.getMachineWorkingStat());
|
|
value.setTheDayDuration(deviceMonitoringData.getCurrDuration());
|
|
} else {
|
|
// es中也没有,直接从老接口拿
|
|
isExistEs = false;
|
|
value = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime);
|
|
}
|
|
// 因为ReportTime参与后面的计算,所以如果是第一次取这个数据需要设置为当前消息的时间,要不然会有很大的差值
|
|
value.setReportTime(reportTime);
|
|
}
|
|
// 是否日期是当天的,否则需要更新当天工作时长和当天工作量
|
|
if (LocalDate.parse(value.getCurrLocalDate(), DateTimeFormatter.ofPattern("yyyy-MM-dd")).isBefore(localDate)) {
|
|
// value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可
|
|
value.setTheDayJobDuration(0L);
|
|
value.setTheDayJobCount(0L);
|
|
value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
|
|
value.setTheDayDuration(0L);
|
|
value.setReportTime(reportTime);
|
|
}
|
|
deviceTotalDataStat.update(value);
|
|
return value;
|
|
}
|
|
|
|
private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) {
|
|
|
|
DeviceTotalData deviceTotalData = null;
|
|
|
|
// 通过http去请求之前的接口拿数据
|
|
int i = 0;
|
|
boolean stop = false;
|
|
while (i <= 20 && !stop) {
|
|
i++;
|
|
String result = HttpUtil
|
|
.get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D");
|
|
Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data");
|
|
if (data == null) {
|
|
break;
|
|
}
|
|
Object records = JSONUtil.getByPath(JSONUtil.parse(data), "records");
|
|
if (records == null) {
|
|
break;
|
|
}
|
|
JSONArray objects = JSONUtil.parseArray(records);
|
|
if (objects.isEmpty()) {
|
|
break;
|
|
}
|
|
for (Object o : objects.toArray()) {
|
|
Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac");
|
|
Long iotMac = Long.parseLong((String) mac);
|
|
if (iotMac.equals(machineIotMac)) {
|
|
deviceTotalData = new DeviceTotalData();
|
|
Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal");
|
|
deviceTotalData.setJobTotal(Integer.toUnsignedLong(Integer.parseInt(String.valueOf(productionTotal))));
|
|
Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal");
|
|
deviceTotalData.setJobDurationTotal(Integer.toUnsignedLong(Integer.parseInt(String.valueOf(workTotalTotal))) * 3600);
|
|
Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime");
|
|
LocalDateTime lastBootTime = LocalDateTime
|
|
.parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
|
|
deviceTotalData.setLastBootTime(lastBootTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
|
|
deviceTotalData.setTheDayJobDuration(0L);
|
|
deviceTotalData.setTheDayJobCount(0L);
|
|
deviceTotalData.setCurrLocalDate(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
|
|
// 是否在线
|
|
Object isOnlineObj = JSONUtil.getByPath(JSONUtil.parse(o), "isOnline");
|
|
if(isOnlineObj != null) {
|
|
int isOnline = Integer.parseInt(String.valueOf(isOnlineObj));
|
|
if(isOnline == 0) {
|
|
// 开机
|
|
deviceTotalData.setMachinePwrStat(1);
|
|
deviceTotalData.setMachineWorkingStat(2);
|
|
}else {
|
|
// 关机
|
|
deviceTotalData.setMachinePwrStat(0);
|
|
deviceTotalData.setMachineWorkingStat(0);
|
|
}
|
|
}else {
|
|
deviceTotalData.setMachinePwrStat(0);
|
|
deviceTotalData.setMachineWorkingStat(0);
|
|
}
|
|
stop = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (deviceTotalData == null) {
|
|
deviceTotalData = new DeviceTotalData();
|
|
deviceTotalData.setJobTotal(0L);
|
|
deviceTotalData.setJobDurationTotal(0L);
|
|
deviceTotalData.setLastBootTime(reportTime);
|
|
deviceTotalData.setTheDayJobDuration(0L);
|
|
deviceTotalData.setTheDayJobCount(0L);
|
|
deviceTotalData.setMachinePwrStat(0);
|
|
deviceTotalData.setMachineWorkingStat(0);
|
|
deviceTotalData.setCurrLocalDate(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
|
|
deviceTotalData.setReportTime(reportTime);
|
|
}
|
|
deviceTotalData.setTheDayDuration(0L);
|
|
return deviceTotalData;
|
|
}
|
|
|
|
private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac) {
|
|
|
|
try {
|
|
// 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询)
|
|
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
|
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
|
|
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
|
|
searchSourceBuilder.size(1);
|
|
// 创建查询请求对象,将查询对象配置到其中
|
|
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX));
|
|
searchRequest.source(searchSourceBuilder);
|
|
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
|
|
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate);
|
|
// 先判断索引是否存在
|
|
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
|
|
if (exists) {
|
|
// 执行查询,然后处理响应结果
|
|
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
|
|
// 根据状态和数据条数验证是否返回了数据
|
|
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
|
|
SearchHits hits = searchResponse.getHits();
|
|
SearchHit reqHit = hits.getHits()[0];
|
|
return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class);
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
log.error("获取es数据异常", e);
|
|
}
|
|
return null;
|
|
}
|
|
}).name("machineIotDataReceivedEventDataStream keyBy stream");
|
|
|
|
// 写入es
|
|
sinkEs(machineIotDataReceivedEventDataStream);
|
|
|
|
env.execute("device_monitoring_data");
|
|
}
|
|
|
|
private static void sinkEs(DataStream<DeviceMonitoringData> dataStream) {
|
|
|
|
List<HttpHost> httpHosts = new ArrayList<>();
|
|
httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST),
|
|
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
|
|
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
|
|
ElasticsearchSink.Builder<DeviceMonitoringData> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
|
|
(ElasticsearchSinkFunction<DeviceMonitoringData>) (deviceMonitoringData, runtimeContext, requestIndexer) -> {
|
|
|
|
LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime())
|
|
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
|
|
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
|
|
// 索引名称
|
|
String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix;
|
|
// 校验索引是否存在
|
|
checkIndicesIsExists(indexDateSuffix, indicesName);
|
|
//创建es 请求
|
|
IndexRequest indexRequest = Requests.indexRequest()
|
|
.index(indicesName)
|
|
.source(BeanUtil.beanToMap(deviceMonitoringData));
|
|
requestIndexer.add(indexRequest);
|
|
}
|
|
);
|
|
//刷新前缓冲的最大动作量
|
|
esSinkBuilder.setBulkFlushMaxActions(10);
|
|
//刷新前缓冲区的最大数据大小(以MB为单位)
|
|
esSinkBuilder.setBulkFlushMaxSizeMb(5);
|
|
//无论缓冲操作的数量或大小如何,都要刷新的时间间隔
|
|
esSinkBuilder.setBulkFlushInterval(5000L);
|
|
// 客户端创建配置回调,配置账号密码
|
|
esSinkBuilder.setRestClientFactory(
|
|
restClientBuilder -> {
|
|
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
|
|
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
|
credentialsProvider.setCredentials(AuthScope.ANY,
|
|
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
|
|
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
|
|
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
|
});
|
|
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
|
|
// 设置es连接超时时间
|
|
requestConfigBuilder
|
|
.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
|
|
return requestConfigBuilder;
|
|
});
|
|
}
|
|
);
|
|
//数据流添加sink
|
|
dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink");
|
|
}
|
|
|
|
private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) {
|
|
|
|
if(currIndicesDateSuffix == null) {
|
|
// 当前月的索引为空
|
|
createIndices(indicesName, indexDateSuffix);
|
|
}else {
|
|
// 校验当前消息能否符合当前索引
|
|
if(!indexDateSuffix.equals(currIndicesDateSuffix)) {
|
|
// 如果不符合,需要重建索引
|
|
createIndices(indicesName, indexDateSuffix);
|
|
}
|
|
}
|
|
}
|
|
|
|
private static void createIndices(String indicesName, String indexDateSuffix) {
|
|
|
|
// 判断索引是否存在
|
|
GetIndexRequest exist = new GetIndexRequest(indicesName);
|
|
// 先判断客户端是否存在
|
|
try {
|
|
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
|
|
if(!exists) {
|
|
// 创建索引
|
|
CreateIndexRequest request = new CreateIndexRequest(indicesName);
|
|
// 字段映射
|
|
String mappersStr = "{\n" +
|
|
" \"properties\": {\n" +
|
|
" \"accJobCount\": {\n" +
|
|
" \"type\": \"long\"\n" +
|
|
" },\n" +
|
|
" \"accJobCountDuration\": {\n" +
|
|
" \"type\": \"long\"\n" +
|
|
" },\n" +
|
|
" \"currDuration\": {\n" +
|
|
" \"type\": \"long\"\n" +
|
|
" },\n" +
|
|
" \"currJobCount\": {\n" +
|
|
" \"type\": \"long\"\n" +
|
|
" },\n" +
|
|
" \"currJobDuration\": {\n" +
|
|
" \"type\": \"long\"\n" +
|
|
" },\n" +
|
|
" \"dataSource\": {\n" +
|
|
" \"type\": \"integer\"\n" +
|
|
" },\n" +
|
|
" \"lastBootTime\": {\n" +
|
|
" \"type\": \"date\"\n" +
|
|
" },\n" +
|
|
" \"machineIotMac\": {\n" +
|
|
" \"type\": \"keyword\"\n" +
|
|
" },\n" +
|
|
" \"machinePwrStat\": {\n" +
|
|
" \"type\": \"integer\"\n" +
|
|
" },\n" +
|
|
" \"machineWorkingStat\": {\n" +
|
|
" \"type\": \"integer\"\n" +
|
|
" },\n" +
|
|
" \"receivedTime\": {\n" +
|
|
" \"type\": \"date\"\n" +
|
|
" },\n" +
|
|
" \"reportTime\": {\n" +
|
|
" \"type\": \"date\"\n" +
|
|
" }\n" +
|
|
" }\n" +
|
|
"}";
|
|
request.mapping(mappersStr, XContentType.JSON);
|
|
// 设置索引别名
|
|
request.alias(new Alias(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)));
|
|
// 暂时不管是否创建成功
|
|
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
|
|
boolean acknowledged = createIndexResponse.isAcknowledged();
|
|
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
|
|
if(!acknowledged || !shardsAcknowledged) {
|
|
throw new Exception("自定义索引创建失败!!!");
|
|
}
|
|
currIndicesDateSuffix = indexDateSuffix;
|
|
}
|
|
} catch (Exception e) {
|
|
e.printStackTrace();
|
|
}
|
|
}
|
|
}
|