Browse Source

更新

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
parent
commit
dd6cf3c455
4 changed files with 233 additions and 52 deletions
  1. 25
      src/main/java/com/qniao/iot/gizwits/CloudBoxData.java
  2. 161
      src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java
  3. 82
      src/test/java/Demo1.java
  4. 17
      src/test/java/Demo2.java

25
src/main/java/com/qniao/iot/gizwits/CloudBoxData.java

@ -0,0 +1,25 @@
package com.qniao.iot.gizwits;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class CloudBoxData {
private LocalDateTime createTime;
private Integer dataSource;
private LocalDateTime dataTimestamp;
private Integer dataType;
private Long mac;
private Long quantity;
private Long spaceOfTime;
private Long totalProduction;
}

161
src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java

@ -1,7 +1,12 @@
package com.qniao.iot.gizwits;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.gizwits.config.ApolloConfig;
import com.qniao.iot.gizwits.constant.ConfigConstant;
@ -20,6 +25,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
@ -29,23 +36,28 @@ import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.*;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.metrics.ParsedStats;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.sql.Date;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
import static java.time.temporal.ChronoUnit.SECONDS;
@ -146,11 +158,11 @@ public class GizWitsIotMonitoringDataJob {
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
Long reportTime = receivedEvent.getReportTime();
if(lastedDeviceState == null) {
if (lastedDeviceState == null) {
lastedDeviceState = getDeviceTotalData(receivedEvent);
lastOnData = receivedEvent;
}
if(lastWorkingStat == null) {
if (lastWorkingStat == null) {
lastWorkingStat = 0;
}
if (onData == null) {
@ -209,7 +221,7 @@ public class GizWitsIotMonitoringDataJob {
lastWaitJobDataState.update(receivedEvent);
}
}
if(lastWorkingStat != 1) {
if (lastWorkingStat != 1) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
@ -221,9 +233,9 @@ public class GizWitsIotMonitoringDataJob {
data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
if(lastOnData == null) {
if (lastOnData == null) {
data.setLastBootTime(reportTime * 1000);
}else {
} else {
data.setLastBootTime(lastOnData.getReportTime() * 1000);
}
out.collect(data);
@ -250,7 +262,7 @@ public class GizWitsIotMonitoringDataJob {
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime();
data.setLastBootTime(ldt);
} else {
// es中也没有machine_iot_data_received_event索引中拿
// es中也没有qn_cloud_box_data索引中拿
data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000);
}
deviceTotalDataStat.update(data);
@ -279,23 +291,45 @@ public class GizWitsIotMonitoringDataJob {
return null;
}
private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac,
Long reportTime) throws Exception {
DeviceTotalData deviceTotalData;
/*LocalDateTime startTime = LocalDateTime.of(new Date(reportTime * 1000).toLocalDate(), LocalTime.MIN);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime")
.gte(startTime.atZone(ZoneOffset.of("+8")).toEpochSecond())
.lte(reportTime));
searchSourceBuilder.sort("reportTime");
searchSourceBuilder.size(500);
List<MachineIotDataReceivedEvent> receivedEventList = new ArrayList<>();
EsRestClientUtil.queryDeviceListPageResult(searchSourceBuilder,
receivedEventList::add, MachineIotDataReceivedEvent.class, getIndicesList());
// 一天的作业统计
deviceTotalData = statistics(receivedEventList, reportTime);
private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) {
DeviceTotalData deviceTotalData = null;
// 通过http去请求之前的接口拿数据
for (int i = 1; i <= 20; i++) {
String result = HttpUtil
.get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D");
Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data");
if(data == null) {
break;
}
Object records = JSONUtil.getByPath(JSONUtil.parse(result), "records");
if(records == null) {
break;
}
JSONArray objects = JSONUtil.parseArray(records);
if(objects.isEmpty()) {
break;
}
for (Object o : objects.toArray()) {
Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac");
Long iotMac = (Long)mac;
if(iotMac.equals(machineIotMac)) {
deviceTotalData = new DeviceTotalData();
Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal");
deviceTotalData.setJobTotal((Long)productionTotal);
Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal");
deviceTotalData.setJobDurationTotal(((Long)workTotalTotal) * 3600);
Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime");
LocalDateTime lastBootTime = LocalDateTime
.parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:hh:ss"));
deviceTotalData.setLastBootTime(lastBootTime);
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
break;
}
}
}
if(deviceTotalData == null) {
deviceTotalData = new DeviceTotalData();
deviceTotalData.setJobTotal(0L);
@ -303,24 +337,14 @@ public class GizWitsIotMonitoringDataJob {
deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()));
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
}*/
deviceTotalData = new DeviceTotalData();
deviceTotalData.setJobTotal(0L);
deviceTotalData.setJobDurationTotal(0L);
deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()));
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
}
return deviceTotalData;
}
/**
*
* @param receivedEventList
* @return 时长数量
*/
private DeviceTotalData statistics(List<MachineIotDataReceivedEvent> receivedEventList, Long time) throws Exception{
if(CollUtil.isNotEmpty(receivedEventList)) {
/*private DeviceTotalData statistics(List<MachineIotDataReceivedEvent> receivedEventList, Long time) throws Exception {
if (CollUtil.isNotEmpty(receivedEventList)) {
// 一个周期中的开机数据
DeviceTotalData onData = new DeviceTotalData();
@ -349,7 +373,7 @@ public class GizWitsIotMonitoringDataJob {
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
Long reportTime = receivedEvent.getReportTime();
if(lastOnData == null) {
if (lastOnData == null) {
lastOnData = receivedEvent;
}
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
@ -395,7 +419,7 @@ public class GizWitsIotMonitoringDataJob {
lastWorkingStat = 1;
}
}
if(lastWorkingStat != 1) {
if (lastWorkingStat != 1) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
@ -411,7 +435,7 @@ public class GizWitsIotMonitoringDataJob {
}
}
return null;
}
}*/
private String[] getIndicesList() throws IOException {
@ -456,5 +480,54 @@ public class GizWitsIotMonitoringDataJob {
return null;
}
}).name("machineIotDataReceivedEventDataStream keyBy stream");
// 写入es
sinkEs(machineIotDataReceivedEventDataStream);
}
private static void sinkEs(DataStream<DeviceMonitoringData> dataStream) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
ElasticsearchSink.Builder<DeviceMonitoringData> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
(ElasticsearchSinkFunction<DeviceMonitoringData>) (deviceMonitoringData, runtimeContext, requestIndexer) -> {
LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime())
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
//创建es 请求
IndexRequest indexRequest = Requests.indexRequest()
.index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix)
.source(BeanUtil.beanToMap(deviceMonitoringData));
requestIndexer.add(indexRequest);
}
);
//刷新前缓冲的最大动作量
esSinkBuilder.setBulkFlushMaxActions(10);
//刷新前缓冲区的最大数据大小以MB为单位
esSinkBuilder.setBulkFlushMaxSizeMb(5);
//无论缓冲操作的数量或大小如何都要刷新的时间间隔
esSinkBuilder.setBulkFlushInterval(5000L);
// 客户端创建配置回调配置账号密码
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
return requestConfigBuilder;
});
}
);
//数据流添加sink
dataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink");
}
}

