Browse Source

更新

hph_优化版本
hupenghui@qniao.cn 3 years ago
parent
commit
fc69d3111c
4 changed files with 112 additions and 62 deletions
  1. 117
      cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java
  2. 6
      cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java
  3. 43
      cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java
  4. 8
      cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java

117
cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java

@ -1,61 +1,116 @@
package com.qniao.iot;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.FileUtils;
import cn.hutool.core.util.StrUtil;
import com.alibaba.druid.pool.DruidDataSource;
import java.io.File;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
public class CloudBoxEventJob1 {
private static EsRestClientService esRestClientService = new EsRestClientService();
private static final EsRestClientService esRestClientService = new EsRestClientService();
public static void main(String[] args) throws Exception {
private static final DruidDataSource DATA_SOURCE = DruidDataSourceUtil.getInstance();
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 查询数据searchResponse
String scrollId = null;
DataSet<Tuple3<String, String, Integer>> dataSet = null;
List<Tuple3<String, String, Integer>> dataList = null;
List<Body> dataList = null;
int count = 0;
int nThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
Map<String, Object> map = new ConcurrentHashMap<>(10000);
while (!"none".equals(scrollId)) {
Map<String, Object> map = esRestClientService.queryDeviceListPage(scrollId);
if (map.get("tupleList") instanceof List)
dataList = (List<Tuple3<String, String, Integer>>) map.get("tupleList");
map.clear();
esRestClientService.queryDeviceListPage(scrollId, map, count);
if (map.get("tupleList") != null) {
dataList = (List<Body>) map.get("tupleList");
}
scrollId = map.get("scrollId").toString();
if (dataList == null || dataList.size() < 10000 || count > 3)
break;
// 导入数据
DataSet<Tuple3<String, String, Integer>> dataSetTemp = env.fromCollection(dataList);
if (dataSet == null) {
dataSet = dataSetTemp;
} else {
dataSet = dataSet.union(dataSetTemp);
if(count>861) {
if (dataList == null || dataList.size() < 10000 || count > 1000) {
executorService.shutdown();
while (!executorService.isTerminated()) {
System.out.println("任务正在执行中,请稍等。。。");
Thread.sleep(10000);
}
System.out.println("任务执行完成。。。");
break;
}
// 导入数据到mysql
List<Body> finalDataList = dataList;
executorService.execute(() -> {
try {
invoke(finalDataList);
} catch (Exception e) {
e.printStackTrace();
}
});
}
++count;
}
// 分组计算规则
dataSet = dataSet.groupBy(0).sum(2);
}
//dataSet.print();
public static void invoke(List<Body> values) throws Exception {
if (values.size() > 0) {
Connection connection = DATA_SOURCE.getConnection();
PreparedStatement ps = connection.prepareStatement(getSql());
for (Body body : values) {
if (body != null) {
ps.setInt(1, body.getData_source());
ps.setLong(2, body.getMac());
Integer dataType = body.getData_type();
ps.setInt(3, dataType == 1 ? 0 : 1);
ps.setInt(4, dataType == 2 ? 1 : 0);
Long totalProduction = body.getTotal_production();
ps.setLong(5, totalProduction == null ? 0 : totalProduction);
Long quantity = body.getQuantity();
ps.setLong(6, quantity == null ? 0L : quantity);
ps.setBigDecimal(7, body.getRunningDuration());
ps.setBigDecimal(8, body.getWaitingDuration());
ps.setBigDecimal(9, null);
ps.setBigDecimal(10, null);
String createTimeStr = body.getCreate_time();
Date createDate = null;
if (StrUtil.isNotEmpty(createTimeStr)) {
long time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(createTimeStr).getTime();
createDate = new Date(time);
}
ps.setDate(11, createDate);
Long id = body.getId();
ps.setString(12, id == null ? "0" : StrUtil.toString(id));
ps.setString(13, body.getDocId());
ps.addBatch();
}
}
ps.executeBatch();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
}
String output = "C:\\Users\\10499\\Downloads\\1223.txt";
FileUtils.deleteFileOrDirectory(new File(output));
dataSet.writeAsText(output);
private static String getSql() {
env.execute("read es");
return "insert into qn_cloud_box_event(data_source, machine_iot_mac, machine_pwr_stat, machine_working_stat, acc_job_count,\n" +
" curr_job_count, curr_job_duration, curr_waiting_duration, curr_stoping_duration, ig_stat,\n" +
" report_time,event_id, doc_id)\n" +
"values (?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?)";
}
}

6
cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java

@ -14,10 +14,12 @@ public class DruidDataSourceUtil {
dataSource.setPassword("qncloudprint5682");
//配置初始化大小最小最大
dataSource.setInitialSize(2);
dataSource.setMinIdle(2);
dataSource.setInitialSize(16);
dataSource.setMinIdle(16);
dataSource.setMaxActive(100);
dataSource.setTestWhileIdle(true);
dataSource.setRemoveAbandoned(true);
//超时时间单位为秒180秒=3分钟
dataSource.setRemoveAbandonedTimeout(180);

43
cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java

@ -1,13 +1,11 @@
package com.qniao.iot;
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.java.tuple.Tuple3;
import cn.hutool.json.JSONUtil;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
@ -18,6 +16,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
@ -62,33 +61,30 @@ public class EsRestClientService {
* @author lizixian
* @date 2020/5/10 18:01
*/
public Map<String, Object> queryDeviceListPage(String scrollId) {
String brand = "CH";
public void queryDeviceListPage(String scrollId, Map<String, Object> map, int count) {
//设置查询数量
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.size(10000);
BoolQueryBuilder bool = QueryBuilders.boolQuery();
// 平台
// bool.must(QueryBuilders.termQuery("brand", brand));
sourceBuilder.query(bool);//查询条件
return queryDeviceListPageResult(sourceBuilder, scrollId);
sourceBuilder.sort("create_time", SortOrder.DESC);
sourceBuilder.query(bool);
queryDeviceListPageResult(sourceBuilder, scrollId, map, count);
}
private Map<String, Object> queryDeviceListPageResult(SearchSourceBuilder sourceBuilder, String scrollId) {
private void queryDeviceListPageResult(SearchSourceBuilder sourceBuilder, String scrollId,
Map<String, Object> resultMap, int count) {
SearchRequest searchRequest = new SearchRequest(index)
.scroll("2m")
.scroll("5m")
.source(sourceBuilder);
if (client == null) {
init();
}
Map<String, Object> resultMap = new HashMap<>(5);
List<Tuple3<String, String, Integer>> tupleList = new ArrayList<>();
List<Body> tupleList = new ArrayList<>();
try {
SearchResponse response = null;
SearchResponse response;
if (scrollId != null) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId).scroll("5m");
@ -101,26 +97,25 @@ public class EsRestClientService {
if (s == RestStatus.OK.getStatus()) {
SearchHit[] hits = response.getHits().getHits();
scrollId = response.getScrollId();
System.out.println("*********************查询es结果");
if (hits != null) {
for (SearchHit hit : hits) {
System.out.println("*********************查询es结果:" + hit.getSourceAsString());
JSONObject json = JSONObject.parseObject(hit.getSourceAsString());
tupleList.add(new Tuple3<>(json.getString("mac"), json.getString("data_source"), 1));
System.out.println("*********************查询es结果数量 :" + hits.length);
if(count > 861) {
for (SearchHit hit : hits) {
tupleList.add(JSONUtil.toBean(hit.getSourceAsString(), Body.class));
}
}
}
} else {
//清除滚屏
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);//也可以选择setScrollIds()将多个scrollId一起使用
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
boolean succeeded = clearScrollResponse.isSucceeded();
clearScrollResponse.isSucceeded();
}
resultMap.put("scrollId", scrollId);
resultMap.put("tupleList", tupleList);
} catch (IOException e) {
e.printStackTrace();
}
return resultMap;
}
}

8
cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java

@ -46,10 +46,8 @@ public class SinkMysqlFunc extends RichSinkFunction<List<Body>> {
Integer dataType = body.getData_type();
ps.setInt(3, dataType == 1 ? 0 : 1);
ps.setInt(4, dataType == 2 ? 1 : 0);
Long totalProduction = body.getTotal_production();
ps.setLong(5, totalProduction == null ? 0 : totalProduction);
Long quantity = body.getQuantity();
ps.setLong(6, quantity == null ? 0L : quantity);
ps.setLong(5, body.getQuantity());
ps.setLong(6, body.getQuantity());
ps.setBigDecimal(7, body.getRunningDuration());
ps.setBigDecimal(8, body.getWaitingDuration());
ps.setBigDecimal(9, null);
@ -62,7 +60,7 @@ public class SinkMysqlFunc extends RichSinkFunction<List<Body>> {
}
ps.setDate(11, createDate);
Long id = body.getId();
ps.setString(12, id == null ? "0" : StrUtil.toString(id));
ps.setString(12, id == null ? null : StrUtil.toString(id));
ps.setString(13, body.getDocId());
ps.addBatch();
}

Loading…
Cancel
Save