diff --git a/src/main/java/com/qniao/iot/gizwits/CloudBoxData.java b/src/main/java/com/qniao/iot/gizwits/CloudBoxData.java new file mode 100644 index 0000000..1ab8952 --- /dev/null +++ b/src/main/java/com/qniao/iot/gizwits/CloudBoxData.java @@ -0,0 +1,25 @@ +package com.qniao.iot.gizwits; + +import lombok.Data; + +import java.time.LocalDateTime; + +@Data +public class CloudBoxData { + + private LocalDateTime createTime; + + private Integer dataSource; + + private LocalDateTime dataTimestamp; + + private Integer dataType; + + private Long mac; + + private Long quantity; + + private Long spaceOfTime; + + private Long totalProduction; +} diff --git a/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java index 741bea8..8b1610b 100644 --- a/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java @@ -1,7 +1,12 @@ package com.qniao.iot.gizwits; +import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.http.HttpUtil; +import cn.hutool.json.JSON; +import cn.hutool.json.JSONArray; import cn.hutool.json.JSONUtil; import com.qniao.iot.gizwits.config.ApolloConfig; import com.qniao.iot.gizwits.constant.ConfigConstant; @@ -20,6 +25,8 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.util.Collector; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -29,23 +36,28 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.GetAliasesResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.*; import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.metrics.ParsedStats; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.sql.Date; import java.time.*; +import java.time.format.DateTimeFormatter; import java.util.*; import static java.time.temporal.ChronoUnit.SECONDS; @@ -146,11 +158,11 @@ public class GizWitsIotMonitoringDataJob { Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); lastWorkingStatState.update(machineWorkingStat); Long reportTime = receivedEvent.getReportTime(); - if(lastedDeviceState == null) { + if (lastedDeviceState == null) { lastedDeviceState = getDeviceTotalData(receivedEvent); lastOnData = receivedEvent; } - if(lastWorkingStat == null) { + if (lastWorkingStat == null) { lastWorkingStat = 0; } if (onData == null) { @@ -209,7 +221,7 @@ public class GizWitsIotMonitoringDataJob { lastWaitJobDataState.update(receivedEvent); } } - if(lastWorkingStat != 1) { + if (lastWorkingStat != 1) { DeviceMonitoringData data = new DeviceMonitoringData(); data.setDataSource(receivedEvent.getDataSource()); data.setMachineIotMac(receivedEvent.getMachineIotMac()); @@ -221,9 +233,9 @@ public class GizWitsIotMonitoringDataJob { data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration()); data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal()); data.setReportTime(reportTime); - if(lastOnData == null) { + if (lastOnData == null) { data.setLastBootTime(reportTime * 1000); - }else { + } else { data.setLastBootTime(lastOnData.getReportTime() * 1000); } out.collect(data); @@ -250,7 +262,7 @@ public class GizWitsIotMonitoringDataJob { .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime(); data.setLastBootTime(ldt); } else { - // es中也没有,从“machine_iot_data_received_event”索引中拿 + // es中也没有,从“qn_cloud_box_data”索引中拿 data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000); } deviceTotalDataStat.update(data); @@ -279,23 +291,45 @@ public class GizWitsIotMonitoringDataJob { return null; } - private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, - Long reportTime) throws Exception { - - DeviceTotalData deviceTotalData; - /*LocalDateTime startTime = LocalDateTime.of(new Date(reportTime * 1000).toLocalDate(), LocalTime.MIN); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); - searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime") - .gte(startTime.atZone(ZoneOffset.of("+8")).toEpochSecond()) - .lte(reportTime)); - searchSourceBuilder.sort("reportTime"); - searchSourceBuilder.size(500); - List receivedEventList = new ArrayList<>(); - EsRestClientUtil.queryDeviceListPageResult(searchSourceBuilder, - receivedEventList::add, MachineIotDataReceivedEvent.class, getIndicesList()); - // 一天的作业统计 - deviceTotalData = statistics(receivedEventList, reportTime); + private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) { + + DeviceTotalData deviceTotalData = null; + + // 通过http去请求之前的接口拿数据 + for (int i = 1; i <= 20; i++) { + String result = HttpUtil + .get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D"); + Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data"); + if(data == null) { + break; + } + Object records = JSONUtil.getByPath(JSONUtil.parse(result), "records"); + if(records == null) { + break; + } + JSONArray objects = JSONUtil.parseArray(records); + if(objects.isEmpty()) { + break; + } + for (Object o : objects.toArray()) { + Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac"); + Long iotMac = (Long)mac; + if(iotMac.equals(machineIotMac)) { + deviceTotalData = new DeviceTotalData(); + Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal"); + deviceTotalData.setJobTotal((Long)productionTotal); + Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal"); + deviceTotalData.setJobDurationTotal(((Long)workTotalTotal) * 3600); + Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime"); + LocalDateTime lastBootTime = LocalDateTime + .parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:hh:ss")); + deviceTotalData.setLastBootTime(lastBootTime); + deviceTotalData.setTheDayJobDuration(0L); + deviceTotalData.setTheDayJobCount(0L); + break; + } + } + } if(deviceTotalData == null) { deviceTotalData = new DeviceTotalData(); deviceTotalData.setJobTotal(0L); @@ -303,24 +337,14 @@ public class GizWitsIotMonitoringDataJob { deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault())); deviceTotalData.setTheDayJobDuration(0L); deviceTotalData.setTheDayJobCount(0L); - }*/ - deviceTotalData = new DeviceTotalData(); - deviceTotalData.setJobTotal(0L); - deviceTotalData.setJobDurationTotal(0L); - deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault())); - deviceTotalData.setTheDayJobDuration(0L); - deviceTotalData.setTheDayJobCount(0L); + } return deviceTotalData; } - /** - * - * @param receivedEventList - * @return 时长,数量 - */ - private DeviceTotalData statistics(List receivedEventList, Long time) throws Exception{ - if(CollUtil.isNotEmpty(receivedEventList)) { + /*private DeviceTotalData statistics(List receivedEventList, Long time) throws Exception { + + if (CollUtil.isNotEmpty(receivedEventList)) { // 一个周期中的开机数据 DeviceTotalData onData = new DeviceTotalData(); @@ -349,7 +373,7 @@ public class GizWitsIotMonitoringDataJob { Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); lastWorkingStatState.update(machineWorkingStat); Long reportTime = receivedEvent.getReportTime(); - if(lastOnData == null) { + if (lastOnData == null) { lastOnData = receivedEvent; } LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); @@ -395,7 +419,7 @@ public class GizWitsIotMonitoringDataJob { lastWorkingStat = 1; } } - if(lastWorkingStat != 1) { + if (lastWorkingStat != 1) { DeviceMonitoringData data = new DeviceMonitoringData(); data.setDataSource(receivedEvent.getDataSource()); data.setMachineIotMac(receivedEvent.getMachineIotMac()); @@ -411,7 +435,7 @@ public class GizWitsIotMonitoringDataJob { } } return null; - } + }*/ private String[] getIndicesList() throws IOException { @@ -456,5 +480,54 @@ public class GizWitsIotMonitoringDataJob { return null; } }).name("machineIotDataReceivedEventDataStream keyBy stream"); + + // 写入es + sinkEs(machineIotDataReceivedEventDataStream); + } + + private static void sinkEs(DataStream dataStream) { + + List httpHosts = new ArrayList<>(); + httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST), + ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), + ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); + ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, + (ElasticsearchSinkFunction) (deviceMonitoringData, runtimeContext, requestIndexer) -> { + + LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime()) + .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); + String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); + //创建es 请求 + IndexRequest indexRequest = Requests.indexRequest() + .index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix) + .source(BeanUtil.beanToMap(deviceMonitoringData)); + requestIndexer.add(indexRequest); + } + ); + //刷新前缓冲的最大动作量 + esSinkBuilder.setBulkFlushMaxActions(10); + //刷新前缓冲区的最大数据大小(以MB为单位) + esSinkBuilder.setBulkFlushMaxSizeMb(5); + //无论缓冲操作的数量或大小如何,都要刷新的时间间隔 + esSinkBuilder.setBulkFlushInterval(5000L); + // 客户端创建配置回调,配置账号密码 + esSinkBuilder.setRestClientFactory( + restClientBuilder -> { + restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), + ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); + return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + }); + restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { + // 设置es连接超时时间 + requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); + return requestConfigBuilder; + }); + } + ); + //数据流添加sink + dataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink"); } } diff --git a/src/test/java/Demo1.java b/src/test/java/Demo1.java index a43822f..5a0faab 100644 --- a/src/test/java/Demo1.java +++ b/src/test/java/Demo1.java @@ -1,18 +1,84 @@ +import com.qniao.iot.gizwits.CloudBoxData; +import com.qniao.iot.gizwits.utils.EsRestClientUtil; +import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.metrics.ParsedStats; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.io.IOException; import java.sql.Date; import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class Demo1 { - public static void main(String[] args) { + public static void main(String[] args) throws IOException { + + String host = "120.79.137.137:9200"; + String[] nodeIpInfos = host.split(":"); + RestClientBuilder builder = RestClient.builder(new HttpHost(nodeIpInfos[0], Integer.parseInt(nodeIpInfos[1]), "http")) + .setRequestConfigCallback(requestConfigBuilder -> { + requestConfigBuilder.setConnectTimeout(10 * 60 * 1000); + requestConfigBuilder.setSocketTimeout(10 * 60 * 1000); + requestConfigBuilder.setConnectionRequestTimeout(10 * 60 * 1000); + return requestConfigBuilder; + }); + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "qnol26215")); + builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider)); + RestHighLevelClient client = new RestHighLevelClient(builder); + + + - Map workingJobMap = new HashMap<>(); - workingJobMap.put("231231", 2325443523L); - workingJobMap.put("23rwer31", 2325443523L); - workingJobMap.put("2231f3f231", 2325443523L); - workingJobMap.clear(); - workingJobMap.put("231232222231", 2325443523L); - System.out.println(workingJobMap.get("231232222231")); + AggregationBuilder aggr1 = AggregationBuilders.stats("JobDurationTotal").field("space_of_time"); + AggregationBuilder aggr2 = AggregationBuilders.stats("JobTotal").field("quantity"); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("mac", 861193040814411L)) + .must(QueryBuilders.termQuery("data_type", 2)) + .must(QueryBuilders.rangeQuery ("space_of_time") + .lte(60)) + .must(QueryBuilders.rangeQuery("create_time") + .gte("2022-01-01 00:00:00") + .lte("2022-12-31 23:59:59")); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.aggregation(aggr1); + searchSourceBuilder.aggregation(aggr2); + searchSourceBuilder.size(0); + List receivedEventList = new ArrayList<>(); + SearchRequest request = new SearchRequest("qn_cloud_box_data_history_202208"); + request.source(searchSourceBuilder); + // 执行请求 + SearchResponse response = client.search(request, RequestOptions.DEFAULT); + // 获取响应中的聚合信息 + Aggregations aggregations = response.getAggregations(); + if (RestStatus.OK.equals(response.status()) || aggregations != null) { + ParsedStats jobDurationTotal = aggregations.get("JobDurationTotal"); + ParsedStats jobTotal = aggregations.get("JobTotal"); + double max = jobDurationTotal.getSum(); + double max1 = jobTotal.getMax(); + } } } diff --git a/src/test/java/Demo2.java b/src/test/java/Demo2.java new file mode 100644 index 0000000..369b133 --- /dev/null +++ b/src/test/java/Demo2.java @@ -0,0 +1,17 @@ +import cn.hutool.http.HttpUtil; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; + +public class Demo2 { + + public static void main(String[] args) { + + String s = HttpUtil.get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:21%7D"); + + Object data = JSONUtil.getByPath(JSONUtil.parse(s), "data"); + data = JSONUtil.getByPath(JSONUtil.parse(data), "records"); + + + System.out.println(data); + } +}