Browse Source

更新

hph_优化版本
1049970895@qniao.cn 3 years ago
parent
commit
9aa21f21c2
19 changed files with 1183 additions and 5 deletions
  1. 142
      cloud-box-job/dependency-reduced-pom.xml
  2. 273
      cloud-box-job/pom.xml
  3. 43
      cloud-box-job/src/main/java/com/qniao/iot/Body.java
  4. 17
      cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEvent.java
  5. 32
      cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEventDeserializationSchema.java
  6. 107
      cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob.java
  7. 61
      cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java
  8. 45
      cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java
  9. 126
      cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java
  10. 97
      cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java
  11. 25
      cloud-box-job/src/main/resources/log4j2.properties
  12. 12
      cloud-box-job/src/test/java/Test.java
  13. 1
      pom.xml
  14. 43
      root-cloud-mocker/src/main/java/com/qniao/iot/rc/Body.java
  15. 15
      root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxDataHistoryEvent.java
  16. 27
      root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxEventSerialization.java
  17. 95
      root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxEventSourceMocker.java
  18. 23
      root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java
  19. 4
      root-cloud-statistics/src/main/resources/META-INF/app.properties

142
cloud-box-job/dependency-reduced-pom.xml

@ -0,0 +1,142 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>iot-root-cloud-model-hw-formatter</artifactId>
<groupId>com.qniao</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-box-job</artifactId>
<build>
<finalName>new-job</finalName>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer />
<transformer>
<mainClass>com.qniao.iot.CloudBoxEventJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>icu4j</artifactId>
<groupId>com.ibm.icu</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-cep</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>commons-compiler</artifactId>
<groupId>org.codehaus.janino</groupId>
</exclusion>
<exclusion>
<artifactId>janino</artifactId>
<groupId>org.codehaus.janino</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>maven-releases</id>
<name>Nexus releases Repository</name>
<url>http://120.78.76.88:8081/repository/maven-snapshots/</url>
</repository>
</distributionManagement>
<properties>
<target.java.version>1.8</target.java.version>
<log4j.version>2.17.2</log4j.version>
<flink.version>1.15.0</flink.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
</properties>
</project>

273
cloud-box-job/pom.xml

@ -0,0 +1,273 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iot-root-cloud-model-hw-formatter</artifactId>
<groupId>com.qniao</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-box-job</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.0</flink.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>root-cloud-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-command</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-constant</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-state-event-generator-job</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.12</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.4</version>
</dependency>
<!-- apollo -->
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-core</artifactId>
<version>2.0.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
</dependency>
<!--table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.15.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.12</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.qniao.iot.CloudBoxEventJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<finalName>new-job</finalName>
</build>
<distributionManagement>
<repository>
<id>maven-releases</id>
<name>Nexus releases Repository</name>
<url>http://120.78.76.88:8081/repository/maven-snapshots/</url>
</repository>
</distributionManagement>
</project>

43
cloud-box-job/src/main/java/com/qniao/iot/Body.java

@ -0,0 +1,43 @@
package com.qniao.iot;
import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
@Data
public class Body {
private static final long serialVersionUID = 1L;
private String create_time;
private Integer data_source;
private String data_timestamp;
// 0开机数据 1关机数据 3生产数据
private Integer data_type;
private String docId;
private Long id;
private Long mac;
private Long quantity;
private BigDecimal runningDuration;
private BigDecimal runningHour;
private Integer space_of_time;
private Long total_production;
private BigDecimal waitingDuration;
private BigDecimal waitingHour;
private Long currentTime;
}

17
cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEvent.java

@ -0,0 +1,17 @@
package com.qniao.iot;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
@Data
public class CloudBoxDataHistoryEvent implements Serializable {
private static final long serialVersionUID = 1L;
private String body;
private String eventKey;
}

32
cloud-box-job/src/main/java/com/qniao/iot/CloudBoxDataHistoryEventDeserializationSchema.java

@ -0,0 +1,32 @@
package com.qniao.iot;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
/**
* @author Lzk
*/
public class CloudBoxDataHistoryEventDeserializationSchema implements DeserializationSchema<CloudBoxDataHistoryEvent> {
/**
* 注册JavaTimeModule支持LocalDateTime字段的解析
*/
final private ObjectMapper objectMapper = new ObjectMapper();
@Override
public CloudBoxDataHistoryEvent deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, CloudBoxDataHistoryEvent.class);
}
@Override
public boolean isEndOfStream(CloudBoxDataHistoryEvent nextElement) {
return false;
}
@Override
public TypeInformation<CloudBoxDataHistoryEvent> getProducedType() {
return TypeInformation.of(CloudBoxDataHistoryEvent.class);
}
}

