|
|
@ -24,6 +24,7 @@ import cn.hutool.core.convert.Convert; |
|
|
import cn.hutool.core.util.StrUtil; |
|
|
import cn.hutool.core.util.StrUtil; |
|
|
import cn.hutool.db.Db; |
|
|
import cn.hutool.db.Db; |
|
|
import cn.hutool.db.Entity; |
|
|
import cn.hutool.db.Entity; |
|
|
|
|
|
import cn.hutool.db.nosql.redis.RedisDS; |
|
|
import cn.hutool.json.JSONObject; |
|
|
import cn.hutool.json.JSONObject; |
|
|
import cn.hutool.json.JSONUtil; |
|
|
import cn.hutool.json.JSONUtil; |
|
|
import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedPayLoadEvent; |
|
|
import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedPayLoadEvent; |
|
|
@ -36,6 +37,7 @@ import com.qniao.rootcloudstatistics.event.AIWarningDataReceivedEventSerializati |
|
|
import com.qniao.rootcloudstatistics.until.OSSUtils; |
|
|
import com.qniao.rootcloudstatistics.until.OSSUtils; |
|
|
import com.qniao.rootcloudstatistics.until.SnowFlake; |
|
|
import com.qniao.rootcloudstatistics.until.SnowFlake; |
|
|
import com.rabbitmq.client.AMQP; |
|
|
import com.rabbitmq.client.AMQP; |
|
|
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
import org.apache.commons.lang3.time.DateFormatUtils; |
|
|
import org.apache.commons.lang3.time.DateFormatUtils; |
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
import org.apache.flink.api.common.functions.MapFunction; |
|
|
import org.apache.flink.api.common.functions.MapFunction; |
|
|
@ -55,6 +57,7 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig |
|
|
import org.apache.kafka.clients.CommonClientConfigs; |
|
|
import org.apache.kafka.clients.CommonClientConfigs; |
|
|
import org.apache.kafka.clients.consumer.OffsetResetStrategy; |
|
|
import org.apache.kafka.clients.consumer.OffsetResetStrategy; |
|
|
import org.apache.kafka.common.config.SaslConfigs; |
|
|
import org.apache.kafka.common.config.SaslConfigs; |
|
|
|
|
|
import redis.clients.jedis.Jedis; |
|
|
|
|
|
|
|
|
import java.sql.Connection; |
|
|
import java.sql.Connection; |
|
|
import java.sql.DriverManager; |
|
|
import java.sql.DriverManager; |
|
|
@ -82,6 +85,17 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
|
|
|
|
|
static SnowFlake snowflake = new SnowFlake(1L, 1L); |
|
|
static SnowFlake snowflake = new SnowFlake(1L, 1L); |
|
|
|
|
|
|
|
|
|
|
|
static Jedis jedis = RedisDS.create().getJedis(); |
|
|
|
|
|
|
|
|
|
|
|
static String redisKey = "ai_warning_data_received_event"; |
|
|
|
|
|
|
|
|
|
|
|
static String START_LOCAL_TIME = "startLocalTime"; |
|
|
|
|
|
|
|
|
|
|
|
static String END_LOCAL_TIME = "endLocalTime"; |
|
|
|
|
|
|
|
|
|
|
|
static String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; |
|
|
|
|
|
|
|
|
|
|
|
static ZoneId zoneId = ZoneId.systemDefault(); |
|
|
|
|
|
|
|
|
public static void main(String[] args) throws Exception { |
|
|
public static void main(String[] args) throws Exception { |
|
|
|
|
|
|
|
|
@ -214,6 +228,31 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
public void invoke(AIWarningDataReceivedEvent value, Context context) throws Exception { |
|
|
public void invoke(AIWarningDataReceivedEvent value, Context context) throws Exception { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 通过时间范围对数据进行过滤 |
|
|
|
|
|
// 查询redis缓存是否存在该摄像头的休眠时间范围 |
|
|
|
|
|
// todo 待放开 |
|
|
|
|
|
/*String redisValue = jedis.hget(redisKey, value.getCameraId().toString()); |
|
|
|
|
|
if (StringUtils.isNotBlank(redisValue)) { |
|
|
|
|
|
JSONObject jsonObject = JSONUtil.parseObj(redisValue); |
|
|
|
|
|
String startLocalTime = jsonObject.getStr(START_LOCAL_TIME); |
|
|
|
|
|
String[] startLocalTimeSplit = startLocalTime.split(":"); |
|
|
|
|
|
String endLocalTime = jsonObject.getStr(END_LOCAL_TIME); |
|
|
|
|
|
String[] endLocalTimeSplit = endLocalTime.split(":"); |
|
|
|
|
|
if (checkLocalTime(value, startLocalTime, startLocalTimeSplit, endLocalTime, endLocalTimeSplit)) return; |
|
|
|
|
|
} else { |
|
|
|
|
|
// 查询数据库 |
|
|
|
|
|
String query = "select * from qn_ai_alarm_sleep_time_config where camera_id = ?"; |
|
|
|
|
|
Entity alarmSleepConfig = Db.use().queryOne(query, value.getCameraId()); |
|
|
|
|
|
if (Objects.nonNull(alarmSleepConfig)) { |
|
|
|
|
|
String startLocalTime = alarmSleepConfig.getStr("start_local_time"); |
|
|
|
|
|
String[] startLocalTimeSplit = startLocalTime.split(":"); |
|
|
|
|
|
String endLocalTime = alarmSleepConfig.getStr("end_local_time"); |
|
|
|
|
|
String[] endLocalTimeSplit = endLocalTime.split(":"); |
|
|
|
|
|
if (checkLocalTime(value, startLocalTime, startLocalTimeSplit, endLocalTime, endLocalTimeSplit)) return; |
|
|
|
|
|
} |
|
|
|
|
|
}*/ |
|
|
String query = "select * from qn_ai_alarm where id = ?"; |
|
|
String query = "select * from qn_ai_alarm where id = ?"; |
|
|
AIWarningDataReceivedEvent.Alarm image = value.getAlarmList().get(0); |
|
|
AIWarningDataReceivedEvent.Alarm image = value.getAlarmList().get(0); |
|
|
Entity alarm = Db.use().queryOne(query, image.getAlarmId()); |
|
|
Entity alarm = Db.use().queryOne(query, image.getAlarmId()); |
|
|
@ -244,6 +283,25 @@ public class RootCloudIotDataFormatterJob { |
|
|
statement2.executeBatch(); |
|
|
statement2.executeBatch(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private boolean checkLocalTime(AIWarningDataReceivedEvent value, |
|
|
|
|
|
String startLocalTime, |
|
|
|
|
|
String[] startLocalTimeSplit, |
|
|
|
|
|
String endLocalTime, |
|
|
|
|
|
String[] endLocalTimeSplit) { |
|
|
|
|
|
|
|
|
|
|
|
if (StringUtils.isNotBlank(startLocalTime) && StringUtils.isNotBlank(endLocalTime)) { |
|
|
|
|
|
Long receivedTimeL = value.getReceivedTime(); |
|
|
|
|
|
LocalDateTime receivedTime = Instant.ofEpochMilli(receivedTimeL).atZone(zoneId).toLocalDateTime(); |
|
|
|
|
|
LocalDate localDate = receivedTime.toLocalDate(); |
|
|
|
|
|
LocalDateTime localStartDateTime = localDate.atTime(LocalTime.of(Integer.parseInt(startLocalTimeSplit[0]), Integer.parseInt(startLocalTimeSplit[1]), |
|
|
|
|
|
Integer.parseInt(startLocalTimeSplit[2]))); |
|
|
|
|
|
LocalDateTime localEndDateTime = localDate.atTime(LocalTime.of(Integer.parseInt(endLocalTimeSplit[0]), Integer.parseInt(endLocalTimeSplit[1]), |
|
|
|
|
|
Integer.parseInt(endLocalTimeSplit[2]))); |
|
|
|
|
|
return receivedTime.isBefore(localStartDateTime) || receivedTime.isAfter(localEndDateTime); |
|
|
|
|
|
} |
|
|
|
|
|
return false; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
public void open(Configuration parameters) throws Exception { |
|
|
public void open(Configuration parameters) throws Exception { |
|
|
Class.forName("com.mysql.cj.jdbc.Driver"); |
|
|
Class.forName("com.mysql.cj.jdbc.Driver"); |
|
|
|