From fc69d3111c53de51dab88e24af273d2ceeebdc88 Mon Sep 17 00:00:00 2001 From: "hupenghui@qniao.cn" <1049970895> Date: Sun, 24 Jul 2022 15:36:39 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/qniao/iot/CloudBoxEventJob1.java | 117 +++++++++++++----- .../com/qniao/iot/DruidDataSourceUtil.java | 6 +- .../com/qniao/iot/EsRestClientService.java | 43 +++---- .../java/com/qniao/iot/SinkMysqlFunc.java | 8 +- 4 files changed, 112 insertions(+), 62 deletions(-) diff --git a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java b/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java index bcaee02..db70a36 100644 --- a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java +++ b/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java @@ -1,61 +1,116 @@ package com.qniao.iot; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.util.FileUtils; +import cn.hutool.core.util.StrUtil; +import com.alibaba.druid.pool.DruidDataSource; -import java.io.File; +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.text.SimpleDateFormat; import java.util.List; import java.util.Map; +import java.util.concurrent.*; public class CloudBoxEventJob1 { - private static EsRestClientService esRestClientService = new EsRestClientService(); + private static final EsRestClientService esRestClientService = new EsRestClientService(); - public static void main(String[] args) throws Exception { + private static final DruidDataSource DATA_SOURCE = DruidDataSourceUtil.getInstance(); + public static void main(String[] args) throws Exception { - // set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 查询数据searchResponse String scrollId = null; - DataSet> dataSet = null; - List> dataList = null; + List dataList = null; int count = 0; + int nThreads = Runtime.getRuntime().availableProcessors(); + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + Map map = new ConcurrentHashMap<>(10000); while (!"none".equals(scrollId)) { - Map map = esRestClientService.queryDeviceListPage(scrollId); - if (map.get("tupleList") instanceof List) - dataList = (List>) map.get("tupleList"); + map.clear(); + esRestClientService.queryDeviceListPage(scrollId, map, count); + if (map.get("tupleList") != null) { + dataList = (List) map.get("tupleList"); + } scrollId = map.get("scrollId").toString(); - if (dataList == null || dataList.size() < 10000 || count > 3) - break; - - // 导入数据 - DataSet> dataSetTemp = env.fromCollection(dataList); - if (dataSet == null) { - dataSet = dataSetTemp; - } else { - dataSet = dataSet.union(dataSetTemp); + if(count>861) { + if (dataList == null || dataList.size() < 10000 || count > 1000) { + executorService.shutdown(); + while (!executorService.isTerminated()) { + System.out.println("任务正在执行中,请稍等。。。"); + Thread.sleep(10000); + } + System.out.println("任务执行完成。。。"); + break; + } + // 导入数据到mysql + List finalDataList = dataList; + executorService.execute(() -> { + try { + invoke(finalDataList); + } catch (Exception e) { + e.printStackTrace(); + } + }); } ++count; } - // 分组计算规则 - dataSet = dataSet.groupBy(0).sum(2); - + } - //dataSet.print(); + public static void invoke(List values) throws Exception { + + if (values.size() > 0) { + Connection connection = DATA_SOURCE.getConnection(); + PreparedStatement ps = connection.prepareStatement(getSql()); + for (Body body : values) { + if (body != null) { + ps.setInt(1, body.getData_source()); + ps.setLong(2, body.getMac()); + Integer dataType = body.getData_type(); + ps.setInt(3, dataType == 1 ? 0 : 1); + ps.setInt(4, dataType == 2 ? 1 : 0); + Long totalProduction = body.getTotal_production(); + ps.setLong(5, totalProduction == null ? 0 : totalProduction); + Long quantity = body.getQuantity(); + ps.setLong(6, quantity == null ? 0L : quantity); + ps.setBigDecimal(7, body.getRunningDuration()); + ps.setBigDecimal(8, body.getWaitingDuration()); + ps.setBigDecimal(9, null); + ps.setBigDecimal(10, null); + String createTimeStr = body.getCreate_time(); + Date createDate = null; + if (StrUtil.isNotEmpty(createTimeStr)) { + long time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(createTimeStr).getTime(); + createDate = new Date(time); + } + ps.setDate(11, createDate); + Long id = body.getId(); + ps.setString(12, id == null ? "0" : StrUtil.toString(id)); + ps.setString(13, body.getDocId()); + ps.addBatch(); + } + } + ps.executeBatch(); + if (connection != null) { + connection.close(); + } + if (ps != null) { + ps.close(); + } + } + } - String output = "C:\\Users\\10499\\Downloads\\1223.txt"; - FileUtils.deleteFileOrDirectory(new File(output)); - dataSet.writeAsText(output); + private static String getSql() { - env.execute("read es"); + return "insert into qn_cloud_box_event(data_source, machine_iot_mac, machine_pwr_stat, machine_working_stat, acc_job_count,\n" + + " curr_job_count, curr_job_duration, curr_waiting_duration, curr_stoping_duration, ig_stat,\n" + + " report_time,event_id, doc_id)\n" + + "values (?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?)"; } } diff --git a/cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java b/cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java index c01b64f..f6c47dd 100644 --- a/cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java +++ b/cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java @@ -14,10 +14,12 @@ public class DruidDataSourceUtil { dataSource.setPassword("qncloudprint5682"); //配置初始化大小、最小、最大 - dataSource.setInitialSize(2); - dataSource.setMinIdle(2); + dataSource.setInitialSize(16); + dataSource.setMinIdle(16); dataSource.setMaxActive(100); + + dataSource.setTestWhileIdle(true); dataSource.setRemoveAbandoned(true); //超时时间;单位为秒。180秒=3分钟 dataSource.setRemoveAbandonedTimeout(180); diff --git a/cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java b/cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java index 4321632..c11d73f 100644 --- a/cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java +++ b/cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java @@ -1,13 +1,11 @@ package com.qniao.iot; -import com.alibaba.fastjson2.JSONObject; -import org.apache.flink.api.java.tuple.Tuple3; +import cn.hutool.json.JSONUtil; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.action.search.*; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; @@ -18,6 +16,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; @@ -62,33 +61,30 @@ public class EsRestClientService { * @author lizixian * @date 2020/5/10 18:01 */ - public Map queryDeviceListPage(String scrollId) { - String brand = "CH"; + public void queryDeviceListPage(String scrollId, Map map, int count) { //设置查询数量 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.size(10000); BoolQueryBuilder bool = QueryBuilders.boolQuery(); - - // 平台 -// bool.must(QueryBuilders.termQuery("brand", brand)); - - sourceBuilder.query(bool);//查询条件 - return queryDeviceListPageResult(sourceBuilder, scrollId); + sourceBuilder.sort("create_time", SortOrder.DESC); + sourceBuilder.query(bool); + queryDeviceListPageResult(sourceBuilder, scrollId, map, count); } - private Map queryDeviceListPageResult(SearchSourceBuilder sourceBuilder, String scrollId) { + private void queryDeviceListPageResult(SearchSourceBuilder sourceBuilder, String scrollId, + Map resultMap, int count) { + SearchRequest searchRequest = new SearchRequest(index) - .scroll("2m") + .scroll("5m") .source(sourceBuilder); if (client == null) { init(); } - Map resultMap = new HashMap<>(5); - List> tupleList = new ArrayList<>(); + List tupleList = new ArrayList<>(); try { - SearchResponse response = null; + SearchResponse response; if (scrollId != null) { SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId).scroll("5m"); @@ -101,26 +97,25 @@ public class EsRestClientService { if (s == RestStatus.OK.getStatus()) { SearchHit[] hits = response.getHits().getHits(); scrollId = response.getScrollId(); - System.out.println("*********************查询es结果"); if (hits != null) { - for (SearchHit hit : hits) { - System.out.println("*********************查询es结果:" + hit.getSourceAsString()); - JSONObject json = JSONObject.parseObject(hit.getSourceAsString()); - tupleList.add(new Tuple3<>(json.getString("mac"), json.getString("data_source"), 1)); + System.out.println("*********************查询es结果数量 :" + hits.length); + if(count > 861) { + for (SearchHit hit : hits) { + tupleList.add(JSONUtil.toBean(hit.getSourceAsString(), Body.class)); + } } } } else { //清除滚屏 ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(scrollId);//也可以选择setScrollIds()将多个scrollId一起使用 + clearScrollRequest.addScrollId(scrollId); ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); - boolean succeeded = clearScrollResponse.isSucceeded(); + clearScrollResponse.isSucceeded(); } resultMap.put("scrollId", scrollId); resultMap.put("tupleList", tupleList); } catch (IOException e) { e.printStackTrace(); } - return resultMap; } } \ No newline at end of file diff --git a/cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java b/cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java index 7c02dad..a82c7c3 100644 --- a/cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java +++ b/cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java @@ -46,10 +46,8 @@ public class SinkMysqlFunc extends RichSinkFunction> { Integer dataType = body.getData_type(); ps.setInt(3, dataType == 1 ? 0 : 1); ps.setInt(4, dataType == 2 ? 1 : 0); - Long totalProduction = body.getTotal_production(); - ps.setLong(5, totalProduction == null ? 0 : totalProduction); - Long quantity = body.getQuantity(); - ps.setLong(6, quantity == null ? 0L : quantity); + ps.setLong(5, body.getQuantity()); + ps.setLong(6, body.getQuantity()); ps.setBigDecimal(7, body.getRunningDuration()); ps.setBigDecimal(8, body.getWaitingDuration()); ps.setBigDecimal(9, null); @@ -62,7 +60,7 @@ public class SinkMysqlFunc extends RichSinkFunction> { } ps.setDate(11, createDate); Long id = body.getId(); - ps.setString(12, id == null ? "0" : StrUtil.toString(id)); + ps.setString(12, id == null ? null : StrUtil.toString(id)); ps.setString(13, body.getDocId()); ps.addBatch(); }