107
cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob.java

@ -0,0 +1,107 @@
package com.qniao.iot;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
public class CloudBoxEventJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
env.getConfig().setAutoWatermarkInterval(1000L);
env.setParallelism(1);
/*Map<TopicPartition, Long> offsets = new HashMap<>();
TopicPartition topicPartition = new TopicPartition("data-message-channel-qn", 0);
offsets.put(topicPartition, 5872534L);*/
KafkaSource<CloudBoxDataHistoryEvent> source = KafkaSource.<CloudBoxDataHistoryEvent>builder()
.setBootstrapServers("172.19.14.225:9092")
.setTopics("data-message-channel-qn")
.setGroupId("cloud_box_event_job")
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new CloudBoxDataHistoryEventDeserializationSchema())
.build();
SingleOutputStreamOperator<CloudBoxDataHistoryEvent> fromSource = env
.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),
"cloudBoxDataHistoryEvent fromSource")
.filter((FilterFunction<CloudBoxDataHistoryEvent>) value -> {
String eventKey = value.getEventKey();
return StrUtil.isNotEmpty(eventKey) && eventKey.equals("qn_cloud_box_data_history");
});
fromSource.print();
SingleOutputStreamOperator<Body> flatMap = fromSource
.flatMap(new RichFlatMapFunction<CloudBoxDataHistoryEvent, Body>() {
@Override
public void flatMap(CloudBoxDataHistoryEvent event, Collector<Body> out) {
String body = event.getBody();
if (StrUtil.isNotEmpty(body)) {
Body bean = JSONUtil.toBean(body, Body.class);
bean.setCurrentTime(LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
out.collect(bean);
}
}
}).name("cloudBoxDataHistoryEvent flatmap");
SingleOutputStreamOperator<List<Body>> toMysql = flatMap
.assignTimestampsAndWatermarks(WatermarkStrategy.<Body>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(((body, recordTimestamp) -> body.getCurrentTime())))
.keyBy(Body::getData_type)
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.process(new ProcessWindowFunction<Body, List<Body>, Integer, TimeWindow>() {
@Override
public void process(Integer aLong, ProcessWindowFunction<Body, List<Body>, Integer, TimeWindow>.Context context,
Iterable<Body> elements, Collector<List<Body>> out) {
List<Body> list = ListUtil.toList(elements);
if (list.size() > 0) {
out.collect(list);
}
}
}).name("to mysql");
toMysql.addSink(new SinkMysqlFunc()).name("sink to mysql");
env.execute("cloud box event job");
}
}

61
cloud-box-job/src/main/java/com/qniao/iot/CloudBoxEventJob1.java

@ -0,0 +1,61 @@
package com.qniao.iot;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.FileUtils;
import java.io.File;
import java.util.List;
import java.util.Map;
public class CloudBoxEventJob1 {
private static EsRestClientService esRestClientService = new EsRestClientService();
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 查询数据searchResponse
String scrollId = null;
DataSet<Tuple3<String, String, Integer>> dataSet = null;
List<Tuple3<String, String, Integer>> dataList = null;
int count = 0;
while (!"none".equals(scrollId)) {
Map<String, Object> map = esRestClientService.queryDeviceListPage(scrollId);
if (map.get("tupleList") instanceof List)
dataList = (List<Tuple3<String, String, Integer>>) map.get("tupleList");
scrollId = map.get("scrollId").toString();
if (dataList == null || dataList.size() < 10000 || count > 3)
break;
// 导入数据
DataSet<Tuple3<String, String, Integer>> dataSetTemp = env.fromCollection(dataList);
if (dataSet == null) {
dataSet = dataSetTemp;
} else {
dataSet = dataSet.union(dataSetTemp);
}
++count;
}
// 分组计算规则
dataSet = dataSet.groupBy(0).sum(2);
//dataSet.print();
String output = "C:\\Users\\10499\\Downloads\\1223.txt";
FileUtils.deleteFileOrDirectory(new File(output));
dataSet.writeAsText(output);
env.execute("read es");
}
}

45
cloud-box-job/src/main/java/com/qniao/iot/DruidDataSourceUtil.java

