Browse Source

first commit

master
lizhongkang@qniao.cn 3 years ago
parent
commit
c9033ed17b
22 changed files with 1192 additions and 0 deletions
  1. 31
      ai-root-cloud-event/.gitignore
  2. 78
      ai-root-cloud-event/pom.xml
  3. 100
      ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWaringDataReceivedEvent.java
  4. 65
      ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWaringDataReceivedEvent.java
  5. 31
      ai-root-cloud-statistics/.gitignore
  6. 118
      ai-root-cloud-statistics/.mvn/wrapper/MavenWrapperDownloader.java
  7. BIN
      ai-root-cloud-statistics/.mvn/wrapper/maven-wrapper.jar
  8. 2
      ai-root-cloud-statistics/.mvn/wrapper/maven-wrapper.properties
  9. 4
      ai-root-cloud-statistics/README.md
  10. 98
      ai-root-cloud-statistics/dependency-reduced-pom.xml
  11. 211
      ai-root-cloud-statistics/pom.xml
  12. 140
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java
  13. 19
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/config/ApolloConfig.java
  14. 20
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java
  15. 10
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/DataSource.java
  16. 33
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWaringDataReceivedEventDeserializationSchema.java
  17. 23
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWaringDataReceivedEventSerializationSchema.java
  18. 62
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/until/OSSUtils.java
  19. 103
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/until/SnowFlake.java
  20. 5
      ai-root-cloud-statistics/src/main/resources/META-INF/app.properties
  21. 25
      ai-root-cloud-statistics/src/main/resources/log4j2.properties
  22. 14
      pom.xml

31
ai-root-cloud-event/.gitignore

@ -0,0 +1,31 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
### VS Code ###
.vscode/

78
ai-root-cloud-event/pom.xml

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qniao</groupId>
<artifactId>ai-root-cloud-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ai-root-cloud-event</name>
<description>ai-root-cloud-event</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.0</flink.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
<lombok.version>1.18.24</lombok.version>
<jackson.version>2.13.3</jackson.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>maven-releases</id>
<name>Nexus releases Repository</name>
<url>http://120.78.76.88:8081/repository/maven-snapshots/</url>
</repository>
</distributionManagement>
</project>

100
ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWaringDataReceivedEvent.java

@ -0,0 +1,100 @@
package com.qniao.rootcloudevent;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* @author Lzk
* @date 2022/10/17
**/
@Data
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY, getterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class AIRootCloudWaringDataReceivedEvent implements Serializable {
private static final long serialVersionUID = 1L;
private String payloadId;
private Long eventId;
private Payload payload;
private Long publishTimestamp;
private String tenantId;
private String action;
private Long timestamp;
@Data
public static class Payload {
private String code;
private Long cts;
private Long oriTs;
private Long tgTs;
private String eventType;
/**
* 设备的UUID
*/
private String deviceId;
private Info info;
private Long ts;
}
@Data
public static class Info {
/**
* 摄像头id
*/
private Long cameraId;
/**
* 摄像头所属区域
*/
private String cameraPosition;
private List<Alarm> alarmList;
private String cameraName;
}
@Data
public static class Alarm {
/**
* 图片地址
*/
private String picUrl;
/**
* 缩略图
*/
private String thumbnail;
private Extention extention;
/**
* 告警时间
*/
private Long alarmTime;
/**
* 2 = 人员逗留
* 5 = 人员入侵
* 9 = 吸烟
* 10 = 火苗烟雾
*/
private Integer aiType;
private Long alarmId;
/**
* 告警级别 1:严重;2:普通;3:通知;4:仅记录;
*/
private Integer alarmLevel;
private Long channelId;
private String events;
}
@Data
public static class Extention {
}
}

65
ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWaringDataReceivedEvent.java