82
src/test/java/Demo1.java

@ -1,18 +1,84 @@
import com.qniao.iot.gizwits.CloudBoxData;
import com.qniao.iot.gizwits.utils.EsRestClientUtil;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.metrics.ParsedStats;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.sql.Date;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Demo1 {
public static void main(String[] args) {
public static void main(String[] args) throws IOException {
String host = "120.79.137.137:9200";
String[] nodeIpInfos = host.split(":");
RestClientBuilder builder = RestClient.builder(new HttpHost(nodeIpInfos[0], Integer.parseInt(nodeIpInfos[1]), "http"))
.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(10 * 60 * 1000);
requestConfigBuilder.setSocketTimeout(10 * 60 * 1000);
requestConfigBuilder.setConnectionRequestTimeout(10 * 60 * 1000);
return requestConfigBuilder;
});
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "qnol26215"));
builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
RestHighLevelClient client = new RestHighLevelClient(builder);
Map<String, Long> workingJobMap = new HashMap<>();
workingJobMap.put("231231", 2325443523L);
workingJobMap.put("23rwer31", 2325443523L);
workingJobMap.put("2231f3f231", 2325443523L);
workingJobMap.clear();
workingJobMap.put("231232222231", 2325443523L);
System.out.println(workingJobMap.get("231232222231"));
AggregationBuilder aggr1 = AggregationBuilders.stats("JobDurationTotal").field("space_of_time");
AggregationBuilder aggr2 = AggregationBuilders.stats("JobTotal").field("quantity");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("mac", 861193040814411L))
.must(QueryBuilders.termQuery("data_type", 2))
.must(QueryBuilders.rangeQuery ("space_of_time")
.lte(60))
.must(QueryBuilders.rangeQuery("create_time")
.gte("2022-01-01 00:00:00")
.lte("2022-12-31 23:59:59"));
searchSourceBuilder.query(queryBuilder);
searchSourceBuilder.aggregation(aggr1);
searchSourceBuilder.aggregation(aggr2);
searchSourceBuilder.size(0);
List<CloudBoxData> receivedEventList = new ArrayList<>();
SearchRequest request = new SearchRequest("qn_cloud_box_data_history_202208");
request.source(searchSourceBuilder);
// 执行请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 获取响应中的聚合信息
Aggregations aggregations = response.getAggregations();
if (RestStatus.OK.equals(response.status()) || aggregations != null) {
ParsedStats jobDurationTotal = aggregations.get("JobDurationTotal");
ParsedStats jobTotal = aggregations.get("JobTotal");
double max = jobDurationTotal.getSum();
double max1 = jobTotal.getMax();
}
}
}

17
src/test/java/Demo2.java

@ -0,0 +1,17 @@
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
public class Demo2 {
public static void main(String[] args) {
String s = HttpUtil.get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:21%7D");
Object data = JSONUtil.getByPath(JSONUtil.parse(s), "data");
data = JSONUtil.getByPath(JSONUtil.parse(data), "records");
System.out.println(data);
}
}
Loading…
Cancel
Save