@ -0,0 +1,45 @@
package com.qniao.iot;
import com.alibaba.druid.pool.DruidDataSource;
public class DruidDataSourceUtil {
public static DruidDataSource getInstance(){
DruidDataSource dataSource = new DruidDataSource();
//设置连接参数
dataSource.setUrl("jdbc:mysql://rm-wz9it4fs5tk7n4tm1zo.mysql.rds.aliyuncs.com:3306/cloud_print_cloud_factory?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useTimezone=true&serverTimezone=GMT%2B8&useSSL=false");
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUsername("qn_cloudprint");
dataSource.setPassword("qncloudprint5682");
//配置初始化大小最小最大
dataSource.setInitialSize(2);
dataSource.setMinIdle(2);
dataSource.setMaxActive(100);
dataSource.setRemoveAbandoned(true);
//超时时间单位为秒180秒=3分钟
dataSource.setRemoveAbandonedTimeout(180);
//配置一个连接在池中最小生存的时间单位是毫秒
dataSource.setMinEvictableIdleTimeMillis(300000);
//配置获取连接等待超时的时间单位毫秒
dataSource.setMaxWait(60000);
//配置间隔多久才进行一次检测检测需要关闭的空闲连接单位是毫秒
dataSource.setTimeBetweenEvictionRunsMillis(60000);
//防止过期
dataSource.setValidationQuery("SELECT 'x'");
dataSource.setTestWhileIdle(true);
dataSource.setTestOnBorrow(false);
dataSource.setTestOnReturn(false);
//是否缓存preparedStatement
dataSource.setPoolPreparedStatements(false);
dataSource.setMaxOpenPreparedStatements(100);
//asyncInit是1.1.4中新增加的配置如果有initialSize数量较多时打开会加快应用启动时间
dataSource.setAsyncInit(true);
return dataSource;
}
}

126
cloud-box-job/src/main/java/com/qniao/iot/EsRestClientService.java

@ -0,0 +1,126 @@
package com.qniao.iot;
import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.search.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 阿里云服务器搭建的ES服务
*
* @author lizixian
* @date 2020/3/16 10:41
*/
public class EsRestClientService {
private String host = "120.79.137.137:9200";
private String scheme = "http";
private String index = "qn_cloud_box_data_history";
private RestClientBuilder builder = null;
private RestHighLevelClient client = null;
public void init() {
String[] nodeIpInfos = host.split(":");
builder = RestClient.builder(new HttpHost(nodeIpInfos[0], Integer.parseInt(nodeIpInfos[1]), scheme))
.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(10 * 60 * 1000);
requestConfigBuilder.setSocketTimeout(10 * 60 * 1000);
requestConfigBuilder.setConnectionRequestTimeout(10 * 60 * 1000);
return requestConfigBuilder;
});
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "qnol26215"));
builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
client = new RestHighLevelClient(builder);
}
/**
* 分页查询应设备应用安装列表-使用游标
*
* @author lizixian
* @date 2020/5/10 18:01
*/
public Map<String, Object> queryDeviceListPage(String scrollId) {
String brand = "CH";
//设置查询数量
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.size(10000);
BoolQueryBuilder bool = QueryBuilders.boolQuery();
// 平台
// bool.must(QueryBuilders.termQuery("brand", brand));
sourceBuilder.query(bool);//查询条件
return queryDeviceListPageResult(sourceBuilder, scrollId);
}
private Map<String, Object> queryDeviceListPageResult(SearchSourceBuilder sourceBuilder, String scrollId) {
SearchRequest searchRequest = new SearchRequest(index)
.scroll("2m")
.source(sourceBuilder);
if (client == null) {
init();
}
Map<String, Object> resultMap = new HashMap<>(5);
List<Tuple3<String, String, Integer>> tupleList = new ArrayList<>();
try {
SearchResponse response = null;
if (scrollId != null) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId).scroll("5m");
response = client.scroll(scrollRequest, RequestOptions.DEFAULT);
} else {
response = client.search(searchRequest, RequestOptions.DEFAULT);
}
int s = response.status().getStatus();
if (s == RestStatus.OK.getStatus()) {
SearchHit[] hits = response.getHits().getHits();
scrollId = response.getScrollId();
System.out.println("*********************查询es结果");
if (hits != null) {
for (SearchHit hit : hits) {
System.out.println("*********************查询es结果:" + hit.getSourceAsString());
JSONObject json = JSONObject.parseObject(hit.getSourceAsString());
tupleList.add(new Tuple3<>(json.getString("mac"), json.getString("data_source"), 1));
}
}
} else {
//清除滚屏
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);//也可以选择setScrollIds()将多个scrollId一起使用
ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
boolean succeeded = clearScrollResponse.isSucceeded();
}
resultMap.put("scrollId", scrollId);
resultMap.put("tupleList", tupleList);
} catch (IOException e) {
e.printStackTrace();
}
return resultMap;
}
}