@ -0,0 +1,65 @@
package com.qniao.rootcloudevent;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* @author Lzk
* @date 2022/10/17
**/
@Data
public class AIWaringDataReceivedEvent implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 唯一标识
*/
private Long id;
/**
* 数据来源 1-根云
*/
private Integer dataSource;
/**
* 设备物联地址(云盒物理标识)
*/
private String machineIotMac;
/**
* 告警列表
*/
private List<Alarm> alarmList;
@Data
public static class Alarm {
/**
* 图片地址
*/
private String picUrl;
/**
* 缩略图
*/
private String thumbnail;
/**
* 告警时间
*/
private Long alarmTime;
/**
* 2 = 人员逗留
* 5 = 人员入侵
* 9 = 吸烟
* 10 = 火苗烟雾
*/
private Integer aiType;
/**
* 告警级别 1:严重;2:普通;3:通知;4:仅记录;
*/
private Integer alarmLevel;
}
}

31
ai-root-cloud-statistics/.gitignore

@ -0,0 +1,31 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
### VS Code ###
.vscode/

118
ai-root-cloud-statistics/.mvn/wrapper/MavenWrapperDownloader.java

@ -0,0 +1,118 @@
/*
* Copyright 2007-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.net.*;
import java.io.*;
import java.nio.channels.*;
import java.util.Properties;
public class MavenWrapperDownloader {
private static final String WRAPPER_VERSION = "0.5.6";
/**
* Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
*/
private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/"
+ WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar";
/**
* Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
* use instead of the default one.
*/
private static final String MAVEN_WRAPPER_PROPERTIES_PATH =
".mvn/wrapper/maven-wrapper.properties";
/**
* Path where the maven-wrapper.jar will be saved to.
*/
private static final String MAVEN_WRAPPER_JAR_PATH =
".mvn/wrapper/maven-wrapper.jar";
/**
* Name of the property which should be used to override the default download url for the wrapper.
*/
private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl";
public static void main(String args[]) {
System.out.println("- Downloader started");
File baseDirectory = new File(args[0]);
System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath());
// If the maven-wrapper.properties exists, read it and check if it contains a custom
// wrapperUrl parameter.
File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH);
String url = DEFAULT_DOWNLOAD_URL;
if (mavenWrapperPropertyFile.exists()) {
FileInputStream mavenWrapperPropertyFileInputStream = null;
try {
mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile);
Properties mavenWrapperProperties = new Properties();
mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream);
url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url);
} catch (IOException e) {
System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'");
} finally {
try {
if (mavenWrapperPropertyFileInputStream != null) {
mavenWrapperPropertyFileInputStream.close();
}
} catch (IOException e) {
// Ignore ...
}
}
}
System.out.println("- Downloading from: " + url);
File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH);
if (!outputFile.getParentFile().exists()) {
if (!outputFile.getParentFile().mkdirs()) {
System.out.println(
"- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'");
}
}
System.out.println("- Downloading to: " + outputFile.getAbsolutePath());
try {
downloadFileFromURL(url, outputFile);
System.out.println("Done");
System.exit(0);
} catch (Throwable e) {
System.out.println("- Error downloading");
e.printStackTrace();
System.exit(1);
}
}
private static void downloadFileFromURL(String urlString, File destination) throws Exception {
if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) {
String username = System.getenv("MVNW_USERNAME");
char[] password = System.getenv("MVNW_PASSWORD").toCharArray();
Authenticator.setDefault(new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(username, password);
}
});
}
URL website = new URL(urlString);
ReadableByteChannel rbc;
rbc = Channels.newChannel(website.openStream());
FileOutputStream fos = new FileOutputStream(destination);
fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
fos.close();
rbc.close();
}
}

BIN
ai-root-cloud-statistics/.mvn/wrapper/maven-wrapper.jar

2
ai-root-cloud-statistics/.mvn/wrapper/maven-wrapper.properties

@ -0,0 +1,2 @@
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip
wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar

4
ai-root-cloud-statistics/README.md

@ -0,0 +1,4 @@
# 工程简介
# 延伸阅读

98
ai-root-cloud-statistics/dependency-reduced-pom.xml

