From 7665af1b5edf317efdd8c98229d27c1ed6c753b9 Mon Sep 17 00:00:00 2001
From: "1049970895@qniao.cn" <1049970895>
Date: Tue, 6 Jun 2023 17:40:38 +0800
Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=8A=A5=E8=AD=A6=E8=BF=87?=
=?UTF-8?q?=E6=BB=A4=E8=A7=84=E5=88=99?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.gitignore | 1 +
.../AIWarningDataReceivedEvent.java | 2 +-
ai-root-cloud-statistics/pom.xml | 7 +++
.../RootCloudIotDataFormatterJob.java | 57 ++++++++++++++++++
.../src/main/resources/config/redis.setting | 59 +++++++++++++++++++
5 files changed, 125 insertions(+), 1 deletion(-)
create mode 100644 .gitignore
create mode 100644 ai-root-cloud-statistics/src/main/resources/config/redis.setting
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..a09c56d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+/.idea
diff --git a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java
index bb96749..66c5e3d 100644
--- a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java
+++ b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java
@@ -32,7 +32,7 @@ public class AIWarningDataReceivedEvent implements Serializable {
private String machineIotMac;
/**
- * 接受事件时间
+ * 接受事件时间(毫秒)
*/
private Long receivedTime;
diff --git a/ai-root-cloud-statistics/pom.xml b/ai-root-cloud-statistics/pom.xml
index 6c1fe03..eecce3e 100644
--- a/ai-root-cloud-statistics/pom.xml
+++ b/ai-root-cloud-statistics/pom.xml
@@ -117,6 +117,13 @@
apollo-core
2.0.1
+
+
+ redis.clients
+ jedis
+ 3.7.0
+
+
diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java
index f66538c..226bdc5 100644
--- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java
+++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java
@@ -24,6 +24,7 @@ import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Db;
import cn.hutool.db.Entity;
+import cn.hutool.db.nosql.redis.RedisDS;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedPayLoadEvent;
@@ -36,6 +37,7 @@ import com.qniao.rootcloudstatistics.event.AIWarningDataReceivedEventSerializati
import com.qniao.rootcloudstatistics.until.OSSUtils;
import com.qniao.rootcloudstatistics.until.SnowFlake;
import com.rabbitmq.client.AMQP;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
@@ -55,6 +57,7 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.config.SaslConfigs;
+import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -82,6 +85,17 @@ public class RootCloudIotDataFormatterJob {
static SnowFlake snowflake = new SnowFlake(1L, 1L);
+ static Jedis jedis = RedisDS.create().getJedis();
+
+ static String redisKey = "ai_warning_data_received_event";
+
+ static String START_LOCAL_TIME = "startLocalTime";
+
+ static String END_LOCAL_TIME = "endLocalTime";
+
+ static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+ static ZoneId zoneId = ZoneId.systemDefault();
public static void main(String[] args) throws Exception {
@@ -214,6 +228,30 @@ public class RootCloudIotDataFormatterJob {
@Override
public void invoke(AIWarningDataReceivedEvent value, Context context) throws Exception {
+
+
+ // 通过时间范围对数据进行过滤
+ // 查询redis缓存是否存在该摄像头的休眠时间范围
+ String redisValue = jedis.hget(redisKey, value.getCameraId().toString());
+ if (StringUtils.isNotBlank(redisValue)) {
+ JSONObject jsonObject = JSONUtil.parseObj(redisValue);
+ String startLocalTime = jsonObject.getStr(START_LOCAL_TIME);
+ String[] startLocalTimeSplit = startLocalTime.split(":");
+ String endLocalTime = jsonObject.getStr(END_LOCAL_TIME);
+ String[] endLocalTimeSplit = endLocalTime.split(":");
+ if (checkLocalTime(value, startLocalTime, startLocalTimeSplit, endLocalTime, endLocalTimeSplit)) return;
+ } else {
+ // 查询数据库
+ String query = "select * from qn_ai_alarm_sleep_time_config where camera_id = ?";
+ Entity alarmSleepConfig = Db.use().queryOne(query, value.getCameraId());
+ if (Objects.nonNull(alarmSleepConfig)) {
+ String startLocalTime = alarmSleepConfig.getStr("start_local_time");
+ String[] startLocalTimeSplit = startLocalTime.split(":");
+ String endLocalTime = alarmSleepConfig.getStr("end_local_time");
+ String[] endLocalTimeSplit = endLocalTime.split(":");
+ if (checkLocalTime(value, startLocalTime, startLocalTimeSplit, endLocalTime, endLocalTimeSplit)) return;
+ }
+ }
String query = "select * from qn_ai_alarm where id = ?";
AIWarningDataReceivedEvent.Alarm image = value.getAlarmList().get(0);
Entity alarm = Db.use().queryOne(query, image.getAlarmId());
@@ -244,6 +282,25 @@ public class RootCloudIotDataFormatterJob {
statement2.executeBatch();
}
+ private boolean checkLocalTime(AIWarningDataReceivedEvent value,
+ String startLocalTime,
+ String[] startLocalTimeSplit,
+ String endLocalTime,
+ String[] endLocalTimeSplit) {
+
+ if (StringUtils.isNotBlank(startLocalTime) && StringUtils.isNotBlank(endLocalTime)) {
+ Long receivedTimeL = value.getReceivedTime();
+ LocalDateTime receivedTime = Instant.ofEpochMilli(receivedTimeL).atZone(zoneId).toLocalDateTime();
+ LocalDate localDate = receivedTime.toLocalDate();
+ LocalDateTime localStartDateTime = localDate.atTime(LocalTime.of(Integer.parseInt(startLocalTimeSplit[0]), Integer.parseInt(startLocalTimeSplit[1]),
+ Integer.parseInt(startLocalTimeSplit[2])));
+ LocalDateTime localEndDateTime = localDate.atTime(LocalTime.of(Integer.parseInt(endLocalTimeSplit[0]), Integer.parseInt(endLocalTimeSplit[1]),
+ Integer.parseInt(endLocalTimeSplit[2])));
+ return receivedTime.isBefore(localStartDateTime) || receivedTime.isAfter(localEndDateTime);
+ }
+ return false;
+ }
+
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.cj.jdbc.Driver");
diff --git a/ai-root-cloud-statistics/src/main/resources/config/redis.setting b/ai-root-cloud-statistics/src/main/resources/config/redis.setting
new file mode 100644
index 0000000..253ae6e
--- /dev/null
+++ b/ai-root-cloud-statistics/src/main/resources/config/redis.setting
@@ -0,0 +1,59 @@
+#-------------------------------------------------------------------------------
+# Redis客户端配置样例
+# 每一个分组代表一个Redis实例
+# 无分组的Pool配置为所有分组的共用配置,如果分组自己定义Pool配置,则覆盖共用配置
+# 池配置来自于:https://www.cnblogs.com/jklk/p/7095067.html
+#-------------------------------------------------------------------------------
+
+#----- 默认(公有)配置
+# 地址,默认localhost
+host = localhost
+# 端口,默认6379
+port = 6379
+# 超时,默认2000
+timeout = 2000
+# 连接超时,默认timeout
+connectionTimeout = 2000
+# 读取超时,默认timeout
+soTimeout = 2000
+# 密码,默认无
+password =
+# 数据库序号,默认0
+database = 0
+# 客户端名称
+clientName = Qniao
+# SSL连接,默认false
+ssl = false;
+
+#----- 自定义分组的连接
+[custom]
+# 地址,默认localhost
+host = localhost
+# 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
+BlockWhenExhausted = true;
+# 设置的逐出策略类名, 默认DefaultEvictionPolicy(当连接超过最大空闲时间,或连接数超过最大空闲连接数)
+evictionPolicyClassName = org.apache.commons.pool2.impl.DefaultEvictionPolicy
+# 是否启用pool的jmx管理功能, 默认true
+jmxEnabled = true;
+# 是否启用后进先出, 默认true
+lifo = true;
+# 最大空闲连接数, 默认8个
+maxIdle = 8
+# 最小空闲连接数, 默认0
+minIdle = 0
+# 最大连接数, 默认8个
+maxTotal = 8
+# 获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1
+maxWaitMillis = -1
+# 逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
+minEvictableIdleTimeMillis = 1800000
+# 每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
+numTestsPerEvictionRun = 3;
+# 对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
+SoftMinEvictableIdleTimeMillis = 1800000
+# 在获取连接的时候检查有效性, 默认false
+testOnBorrow = false
+# 在空闲时检查有效性, 默认false
+testWhileIdle = false
+# 逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
+timeBetweenEvictionRunsMillis = -1