97
cloud-box-job/src/main/java/com/qniao/iot/SinkMysqlFunc.java

@ -0,0 +1,97 @@
package com.qniao.iot;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.druid.pool.DruidDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.text.SimpleDateFormat;
import java.util.List;
public class SinkMysqlFunc extends RichSinkFunction<List<Body>> {
private DruidDataSource dataSource = null;
/**
* 初始化方法 在invoke前执行
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 获取数据库连接池配置 此处省略
if (dataSource == null) {
dataSource = DruidDataSourceUtil.getInstance();
}
}
@Override
public void invoke(List<Body> values, Context context) throws Exception {
if (values.size() != 0) {
Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(getSql());
for (Body body : values) {
if (body != null) {
ps.setInt(1, body.getData_source());
ps.setLong(2, body.getMac());
Integer dataType = body.getData_type();
ps.setInt(3, dataType == 1 ? 0 : 1);
ps.setInt(4, dataType == 2 ? 1 : 0);
Long totalProduction = body.getTotal_production();
ps.setLong(5, totalProduction == null ? 0 : totalProduction);
Long quantity = body.getQuantity();
ps.setLong(6, quantity == null ? 0L : quantity);
ps.setBigDecimal(7, body.getRunningDuration());
ps.setBigDecimal(8, body.getWaitingDuration());
ps.setBigDecimal(9, null);
ps.setBigDecimal(10, null);
String createTimeStr = body.getCreate_time();
Date createDate = null;
if (StrUtil.isNotEmpty(createTimeStr)) {
long time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(createTimeStr).getTime();
createDate = new Date(time);
}
ps.setDate(11, createDate);
Long id = body.getId();
ps.setString(12, id == null ? "0" : StrUtil.toString(id));
ps.setString(13, body.getDocId());
ps.addBatch();
}
}
ps.executeBatch();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
}
@Override
public void close() throws Exception {
super.close();
//关闭连接和释放资源
if (dataSource != null) {
dataSource.close();
}
}
private String getSql() {
return "insert into qn_cloud_box_event(data_source, machine_iot_mac, machine_pwr_stat, machine_working_stat, acc_job_count,\n" +
" curr_job_count, curr_job_duration, curr_waiting_duration, curr_stoping_duration, ig_stat,\n" +
" report_time,event_id, doc_id)\n" +
"values (?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?)";
}
}

25
cloud-box-job/src/main/resources/log4j2.properties

@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

12
cloud-box-job/src/test/java/Test.java

@ -0,0 +1,12 @@
import cn.hutool.json.JSONUtil;
import com.qniao.iot.Body;
import org.apache.kafka.common.protocol.types.Field;
public class Test {
public static void main(String[] args) {
String s = "{\"create_time\":\"2022-07-12 12:56:15\",\"data_source\":0,\"data_timestamp\":\"2022-07-12 12:56:15\",\"data_type\":2,\"docId\":\"86119304081409802286021474\",\"id\":744217778243899392,\"mac\":861193040814098,\"quantity\":28,\"space_of_time\":60,\"total_production\":21474}";
System.out.println(JSONUtil.toBean(s, Body.class));
}
}

1
pom.xml

@ -28,6 +28,7 @@ under the License.
<module>root-cloud-event</module> <module>root-cloud-event</module>
<module>root-cloud-mocker</module> <module>root-cloud-mocker</module>
<module>root-cloud-statistics</module> <module>root-cloud-statistics</module>
<module>cloud-box-job</module>
</modules> </modules>
<packaging>pom</packaging> <packaging>pom</packaging>
</project> </project>

43
root-cloud-mocker/src/main/java/com/qniao/iot/rc/Body.java

@ -0,0 +1,43 @@
package com.qniao.iot.rc;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
@Data
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY, getterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class Body implements Serializable {
private static final long serialVersionUID = 1L;
private String createTime;
private Integer dataSource;
private String dataTimestamp;
// 0开机数据 1关机数据 3生产数据
private Integer dataType;
private String docId;
private Long id;
private Long mac;
private Long quantity;
private BigDecimal runningDuration;
private BigDecimal runningHour;
private Integer spaceOfTime;
private Long totalProduction;
private BigDecimal waitingDuration;
private BigDecimal waitingHour;
}

15
root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxDataHistoryEvent.java

@ -0,0 +1,15 @@
package com.qniao.iot.rc;
import lombok.Data;
import java.io.Serializable;
@Data
public class CloudBoxDataHistoryEvent implements Serializable {
private static final long serialVersionUID = 1L;
private Body body;
private String eventKey;
}

27
root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxEventSerialization.java

@ -0,0 +1,27 @@
package com.qniao.iot.rc;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
public class CloudBoxEventSerialization {
private static final ObjectMapper objectMapper = new ObjectMapper();
private final String topic;
public CloudBoxEventSerialization(String topic) {
this.topic = topic;
}
public ProducerRecord<String, byte[]> serialize(
final CloudBoxDataHistoryEvent message, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + message, e);
}
}
}

95
root-cloud-mocker/src/main/java/com/qniao/iot/rc/CloudBoxEventSourceMocker.java

@ -0,0 +1,95 @@
package com.qniao.iot.rc;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Db;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
public class CloudBoxEventSourceMocker {
// 延迟毫秒
public static final long DELAY = 500;
public static void main(String[] args) throws Exception {
// 创建kafka配置属性
Properties kafkaProps = createKafkaProperties();
// 创建Kafka消息的生产者
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(kafkaProps);
String topic = "data-message-channel-qn";
// 循环发送事件
while (true) {
CloudBoxDataHistoryEvent event = new CloudBoxDataHistoryEvent();
Body body = new Body();
body.setDataSource(1);
body.setCreateTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")));
body.setDataTimestamp(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")));
body.setId(RandomUtil.randomLong(99999999));
body.setMac(RandomUtil.randomLong(9999999999999999L));
body.setDataType(1);
body.setQuantity(RandomUtil.randomLong(9999));
body.setRunningDuration(RandomUtil.randomBigDecimal(BigDecimal.valueOf(500)));
body.setRunningHour(RandomUtil.randomBigDecimal(BigDecimal.valueOf(500)));
body.setTotalProduction(RandomUtil.randomLong(999999));
event.setBody(body);
ProducerRecord<String, byte[]> record = new CloudBoxEventSerialization(topic).serialize(
event,
null);
Future<RecordMetadata> send = producer.send(record);
System.out.println(send.get());
Thread.sleep(DELAY);
}
}
private static Integer generateWorkingSta(Integer workingSta, Integer pwrSta) {
if(pwrSta.equals(0)) {
return 0;
}else {
return workingSta;
}
}
private static Properties createKafkaProperties() {
Properties kafkaProps = new Properties();
// 本地环境
//kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SASL_PLAINTEXT://localhost:9093");
// 测试环境
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.29.115.145:9092");
// 正式环境
//kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
/*kafkaProps.put(ProducerConfig.ACKS_CONFIG, "1");
// 添加认证配置
kafkaProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
kafkaProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
kafkaProps.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";");
*/
return kafkaProps;
}
}

