From abb2af7255c8b778f5f2c6fb518628b5a705b929 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Mon, 25 Jul 2022 17:16:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cloud-box-job/dependency-reduced-pom.xml | 142 --------- cloud-box-job/pom.xml | 273 ------------------ .../src/main/resources/log4j2.properties | 25 -- cloud-box-job/src/test/java/Test.java | 12 - pom.xml | 1 - root-cloud-statistics/pom.xml | 12 + .../src/test/java/com/qniao/iot/rc}/Body.java | 2 +- .../iot/rc}/CloudBoxDataHistoryEvent.java | 2 +- ...DataHistoryEventDeserializationSchema.java | 2 +- .../com/qniao/iot/rc}/CloudBoxEventJob.java | 19 +- .../com/qniao/iot/rc}/CloudBoxEventJob1.java | 50 ++-- .../qniao/iot/rc}/DruidDataSourceUtil.java | 2 +- .../qniao/iot/rc}/EsRestClientService.java | 11 +- .../java/com/qniao/iot/rc}/SinkMysqlFunc.java | 6 +- 14 files changed, 50 insertions(+), 509 deletions(-) delete mode 100644 cloud-box-job/dependency-reduced-pom.xml delete mode 100644 cloud-box-job/pom.xml delete mode 100644 cloud-box-job/src/main/resources/log4j2.properties delete mode 100644 cloud-box-job/src/test/java/Test.java rename {cloud-box-job/src/main/java/com/qniao/iot => root-cloud-statistics/src/test/java/com/qniao/iot/rc}/Body.java (96%) rename {cloud-box-job/src/main/java/com/qniao/iot => root-cloud-statistics/src/test/java/com/qniao/iot/rc}/CloudBoxDataHistoryEvent.java (92%) rename {cloud-box-job/src/main/java/com/qniao/iot => root-cloud-statistics/src/test/java/com/qniao/iot/rc}/CloudBoxDataHistoryEventDeserializationSchema.java (97%) rename {cloud-box-job/src/main/java/com/qniao/iot => root-cloud-statistics/src/test/java/com/qniao/iot/rc}/CloudBoxEventJob.java (80%) rename {cloud-box-job/src/main/java/com/qniao/iot => root-cloud-statistics/src/test/java/com/qniao/iot/rc}/CloudBoxEventJob1.java (72%) rename {cloud-box-job/src/main/java/com/qniao/iot => root-cloud-statistics/src/test/java/com/qniao/iot/rc}/DruidDataSourceUtil.java (98%) rename {cloud-box-job/src/main/java/com/qniao/iot => root-cloud-statistics/src/test/java/com/qniao/iot/rc}/EsRestClientService.java (94%) rename {cloud-box-job/src/main/java/com/qniao/iot => root-cloud-statistics/src/test/java/com/qniao/iot/rc}/SinkMysqlFunc.java (92%) diff --git a/cloud-box-job/dependency-reduced-pom.xml b/cloud-box-job/dependency-reduced-pom.xml deleted file mode 100644 index 0cd2bee..0000000 --- a/cloud-box-job/dependency-reduced-pom.xml +++ /dev/null @@ -1,142 +0,0 @@ - - - - iot-root-cloud-model-hw-formatter - com.qniao - 0.0.1-SNAPSHOT - - 4.0.0 - cloud-box-job - - new-job - - - maven-compiler-plugin - 3.1 - - ${target.java.version} - ${target.java.version} - - - - maven-shade-plugin - 3.1.1 - - - package - - shade - - - - - org.apache.flink:flink-shaded-force-shading - com.google.code.findbugs:jsr305 - org.slf4j:* - org.apache.logging.log4j:* - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - com.qniao.iot.CloudBoxEventJob - - - - - - - - - - - org.apache.logging.log4j - log4j-slf4j-impl - 2.17.2 - runtime - - - org.apache.logging.log4j - log4j-api - 2.17.2 - runtime - - - org.apache.logging.log4j - log4j-core - 2.17.2 - runtime - - - org.apache.flink - flink-table-common - 1.15.0 - provided - - - icu4j - com.ibm.icu - - - - - org.apache.flink - flink-table-runtime - 1.15.0 - provided - - - flink-cep - org.apache.flink - - - - - org.apache.flink - flink-table-planner_2.12 - 1.15.0 - provided - - - commons-compiler - org.codehaus.janino - - - janino - org.codehaus.janino - - - - - org.apache.flink - flink-table-api-java-bridge - 1.15.0 - provided - - - - - maven-releases - Nexus releases Repository - http://120.78.76.88:8081/repository/maven-snapshots/ - - - - 1.8 - 2.17.2 - 1.15.0 - ${target.java.version} - UTF-8 - ${target.java.version} - - diff --git a/cloud-box-job/pom.xml b/cloud-box-job/pom.xml deleted file mode 100644 index b2589cf..0000000 --- a/cloud-box-job/pom.xml +++ /dev/null @@ -1,273 +0,0 @@ - - - - iot-root-cloud-model-hw-formatter - com.qniao - 0.0.1-SNAPSHOT - - 4.0.0 - - cloud-box-job - - - UTF-8 - 1.15.0 - 1.8 - ${target.java.version} - ${target.java.version} - 2.17.2 - - - - - - com.qniao - root-cloud-event - 0.0.1-SNAPSHOT - - - - - - org.apache.flink - flink-streaming-java - ${flink.version} - - - org.apache.flink - flink-clients - ${flink.version} - - - - org.apache.flink - flink-connector-kafka - ${flink.version} - - - - - - org.apache.logging.log4j - log4j-slf4j-impl - ${log4j.version} - runtime - - - org.apache.logging.log4j - log4j-api - ${log4j.version} - runtime - - - org.apache.logging.log4j - log4j-core - ${log4j.version} - runtime - - - - commons-logging - commons-logging - 1.2 - - - - com.qniao - iot-machine-data-command - 0.0.1-SNAPSHOT - - - - com.qniao - iot-machine-data-event - 0.0.1-SNAPSHOT - - - - com.qniao - iot-machine-data-constant - 0.0.1-SNAPSHOT - - - - com.qniao - iot-machine-state-event-generator-job - 0.0.1-SNAPSHOT - - - - org.apache.flink - flink-connector-rabbitmq_2.12 - 1.14.5 - - - - cn.hutool - hutool-all - 5.8.4 - - - - - com.ctrip.framework.apollo - apollo-client - 2.0.1 - - - com.ctrip.framework.apollo - apollo-core - 2.0.1 - - - - - org.apache.flink - flink-table-common - 1.15.0 - provided - - - - - - org.apache.flink - flink-table-runtime - ${flink.version} - provided - - - - org.apache.flink - flink-table-planner_2.12 - 1.15.0 - provided - - - - - org.apache.flink - flink-table-api-java - 1.15.0 - - - - - org.apache.flink - flink-table-api-java-bridge - ${flink.version} - provided - - - - org.apache.flink - flink-table-api-scala_2.12 - 1.15.0 - - - - - org.apache.flink - flink-table-api-scala-bridge_2.12 - 1.15.0 - - - - com.alibaba - druid - 1.1.12 - - - - mysql - mysql-connector-java - 8.0.29 - - - - com.alibaba.fastjson2 - fastjson2 - 2.0.7 - - - - org.elasticsearch.client - elasticsearch-rest-high-level-client - 7.17.3 - - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.1 - - ${target.java.version} - ${target.java.version} - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 3.1.1 - - - - package - - shade - - - - - org.apache.flink:flink-shaded-force-shading - com.google.code.findbugs:jsr305 - org.slf4j:* - org.apache.logging.log4j:* - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - com.qniao.iot.CloudBoxEventJob - - - - - - - - new-job - - - - - maven-releases - Nexus releases Repository - http://120.78.76.88:8081/repository/maven-snapshots/ - - - \ No newline at end of file diff --git a/cloud-box-job/src/main/resources/log4j2.properties b/cloud-box-job/src/main/resources/log4j2.properties deleted file mode 100644 index 32c696e..0000000 --- a/cloud-box-job/src/main/resources/log4j2.properties +++ /dev/null @@ -1,25 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -rootLogger.level = INFO -rootLogger.appenderRef.console.ref = ConsoleAppender - -appender.console.name = ConsoleAppender -appender.console.type = CONSOLE -appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/cloud-box-job/src/test/java/Test.java b/cloud-box-job/src/test/java/Test.java deleted file mode 100644 index d20a176..0000000 --- a/cloud-box-job/src/test/java/Test.java +++ /dev/null @@ -1,12 +0,0 @@ -import cn.hutool.json.JSONUtil; -import com.qniao.iot.Body; -import org.apache.kafka.common.protocol.types.Field; - -public class Test { - - public static void main(String[] args) { - - String s = "{\"create_time\":\"2022-07-12 12:56:15\",\"data_source\":0,\"data_timestamp\":\"2022-07-12 12:56:15\",\"data_type\":2,\"docId\":\"86119304081409802286021474\",\"id\":744217778243899392,\"mac\":861193040814098,\"quantity\":28,\"space_of_time\":60,\"total_production\":21474}"; - System.out.println(JSONUtil.toBean(s, Body.class)); - } -} diff --git a/pom.xml b/pom.xml index 1ff66bf..ebe32b0 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,6 @@ under the License. root-cloud-event root-cloud-mocker root-cloud-statistics - cloud-box-job pom diff --git a/root-cloud-statistics/pom.xml b/root-cloud-statistics/pom.xml index 440d130..1921f49 100644 --- a/root-cloud-statistics/pom.xml +++ b/root-cloud-statistics/pom.xml @@ -147,6 +147,18 @@ under the License. apollo-core 2.0.1 + + com.alibaba + druid + 1.1.12 + test + + + + mysql + mysql-connector-java + 8.0.29 + diff --git a/cloud-box-job/src/main/java/com/qniao/iot/Body.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/Body.java similarity index 96% rename from cloud-box-job/src/main/java/com/qniao/iot/Body.java rename to root-cloud-statistics/src/test/java/com/qniao/iot/rc/Body.java index 7d6dc9f..d916d3f 100644 --- a/cloud-box-job/src/main/java/com/qniao/iot/Body.java +++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/Body.java @@ -1,4 +1,4 @@ -package com.qniao.iot; +package com.qniao.iot.rc; import lombok.Data; diff --git a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEvent.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxDataHistoryEvent.java similarity index 92% rename from cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEvent.java rename to root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxDataHistoryEvent.java index 57c16aa..988c5a1 100644 --- a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEvent.java +++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxDataHistoryEvent.java @@ -1,4 +1,4 @@ -package com.qniao.iot; +package com.qniao.iot.rc; import com.fasterxml.jackson.annotation.JsonAutoDetect; import lombok.Data; diff --git a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEventDeserializationSchema.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxDataHistoryEventDeserializationSchema.java similarity index 97% rename from cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEventDeserializationSchema.java rename to root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxDataHistoryEventDeserializationSchema.java index 5dce9b2..f1f2405 100644 --- a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEventDeserializationSchema.java +++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxDataHistoryEventDeserializationSchema.java @@ -1,4 +1,4 @@ -package com.qniao.iot; +package com.qniao.iot.rc; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.serialization.DeserializationSchema; diff --git a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java similarity index 80% rename from cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob.java rename to root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java index 3197ae5..3028283 100644 --- a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob.java +++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java @@ -1,43 +1,26 @@ -package com.qniao.iot; +package com.qniao.iot.rc; -import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.serialization.SimpleStringSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource; -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; import java.util.*; diff --git a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob1.java similarity index 72% rename from cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java rename to root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob1.java index db70a36..be6b9d1 100644 --- a/cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java +++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob1.java @@ -1,4 +1,4 @@ -package com.qniao.iot; +package com.qniao.iot.rc; import cn.hutool.core.util.StrUtil; import com.alibaba.druid.pool.DruidDataSource; @@ -6,7 +6,7 @@ import com.alibaba.druid.pool.DruidDataSource; import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; -import java.text.SimpleDateFormat; +import java.sql.Timestamp; import java.util.List; import java.util.Map; import java.util.concurrent.*; @@ -39,26 +39,26 @@ public class CloudBoxEventJob1 { } scrollId = map.get("scrollId").toString(); - if(count>861) { - if (dataList == null || dataList.size() < 10000 || count > 1000) { - executorService.shutdown(); - while (!executorService.isTerminated()) { - System.out.println("任务正在执行中,请稍等。。。"); - Thread.sleep(10000); - } - System.out.println("任务执行完成。。。"); - break; + /*if(count>501) { + }*/ + if (dataList == null || dataList.size() < 10000 || count > 1000) { + executorService.shutdown(); + while (!executorService.isTerminated()) { + System.out.println("任务正在执行中,请稍等。。。"); + Thread.sleep(10000); } - // 导入数据到mysql - List finalDataList = dataList; - executorService.execute(() -> { - try { - invoke(finalDataList); - } catch (Exception e) { - e.printStackTrace(); - } - }); + System.out.println("任务执行完成。。。"); + break; } + // 导入数据到mysql + List finalDataList = dataList; + executorService.execute(() -> { + try { + invoke(finalDataList); + } catch (Exception e) { + e.printStackTrace(); + } + }); ++count; } } @@ -85,11 +85,13 @@ public class CloudBoxEventJob1 { ps.setBigDecimal(10, null); String createTimeStr = body.getCreate_time(); Date createDate = null; + Timestamp timestamp = null; if (StrUtil.isNotEmpty(createTimeStr)) { - long time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(createTimeStr).getTime(); - createDate = new Date(time); + /*long time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(createTimeStr).getTime(); + createDate = new Date(time);*/ + timestamp = Timestamp.valueOf(createTimeStr); } - ps.setDate(11, createDate); + ps.setTimestamp(11, timestamp); Long id = body.getId(); ps.setString(12, id == null ? "0" : StrUtil.toString(id)); ps.setString(13, body.getDocId()); @@ -108,7 +110,7 @@ public class CloudBoxEventJob1 { private static String getSql() { - return "insert into qn_cloud_box_event(data_source, machine_iot_mac, machine_pwr_stat, machine_working_stat, acc_job_count,\n" + + return "insert into qn_cloud_box_event_copy1(data_source, machine_iot_mac, machine_pwr_stat, machine_working_stat, acc_job_count,\n" + " curr_job_count, curr_job_duration, curr_waiting_duration, curr_stoping_duration, ig_stat,\n" + " report_time,event_id, doc_id)\n" + "values (?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?)"; diff --git a/cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/DruidDataSourceUtil.java similarity index 98% rename from cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java rename to root-cloud-statistics/src/test/java/com/qniao/iot/rc/DruidDataSourceUtil.java index f6c47dd..774099c 100644 --- a/cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java +++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/DruidDataSourceUtil.java @@ -1,4 +1,4 @@ -package com.qniao.iot; +package com.qniao.iot.rc; import com.alibaba.druid.pool.DruidDataSource; diff --git a/cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/EsRestClientService.java similarity index 94% rename from cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java rename to root-cloud-statistics/src/test/java/com/qniao/iot/rc/EsRestClientService.java index c11d73f..4213a49 100644 --- a/cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java +++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/EsRestClientService.java @@ -1,4 +1,4 @@ -package com.qniao.iot; +package com.qniao.iot.rc; import cn.hutool.json.JSONUtil; import org.apache.http.HttpHost; @@ -21,7 +21,6 @@ import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -99,10 +98,10 @@ public class EsRestClientService { scrollId = response.getScrollId(); if (hits != null) { System.out.println("*********************查询es结果数量 :" + hits.length); - if(count > 861) { - for (SearchHit hit : hits) { - tupleList.add(JSONUtil.toBean(hit.getSourceAsString(), Body.class)); - } + /*if(count > 501) { + }*/ + for (SearchHit hit : hits) { + tupleList.add(JSONUtil.toBean(hit.getSourceAsString(), Body.class)); } } } else { diff --git a/cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/SinkMysqlFunc.java similarity index 92% rename from cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java rename to root-cloud-statistics/src/test/java/com/qniao/iot/rc/SinkMysqlFunc.java index a82c7c3..00053df 100644 --- a/cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java +++ b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/SinkMysqlFunc.java @@ -1,11 +1,9 @@ -package com.qniao.iot; +package com.qniao.iot.rc; -import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.druid.pool.DruidDataSource; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.sql.Connection; import java.sql.Date; @@ -86,7 +84,7 @@ public class SinkMysqlFunc extends RichSinkFunction> { private String getSql() { - return "insert into qn_cloud_box_event(data_source, machine_iot_mac, machine_pwr_stat, machine_working_stat, acc_job_count,\n" + + return "insert into qn_cloud_box_event_copy1(data_source, machine_iot_mac, machine_pwr_stat, machine_working_stat, acc_job_count,\n" + " curr_job_count, curr_job_duration, curr_waiting_duration, curr_stoping_duration, ig_stat,\n" + " report_time,event_id, doc_id)\n" + "values (?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?)";