|
|
@ -21,6 +21,9 @@ package com.qniao.rootcloudstatistics; |
|
|
|
|
|
|
|
|
import cn.hutool.core.collection.CollUtil; |
|
|
import cn.hutool.core.collection.CollUtil; |
|
|
import cn.hutool.core.convert.Convert; |
|
|
import cn.hutool.core.convert.Convert; |
|
|
|
|
|
import cn.hutool.core.util.StrUtil; |
|
|
|
|
|
import cn.hutool.db.Db; |
|
|
|
|
|
import cn.hutool.db.Entity; |
|
|
import cn.hutool.json.JSONObject; |
|
|
import cn.hutool.json.JSONObject; |
|
|
import cn.hutool.json.JSONUtil; |
|
|
import cn.hutool.json.JSONUtil; |
|
|
import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedPayLoadEvent; |
|
|
import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedPayLoadEvent; |
|
|
@ -33,15 +36,17 @@ import com.qniao.rootcloudstatistics.event.AIWarningDataReceivedEventSerializati |
|
|
import com.qniao.rootcloudstatistics.until.OSSUtils; |
|
|
import com.qniao.rootcloudstatistics.until.OSSUtils; |
|
|
import com.qniao.rootcloudstatistics.until.SnowFlake; |
|
|
import com.qniao.rootcloudstatistics.until.SnowFlake; |
|
|
import com.rabbitmq.client.AMQP; |
|
|
import com.rabbitmq.client.AMQP; |
|
|
|
|
|
import org.apache.commons.lang3.time.DateFormatUtils; |
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
import org.apache.flink.api.common.functions.MapFunction; |
|
|
import org.apache.flink.api.common.functions.MapFunction; |
|
|
import org.apache.flink.api.common.serialization.SimpleStringEncoder; |
|
|
import org.apache.flink.api.common.serialization.SimpleStringEncoder; |
|
|
|
|
|
import org.apache.flink.configuration.Configuration; |
|
|
import org.apache.flink.connector.kafka.source.KafkaSource; |
|
|
import org.apache.flink.connector.kafka.source.KafkaSource; |
|
|
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
|
|
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
|
|
import org.apache.flink.core.fs.Path; |
|
|
|
|
|
import org.apache.flink.streaming.api.CheckpointingMode; |
|
|
import org.apache.flink.streaming.api.CheckpointingMode; |
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
|
|
|
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; |
|
|
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; |
|
|
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; |
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; |
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; |
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; |
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; |
|
|
@ -49,7 +54,12 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig |
|
|
import org.apache.kafka.clients.CommonClientConfigs; |
|
|
import org.apache.kafka.clients.CommonClientConfigs; |
|
|
import org.apache.kafka.clients.consumer.OffsetResetStrategy; |
|
|
import org.apache.kafka.clients.consumer.OffsetResetStrategy; |
|
|
import org.apache.kafka.common.config.SaslConfigs; |
|
|
import org.apache.kafka.common.config.SaslConfigs; |
|
|
|
|
|
import org.apache.flink.core.fs.Path; |
|
|
|
|
|
import org.checkerframework.checker.units.qual.C; |
|
|
|
|
|
|
|
|
|
|
|
import java.sql.Connection; |
|
|
|
|
|
import java.sql.DriverManager; |
|
|
|
|
|
import java.sql.PreparedStatement; |
|
|
import java.util.ArrayList; |
|
|
import java.util.ArrayList; |
|
|
import java.util.List; |
|
|
import java.util.List; |
|
|
import java.util.Objects; |
|
|
import java.util.Objects; |
|
|
@ -96,6 +106,7 @@ public class RootCloudIotDataFormatterJob { |
|
|
.map((MapFunction<JSONObject, AIWarningDataReceivedEvent>) RootCloudIotDataFormatterJob::transform) |
|
|
.map((MapFunction<JSONObject, AIWarningDataReceivedEvent>) RootCloudIotDataFormatterJob::transform) |
|
|
.name("Transform AIWarningDataReceivedEvent"); |
|
|
.name("Transform AIWarningDataReceivedEvent"); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// rabbitmq配置 |
|
|
// rabbitmq配置 |
|
|
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() |
|
|
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() |
|
|
.setHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_HOST)) |
|
|
.setHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_HOST)) |
|
|
@ -135,6 +146,9 @@ public class RootCloudIotDataFormatterJob { |
|
|
).build(); |
|
|
).build(); |
|
|
transformDs.addSink(sink); |
|
|
transformDs.addSink(sink); |
|
|
|
|
|
|
|
|
|
|
|
// 写入数据到MySQL |
|
|
|
|
|
transformDs.addSink(new MySqlJdbcSink()); |
|
|
|
|
|
|
|
|
env.execute("ai root cloud waring data formatter job"); |
|
|
env.execute("ai root cloud waring data formatter job"); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -166,4 +180,61 @@ public class RootCloudIotDataFormatterJob { |
|
|
} |
|
|
} |
|
|
return aiWaringDataReceivedEvent; |
|
|
return aiWaringDataReceivedEvent; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
static class MySqlJdbcSink extends RichSinkFunction<AIWarningDataReceivedEvent> { |
|
|
|
|
|
|
|
|
|
|
|
private static final String INSERT_ALARM = |
|
|
|
|
|
"INSERT INTO qn_ai_alarm(id,iot_mac,received_time,camera_id,camera_position,camera_name,ai_type,alarm_level,create_time) values(?,?,?,?,?,?,?,?,?);"; |
|
|
|
|
|
|
|
|
|
|
|
private static final String INSERT_ALARM_IMAGE = |
|
|
|
|
|
"INSERT INTO qn_ai_alarm_image(id,ai_alarm_id,url,thumbnail,events,alarm_time,create_time) values(?,?,?,?,?,?,?);"; |
|
|
|
|
|
|
|
|
|
|
|
private PreparedStatement statement; |
|
|
|
|
|
private PreparedStatement statement2; |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
public void invoke(AIWarningDataReceivedEvent value, Context context) throws Exception { |
|
|
|
|
|
String query = "select * from qn_ai_alarm where id = ?"; |
|
|
|
|
|
AIWarningDataReceivedEvent.Alarm image = value.getAlarmList().get(0); |
|
|
|
|
|
Entity alarm = Db.use().queryOne(query, image.getAlarmId()); |
|
|
|
|
|
if (Objects.isNull(alarm)) { |
|
|
|
|
|
statement.setLong(1, image.getAlarmId()); |
|
|
|
|
|
statement.setString(2, value.getMachineIotMac()); |
|
|
|
|
|
statement.setString(3, DateFormatUtils.format(value.getReceivedTime(), "yyyy-MM-dd HH:mm:ss")); |
|
|
|
|
|
statement.setLong(4, value.getCameraId()); |
|
|
|
|
|
statement.setString(5, value.getCameraPosition()); |
|
|
|
|
|
statement.setString(6, value.getCameraName()); |
|
|
|
|
|
statement.setByte(7, image.getAiType().byteValue()); |
|
|
|
|
|
statement.setByte(8, image.getAlarmLevel().byteValue()); |
|
|
|
|
|
statement.setString(9, DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss")); |
|
|
|
|
|
statement.addBatch(); |
|
|
|
|
|
statement.executeBatch(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
for (AIWarningDataReceivedEvent.Alarm a : value.getAlarmList()) { |
|
|
|
|
|
statement2.setLong(1, snowflake.nextId()); |
|
|
|
|
|
statement2.setLong(2, a.getAlarmId()); |
|
|
|
|
|
statement2.setString(3, a.getPicUrl()); |
|
|
|
|
|
statement2.setString(4, a.getThumbnail()); |
|
|
|
|
|
statement2.setString(5, StrUtil.isBlank(a.getEvents()) ? null : a.getEvents().replace("[[", "").replace("]]", "")); |
|
|
|
|
|
statement2.setString(6, DateFormatUtils.format(a.getAlarmTime(), "yyyy-MM-dd HH:mm:ss")); |
|
|
|
|
|
statement2.setString(7, DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss")); |
|
|
|
|
|
statement2.addBatch(); |
|
|
|
|
|
} |
|
|
|
|
|
statement2.executeBatch(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
public void open(Configuration parameters) throws Exception { |
|
|
|
|
|
Class.forName("com.mysql.cj.jdbc.Driver"); |
|
|
|
|
|
String mysqlConn = "jdbc:mysql://" + |
|
|
|
|
|
ApolloConfig.get(ConfigConstant.MYSQL_DATABASE_URL) + "/" + |
|
|
|
|
|
ApolloConfig.get(ConfigConstant.MYSQL_DATABASE_NAME) + "?user=" + |
|
|
|
|
|
ApolloConfig.get(ConfigConstant.MYSQL_USER_NAME) + "&password=" + |
|
|
|
|
|
ApolloConfig.get(ConfigConstant.MYSQL_USER_PASSWORD); |
|
|
|
|
|
Connection connection = DriverManager.getConnection(mysqlConn); |
|
|
|
|
|
statement = connection.prepareStatement(INSERT_ALARM); |
|
|
|
|
|
statement2 = connection.prepareStatement(INSERT_ALARM_IMAGE); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
} |