23
root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java

@ -2,9 +2,12 @@ package com.qniao.iot.rc;
import cn.hutool.core.util.RandomUtil; import cn.hutool.core.util.RandomUtil;
import cn.hutool.db.Db; import cn.hutool.db.Db;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
@ -12,6 +15,7 @@ import java.math.BigDecimal;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Future;
public class RootCloudIotDataEventSourceMocker { public class RootCloudIotDataEventSourceMocker {
// 延迟毫秒 // 延迟毫秒
@ -58,7 +62,8 @@ public class RootCloudIotDataEventSourceMocker {
event, event,
null); null);
producer.send(record);
Future<RecordMetadata> send = producer.send(record);
System.out.println(send.get());
Thread.sleep(DELAY); Thread.sleep(DELAY);
} }
@ -75,12 +80,24 @@ public class RootCloudIotDataEventSourceMocker {
private static Properties createKafkaProperties() { private static Properties createKafkaProperties() {
Properties kafkaProps = new Properties(); Properties kafkaProps = new Properties();
// 本地环境
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SASL_PLAINTEXT://localhost:9093");
// 测试环境 // 测试环境
//kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092");
//kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.29.115.145:9092");
// 正式环境 // 正式环境
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092");
//kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "1");
// 添加认证配置
kafkaProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
kafkaProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
kafkaProps.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";");
return kafkaProps; return kafkaProps;
} }
} }

4
root-cloud-statistics/src/main/resources/META-INF/app.properties

@ -1,5 +1,5 @@
app.id=root-cloud-model-hw-formatter app.id=root-cloud-model-hw-formatter
# ???? 8.135.8.221
# ???? 47.112.164.224
# test 8.135.8.221
# prod 47.112.164.224
apollo.meta=http://47.112.164.224:5000 apollo.meta=http://47.112.164.224:5000
Loading…
Cancel
Save