@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>ai-root-cloud-waring-formatter</artifactId>
<groupId>com.qniao</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ai-root-cloud-statistics</artifactId>
<name>ai-root-cloud-statistics</name>
<version>0.0.1-SNAPSHOT</version>
<description>ai-root-cloud-statistics</description>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer />
<transformer>
<mainClass>com.qniao.iot.rc.RootCloudIotDataFormatterJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>maven-releases</id>
<name>Nexus releases Repository</name>
<url>http://120.78.76.88:8081/repository/maven-snapshots/</url>
</repository>
</distributionManagement>
<properties>
<target.java.version>1.8</target.java.version>
<log4j.version>2.17.2</log4j.version>
<flink.version>1.15.0</flink.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
</properties>
</project>

211
ai-root-cloud-statistics/pom.xml

@ -0,0 +1,211 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.qniao</groupId>
<artifactId>ai-root-cloud-waring-formatter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ai-root-cloud-statistics</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>ai-root-cloud-statistics</name>
<description>ai-root-cloud-statistics</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.0</flink.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>ai-root-cloud-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- 阿里OSS对象存储 -->
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>2.8.3</version>
</dependency>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-command</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-constant</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-state-event-generator-job</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.12</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.4</version>
</dependency>
<!-- apollo -->
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-core</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.qniao.iot.rc.RootCloudIotDataFormatterJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>maven-releases</id>
<name>Nexus releases Repository</name>
<url>http://120.78.76.88:8081/repository/maven-snapshots/</url>
</repository>
</distributionManagement>
</project>

140
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.qniao.rootcloudstatistics;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import com.qniao.rootcloudevent.AIRootCloudWaringDataReceivedEvent;
import com.qniao.rootcloudevent.AIWaringDataReceivedEvent;
import com.qniao.rootcloudstatistics.config.ApolloConfig;
import com.qniao.rootcloudstatistics.constant.ConfigConstant;
import com.qniao.rootcloudstatistics.constant.DataSource;
import com.qniao.rootcloudstatistics.event.AIRootCloudWaringDataReceivedEventDeserializationSchema;
import com.qniao.rootcloudstatistics.event.AIWaringDataReceivedEventSerializationSchema;
import com.qniao.rootcloudstatistics.until.OSSUtils;
import com.qniao.rootcloudstatistics.until.SnowFlake;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* Skeleton for a Flink DataStream Job.
*
* <p>For a tutorial how to write a Flink application, check the
* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
* <p>If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
*
* @author Lzk
*/
public class RootCloudIotDataFormatterJob {
static SnowFlake snowflake = new SnowFlake(
Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)),
Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID))
);
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<AIRootCloudWaringDataReceivedEvent> source = KafkaSource.<AIRootCloudWaringDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new AIRootCloudWaringDataReceivedEventDeserializationSchema())
.build();
// 把树根的数据转成我们自己的格式
SingleOutputStreamOperator<AIWaringDataReceivedEvent> transformDs = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "AIRootCloudWaringDataReceivedEvent Source")
.map((MapFunction<AIRootCloudWaringDataReceivedEvent, AIWaringDataReceivedEvent>) RootCloudIotDataFormatterJob::transform)
.name("Transform AIWaringDataReceivedEvent");
// 写入kafka
transformDs.sinkTo(
KafkaSink.<AIWaringDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS))
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS))
.setValueSerializationSchema(new AIWaringDataReceivedEventSerializationSchema())
.build()
).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
).name("AIWaringDataReceivedEvent Sink");
// 发送到OSS存储
String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH);
StreamingFileSink<AIWaringDataReceivedEvent> sink = StreamingFileSink.forRowFormat(
new Path(outputPath),
new SimpleStringEncoder<AIWaringDataReceivedEvent>("UTF-8")
).build();
transformDs.addSink(sink);
env.execute("ai root cloud waring data formatter job");
}
private static AIWaringDataReceivedEvent transform(AIRootCloudWaringDataReceivedEvent event) {
AIWaringDataReceivedEvent aiWaringDataReceivedEvent = new AIWaringDataReceivedEvent();
if (Objects.nonNull(event) && Objects.nonNull(event.getPayload())) {
aiWaringDataReceivedEvent.setId(snowflake.nextId());
aiWaringDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD);
aiWaringDataReceivedEvent.setMachineIotMac(event.getPayload().getDeviceId());
if (Objects.nonNull(event.getPayload().getInfo()) && CollUtil.isNotEmpty(event.getPayload().getInfo().getAlarmList())) {
List<AIWaringDataReceivedEvent.Alarm> alarmList = new ArrayList<>();
event.getPayload().getInfo().getAlarmList().forEach(a -> {
AIWaringDataReceivedEvent.Alarm alarm = Convert.convert(AIWaringDataReceivedEvent.Alarm.class, a);
alarm.setPicUrl(OSSUtils.getFileUrl(a.getPicUrl()));
alarm.setThumbnail(OSSUtils.getFileUrl(a.getThumbnail()));
alarmList.add(alarm);
});
aiWaringDataReceivedEvent.setAlarmList(alarmList);
}
}
return aiWaringDataReceivedEvent;
}
}

