Compare commits

...

3 Commits

Author SHA1 Message Date
1049970895@qniao.cn 8fc6c4e3c0 新增报警过滤规则 2 years ago
1049970895@qniao.cn ccd68944b5 新增报警过滤规则 2 years ago
1049970895@qniao.cn 7665af1b5e 新增报警过滤规则 2 years ago
5 changed files with 93 additions and 1 deletions
Split View
  1. 1
      .gitignore
  2. 2
      ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java
  3. 7
      ai-root-cloud-statistics/pom.xml
  4. 58
      ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java
  5. 26
      ai-root-cloud-statistics/src/main/resources/config/redis.setting

1
.gitignore

@ -0,0 +1 @@
/.idea

2
ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java

@ -32,7 +32,7 @@ public class AIWarningDataReceivedEvent implements Serializable {
private String machineIotMac;
/**
* 接受事件时间
* 接受事件时间毫秒
*/
private Long receivedTime;

7
ai-root-cloud-statistics/pom.xml

@ -117,6 +117,13 @@
<artifactId>apollo-core</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
</dependencies>
<build>

58
ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java

@ -24,6 +24,7 @@ import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Db;
import cn.hutool.db.Entity;
import cn.hutool.db.nosql.redis.RedisDS;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedPayLoadEvent;
@ -36,6 +37,7 @@ import com.qniao.rootcloudstatistics.event.AIWarningDataReceivedEventSerializati
import com.qniao.rootcloudstatistics.until.OSSUtils;
import com.qniao.rootcloudstatistics.until.SnowFlake;
import com.rabbitmq.client.AMQP;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
@ -55,6 +57,7 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.config.SaslConfigs;
import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.sql.DriverManager;
@ -82,6 +85,17 @@ public class RootCloudIotDataFormatterJob {
static SnowFlake snowflake = new SnowFlake(1L, 1L);
static Jedis jedis = RedisDS.create().getJedis();
static String redisKey = "ai_warning_data_received_event";
static String START_LOCAL_TIME = "startLocalTime";
static String END_LOCAL_TIME = "endLocalTime";
static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
static ZoneId zoneId = ZoneId.systemDefault();
public static void main(String[] args) throws Exception {
@ -214,6 +228,31 @@ public class RootCloudIotDataFormatterJob {
@Override
public void invoke(AIWarningDataReceivedEvent value, Context context) throws Exception {
// 通过时间范围对数据进行过滤
// 查询redis缓存是否存在该摄像头的休眠时间范围
// todo 待放开
/*String redisValue = jedis.hget(redisKey, value.getCameraId().toString());
if (StringUtils.isNotBlank(redisValue)) {
JSONObject jsonObject = JSONUtil.parseObj(redisValue);
String startLocalTime = jsonObject.getStr(START_LOCAL_TIME);
String[] startLocalTimeSplit = startLocalTime.split(":");
String endLocalTime = jsonObject.getStr(END_LOCAL_TIME);
String[] endLocalTimeSplit = endLocalTime.split(":");
if (checkLocalTime(value, startLocalTime, startLocalTimeSplit, endLocalTime, endLocalTimeSplit)) return;
} else {
// 查询数据库
String query = "select * from qn_ai_alarm_sleep_time_config where camera_id = ?";
Entity alarmSleepConfig = Db.use().queryOne(query, value.getCameraId());
if (Objects.nonNull(alarmSleepConfig)) {
String startLocalTime = alarmSleepConfig.getStr("start_local_time");
String[] startLocalTimeSplit = startLocalTime.split(":");
String endLocalTime = alarmSleepConfig.getStr("end_local_time");
String[] endLocalTimeSplit = endLocalTime.split(":");
if (checkLocalTime(value, startLocalTime, startLocalTimeSplit, endLocalTime, endLocalTimeSplit)) return;
}
}*/
String query = "select * from qn_ai_alarm where id = ?";
AIWarningDataReceivedEvent.Alarm image = value.getAlarmList().get(0);
Entity alarm = Db.use().queryOne(query, image.getAlarmId());
@ -244,6 +283,25 @@ public class RootCloudIotDataFormatterJob {
statement2.executeBatch();
}
private boolean checkLocalTime(AIWarningDataReceivedEvent value,
String startLocalTime,
String[] startLocalTimeSplit,
String endLocalTime,
String[] endLocalTimeSplit) {
if (StringUtils.isNotBlank(startLocalTime) && StringUtils.isNotBlank(endLocalTime)) {
Long receivedTimeL = value.getReceivedTime();
LocalDateTime receivedTime = Instant.ofEpochMilli(receivedTimeL).atZone(zoneId).toLocalDateTime();
LocalDate localDate = receivedTime.toLocalDate();
LocalDateTime localStartDateTime = localDate.atTime(LocalTime.of(Integer.parseInt(startLocalTimeSplit[0]), Integer.parseInt(startLocalTimeSplit[1]),
Integer.parseInt(startLocalTimeSplit[2])));
LocalDateTime localEndDateTime = localDate.atTime(LocalTime.of(Integer.parseInt(endLocalTimeSplit[0]), Integer.parseInt(endLocalTimeSplit[1]),
Integer.parseInt(endLocalTimeSplit[2])));
return receivedTime.isBefore(localStartDateTime) || receivedTime.isAfter(localEndDateTime);
}
return false;
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.cj.jdbc.Driver");

26
ai-root-cloud-statistics/src/main/resources/config/redis.setting

@ -0,0 +1,26 @@
#-------------------------------------------------------------------------------
# Redis客户端配置样例
# 每一个分组代表一个Redis实例
# 无分组的Pool配置为所有分组的共用配置,如果分组自己定义Pool配置,则覆盖共用配置
# 池配置来自于:https://www.cnblogs.com/jklk/p/7095067.html
#-------------------------------------------------------------------------------
#----- 默认(公有)配置
# 地址,默认localhost
host = r-wz90lusc9p6wi53glmpd.redis.rds.aliyuncs.com
# 端口,默认6379
port = 6379
# 超时,默认2000
timeout = 2000
# 连接超时,默认timeout
connectionTimeout = 2000
# 读取超时,默认timeout
soTimeout = 2000
# 密码,默认无
password = 1M24%$dc&
# 数据库序号,默认0
database = 0
# 客户端名称
clientName = Qniao
# SSL连接,默认false
ssl = false;
Loading…
Cancel
Save