Browse Source

改名

master
lizhongkang@qniao.cn 3 years ago
parent
commit
f4c3070297
10 changed files with 86 additions and 159 deletions
  1. 2
      ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java
  2. 22
      ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java
  3. 98
      ai-root-cloud-statistics/dependency-reduced-pom.xml
  4. 2
      ai-root-cloud-statistics/pom.xml
  5. 44
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java
  6. 33
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWaringDataReceivedEventDeserializationSchema.java
  7. 33
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWarningDataReceivedEventDeserializationSchema.java
  8. 6
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWarningDataReceivedEventSerializationSchema.java
  9. 2
      ai-root-cloud-statistics/src/main/resources/META-INF/app.properties
  10. 3
      pom.xml

ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWaringDataReceivedEvent.java → ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java

@ -12,7 +12,7 @@ import java.util.List;
**/
@Data
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY, getterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
public class AIRootCloudWaringDataReceivedEvent implements Serializable {
public class AIRootCloudWarningDataReceivedEvent implements Serializable {
private static final long serialVersionUID = 1L;

ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWaringDataReceivedEvent.java → ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java

@ -10,7 +10,7 @@ import java.util.List;
* @date 2022/10/17
**/
@Data
public class AIWaringDataReceivedEvent implements Serializable {
public class AIWarningDataReceivedEvent implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 唯一标识
@ -27,6 +27,26 @@ public class AIWaringDataReceivedEvent implements Serializable {
*/
private String machineIotMac;
/**
* 接受事件时间
*/
private Long receivedTime;
/**
* 摄像头id
*/
private Long cameraId;
/**
* 摄像头所属区域
*/
private String cameraPosition;
/**
* 摄像头名称
*/
private String cameraName;
/**
* 告警列表
*/

98
ai-root-cloud-statistics/dependency-reduced-pom.xml

@ -1,98 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>ai-root-cloud-waring-formatter</artifactId>
<groupId>com.qniao</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ai-root-cloud-statistics</artifactId>
<name>ai-root-cloud-statistics</name>
<version>0.0.1-SNAPSHOT</version>
<description>ai-root-cloud-statistics</description>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer />
<transformer>
<mainClass>com.qniao.iot.rc.RootCloudIotDataFormatterJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<distributionManagement>
<repository>
<id>maven-releases</id>
<name>Nexus releases Repository</name>
<url>http://120.78.76.88:8081/repository/maven-snapshots/</url>
</repository>
</distributionManagement>
<properties>
<target.java.version>1.8</target.java.version>
<log4j.version>2.17.2</log4j.version>
<flink.version>1.15.0</flink.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
</properties>
</project>

2
ai-root-cloud-statistics/pom.xml

@ -4,7 +4,7 @@
<parent>
<groupId>com.qniao</groupId>
<artifactId>ai-root-cloud-waring-formatter</artifactId>
<artifactId>ai-root-cloud-warning-formatter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

44
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java

@ -21,13 +21,13 @@ package com.qniao.rootcloudstatistics;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import com.qniao.rootcloudevent.AIRootCloudWaringDataReceivedEvent;
import com.qniao.rootcloudevent.AIWaringDataReceivedEvent;
import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedEvent;
import com.qniao.rootcloudevent.AIWarningDataReceivedEvent;
import com.qniao.rootcloudstatistics.config.ApolloConfig;
import com.qniao.rootcloudstatistics.constant.ConfigConstant;
import com.qniao.rootcloudstatistics.constant.DataSource;
import com.qniao.rootcloudstatistics.event.AIRootCloudWaringDataReceivedEventDeserializationSchema;
import com.qniao.rootcloudstatistics.event.AIWaringDataReceivedEventSerializationSchema;
import com.qniao.rootcloudstatistics.event.AIRootCloudWarningDataReceivedEventDeserializationSchema;
import com.qniao.rootcloudstatistics.event.AIWarningDataReceivedEventSerializationSchema;
import com.qniao.rootcloudstatistics.until.OSSUtils;
import com.qniao.rootcloudstatistics.until.SnowFlake;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@ -76,38 +76,38 @@ public class RootCloudIotDataFormatterJob {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<AIRootCloudWaringDataReceivedEvent> source = KafkaSource.<AIRootCloudWaringDataReceivedEvent>builder()
KafkaSource<AIRootCloudWarningDataReceivedEvent> source = KafkaSource.<AIRootCloudWarningDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setValueOnlyDeserializer(new AIRootCloudWaringDataReceivedEventDeserializationSchema())
.setValueOnlyDeserializer(new AIRootCloudWarningDataReceivedEventDeserializationSchema())
.build();
// 把树根的数据转成我们自己的格式
SingleOutputStreamOperator<AIWaringDataReceivedEvent> transformDs = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "AIRootCloudWaringDataReceivedEvent Source")
.map((MapFunction<AIRootCloudWaringDataReceivedEvent, AIWaringDataReceivedEvent>) RootCloudIotDataFormatterJob::transform)
.name("Transform AIWaringDataReceivedEvent");
SingleOutputStreamOperator<AIWarningDataReceivedEvent> transformDs = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "AIRootCloudWarningDataReceivedEvent Source")
.map((MapFunction<AIRootCloudWarningDataReceivedEvent, AIWarningDataReceivedEvent>) RootCloudIotDataFormatterJob::transform)
.name("Transform AIWarningDataReceivedEvent");
// 写入kafka
transformDs.sinkTo(
KafkaSink.<AIWaringDataReceivedEvent>builder()
KafkaSink.<AIWarningDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS))
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS))
.setValueSerializationSchema(new AIWaringDataReceivedEventSerializationSchema())
.setValueSerializationSchema(new AIWarningDataReceivedEventSerializationSchema())
.build()
).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
).name("AIWaringDataReceivedEvent Sink");
).name("AIWarningDataReceivedEvent Sink");
// 发送到OSS存储
String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH);
StreamingFileSink<AIWaringDataReceivedEvent> sink = StreamingFileSink.forRowFormat(
StreamingFileSink<AIWarningDataReceivedEvent> sink = StreamingFileSink.forRowFormat(
new Path(outputPath),
new SimpleStringEncoder<AIWaringDataReceivedEvent>("UTF-8")
new SimpleStringEncoder<AIWarningDataReceivedEvent>("UTF-8")
).build();
transformDs.addSink(sink);
@ -115,19 +115,23 @@ public class RootCloudIotDataFormatterJob {
}
private static AIWaringDataReceivedEvent transform(AIRootCloudWaringDataReceivedEvent event) {
private static AIWarningDataReceivedEvent transform(AIRootCloudWarningDataReceivedEvent event) {
AIWaringDataReceivedEvent aiWaringDataReceivedEvent = new AIWaringDataReceivedEvent();
AIWarningDataReceivedEvent aiWaringDataReceivedEvent = new AIWarningDataReceivedEvent();
if (Objects.nonNull(event) && Objects.nonNull(event.getPayload())) {
aiWaringDataReceivedEvent.setId(snowflake.nextId());
aiWaringDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD);
aiWaringDataReceivedEvent.setMachineIotMac(event.getPayload().getDeviceId());
aiWaringDataReceivedEvent.setReceivedTime(System.currentTimeMillis());
if (Objects.nonNull(event.getPayload().getInfo()) && CollUtil.isNotEmpty(event.getPayload().getInfo().getAlarmList())) {
List<AIWaringDataReceivedEvent.Alarm> alarmList = new ArrayList<>();
AIRootCloudWarningDataReceivedEvent.Info info = event.getPayload().getInfo();
aiWaringDataReceivedEvent.setCameraId(info.getCameraId());
aiWaringDataReceivedEvent.setCameraName(info.getCameraName());
aiWaringDataReceivedEvent.setCameraPosition(info.getCameraPosition());
List<AIWarningDataReceivedEvent.Alarm> alarmList = new ArrayList<>();
event.getPayload().getInfo().getAlarmList().forEach(a -> {
AIWaringDataReceivedEvent.Alarm alarm = Convert.convert(AIWaringDataReceivedEvent.Alarm.class, a);
AIWarningDataReceivedEvent.Alarm alarm = Convert.convert(AIWarningDataReceivedEvent.Alarm.class, a);
alarm.setPicUrl(OSSUtils.getFileUrl(a.getPicUrl()));
alarm.setThumbnail(OSSUtils.getFileUrl(a.getThumbnail()));
alarmList.add(alarm);

33
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWaringDataReceivedEventDeserializationSchema.java

@ -1,33 +0,0 @@
package com.qniao.rootcloudstatistics.event;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qniao.rootcloudevent.AIRootCloudWaringDataReceivedEvent;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
/**
* @author Lzk
*/
public class AIRootCloudWaringDataReceivedEventDeserializationSchema implements DeserializationSchema<AIRootCloudWaringDataReceivedEvent> {
/**
* 注册JavaTimeModule支持LocalDateTime字段的解析
*/
final private ObjectMapper objectMapper = new ObjectMapper();
@Override
public AIRootCloudWaringDataReceivedEvent deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, AIRootCloudWaringDataReceivedEvent.class);
}
@Override
public boolean isEndOfStream(AIRootCloudWaringDataReceivedEvent nextElement) {
return false;
}
@Override
public TypeInformation<AIRootCloudWaringDataReceivedEvent> getProducedType() {
return TypeInformation.of(AIRootCloudWaringDataReceivedEvent.class);
}
}

33
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWarningDataReceivedEventDeserializationSchema.java

@ -0,0 +1,33 @@
package com.qniao.rootcloudstatistics.event;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedEvent;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
/**
* @author Lzk
*/
public class AIRootCloudWarningDataReceivedEventDeserializationSchema implements DeserializationSchema<AIRootCloudWarningDataReceivedEvent> {
/**
* 注册JavaTimeModule支持LocalDateTime字段的解析
*/
final private ObjectMapper objectMapper = new ObjectMapper();
@Override
public AIRootCloudWarningDataReceivedEvent deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, AIRootCloudWarningDataReceivedEvent.class);
}
@Override
public boolean isEndOfStream(AIRootCloudWarningDataReceivedEvent nextElement) {
return false;
}
@Override
public TypeInformation<AIRootCloudWarningDataReceivedEvent> getProducedType() {
return TypeInformation.of(AIRootCloudWarningDataReceivedEvent.class);
}
}

ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWaringDataReceivedEventSerializationSchema.java → ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWarningDataReceivedEventSerializationSchema.java

@ -1,6 +1,6 @@
package com.qniao.rootcloudstatistics.event;
import com.qniao.rootcloudevent.AIWaringDataReceivedEvent;
import com.qniao.rootcloudevent.AIWarningDataReceivedEvent;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@ -9,11 +9,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
/**
* @author Lzk
*/
public class AIWaringDataReceivedEventSerializationSchema implements SerializationSchema<AIWaringDataReceivedEvent> {
public class AIWarningDataReceivedEventSerializationSchema implements SerializationSchema<AIWarningDataReceivedEvent> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
public byte[] serialize(AIWaringDataReceivedEvent event) {
public byte[] serialize(AIWarningDataReceivedEvent event) {
try {
return OBJECT_MAPPER.writeValueAsBytes(event);
} catch (JsonProcessingException e) {

2
ai-root-cloud-statistics/src/main/resources/META-INF/app.properties

@ -1,4 +1,4 @@
app.id=ai-root-cloud-waring-formatter
app.id=ai-root-cloud-warning-formatter
# ???? 8.135.8.221
# ???? 47.112.164.224

3
pom.xml

@ -3,7 +3,8 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qniao</groupId>
<artifactId>ai-root-cloud-waring-formatter</artifactId>
<artifactId>ai-root-cloud-warning-formatter</artifactId>
<name>ai-root-cloud-warning-formatter</name>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>

Loading…
Cancel
Save