Browse Source

更新

hph_优化版本
1049970895@qniao.cn 3 years ago
parent
commit
abb2af7255
14 changed files with 50 additions and 509 deletions
  1. 142
      cloud-box-job/dependency-reduced-pom.xml
  2. 273
      cloud-box-job/pom.xml
  3. 25
      cloud-box-job/src/main/resources/log4j2.properties
  4. 12
      cloud-box-job/src/test/java/Test.java
  5. 1
      pom.xml
  6. 12
      root-cloud-statistics/pom.xml
  7. 2
      root-cloud-statistics/src/test/java/com/qniao/iot/rc/Body.java
  8. 2
      root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxDataHistoryEvent.java
  9. 2
      root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxDataHistoryEventDeserializationSchema.java
  10. 19
      root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java
  11. 50
      root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob1.java
  12. 2
      root-cloud-statistics/src/test/java/com/qniao/iot/rc/DruidDataSourceUtil.java
  13. 11
      root-cloud-statistics/src/test/java/com/qniao/iot/rc/EsRestClientService.java
  14. 6
      root-cloud-statistics/src/test/java/com/qniao/iot/rc/SinkMysqlFunc.java

142
cloud-box-job/dependency-reduced-pom.xml

@ -1,142 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>iot-root-cloud-model-hw-formatter</artifactId>
<groupId>com.qniao</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-box-job</artifactId>
<build>
<finalName>new-job</finalName>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer />
<transformer>
<mainClass>com.qniao.iot.CloudBoxEventJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>icu4j</artifactId>
<groupId>com.ibm.icu</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-cep</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>commons-compiler</artifactId>
<groupId>org.codehaus.janino</groupId>
</exclusion>
<exclusion>
<artifactId>janino</artifactId>
<groupId>org.codehaus.janino</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>maven-releases</id>
<name>Nexus releases Repository</name>
<url>http://120.78.76.88:8081/repository/maven-snapshots/</url>
</repository>
</distributionManagement>
<properties>
<target.java.version>1.8</target.java.version>
<log4j.version>2.17.2</log4j.version>
<flink.version>1.15.0</flink.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
</properties>
</project>

273
cloud-box-job/pom.xml

@ -1,273 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iot-root-cloud-model-hw-formatter</artifactId>
<groupId>com.qniao</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-box-job</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.0</flink.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>root-cloud-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-command</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-constant</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-state-event-generator-job</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.12</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.4</version>
</dependency>
<!-- apollo -->
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-core</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
</dependency>
<!--table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.12</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.qniao.iot.CloudBoxEventJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<finalName>new-job</finalName>
</build>
<distributionManagement>
<repository>
<id>maven-releases</id>
<name>Nexus releases Repository</name>
<url>http://120.78.76.88:8081/repository/maven-snapshots/</url>
</repository>
</distributionManagement>
</project>

25
cloud-box-job/src/main/resources/log4j2.properties

@ -1,25 +0,0 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

12
cloud-box-job/src/test/java/Test.java

@ -1,12 +0,0 @@
import cn.hutool.json.JSONUtil;
import com.qniao.iot.Body;
import org.apache.kafka.common.protocol.types.Field;
public class Test {
public static void main(String[] args) {
String s = "{\"create_time\":\"2022-07-12 12:56:15\",\"data_source\":0,\"data_timestamp\":\"2022-07-12 12:56:15\",\"data_type\":2,\"docId\":\"86119304081409802286021474\",\"id\":744217778243899392,\"mac\":861193040814098,\"quantity\":28,\"space_of_time\":60,\"total_production\":21474}";
System.out.println(JSONUtil.toBean(s, Body.class));
}
}

1
pom.xml

@ -28,7 +28,6 @@ under the License.
<module>root-cloud-event</module>
<module>root-cloud-mocker</module>
<module>root-cloud-statistics</module>
<module>cloud-box-job</module>
</modules>
<packaging>pom</packaging>
</project>

12
root-cloud-statistics/pom.xml

@ -147,6 +147,18 @@ under the License.
<artifactId>apollo-core</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
</dependencies>
<build>

cloud-box-job/src/main/java/com/qniao/iot/Body.java → root-cloud-statistics/src/test/java/com/qniao/iot/rc/Body.java

@ -1,4 +1,4 @@
package com.qniao.iot;
package com.qniao.iot.rc;
import lombok.Data;

cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEvent.java → root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxDataHistoryEvent.java

@ -1,4 +1,4 @@
package com.qniao.iot;
package com.qniao.iot.rc;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import lombok.Data;

cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEventDeserializationSchema.java → root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxDataHistoryEventDeserializationSchema.java

@ -1,4 +1,4 @@
package com.qniao.iot;
package com.qniao.iot.rc;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.DeserializationSchema;

cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob.java → root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java

@ -1,43 +1,26 @@
package com.qniao.iot;
package com.qniao.iot.rc;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;

cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java → root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob1.java

@ -1,4 +1,4 @@
package com.qniao.iot;
package com.qniao.iot.rc;
import cn.hutool.core.util.StrUtil;
import com.alibaba.druid.pool.DruidDataSource;
@ -6,7 +6,7 @@ import com.alibaba.druid.pool.DruidDataSource;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.text.SimpleDateFormat;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@ -39,26 +39,26 @@ public class CloudBoxEventJob1 {
}
scrollId = map.get("scrollId").toString();
if(count>861) {
if (dataList == null || dataList.size() < 10000 || count > 1000) {
executorService.shutdown();
while (!executorService.isTerminated()) {
System.out.println("任务正在执行中,请稍等。。。");
Thread.sleep(10000);
}
System.out.println("任务执行完成。。。");
break;
/*if(count>501) {
}*/
if (dataList == null || dataList.size() < 10000 || count > 1000) {
executorService.shutdown();
while (!executorService.isTerminated()) {
System.out.println("任务正在执行中,请稍等。。。");
Thread.sleep(10000);
}
// 导入数据到mysql
List<Body> finalDataList = dataList;
executorService.execute(() -> {
try {
invoke(finalDataList);
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.println("任务执行完成。。。");
break;
}
// 导入数据到mysql
List<Body> finalDataList = dataList;
executorService.execute(() -> {
try {
invoke(finalDataList);
} catch (Exception e) {
e.printStackTrace();
}
});
++count;
}
}
@ -85,11 +85,13 @@ public class CloudBoxEventJob1 {
ps.setBigDecimal(10, null);
String createTimeStr = body.getCreate_time();
Date createDate = null;
Timestamp timestamp = null;
if (StrUtil.isNotEmpty(createTimeStr)) {
long time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(createTimeStr).getTime();
createDate = new Date(time);
/*long time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(createTimeStr).getTime();
createDate = new Date(time);*/
timestamp = Timestamp.valueOf(createTimeStr);
}
ps.setDate(11, createDate);
ps.setTimestamp(11, timestamp);
Long id = body.getId();
ps.setString(12, id == null ? "0" : StrUtil.toString(id));
ps.setString(13, body.getDocId());
@ -108,7 +110,7 @@ public class CloudBoxEventJob1 {
private static String getSql() {
return "insert into qn_cloud_box_event(data_source, machine_iot_mac, machine_pwr_stat, machine_working_stat, acc_job_count,\n" +
return "insert into qn_cloud_box_event_copy1(data_source, machine_iot_mac, machine_pwr_stat, machine_working_stat, acc_job_count,\n" +
" curr_job_count, curr_job_duration, curr_waiting_duration, curr_stoping_duration, ig_stat,\n" +
" report_time,event_id, doc_id)\n" +
"values (?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?)";

cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java → root-cloud-statistics/src/test/java/com/qniao/iot/rc/DruidDataSourceUtil.java

@ -1,4 +1,4 @@
package com.qniao.iot;
package com.qniao.iot.rc;
import com.alibaba.druid.pool.DruidDataSource;

cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java → root-cloud-statistics/src/test/java/com/qniao/iot/rc/EsRestClientService.java

@ -1,4 +1,4 @@
package com.qniao.iot;
package com.qniao.iot.rc;
import cn.hutool.json.JSONUtil;
import org.apache.http.HttpHost;
@ -21,7 +21,6 @@ import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -99,10 +98,10 @@ public class EsRestClientService {
scrollId = response.getScrollId();
if (hits != null) {
System.out.println("*********************查询es结果数量 :" + hits.length);
if(count > 861) {
for (SearchHit hit : hits) {
tupleList.add(JSONUtil.toBean(hit.getSourceAsString(), Body.class));
}
/*if(count > 501) {
}*/
for (SearchHit hit : hits) {
tupleList.add(JSONUtil.toBean(hit.getSourceAsString(), Body.class));
}
}
} else {

cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java → root-cloud-statistics/src/test/java/com/qniao/iot/rc/SinkMysqlFunc.java

@ -1,11 +1,9 @@
package com.qniao.iot;
package com.qniao.iot.rc;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.Connection;
import java.sql.Date;
@ -86,7 +84,7 @@ public class SinkMysqlFunc extends RichSinkFunction<List<Body>> {
private String getSql() {
return "insert into qn_cloud_box_event(data_source, machine_iot_mac, machine_pwr_stat, machine_working_stat, acc_job_count,\n" +
return "insert into qn_cloud_box_event_copy1(data_source, machine_iot_mac, machine_pwr_stat, machine_working_stat, acc_job_count,\n" +
" curr_job_count, curr_job_duration, curr_waiting_duration, curr_stoping_duration, ig_stat,\n" +
" report_time,event_id, doc_id)\n" +
"values (?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?)";
Loading…
Cancel
Save