19
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/config/ApolloConfig.java

@ -0,0 +1,19 @@
package com.qniao.rootcloudstatistics.config;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
public class ApolloConfig {
private static final Config config = ConfigService.getAppConfig();
public static String get(String key, String defaultValue) {
return config.getProperty(key, defaultValue);
}
public static String get(String key) {
return config.getProperty(key, null);
}
}

20
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java

@ -0,0 +1,20 @@
package com.qniao.rootcloudstatistics.constant;
public interface ConfigConstant {
String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers";
String SOURCE_KAFKA_TOPICS = "source.kafka.topics";
String SOURCE_KAFKA_GROUPID = "source.kafka.groupId";
String SINK_KAFKA_BOOTSTRAP_SERVERS = "sink.kafka.bootstrap.servers";
String SINK_KAFKA_TOPICS = "sink.kafka.topics";
String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id";
String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id";
String SINK_OSS_PATH = "sink.oss.path";
}

10
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/DataSource.java

@ -0,0 +1,10 @@
package com.qniao.rootcloudstatistics.constant;
/**
* @author Lzk
* @date 2022/10/17
**/
public interface DataSource {
Integer ROOT_CLOUD = 1;
Integer TACT_CLOUD = 0;
}

33
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWaringDataReceivedEventDeserializationSchema.java

@ -0,0 +1,33 @@
package com.qniao.rootcloudstatistics.event;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qniao.rootcloudevent.AIRootCloudWaringDataReceivedEvent;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
/**
* @author Lzk
*/
public class AIRootCloudWaringDataReceivedEventDeserializationSchema implements DeserializationSchema<AIRootCloudWaringDataReceivedEvent> {
/**
* 注册JavaTimeModule支持LocalDateTime字段的解析
*/
final private ObjectMapper objectMapper = new ObjectMapper();
@Override
public AIRootCloudWaringDataReceivedEvent deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, AIRootCloudWaringDataReceivedEvent.class);
}
@Override
public boolean isEndOfStream(AIRootCloudWaringDataReceivedEvent nextElement) {
return false;
}
@Override
public TypeInformation<AIRootCloudWaringDataReceivedEvent> getProducedType() {
return TypeInformation.of(AIRootCloudWaringDataReceivedEvent.class);
}
}

23
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWaringDataReceivedEventSerializationSchema.java

@ -0,0 +1,23 @@
package com.qniao.rootcloudstatistics.event;
import com.qniao.rootcloudevent.AIWaringDataReceivedEvent;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author Lzk
*/
public class AIWaringDataReceivedEventSerializationSchema implements SerializationSchema<AIWaringDataReceivedEvent> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
public byte[] serialize(AIWaringDataReceivedEvent event) {
try {
return OBJECT_MAPPER.writeValueAsBytes(event);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + event, e);
}
}
}

62
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/until/OSSUtils.java

