diff --git a/ai-root-cloud-event/.gitignore b/ai-root-cloud-event/.gitignore new file mode 100644 index 0000000..a2a3040 --- /dev/null +++ b/ai-root-cloud-event/.gitignore @@ -0,0 +1,31 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/** +!**/src/test/** + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ + +### VS Code ### +.vscode/ diff --git a/ai-root-cloud-event/pom.xml b/ai-root-cloud-event/pom.xml new file mode 100644 index 0000000..9b5244f --- /dev/null +++ b/ai-root-cloud-event/pom.xml @@ -0,0 +1,78 @@ + + + 4.0.0 + com.qniao + ai-root-cloud-event + 0.0.1-SNAPSHOT + ai-root-cloud-event + ai-root-cloud-event + + + UTF-8 + 1.15.0 + 1.8 + ${target.java.version} + ${target.java.version} + 2.17.2 + 1.18.24 + 2.13.3 + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + + org.apache.flink + flink-connector-kafka + ${flink.version} + + + + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + + + org.projectlombok + lombok + ${lombok.version} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + + + maven-releases + Nexus releases Repository + http://120.78.76.88:8081/repository/maven-snapshots/ + + + diff --git a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWaringDataReceivedEvent.java b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWaringDataReceivedEvent.java new file mode 100644 index 0000000..5343252 --- /dev/null +++ b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIRootCloudWaringDataReceivedEvent.java @@ -0,0 +1,100 @@ +package com.qniao.rootcloudevent; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +/** + * @author Lzk + * @date 2022/10/17 + **/ +@Data +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY, getterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) +public class AIRootCloudWaringDataReceivedEvent implements Serializable { + + private static final long serialVersionUID = 1L; + + private String payloadId; + private Long eventId; + private Payload payload; + private Long publishTimestamp; + private String tenantId; + private String action; + private Long timestamp; + + @Data + public static class Payload { + private String code; + + private Long cts; + + private Long oriTs; + + private Long tgTs; + + private String eventType; + /** + * 设备的UUID + */ + private String deviceId; + + private Info info; + + private Long ts; + } + + @Data + public static class Info { + /** + * 摄像头id + */ + private Long cameraId; + + /** + * 摄像头所属区域 + */ + private String cameraPosition; + + private List alarmList; + + private String cameraName; + } + + @Data + public static class Alarm { + /** + * 图片地址 + */ + private String picUrl; + /** + * 缩略图 + */ + private String thumbnail; + private Extention extention; + /** + * 告警时间 + */ + private Long alarmTime; + /** + * 2 = 人员逗留 + * 5 = 人员入侵 + * 9 = 吸烟 + * 10 = 火苗烟雾 + */ + private Integer aiType; + private Long alarmId; + /** + * 告警级别 1:严重;2:普通;3:通知;4:仅记录; + */ + private Integer alarmLevel; + private Long channelId; + private String events; + } + + @Data + public static class Extention { + + } +} diff --git a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWaringDataReceivedEvent.java b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWaringDataReceivedEvent.java new file mode 100644 index 0000000..0c79275 --- /dev/null +++ b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWaringDataReceivedEvent.java @@ -0,0 +1,65 @@ +package com.qniao.rootcloudevent; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +/** + * @author Lzk + * @date 2022/10/17 + **/ +@Data +public class AIWaringDataReceivedEvent implements Serializable { + private static final long serialVersionUID = 1L; + /** + * 唯一标识 + */ + private Long id; + + /** + * 数据来源 1-根云 + */ + private Integer dataSource; + + /** + * 设备物联地址(云盒物理标识) + */ + private String machineIotMac; + + /** + * 告警列表 + */ + private List alarmList; + + @Data + public static class Alarm { + /** + * 图片地址 + */ + private String picUrl; + + /** + * 缩略图 + */ + private String thumbnail; + + /** + * 告警时间 + */ + private Long alarmTime; + + /** + * 2 = 人员逗留 + * 5 = 人员入侵 + * 9 = 吸烟 + * 10 = 火苗烟雾 + */ + private Integer aiType; + + /** + * 告警级别 1:严重;2:普通;3:通知;4:仅记录; + */ + private Integer alarmLevel; + } +} diff --git a/ai-root-cloud-statistics/.gitignore b/ai-root-cloud-statistics/.gitignore new file mode 100644 index 0000000..a2a3040 --- /dev/null +++ b/ai-root-cloud-statistics/.gitignore @@ -0,0 +1,31 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/** +!**/src/test/** + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ + +### VS Code ### +.vscode/ diff --git a/ai-root-cloud-statistics/.mvn/wrapper/MavenWrapperDownloader.java b/ai-root-cloud-statistics/.mvn/wrapper/MavenWrapperDownloader.java new file mode 100644 index 0000000..a45eb6b --- /dev/null +++ b/ai-root-cloud-statistics/.mvn/wrapper/MavenWrapperDownloader.java @@ -0,0 +1,118 @@ +/* + * Copyright 2007-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.net.*; +import java.io.*; +import java.nio.channels.*; +import java.util.Properties; + +public class MavenWrapperDownloader { + + private static final String WRAPPER_VERSION = "0.5.6"; + /** + * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided. + */ + private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/" + + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar"; + + /** + * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to + * use instead of the default one. + */ + private static final String MAVEN_WRAPPER_PROPERTIES_PATH = + ".mvn/wrapper/maven-wrapper.properties"; + + /** + * Path where the maven-wrapper.jar will be saved to. + */ + private static final String MAVEN_WRAPPER_JAR_PATH = + ".mvn/wrapper/maven-wrapper.jar"; + + /** + * Name of the property which should be used to override the default download url for the wrapper. + */ + private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl"; + + public static void main(String args[]) { + System.out.println("- Downloader started"); + File baseDirectory = new File(args[0]); + System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath()); + + // If the maven-wrapper.properties exists, read it and check if it contains a custom + // wrapperUrl parameter. + File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH); + String url = DEFAULT_DOWNLOAD_URL; + if (mavenWrapperPropertyFile.exists()) { + FileInputStream mavenWrapperPropertyFileInputStream = null; + try { + mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile); + Properties mavenWrapperProperties = new Properties(); + mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream); + url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url); + } catch (IOException e) { + System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'"); + } finally { + try { + if (mavenWrapperPropertyFileInputStream != null) { + mavenWrapperPropertyFileInputStream.close(); + } + } catch (IOException e) { + // Ignore ... + } + } + } + System.out.println("- Downloading from: " + url); + + File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH); + if (!outputFile.getParentFile().exists()) { + if (!outputFile.getParentFile().mkdirs()) { + System.out.println( + "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'"); + } + } + System.out.println("- Downloading to: " + outputFile.getAbsolutePath()); + try { + downloadFileFromURL(url, outputFile); + System.out.println("Done"); + System.exit(0); + } catch (Throwable e) { + System.out.println("- Error downloading"); + e.printStackTrace(); + System.exit(1); + } + } + + private static void downloadFileFromURL(String urlString, File destination) throws Exception { + if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) { + String username = System.getenv("MVNW_USERNAME"); + char[] password = System.getenv("MVNW_PASSWORD").toCharArray(); + Authenticator.setDefault(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(username, password); + } + }); + } + URL website = new URL(urlString); + ReadableByteChannel rbc; + rbc = Channels.newChannel(website.openStream()); + FileOutputStream fos = new FileOutputStream(destination); + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + fos.close(); + rbc.close(); + } + +} diff --git a/ai-root-cloud-statistics/.mvn/wrapper/maven-wrapper.jar b/ai-root-cloud-statistics/.mvn/wrapper/maven-wrapper.jar new file mode 100644 index 0000000..2cc7d4a Binary files /dev/null and b/ai-root-cloud-statistics/.mvn/wrapper/maven-wrapper.jar differ diff --git a/ai-root-cloud-statistics/.mvn/wrapper/maven-wrapper.properties b/ai-root-cloud-statistics/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000..642d572 --- /dev/null +++ b/ai-root-cloud-statistics/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,2 @@ +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar diff --git a/ai-root-cloud-statistics/README.md b/ai-root-cloud-statistics/README.md new file mode 100644 index 0000000..c0d9ab2 --- /dev/null +++ b/ai-root-cloud-statistics/README.md @@ -0,0 +1,4 @@ +# 工程简介 + +# 延伸阅读 + diff --git a/ai-root-cloud-statistics/dependency-reduced-pom.xml b/ai-root-cloud-statistics/dependency-reduced-pom.xml new file mode 100644 index 0000000..500a320 --- /dev/null +++ b/ai-root-cloud-statistics/dependency-reduced-pom.xml @@ -0,0 +1,98 @@ + + + + ai-root-cloud-waring-formatter + com.qniao + 0.0.1-SNAPSHOT + + 4.0.0 + ai-root-cloud-statistics + ai-root-cloud-statistics + 0.0.1-SNAPSHOT + ai-root-cloud-statistics + + + + maven-compiler-plugin + 3.1 + + ${target.java.version} + ${target.java.version} + + + + maven-shade-plugin + 3.1.1 + + + package + + shade + + + + + org.apache.flink:flink-shaded-force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.qniao.iot.rc.RootCloudIotDataFormatterJob + + + + + + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.17.2 + runtime + + + org.apache.logging.log4j + log4j-api + 2.17.2 + runtime + + + org.apache.logging.log4j + log4j-core + 2.17.2 + runtime + + + + + maven-releases + Nexus releases Repository + http://120.78.76.88:8081/repository/maven-snapshots/ + + + + 1.8 + 2.17.2 + 1.15.0 + ${target.java.version} + UTF-8 + ${target.java.version} + + diff --git a/ai-root-cloud-statistics/pom.xml b/ai-root-cloud-statistics/pom.xml new file mode 100644 index 0000000..20369c9 --- /dev/null +++ b/ai-root-cloud-statistics/pom.xml @@ -0,0 +1,211 @@ + + + + + com.qniao + ai-root-cloud-waring-formatter + 0.0.1-SNAPSHOT + + 4.0.0 + ai-root-cloud-statistics + 0.0.1-SNAPSHOT + jar + ai-root-cloud-statistics + ai-root-cloud-statistics + + + UTF-8 + 1.15.0 + 1.8 + ${target.java.version} + ${target.java.version} + 2.17.2 + + + + + + com.qniao + ai-root-cloud-event + 0.0.1-SNAPSHOT + + + + com.aliyun.oss + aliyun-sdk-oss + 2.8.3 + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + org.apache.flink + flink-clients + ${flink.version} + + + + org.apache.flink + flink-connector-kafka + ${flink.version} + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + + commons-logging + commons-logging + 1.2 + + + + com.qniao + iot-machine-data-command + 0.0.1-SNAPSHOT + + + + com.qniao + iot-machine-data-event + 0.0.1-SNAPSHOT + + + + com.qniao + iot-machine-data-constant + 0.0.1-SNAPSHOT + + + + com.qniao + iot-machine-state-event-generator-job + 0.0.1-SNAPSHOT + + + + org.apache.flink + flink-connector-rabbitmq_2.12 + 1.14.5 + + + + org.apache.flink + flink-connector-elasticsearch7_2.11 + 1.14.5 + + + + cn.hutool + hutool-all + 5.8.4 + + + + + com.ctrip.framework.apollo + apollo-client + 2.0.1 + + + com.ctrip.framework.apollo + apollo-core + 2.0.1 + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${target.java.version} + ${target.java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + + + org.apache.flink:flink-shaded-force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.qniao.iot.rc.RootCloudIotDataFormatterJob + + + + + + + + + + + + maven-releases + Nexus releases Repository + http://120.78.76.88:8081/repository/maven-snapshots/ + + + diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java new file mode 100644 index 0000000..7202e24 --- /dev/null +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qniao.rootcloudstatistics; + + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.convert.Convert; +import com.qniao.rootcloudevent.AIRootCloudWaringDataReceivedEvent; +import com.qniao.rootcloudevent.AIWaringDataReceivedEvent; +import com.qniao.rootcloudstatistics.config.ApolloConfig; +import com.qniao.rootcloudstatistics.constant.ConfigConstant; +import com.qniao.rootcloudstatistics.constant.DataSource; +import com.qniao.rootcloudstatistics.event.AIRootCloudWaringDataReceivedEventDeserializationSchema; +import com.qniao.rootcloudstatistics.event.AIWaringDataReceivedEventSerializationSchema; +import com.qniao.rootcloudstatistics.until.OSSUtils; +import com.qniao.rootcloudstatistics.until.SnowFlake; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Skeleton for a Flink DataStream Job. + * + *

For a tutorial how to write a Flink application, check the + * tutorials and examples on the Flink Website. + * + *

To package your application into a JAR file for execution, run + * 'mvn clean package' on the command line. + * + *

If you change the name of the main class (with the public static void main(String[] args)) + * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). + * + * @author Lzk + */ +public class RootCloudIotDataFormatterJob { + + static SnowFlake snowflake = new SnowFlake( + Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_DATACENTER_ID)), + Long.parseLong(ApolloConfig.get(ConfigConstant.SNOW_FLAKE_MACHINE_ID)) + ); + + + public static void main(String[] args) throws Exception { + + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); + KafkaSource source = KafkaSource.builder() + .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) + .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) + .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) + .setValueOnlyDeserializer(new AIRootCloudWaringDataReceivedEventDeserializationSchema()) + .build(); + + // 把树根的数据转成我们自己的格式 + SingleOutputStreamOperator transformDs = env + .fromSource(source, WatermarkStrategy.noWatermarks(), "AIRootCloudWaringDataReceivedEvent Source") + .map((MapFunction) RootCloudIotDataFormatterJob::transform) + .name("Transform AIWaringDataReceivedEvent"); + + // 写入kafka + transformDs.sinkTo( + KafkaSink.builder() + .setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS)) + .setValueSerializationSchema(new AIWaringDataReceivedEventSerializationSchema()) + .build() + ).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .build() + ).name("AIWaringDataReceivedEvent Sink"); + + // 发送到OSS存储 + String outputPath = ApolloConfig.get(ConfigConstant.SINK_OSS_PATH); + StreamingFileSink sink = StreamingFileSink.forRowFormat( + new Path(outputPath), + new SimpleStringEncoder("UTF-8") + ).build(); + transformDs.addSink(sink); + + env.execute("ai root cloud waring data formatter job"); + } + + + private static AIWaringDataReceivedEvent transform(AIRootCloudWaringDataReceivedEvent event) { + + + AIWaringDataReceivedEvent aiWaringDataReceivedEvent = new AIWaringDataReceivedEvent(); + if (Objects.nonNull(event) && Objects.nonNull(event.getPayload())) { + aiWaringDataReceivedEvent.setId(snowflake.nextId()); + aiWaringDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); + aiWaringDataReceivedEvent.setMachineIotMac(event.getPayload().getDeviceId()); + + if (Objects.nonNull(event.getPayload().getInfo()) && CollUtil.isNotEmpty(event.getPayload().getInfo().getAlarmList())) { + List alarmList = new ArrayList<>(); + event.getPayload().getInfo().getAlarmList().forEach(a -> { + AIWaringDataReceivedEvent.Alarm alarm = Convert.convert(AIWaringDataReceivedEvent.Alarm.class, a); + alarm.setPicUrl(OSSUtils.getFileUrl(a.getPicUrl())); + alarm.setThumbnail(OSSUtils.getFileUrl(a.getThumbnail())); + alarmList.add(alarm); + }); + aiWaringDataReceivedEvent.setAlarmList(alarmList); + } + } + return aiWaringDataReceivedEvent; + } +} diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/config/ApolloConfig.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/config/ApolloConfig.java new file mode 100644 index 0000000..75d04a2 --- /dev/null +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/config/ApolloConfig.java @@ -0,0 +1,19 @@ +package com.qniao.rootcloudstatistics.config; + +import com.ctrip.framework.apollo.Config; +import com.ctrip.framework.apollo.ConfigService; + +public class ApolloConfig { + + private static final Config config = ConfigService.getAppConfig(); + + public static String get(String key, String defaultValue) { + + return config.getProperty(key, defaultValue); + } + + public static String get(String key) { + + return config.getProperty(key, null); + } +} diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java new file mode 100644 index 0000000..5add8cc --- /dev/null +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java @@ -0,0 +1,20 @@ +package com.qniao.rootcloudstatistics.constant; + +public interface ConfigConstant { + + String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers"; + + String SOURCE_KAFKA_TOPICS = "source.kafka.topics"; + + String SOURCE_KAFKA_GROUPID = "source.kafka.groupId"; + + String SINK_KAFKA_BOOTSTRAP_SERVERS = "sink.kafka.bootstrap.servers"; + + String SINK_KAFKA_TOPICS = "sink.kafka.topics"; + + String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id"; + + String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id"; + + String SINK_OSS_PATH = "sink.oss.path"; +} diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/DataSource.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/DataSource.java new file mode 100644 index 0000000..6654a2e --- /dev/null +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/DataSource.java @@ -0,0 +1,10 @@ +package com.qniao.rootcloudstatistics.constant; + +/** + * @author Lzk + * @date 2022/10/17 + **/ +public interface DataSource { + Integer ROOT_CLOUD = 1; + Integer TACT_CLOUD = 0; +} diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWaringDataReceivedEventDeserializationSchema.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWaringDataReceivedEventDeserializationSchema.java new file mode 100644 index 0000000..7e43c73 --- /dev/null +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIRootCloudWaringDataReceivedEventDeserializationSchema.java @@ -0,0 +1,33 @@ +package com.qniao.rootcloudstatistics.event; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qniao.rootcloudevent.AIRootCloudWaringDataReceivedEvent; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +/** + * @author Lzk + */ +public class AIRootCloudWaringDataReceivedEventDeserializationSchema implements DeserializationSchema { + /** + * 注册JavaTimeModule,支持LocalDateTime字段的解析 + */ + final private ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public AIRootCloudWaringDataReceivedEvent deserialize(byte[] message) throws IOException { + return objectMapper.readValue(message, AIRootCloudWaringDataReceivedEvent.class); + } + + @Override + public boolean isEndOfStream(AIRootCloudWaringDataReceivedEvent nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(AIRootCloudWaringDataReceivedEvent.class); + } +} diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWaringDataReceivedEventSerializationSchema.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWaringDataReceivedEventSerializationSchema.java new file mode 100644 index 0000000..9f221a1 --- /dev/null +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/event/AIWaringDataReceivedEventSerializationSchema.java @@ -0,0 +1,23 @@ +package com.qniao.rootcloudstatistics.event; + +import com.qniao.rootcloudevent.AIWaringDataReceivedEvent; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + + +/** + * @author Lzk + */ +public class AIWaringDataReceivedEventSerializationSchema implements SerializationSchema { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public byte[] serialize(AIWaringDataReceivedEvent event) { + try { + return OBJECT_MAPPER.writeValueAsBytes(event); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Could not serialize record: " + event, e); + } + } +} diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/until/OSSUtils.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/until/OSSUtils.java new file mode 100644 index 0000000..437ad44 --- /dev/null +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/until/OSSUtils.java @@ -0,0 +1,62 @@ +package com.qniao.rootcloudstatistics.until; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClient; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +/** + * @author Lzk + * @date 2022/10/17 + **/ +public class OSSUtils { + // Endpoint以杭州为例,其它Region请按实际情况填写。 + private static final String endpoint = "http://oss-cn-shenzhen.aliyuncs.com"; + // 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。 + private static final String accessKeyId = "LTAINmC91NqIGN38"; + private static final String accessKeySecret = "Hh10dQPjq1jMLLSpbDAR05ZzR3nXsU"; + private static final Map type_contentType = new HashMap() { + { + put("gif", "image/gif"); + put("bmp", "image/bmp"); + put("webp", "image/webp"); + put("jpg", "image/jpg"); + put("png", "multipart/form-data"); + put("txt", "text/plain"); + put("mp4", "video/mpeg4"); + put("html", "text/html"); + put("pdf", "application/pdf"); + } + }; + + + public static String getFileUrl(String url) { + + String[] list = url.split("\\."); + String type = list[list.length - 1]; + + String folderName = "cloudprint"; + String bucketName = "qncloudprintfiletest"; + String objectName = folderName + "/" + UUID.randomUUID() + "." + type; + OSS ossClient = new OSSClient(endpoint, accessKeyId, accessKeySecret); + // 填写网络流地址。 + InputStream inputStream; + try { + inputStream = new URL(url).openStream(); + // 依次填写Bucket名称(例如examplebucket)和Object完整路径(例如exampledir/exampleobject.txt)。Object完整路径中不能包含Bucket名称。 + ossClient.putObject(bucketName, objectName, inputStream); + } catch (IOException e) { + e.printStackTrace(); + } finally { + // 关闭OSSClient。 + ossClient.shutdown(); + } + + return "https://" + bucketName + ".oss-cn-shenzhen.aliyuncs.com/" + objectName; + } +} diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/until/SnowFlake.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/until/SnowFlake.java new file mode 100644 index 0000000..c9b981b --- /dev/null +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/until/SnowFlake.java @@ -0,0 +1,103 @@ +package com.qniao.rootcloudstatistics.until; + +/** + * @description: Twitter的分布式自增ID雪花算法snowflake + * @author: zp + * @date: 2019-10-29 10:05 + */ +public class SnowFlake { + + /** + * 起始的时间戳 + */ + private final static long START_STMP = 1480166465631L; + + /** + * 每一部分占用的位数 + */ + private final static long SEQUENCE_BIT = 12; //序列号占用的位数 + private final static long MACHINE_BIT = 5; //机器标识占用的位数 + private final static long DATACENTER_BIT = 5;//数据中心占用的位数 + + /** + * 每一部分的最大值 + */ + private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT); + private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT); + private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT); + + /** + * 每一部分向左的位移 + */ + private final static long MACHINE_LEFT = SEQUENCE_BIT; + private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT; + private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT; + + private long datacenterId = 1L; //数据中心 + private long machineId = 1L; //机器标识 + private long sequence = 0L; //序列号 + private long lastStmp = -1L;//上一次时间戳 + +// public SnowFlake(){ +// } + + public SnowFlake(long datacenterId, + long machineId) { + if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) { + throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0"); + } + if (machineId > MAX_MACHINE_NUM || machineId < 0) { + throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0"); + } + this.datacenterId = datacenterId; + this.machineId = machineId; + } + + /** + * 产生下一个ID + * + * @return + */ + public synchronized Long nextId() { + long currStmp = getNewstmp(); + if (currStmp < lastStmp) { + throw new RuntimeException("Clock moved backwards. Refusing to generate id"); + } + + if (currStmp == lastStmp) { + //相同毫秒内,序列号自增 + sequence = (sequence + 1) & MAX_SEQUENCE; + //同一毫秒的序列数已经达到最大 + if (sequence == 0L) { + currStmp = getNextMill(); + } + } else { + //不同毫秒内,序列号置为0 + sequence = 0L; + } + + lastStmp = currStmp; + + return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分 + | datacenterId << DATACENTER_LEFT //数据中心部分 + | machineId << MACHINE_LEFT //机器标识部分 + | sequence; //序列号部分 + } + + private long getNextMill() { + long mill = getNewstmp(); + while (mill <= lastStmp) { + mill = getNewstmp(); + } + return mill; + } + + private long getNewstmp() { + return System.currentTimeMillis(); + } + + public static void main(String[] args) { + SnowFlake s = new SnowFlake(1, 1); + System.out.println(s.nextId()); + } +} \ No newline at end of file diff --git a/ai-root-cloud-statistics/src/main/resources/META-INF/app.properties b/ai-root-cloud-statistics/src/main/resources/META-INF/app.properties new file mode 100644 index 0000000..1206951 --- /dev/null +++ b/ai-root-cloud-statistics/src/main/resources/META-INF/app.properties @@ -0,0 +1,5 @@ +app.id=ai-root-cloud-waring-formatter + +# ???? 8.135.8.221 +# ???? 47.112.164.224 +apollo.meta=http://47.112.164.224:5000 \ No newline at end of file diff --git a/ai-root-cloud-statistics/src/main/resources/log4j2.properties b/ai-root-cloud-statistics/src/main/resources/log4j2.properties new file mode 100644 index 0000000..32c696e --- /dev/null +++ b/ai-root-cloud-statistics/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f377853 --- /dev/null +++ b/pom.xml @@ -0,0 +1,14 @@ + + + 4.0.0 + com.qniao + ai-root-cloud-waring-formatter + 0.0.1-SNAPSHOT + pom + + + ai-root-cloud-event + ai-root-cloud-statistics + +