From 1fa4de7d9720da43c5a147054a3a5fd32d6c26a7 Mon Sep 17 00:00:00 2001 From: lizhongkang Date: Wed, 19 Oct 2022 00:13:09 +0800 Subject: [PATCH] bug --- ...CloudWarningDataReceivedPayLoadEvent.java} | 62 ++++++++----------- ai-root-cloud-statistics/pom.xml | 26 +------- .../RootCloudIotDataFormatterJob.java | 29 +++++---- .../constant/ConfigConstant.java | 10 +-- ...ataReceivedEventDeserializationSchema.java | 14 ++--- 5 files changed, 50 insertions(+), 91 deletions(-) rename ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/{AIRootCloudWarningDataReceivedEvent.java => AIRootCloudWarningDataReceivedPayLoadEvent.java} (61%) diff --git a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedPayLoadEvent.java similarity index 61% rename from ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java rename to ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedPayLoadEvent.java index 36c15fe..08bda88 100644 --- a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedEvent.java +++ b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWarningDataReceivedPayLoadEvent.java @@ -1,7 +1,8 @@ package com.qniao.rootcloudevent; import com.fasterxml.jackson.annotation.JsonAutoDetect; -import lombok.AllArgsConstructor; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; import lombok.NoArgsConstructor; @@ -13,48 +14,36 @@ import java.util.List; * @date 2022/10/17 **/ @Data -@AllArgsConstructor @NoArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY, getterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) -public class AIRootCloudWarningDataReceivedEvent implements Serializable { +public class AIRootCloudWarningDataReceivedPayLoadEvent implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; + private String code; - private String payloadId; - private Long eventId; - private Payload payload; - private Long publishTimestamp; - private String tenantId; - private String action; - private Long timestamp; + private Long cts; - @Data - @AllArgsConstructor - @NoArgsConstructor - public static class Payload { - private String code; - - private Long cts; + private Long oriTs; - private Long oriTs; + private Long tgTs; - private Long tgTs; - - private String eventType; - /** - * 设备的UUID - */ - private String deviceId; + private String eventType; + /** + * 设备的UUID + */ + private String deviceId; + @JsonProperty("info") + private Info info; - private Info info; - - private Long ts; - } + private Long ts; @Data - @AllArgsConstructor @NoArgsConstructor - public static class Info { + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Info implements Serializable { + + private static final long serialVersionUID = 3L; /** * 摄像头id */ @@ -64,16 +53,18 @@ public class AIRootCloudWarningDataReceivedEvent implements Serializable { * 摄像头所属区域 */ private String cameraPosition; - + @JsonProperty("alarmList") private List alarmList; private String cameraName; } @Data - @AllArgsConstructor @NoArgsConstructor - public static class Alarm { + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Alarm implements Serializable { + + private static final long serialVersionUID = 4L; /** * 图片地址 */ @@ -82,6 +73,7 @@ public class AIRootCloudWarningDataReceivedEvent implements Serializable { * 缩略图 */ private String thumbnail; + @JsonProperty("extention") private Extention extention; /** * 告警时间 diff --git a/ai-root-cloud-statistics/pom.xml b/ai-root-cloud-statistics/pom.xml index 4a108dd..5744faf 100644 --- a/ai-root-cloud-statistics/pom.xml +++ b/ai-root-cloud-statistics/pom.xml @@ -83,30 +83,6 @@ 1.2 - - com.qniao - iot-machine-data-command - 0.0.1-SNAPSHOT - - - - com.qniao - iot-machine-data-event - 0.0.1-SNAPSHOT - - - - com.qniao - iot-machine-data-constant - 0.0.1-SNAPSHOT - - - - com.qniao - iot-machine-state-event-generator-job - 0.0.1-SNAPSHOT - - org.apache.flink flink-connector-rabbitmq_2.12 @@ -191,7 +167,7 @@ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> - com.qniao.iot.rc.RootCloudIotDataFormatterJob + com.qniao.rootcloudstatistics.RootCloudIotDataFormatterJob diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java index 3eea447..49d5d1f 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java @@ -21,8 +21,9 @@ package com.qniao.rootcloudstatistics; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; -import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema; -import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedEvent; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedPayLoadEvent; import com.qniao.rootcloudevent.AIWarningDataReceivedEvent; import com.qniao.rootcloudstatistics.config.ApolloConfig; import com.qniao.rootcloudstatistics.constant.ConfigConstant; @@ -35,9 +36,6 @@ import com.rabbitmq.client.AMQP; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; -import org.apache.flink.connector.base.DeliveryGuarantee; -import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; -import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.core.fs.Path; @@ -55,7 +53,6 @@ import org.apache.kafka.common.config.SaslConfigs; import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.Properties; /** * Skeleton for a Flink DataStream Job. @@ -81,10 +78,10 @@ public class RootCloudIotDataFormatterJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); - KafkaSource source = KafkaSource.builder() + KafkaSource source = KafkaSource.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) - .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) + .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) .setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") .setProperty("sasl.jaas.config", @@ -96,7 +93,7 @@ public class RootCloudIotDataFormatterJob { // 把树根的数据转成我们自己的格式 SingleOutputStreamOperator transformDs = env .fromSource(source, WatermarkStrategy.noWatermarks(), "AIRootCloudWarningDataReceivedEvent Source") - .map((MapFunction) RootCloudIotDataFormatterJob::transform) + .map((MapFunction) RootCloudIotDataFormatterJob::transform) .name("Transform AIWarningDataReceivedEvent"); // rabbitmq配置 @@ -142,21 +139,23 @@ public class RootCloudIotDataFormatterJob { } - private static AIWarningDataReceivedEvent transform(AIRootCloudWarningDataReceivedEvent event) { + private static AIWarningDataReceivedEvent transform(JSONObject event) { AIWarningDataReceivedEvent aiWaringDataReceivedEvent = new AIWarningDataReceivedEvent(); - if (Objects.nonNull(event) && Objects.nonNull(event.getPayload())) { + Object object = event.get("payload"); + if (Objects.nonNull(object)) { + AIRootCloudWarningDataReceivedPayLoadEvent payLoadEvent = JSONUtil.toBean(object.toString(), AIRootCloudWarningDataReceivedPayLoadEvent.class); aiWaringDataReceivedEvent.setId(snowflake.nextId()); aiWaringDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); - aiWaringDataReceivedEvent.setMachineIotMac(event.getPayload().getDeviceId()); + aiWaringDataReceivedEvent.setMachineIotMac(payLoadEvent.getDeviceId()); aiWaringDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); - if (Objects.nonNull(event.getPayload().getInfo()) && CollUtil.isNotEmpty(event.getPayload().getInfo().getAlarmList())) { - AIRootCloudWarningDataReceivedEvent.Info info = event.getPayload().getInfo(); + if (Objects.nonNull(payLoadEvent.getInfo()) && CollUtil.isNotEmpty(payLoadEvent.getInfo().getAlarmList())) { + AIRootCloudWarningDataReceivedPayLoadEvent.Info info = payLoadEvent.getInfo(); aiWaringDataReceivedEvent.setCameraId(info.getCameraId()); aiWaringDataReceivedEvent.setCameraName(info.getCameraName()); aiWaringDataReceivedEvent.setCameraPosition(info.getCameraPosition()); List alarmList = new ArrayList<>(); - event.getPayload().getInfo().getAlarmList().forEach(a -> { + payLoadEvent.getInfo().getAlarmList().forEach(a -> { AIWarningDataReceivedEvent.Alarm alarm = Convert.convert(AIWarningDataReceivedEvent.Alarm.class, a); alarm.setPicUrl(OSSUtils.getFileUrl(a.getPicUrl())); alarm.setThumbnail(OSSUtils.getFileUrl(a.getThumbnail())); diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java index 25deff1..8e41759 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java @@ -6,15 +6,7 @@ public interface ConfigConstant { String SOURCE_KAFKA_TOPICS = "source.kafka.topics"; - String SOURCE_KAFKA_GROUPID = "source.kafka.groupId"; - - String SINK_KAFKA_BOOTSTRAP_SERVERS = "sink.kafka.bootstrap.servers"; - - String SINK_KAFKA_TOPICS = "sink.kafka.topics"; - - String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id"; - - String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; + String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId"; String SINK_OSS_PATH = "sink.oss.path"; diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWarningDataReceivedEventDeserializationSchema.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWarningDataReceivedEventDeserializationSchema.java index 3e77193..a5f3b72 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWarningDataReceivedEventDeserializationSchema.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWarningDataReceivedEventDeserializationSchema.java @@ -1,7 +1,7 @@ package com.qniao.rootcloudstatistics.event; +import cn.hutool.json.JSONObject; import com.fasterxml.jackson.databind.ObjectMapper; -import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedEvent; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -10,24 +10,24 @@ import java.io.IOException; /** * @author Lzk */ -public class AIRootCloudWarningDataReceivedEventDeserializationSchema implements DeserializationSchema { +public class AIRootCloudWarningDataReceivedEventDeserializationSchema implements DeserializationSchema { /** * 注册JavaTimeModule,支持LocalDateTime字段的解析 */ final private ObjectMapper objectMapper = new ObjectMapper(); @Override - public AIRootCloudWarningDataReceivedEvent deserialize(byte[] message) throws IOException { - return objectMapper.readValue(message, AIRootCloudWarningDataReceivedEvent.class); + public JSONObject deserialize(byte[] message) throws IOException { + return objectMapper.readValue(message, JSONObject.class); } @Override - public boolean isEndOfStream(AIRootCloudWarningDataReceivedEvent nextElement) { + public boolean isEndOfStream(JSONObject nextElement) { return false; } @Override - public TypeInformation getProducedType() { - return TypeInformation.of(AIRootCloudWarningDataReceivedEvent.class); + public TypeInformation getProducedType() { + return TypeInformation.of(JSONObject.class); } }