diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a09c56d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/.idea diff --git a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java index bb96749..66c5e3d 100644 --- a/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java +++ b/ai-root-cloud-event/src/main/java/com/qniao/rootcloudevent/AIWarningDataReceivedEvent.java @@ -32,7 +32,7 @@ public class AIWarningDataReceivedEvent implements Serializable { private String machineIotMac; /** - * 接受事件时间 + * 接受事件时间(毫秒) */ private Long receivedTime; diff --git a/ai-root-cloud-statistics/pom.xml b/ai-root-cloud-statistics/pom.xml index 6c1fe03..eecce3e 100644 --- a/ai-root-cloud-statistics/pom.xml +++ b/ai-root-cloud-statistics/pom.xml @@ -117,6 +117,13 @@ apollo-core 2.0.1 + + + redis.clients + jedis + 3.7.0 + + diff --git a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java index f66538c..226bdc5 100644 --- a/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java +++ b/ai-root-cloud-statistics/src/main/java/com/qniao/rootcloudstatistics/RootCloudIotDataFormatterJob.java @@ -24,6 +24,7 @@ import cn.hutool.core.convert.Convert; import cn.hutool.core.util.StrUtil; import cn.hutool.db.Db; import cn.hutool.db.Entity; +import cn.hutool.db.nosql.redis.RedisDS; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedPayLoadEvent; @@ -36,6 +37,7 @@ import com.qniao.rootcloudstatistics.event.AIWarningDataReceivedEventSerializati import com.qniao.rootcloudstatistics.until.OSSUtils; import com.qniao.rootcloudstatistics.until.SnowFlake; import com.rabbitmq.client.AMQP; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; @@ -55,6 +57,7 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.config.SaslConfigs; +import redis.clients.jedis.Jedis; import java.sql.Connection; import java.sql.DriverManager; @@ -82,6 +85,17 @@ public class RootCloudIotDataFormatterJob { static SnowFlake snowflake = new SnowFlake(1L, 1L); + static Jedis jedis = RedisDS.create().getJedis(); + + static String redisKey = "ai_warning_data_received_event"; + + static String START_LOCAL_TIME = "startLocalTime"; + + static String END_LOCAL_TIME = "endLocalTime"; + + static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + static ZoneId zoneId = ZoneId.systemDefault(); public static void main(String[] args) throws Exception { @@ -214,6 +228,30 @@ public class RootCloudIotDataFormatterJob { @Override public void invoke(AIWarningDataReceivedEvent value, Context context) throws Exception { + + + // 通过时间范围对数据进行过滤 + // 查询redis缓存是否存在该摄像头的休眠时间范围 + String redisValue = jedis.hget(redisKey, value.getCameraId().toString()); + if (StringUtils.isNotBlank(redisValue)) { + JSONObject jsonObject = JSONUtil.parseObj(redisValue); + String startLocalTime = jsonObject.getStr(START_LOCAL_TIME); + String[] startLocalTimeSplit = startLocalTime.split(":"); + String endLocalTime = jsonObject.getStr(END_LOCAL_TIME); + String[] endLocalTimeSplit = endLocalTime.split(":"); + if (checkLocalTime(value, startLocalTime, startLocalTimeSplit, endLocalTime, endLocalTimeSplit)) return; + } else { + // 查询数据库 + String query = "select * from qn_ai_alarm_sleep_time_config where camera_id = ?"; + Entity alarmSleepConfig = Db.use().queryOne(query, value.getCameraId()); + if (Objects.nonNull(alarmSleepConfig)) { + String startLocalTime = alarmSleepConfig.getStr("start_local_time"); + String[] startLocalTimeSplit = startLocalTime.split(":"); + String endLocalTime = alarmSleepConfig.getStr("end_local_time"); + String[] endLocalTimeSplit = endLocalTime.split(":"); + if (checkLocalTime(value, startLocalTime, startLocalTimeSplit, endLocalTime, endLocalTimeSplit)) return; + } + } String query = "select * from qn_ai_alarm where id = ?"; AIWarningDataReceivedEvent.Alarm image = value.getAlarmList().get(0); Entity alarm = Db.use().queryOne(query, image.getAlarmId()); @@ -244,6 +282,25 @@ public class RootCloudIotDataFormatterJob { statement2.executeBatch(); } + private boolean checkLocalTime(AIWarningDataReceivedEvent value, + String startLocalTime, + String[] startLocalTimeSplit, + String endLocalTime, + String[] endLocalTimeSplit) { + + if (StringUtils.isNotBlank(startLocalTime) && StringUtils.isNotBlank(endLocalTime)) { + Long receivedTimeL = value.getReceivedTime(); + LocalDateTime receivedTime = Instant.ofEpochMilli(receivedTimeL).atZone(zoneId).toLocalDateTime(); + LocalDate localDate = receivedTime.toLocalDate(); + LocalDateTime localStartDateTime = localDate.atTime(LocalTime.of(Integer.parseInt(startLocalTimeSplit[0]), Integer.parseInt(startLocalTimeSplit[1]), + Integer.parseInt(startLocalTimeSplit[2]))); + LocalDateTime localEndDateTime = localDate.atTime(LocalTime.of(Integer.parseInt(endLocalTimeSplit[0]), Integer.parseInt(endLocalTimeSplit[1]), + Integer.parseInt(endLocalTimeSplit[2]))); + return receivedTime.isBefore(localStartDateTime) || receivedTime.isAfter(localEndDateTime); + } + return false; + } + @Override public void open(Configuration parameters) throws Exception { Class.forName("com.mysql.cj.jdbc.Driver"); diff --git a/ai-root-cloud-statistics/src/main/resources/config/redis.setting b/ai-root-cloud-statistics/src/main/resources/config/redis.setting new file mode 100644 index 0000000..253ae6e --- /dev/null +++ b/ai-root-cloud-statistics/src/main/resources/config/redis.setting @@ -0,0 +1,59 @@ +#------------------------------------------------------------------------------- +# Redis客户端配置样例 +# 每一个分组代表一个Redis实例 +# 无分组的Pool配置为所有分组的共用配置,如果分组自己定义Pool配置,则覆盖共用配置 +# 池配置来自于:https://www.cnblogs.com/jklk/p/7095067.html +#------------------------------------------------------------------------------- + +#----- 默认(公有)配置 +# 地址,默认localhost +host = localhost +# 端口,默认6379 +port = 6379 +# 超时,默认2000 +timeout = 2000 +# 连接超时,默认timeout +connectionTimeout = 2000 +# 读取超时,默认timeout +soTimeout = 2000 +# 密码,默认无 +password = +# 数据库序号,默认0 +database = 0 +# 客户端名称 +clientName = Qniao +# SSL连接,默认false +ssl = false; + +#----- 自定义分组的连接 +[custom] +# 地址,默认localhost +host = localhost +# 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true +BlockWhenExhausted = true; +# 设置的逐出策略类名, 默认DefaultEvictionPolicy(当连接超过最大空闲时间,或连接数超过最大空闲连接数) +evictionPolicyClassName = org.apache.commons.pool2.impl.DefaultEvictionPolicy +# 是否启用pool的jmx管理功能, 默认true +jmxEnabled = true; +# 是否启用后进先出, 默认true +lifo = true; +# 最大空闲连接数, 默认8个 +maxIdle = 8 +# 最小空闲连接数, 默认0 +minIdle = 0 +# 最大连接数, 默认8个 +maxTotal = 8 +# 获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间, 默认-1 +maxWaitMillis = -1 +# 逐出连接的最小空闲时间 默认1800000毫秒(30分钟) +minEvictableIdleTimeMillis = 1800000 +# 每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3 +numTestsPerEvictionRun = 3; +# 对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略) +SoftMinEvictableIdleTimeMillis = 1800000 +# 在获取连接的时候检查有效性, 默认false +testOnBorrow = false +# 在空闲时检查有效性, 默认false +testWhileIdle = false +# 逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1 +timeBetweenEvictionRunsMillis = -1