diff --git a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java index c1e8f9b..bb96749 100644 --- a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java +++ b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java @@ -60,6 +60,8 @@ public class AIWarningDataReceivedEvent implements Serializable { @AllArgsConstructor @NoArgsConstructor public static class Alarm { + + private Long alarmId; /** * 图片地址 */ @@ -87,5 +89,9 @@ public class AIWarningDataReceivedEvent implements Serializable { * 告警级别 1:严重;2:普通;3:通知;4:仅记录; */ private Integer alarmLevel; + + private Long channelId; + + private String events; } } diff --git a/ai-root-cloud-statistics/pom.xml b/ai-root-cloud-statistics/pom.xml index 5744faf..6c1fe03 100644 --- a/ai-root-cloud-statistics/pom.xml +++ b/ai-root-cloud-statistics/pom.xml @@ -30,6 +30,11 @@ ai-root-cloud-event 0.0.1-SNAPSHOT + + mysql + mysql-connector-java + 8.0.29 + com.aliyun.oss diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java index 49d5d1f..f76bb55 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java @@ -21,6 +21,9 @@ package com.qniao.rootcloudstatistics; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; +import cn.hutool.core.util.StrUtil; +import cn.hutool.db.Db; +import cn.hutool.db.Entity; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedPayLoadEvent; @@ -33,15 +36,17 @@ import com.qniao.rootcloudstatistics.event.AIWarningDataReceivedEventSerializati import com.qniao.rootcloudstatistics.until.OSSUtils; import com.qniao.rootcloudstatistics.until.SnowFlake; import com.rabbitmq.client.AMQP; +import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; -import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; @@ -49,7 +54,12 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.config.SaslConfigs; +import org.apache.flink.core.fs.Path; +import org.checkerframework.checker.units.qual.C; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -96,6 +106,7 @@ public class RootCloudIotDataFormatterJob { .map((MapFunction) RootCloudIotDataFormatterJob::transform) .name("Transform AIWarningDataReceivedEvent"); + // rabbitmq配置 RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_HOST)) @@ -135,6 +146,9 @@ public class RootCloudIotDataFormatterJob { ).build(); transformDs.addSink(sink); + // 写入数据到MySQL + transformDs.addSink(new MySqlJdbcSink()); + env.execute("ai root cloud waring data formatter job"); } @@ -166,4 +180,61 @@ public class RootCloudIotDataFormatterJob { } return aiWaringDataReceivedEvent; } + + static class MySqlJdbcSink extends RichSinkFunction { + + private static final String INSERT_ALARM = + "INSERT INTO qn_ai_alarm(id,iot_mac,received_time,camera_id,camera_position,camera_name,ai_type,alarm_level,create_time) values(?,?,?,?,?,?,?,?,?);"; + + private static final String INSERT_ALARM_IMAGE = + "INSERT INTO qn_ai_alarm_image(id,ai_alarm_id,url,thumbnail,events,alarm_time,create_time) values(?,?,?,?,?,?,?);"; + + private PreparedStatement statement; + private PreparedStatement statement2; + + @Override + public void invoke(AIWarningDataReceivedEvent value, Context context) throws Exception { + String query = "select * from qn_ai_alarm where id = ?"; + AIWarningDataReceivedEvent.Alarm image = value.getAlarmList().get(0); + Entity alarm = Db.use().queryOne(query, image.getAlarmId()); + if (Objects.isNull(alarm)) { + statement.setLong(1, image.getAlarmId()); + statement.setString(2, value.getMachineIotMac()); + statement.setString(3, DateFormatUtils.format(value.getReceivedTime(), "yyyy-MM-dd HH:mm:ss")); + statement.setLong(4, value.getCameraId()); + statement.setString(5, value.getCameraPosition()); + statement.setString(6, value.getCameraName()); + statement.setByte(7, image.getAiType().byteValue()); + statement.setByte(8, image.getAlarmLevel().byteValue()); + statement.setString(9, DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss")); + statement.addBatch(); + statement.executeBatch(); + } + + for (AIWarningDataReceivedEvent.Alarm a : value.getAlarmList()) { + statement2.setLong(1, snowflake.nextId()); + statement2.setLong(2, a.getAlarmId()); + statement2.setString(3, a.getPicUrl()); + statement2.setString(4, a.getThumbnail()); + statement2.setString(5, StrUtil.isBlank(a.getEvents()) ? null : a.getEvents().replace("[[", "").replace("]]", "")); + statement2.setString(6, DateFormatUtils.format(a.getAlarmTime(), "yyyy-MM-dd HH:mm:ss")); + statement2.setString(7, DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss")); + statement2.addBatch(); + } + statement2.executeBatch(); + } + + @Override + public void open(Configuration parameters) throws Exception { + Class.forName("com.mysql.cj.jdbc.Driver"); + String mysqlConn = "jdbc:mysql://" + + ApolloConfig.get(ConfigConstant.MYSQL_DATABASE_URL) + "/" + + ApolloConfig.get(ConfigConstant.MYSQL_DATABASE_NAME) + "?user=" + + ApolloConfig.get(ConfigConstant.MYSQL_USER_NAME) + "&password=" + + ApolloConfig.get(ConfigConstant.MYSQL_USER_PASSWORD); + Connection connection = DriverManager.getConnection(mysqlConn); + statement = connection.prepareStatement(INSERT_ALARM); + statement2 = connection.prepareStatement(INSERT_ALARM_IMAGE); + } + } } diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java index 69c386b..6be0f52 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/constant/ConfigConstant.java @@ -10,6 +10,11 @@ public interface ConfigConstant { String SINK_OSS_PATH = "sink.oss.path"; + String MYSQL_DATABASE_URL = "mysql-database-url"; + String MYSQL_DATABASE_NAME = "mysql-database-name"; + String MYSQL_USER_NAME = "mysql-user-name"; + String MYSQL_USER_PASSWORD = "mysql-user-password"; + String SINK_RABBITMQ_HOST = "sink.rabbitmq.host"; String SINK_RABBITMQ_PORT = "sink.rabbitmq.port"; diff --git a/ai-root-cloud-statistics/src/main/resources/db.setting b/ai-root-cloud-statistics/src/main/resources/db.setting new file mode 100644 index 0000000..eaa9804 --- /dev/null +++ b/ai-root-cloud-statistics/src/main/resources/db.setting @@ -0,0 +1,26 @@ +## db.setting文件 + +url = jdbc:mysql://rm-wz9it4fs5tk7n4tm190110.mysql.rds.aliyuncs.com:3306/cloud_print_cloud_factory?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useTimezone=true&serverTimezone=GMT%2B8&useSSL=false +user = qn_cloudprint +pass = qncloudprint5682 + +# 是否在日志中显示执行的SQL +showSql = true + +# 是否格式化显示的SQL +formatSql = false + +# 是否显示SQL参数 +showParams = true + +# 打印SQL的日志等级,默认debug,可以是info、warn、error +sqlLevel = debug + +# 初始化时建立物理连接的个数 +initialSize = 0 + +# 最大连接池数量 +maxActive = 20 + +# 最小连接池数量 +minIdle = 0 \ No newline at end of file