Browse Source

新增报警过滤规则

feature_hph_检查规则配置过滤
1049970895@qniao.cn 2 years ago
parent
commit
7665af1b5e
5 changed files with 125 additions and 1 deletions
  1. 1
      .gitignore
  2. 2
      ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java
  3. 7
      ai-root-cloud-statistics/pom.xml
  4. 57
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java
  5. 59
      ai-root-cloud-statistics/src/main/resources/config/redis.setting

1
.gitignore

@ -0,0 +1 @@
/.idea

2
ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java

@ -32,7 +32,7 @@ public class AIWarningDataReceivedEvent implements Serializable {
private String machineIotMac;
/**
* 接受事件时间
* 接受事件时间毫秒
*/
private Long receivedTime;

7
ai-root-cloud-statistics/pom.xml

@ -117,6 +117,13 @@
<artifactId>apollo-core</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
</dependencies>
<build>

57
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java

@ -24,6 +24,7 @@ import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Db;
import cn.hutool.db.Entity;
import cn.hutool.db.nosql.redis.RedisDS;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedPayLoadEvent;
@ -36,6 +37,7 @@ import com.qniao.rootcloudstatistics.event.AIWarningDataReceivedEventSerializati
import com.qniao.rootcloudstatistics.until.OSSUtils;
import com.qniao.rootcloudstatistics.until.SnowFlake;
import com.rabbitmq.client.AMQP;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
@ -55,6 +57,7 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.config.SaslConfigs;
import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.sql.DriverManager;
@ -82,6 +85,17 @@ public class RootCloudIotDataFormatterJob {
static SnowFlake snowflake = new SnowFlake(1L, 1L);
static Jedis jedis = RedisDS.create().getJedis();
static String redisKey = "ai_warning_data_received_event";
static String START_LOCAL_TIME = "startLocalTime";
static String END_LOCAL_TIME = "endLocalTime";
static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
static ZoneId zoneId = ZoneId.systemDefault();
public static void main(String[] args) throws Exception {
@ -214,6 +228,30 @@ public class RootCloudIotDataFormatterJob {
@Override
public void invoke(AIWarningDataReceivedEvent value, Context context) throws Exception {
// 通过时间范围对数据进行过滤
// 查询redis缓存是否存在该摄像头的休眠时间范围
String redisValue = jedis.hget(redisKey, value.getCameraId().toString());
if (StringUtils.isNotBlank(redisValue)) {
JSONObject jsonObject = JSONUtil.parseObj(redisValue);
String startLocalTime = jsonObject.getStr(START_LOCAL_TIME);
String[] startLocalTimeSplit = startLocalTime.split(":");
String endLocalTime = jsonObject.getStr(END_LOCAL_TIME);
String[] endLocalTimeSplit = endLocalTime.split(":");
if (checkLocalTime(value, startLocalTime, startLocalTimeSplit, endLocalTime, endLocalTimeSplit)) return;
} else {
// 查询数据库
String query = "select * from qn_ai_alarm_sleep_time_config where camera_id = ?";
Entity alarmSleepConfig = Db.use().queryOne(query, value.getCameraId());
if (Objects.nonNull(alarmSleepConfig)) {
String startLocalTime = alarmSleepConfig.getStr("start_local_time");
String[] startLocalTimeSplit = startLocalTime.split(":");
String endLocalTime = alarmSleepConfig.getStr("end_local_time");
String[] endLocalTimeSplit = endLocalTime.split(":");
if (checkLocalTime(value, startLocalTime, startLocalTimeSplit, endLocalTime, endLocalTimeSplit)) return;
}
}
String query = "select * from qn_ai_alarm where id = ?";
AIWarningDataReceivedEvent.Alarm image = value.getAlarmList().get(0);
Entity alarm = Db.use().queryOne(query, image.getAlarmId());
@ -244,6 +282,25 @@ public class RootCloudIotDataFormatterJob {
statement2.executeBatch();
}
private boolean checkLocalTime(AIWarningDataReceivedEvent value,
String startLocalTime,
String[] startLocalTimeSplit,
String endLocalTime,
String[] endLocalTimeSplit) {
if (StringUtils.isNotBlank(startLocalTime) && StringUtils.isNotBlank(endLocalTime)) {
Long receivedTimeL = value.getReceivedTime();
LocalDateTime receivedTime = Instant.ofEpochMilli(receivedTimeL).atZone(zoneId).toLocalDateTime();
LocalDate localDate = receivedTime.toLocalDate();
LocalDateTime localStartDateTime = localDate.atTime(LocalTime.of(Integer.parseInt(startLocalTimeSplit[0]), Integer.parseInt(startLocalTimeSplit[1]),
Integer.parseInt(startLocalTimeSplit[2])));
LocalDateTime localEndDateTime = localDate.atTime(LocalTime.of(Integer.parseInt(endLocalTimeSplit[0]), Integer.parseInt(endLocalTimeSplit[1]),
Integer.parseInt(endLocalTimeSplit[2])));
return receivedTime.isBefore(localStartDateTime) || receivedTime.isAfter(localEndDateTime);
}
return false;
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.cj.jdbc.Driver");

59
ai-root-cloud-statistics/src/main/resources/config/redis.setting

@ -0,0 +1,59 @@
#-------------------------------------------------------------------------------
# Redis客户端配置样例
# 每一个分组代表一个Redis实例
# 无分组的Pool配置为所有分组的共用配置,如果分组自己定义Pool配置,则覆盖共用配置
# 池配置来自于:https://www.cnblogs.com/jklk/p/7095067.html
#-------------------------------------------------------------------------------
#----- 默认(公有)配置
# 地址,默认localhost
host = localhost
# 端口,默认6379
port = 6379
# 超时,默认2000
timeout = 2000
# 连接超时,默认timeout
connectionTimeout = 2000
# 读取超时,默认timeout
soTimeout = 2000
# 密码,默认无
password =
# 数据库序号,默认0
database = 0
# 客户端名称
clientName = Qniao
# SSL连接,默认false
ssl = false;
#----- 自定义分组的连接
[custom]
# 地址,默认localhost
host = localhost
# 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true
BlockWhenExhausted = true;
# 设置的逐出策略类名, 默认DefaultEvictionPolicy(当连接超过最大空闲时间,或连接数超过最大空闲连接数)
evictionPolicyClassName = org.apache.commons.pool2.impl.DefaultEvictionPolicy
# 是否启用pool的jmx管理功能, 默认true
jmxEnabled = true;
# 是否启用后进先出, 默认true
lifo = true;
# 最大空闲连接数, 默认8个
maxIdle = 8
# 最小空闲连接数, 默认0
minIdle = 0
# 最大连接数, 默认8个
maxTotal = 8
# 获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1
maxWaitMillis = -1
# 逐出连接的最小空闲时间 默认1800000毫秒(30分钟)
minEvictableIdleTimeMillis = 1800000
# 每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3
numTestsPerEvictionRun = 3;
# 对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
SoftMinEvictableIdleTimeMillis = 1800000
# 在获取连接的时候检查有效性, 默认false
testOnBorrow = false
# 在空闲时检查有效性, 默认false
testWhileIdle = false
# 逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
timeBetweenEvictionRunsMillis = -1
Loading…
Cancel
Save