@ -0,0 +1,62 @@
package com.qniao.rootcloudstatistics.until;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClient;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @author Lzk
* @date 2022/10/17
**/
public class OSSUtils {
// Endpoint以杭州为例其它Region请按实际情况填写
private static final String endpoint = "http://oss-cn-shenzhen.aliyuncs.com";
// 阿里云主账号AccessKey拥有所有API的访问权限风险很高强烈建议您创建并使用RAM账号进行API访问或日常运维请登录 https://ram.console.aliyun.com 创建RAM账号
private static final String accessKeyId = "LTAINmC91NqIGN38";
private static final String accessKeySecret = "Hh10dQPjq1jMLLSpbDAR05ZzR3nXsU";
private static final Map<String, String> type_contentType = new HashMap<String, String>() {
{
put("gif", "image/gif");
put("bmp", "image/bmp");
put("webp", "image/webp");
put("jpg", "image/jpg");
put("png", "multipart/form-data");
put("txt", "text/plain");
put("mp4", "video/mpeg4");
put("html", "text/html");
put("pdf", "application/pdf");
}
};
public static String getFileUrl(String url) {
String[] list = url.split("\\.");
String type = list[list.length - 1];
String folderName = "cloudprint";
String bucketName = "qncloudprintfiletest";
String objectName = folderName + "/" + UUID.randomUUID() + "." + type;
OSS ossClient = new OSSClient(endpoint, accessKeyId, accessKeySecret);
// 填写网络流地址
InputStream inputStream;
try {
inputStream = new URL(url).openStream();
// 依次填写Bucket名称例如examplebucket和Object完整路径例如exampledir/exampleobject.txtObject完整路径中不能包含Bucket名称
ossClient.putObject(bucketName, objectName, inputStream);
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭OSSClient
ossClient.shutdown();
}
return "https://" + bucketName + ".oss-cn-shenzhen.aliyuncs.com/" + objectName;
}
}

103
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/until/SnowFlake.java

@ -0,0 +1,103 @@
package com.qniao.rootcloudstatistics.until;
/**
* @description: Twitter的分布式自增ID雪花算法snowflake
* @author: zp
* @date: 2019-10-29 10:05
*/
public class SnowFlake {
/**
* 起始的时间戳
*/
private final static long START_STMP = 1480166465631L;
/**
* 每一部分占用的位数
*/
private final static long SEQUENCE_BIT = 12; //序列号占用的位数
private final static long MACHINE_BIT = 5; //机器标识占用的位数
private final static long DATACENTER_BIT = 5;//数据中心占用的位数
/**
* 每一部分的最大值
*/
private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);
private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);
/**
* 每一部分向左的位移
*/
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;
private long datacenterId = 1L; //数据中心
private long machineId = 1L; //机器标识
private long sequence = 0L; //序列号
private long lastStmp = -1L;//上一次时间戳
// public SnowFlake(){
// }
public SnowFlake(long datacenterId,
long machineId) {
if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {
throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");
}
if (machineId > MAX_MACHINE_NUM || machineId < 0) {
throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
}
this.datacenterId = datacenterId;
this.machineId = machineId;
}
/**
* 产生下一个ID
*
* @return
*/
public synchronized Long nextId() {
long currStmp = getNewstmp();
if (currStmp < lastStmp) {
throw new RuntimeException("Clock moved backwards. Refusing to generate id");
}
if (currStmp == lastStmp) {
//相同毫秒内序列号自增
sequence = (sequence + 1) & MAX_SEQUENCE;
//同一毫秒的序列数已经达到最大
if (sequence == 0L) {
currStmp = getNextMill();
}
} else {
//不同毫秒内序列号置为0
sequence = 0L;
}
lastStmp = currStmp;
return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分
| datacenterId << DATACENTER_LEFT //数据中心部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //序列号部分
}
private long getNextMill() {
long mill = getNewstmp();
while (mill <= lastStmp) {
mill = getNewstmp();
}
return mill;
}
private long getNewstmp() {
return System.currentTimeMillis();
}
public static void main(String[] args) {
SnowFlake s = new SnowFlake(1, 1);
System.out.println(s.nextId());
}
}

5
ai-root-cloud-statistics/src/main/resources/META-INF/app.properties

@ -0,0 +1,5 @@
app.id=ai-root-cloud-waring-formatter
# ???? 8.135.8.221
# ???? 47.112.164.224
apollo.meta=http://47.112.164.224:5000

25
ai-root-cloud-statistics/src/main/resources/log4j2.properties

@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

14
pom.xml

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qniao</groupId>
<artifactId>ai-root-cloud-waring-formatter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>ai-root-cloud-event</module>
<module>ai-root-cloud-statistics</module>
</modules>
</project>
Loading…
Cancel
Save