diff --git a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWaringDataReceivedEvent.java b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java similarity index 96% rename from ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWaringDataReceivedEvent.java rename to ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java index 5343252..f96a868 100644 --- a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWaringDataReceivedEvent.java +++ b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java @@ -12,7 +12,7 @@ import java.util.List; **/ @Data @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY, getterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) -public class AIRootCloudWaringDataReceivedEvent implements Serializable { +public class AIRootCloudWarningDataReceivedEvent implements Serializable { private static final long serialVersionUID = 1L; diff --git a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWaringDataReceivedEvent.java b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java similarity index 75% rename from ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWaringDataReceivedEvent.java rename to ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java index 0c79275..98c823f 100644 --- a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWaringDataReceivedEvent.java +++ b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java @@ -10,7 +10,7 @@ import java.util.List; * @date 2022/10/17 **/ @Data -public class AIWaringDataReceivedEvent implements Serializable { +public class AIWarningDataReceivedEvent implements Serializable { private static final long serialVersionUID = 1L; /** * 唯一标识 @@ -27,6 +27,26 @@ public class AIWaringDataReceivedEvent implements Serializable { */ private String machineIotMac; + /** + * 接受事件时间 + */ + private Long receivedTime; + + /** + * 摄像头id + */ + private Long cameraId; + + /** + * 摄像头所属区域 + */ + private String cameraPosition; + + /** + * 摄像头名称 + */ + private String cameraName; + /** * 告警列表 */ diff --git a/ai-root-cloud-statistics/dependency-reduced-pom.xml b/ai-root-cloud-statistics/dependency-reduced-pom.xml deleted file mode 100644 index 500a320..0000000 --- a/ai-root-cloud-statistics/dependency-reduced-pom.xml +++ /dev/null @@ -1,98 +0,0 @@ - - - - ai-root-cloud-waring-formatter - com.qniao - 0.0.1-SNAPSHOT - - 4.0.0 - ai-root-cloud-statistics - ai-root-cloud-statistics - 0.0.1-SNAPSHOT - ai-root-cloud-statistics - - - - maven-compiler-plugin - 3.1 - - ${target.java.version} - ${target.java.version} - - - - maven-shade-plugin - 3.1.1 - - - package - - shade - - - - - org.apache.flink:flink-shaded-force-shading - com.google.code.findbugs:jsr305 - org.slf4j:* - org.apache.logging.log4j:* - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - com.qniao.iot.rc.RootCloudIotDataFormatterJob - - - - - - - - - - - org.apache.logging.log4j - log4j-slf4j-impl - 2.17.2 - runtime - - - org.apache.logging.log4j - log4j-api - 2.17.2 - runtime - - - org.apache.logging.log4j - log4j-core - 2.17.2 - runtime - - - - - maven-releases - Nexus releases Repository - http://120.78.76.88:8081/repository/maven-snapshots/ - - - - 1.8 - 2.17.2 - 1.15.0 - ${target.java.version} - UTF-8 - ${target.java.version} - - diff --git a/ai-root-cloud-statistics/pom.xml b/ai-root-cloud-statistics/pom.xml index 20369c9..4a108dd 100644 --- a/ai-root-cloud-statistics/pom.xml +++ b/ai-root-cloud-statistics/pom.xml @@ -4,7 +4,7 @@ com.qniao - ai-root-cloud-waring-formatter + ai-root-cloud-warning-formatter 0.0.1-SNAPSHOT 4.0.0 diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java index 7202e24..5d38a47 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java @@ -21,13 +21,13 @@ package com.qniao.rootcloudstatistics; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; -import com.qniao.rootcloudevent.AIRootCloudWaringDataReceivedEvent; -import com.qniao.rootcloudevent.AIWaringDataReceivedEvent; +import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedEvent; +import com.qniao.rootcloudevent.AIWarningDataReceivedEvent; import com.qniao.rootcloudstatistics.config.ApolloConfig; import com.qniao.rootcloudstatistics.constant.ConfigConstant; import com.qniao.rootcloudstatistics.constant.DataSource; -import com.qniao.rootcloudstatistics.event.AIRootCloudWaringDataReceivedEventDeserializationSchema; -import com.qniao.rootcloudstatistics.event.AIWaringDataReceivedEventSerializationSchema; +import com.qniao.rootcloudstatistics.event.AIRootCloudWarningDataReceivedEventDeserializationSchema; +import com.qniao.rootcloudstatistics.event.AIWarningDataReceivedEventSerializationSchema; import com.qniao.rootcloudstatistics.until.OSSUtils; import com.qniao.rootcloudstatistics.until.SnowFlake; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -76,38 +76,38 @@ public class RootCloudIotDataFormatterJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); - KafkaSource source = KafkaSource.builder() + KafkaSource source = KafkaSource.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) - .setValueOnlyDeserializer(new AIRootCloudWaringDataReceivedEventDeserializationSchema()) + .setValueOnlyDeserializer(new AIRootCloudWarningDataReceivedEventDeserializationSchema()) .build(); // 把树根的数据转成我们自己的格式 - SingleOutputStreamOperator transformDs = env - .fromSource(source, WatermarkStrategy.noWatermarks(), "AIRootCloudWaringDataReceivedEvent Source") - .map((MapFunction) RootCloudIotDataFormatterJob::transform) - .name("Transform AIWaringDataReceivedEvent"); + SingleOutputStreamOperator transformDs = env + .fromSource(source, WatermarkStrategy.noWatermarks(), "AIRootCloudWarningDataReceivedEvent Source") + .map((MapFunction) RootCloudIotDataFormatterJob::transform) + .name("Transform AIWarningDataReceivedEvent"); // 写入kafka transformDs.sinkTo( - KafkaSink.builder() + KafkaSink.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS)) - .setValueSerializationSchema(new AIWaringDataReceivedEventSerializationSchema()) + .setValueSerializationSchema(new AIWarningDataReceivedEventSerializationSchema()) .build() ).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() - ).name("AIWaringDataReceivedEvent Sink"); + ).name("AIWarningDataReceivedEvent Sink"); // 发送到OSS存储 String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); - StreamingFileSink sink = StreamingFileSink.forRowFormat( + StreamingFileSink sink = StreamingFileSink.forRowFormat( new Path(outputPath), - new SimpleStringEncoder("UTF-8") + new SimpleStringEncoder("UTF-8") ).build(); transformDs.addSink(sink); @@ -115,19 +115,23 @@ public class RootCloudIotDataFormatterJob { } - private static AIWaringDataReceivedEvent transform(AIRootCloudWaringDataReceivedEvent event) { + private static AIWarningDataReceivedEvent transform(AIRootCloudWarningDataReceivedEvent event) { - - AIWaringDataReceivedEvent aiWaringDataReceivedEvent = new AIWaringDataReceivedEvent(); + AIWarningDataReceivedEvent aiWaringDataReceivedEvent = new AIWarningDataReceivedEvent(); if (Objects.nonNull(event) && Objects.nonNull(event.getPayload())) { aiWaringDataReceivedEvent.setId(snowflake.nextId()); aiWaringDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); aiWaringDataReceivedEvent.setMachineIotMac(event.getPayload().getDeviceId()); + aiWaringDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); if (Objects.nonNull(event.getPayload().getInfo()) && CollUtil.isNotEmpty(event.getPayload().getInfo().getAlarmList())) { - List alarmList = new ArrayList<>(); + AIRootCloudWarningDataReceivedEvent.Info info = event.getPayload().getInfo(); + aiWaringDataReceivedEvent.setCameraId(info.getCameraId()); + aiWaringDataReceivedEvent.setCameraName(info.getCameraName()); + aiWaringDataReceivedEvent.setCameraPosition(info.getCameraPosition()); + List alarmList = new ArrayList<>(); event.getPayload().getInfo().getAlarmList().forEach(a -> { - AIWaringDataReceivedEvent.Alarm alarm = Convert.convert(AIWaringDataReceivedEvent.Alarm.class, a); + AIWarningDataReceivedEvent.Alarm alarm = Convert.convert(AIWarningDataReceivedEvent.Alarm.class, a); alarm.setPicUrl(OSSUtils.getFileUrl(a.getPicUrl())); alarm.setThumbnail(OSSUtils.getFileUrl(a.getThumbnail())); alarmList.add(alarm); diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWaringDataReceivedEventDeserializationSchema.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWaringDataReceivedEventDeserializationSchema.java deleted file mode 100644 index 7e43c73..0000000 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWaringDataReceivedEventDeserializationSchema.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.qniao.rootcloudstatistics.event; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.qniao.rootcloudevent.AIRootCloudWaringDataReceivedEvent; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -import java.io.IOException; - -/** - * @author Lzk - */ -public class AIRootCloudWaringDataReceivedEventDeserializationSchema implements DeserializationSchema { - /** - * 注册JavaTimeModule,支持LocalDateTime字段的解析 - */ - final private ObjectMapper objectMapper = new ObjectMapper(); - - @Override - public AIRootCloudWaringDataReceivedEvent deserialize(byte[] message) throws IOException { - return objectMapper.readValue(message, AIRootCloudWaringDataReceivedEvent.class); - } - - @Override - public boolean isEndOfStream(AIRootCloudWaringDataReceivedEvent nextElement) { - return false; - } - - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(AIRootCloudWaringDataReceivedEvent.class); - } -} diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWarningDataReceivedEventDeserializationSchema.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWarningDataReceivedEventDeserializationSchema.java new file mode 100644 index 0000000..3e77193 --- /dev/null +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWarningDataReceivedEventDeserializationSchema.java @@ -0,0 +1,33 @@ +package com.qniao.rootcloudstatistics.event; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedEvent; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +/** + * @author Lzk + */ +public class AIRootCloudWarningDataReceivedEventDeserializationSchema implements DeserializationSchema { + /** + * 注册JavaTimeModule,支持LocalDateTime字段的解析 + */ + final private ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public AIRootCloudWarningDataReceivedEvent deserialize(byte[] message) throws IOException { + return objectMapper.readValue(message, AIRootCloudWarningDataReceivedEvent.class); + } + + @Override + public boolean isEndOfStream(AIRootCloudWarningDataReceivedEvent nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(AIRootCloudWarningDataReceivedEvent.class); + } +} diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWaringDataReceivedEventSerializationSchema.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWarningDataReceivedEventSerializationSchema.java similarity index 72% rename from ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWaringDataReceivedEventSerializationSchema.java rename to ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWarningDataReceivedEventSerializationSchema.java index 9f221a1..52c47d5 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWaringDataReceivedEventSerializationSchema.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWarningDataReceivedEventSerializationSchema.java @@ -1,6 +1,6 @@ package com.qniao.rootcloudstatistics.event; -import com.qniao.rootcloudevent.AIWaringDataReceivedEvent; +import com.qniao.rootcloudevent.AIWarningDataReceivedEvent; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -9,11 +9,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap /** * @author Lzk */ -public class AIWaringDataReceivedEventSerializationSchema implements SerializationSchema { +public class AIWarningDataReceivedEventSerializationSchema implements SerializationSchema { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @Override - public byte[] serialize(AIWaringDataReceivedEvent event) { + public byte[] serialize(AIWarningDataReceivedEvent event) { try { return OBJECT_MAPPER.writeValueAsBytes(event); } catch (JsonProcessingException e) { diff --git a/ai-root-cloud-statistics/src/main/resources/META-INF/app.properties b/ai-root-cloud-statistics/src/main/resources/META-INF/app.properties index 1206951..124ee06 100644 --- a/ai-root-cloud-statistics/src/main/resources/META-INF/app.properties +++ b/ai-root-cloud-statistics/src/main/resources/META-INF/app.properties @@ -1,4 +1,4 @@ -app.id=ai-root-cloud-waring-formatter +app.id=ai-root-cloud-warning-formatter # ???? 8.135.8.221 # ???? 47.112.164.224 diff --git a/pom.xml b/pom.xml index f377853..192c109 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,8 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 com.qniao - ai-root-cloud-waring-formatter + ai-root-cloud-warning-formatter + ai-root-cloud-warning-formatter 0.0.1-SNAPSHOT pom