|
|
|
@ -21,8 +21,9 @@ package com.qniao.rootcloudstatistics; |
|
|
|
|
|
|
|
import cn.hutool.core.collection.CollUtil; |
|
|
|
import cn.hutool.core.convert.Convert; |
|
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema; |
|
|
|
import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedEvent; |
|
|
|
import cn.hutool.json.JSONObject; |
|
|
|
import cn.hutool.json.JSONUtil; |
|
|
|
import com.qniao.rootcloudevent.AIRootCloudWarningDataReceivedPayLoadEvent; |
|
|
|
import com.qniao.rootcloudevent.AIWarningDataReceivedEvent; |
|
|
|
import com.qniao.rootcloudstatistics.config.ApolloConfig; |
|
|
|
import com.qniao.rootcloudstatistics.constant.ConfigConstant; |
|
|
|
@ -35,9 +36,6 @@ import com.rabbitmq.client.AMQP; |
|
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
|
import org.apache.flink.api.common.functions.MapFunction; |
|
|
|
import org.apache.flink.api.common.serialization.SimpleStringEncoder; |
|
|
|
import org.apache.flink.connector.base.DeliveryGuarantee; |
|
|
|
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; |
|
|
|
import org.apache.flink.connector.kafka.sink.KafkaSink; |
|
|
|
import org.apache.flink.connector.kafka.source.KafkaSource; |
|
|
|
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
|
|
|
import org.apache.flink.core.fs.Path; |
|
|
|
@ -55,7 +53,6 @@ import org.apache.kafka.common.config.SaslConfigs; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Objects; |
|
|
|
import java.util.Properties; |
|
|
|
|
|
|
|
/** |
|
|
|
* Skeleton for a Flink DataStream Job. |
|
|
|
@ -81,10 +78,10 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
|
|
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|
|
|
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|
|
|
KafkaSource<AIRootCloudWarningDataReceivedEvent> source = KafkaSource.<AIRootCloudWarningDataReceivedEvent>builder() |
|
|
|
KafkaSource<JSONObject> source = KafkaSource.<JSONObject>builder() |
|
|
|
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
|
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) |
|
|
|
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) |
|
|
|
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) |
|
|
|
.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") |
|
|
|
.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") |
|
|
|
.setProperty("sasl.jaas.config", |
|
|
|
@ -96,7 +93,7 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
// 把树根的数据转成我们自己的格式 |
|
|
|
SingleOutputStreamOperator<AIWarningDataReceivedEvent> transformDs = env |
|
|
|
.fromSource(source, WatermarkStrategy.noWatermarks(), "AIRootCloudWarningDataReceivedEvent Source") |
|
|
|
.map((MapFunction<AIRootCloudWarningDataReceivedEvent, AIWarningDataReceivedEvent>) RootCloudIotDataFormatterJob::transform) |
|
|
|
.map((MapFunction<JSONObject, AIWarningDataReceivedEvent>) RootCloudIotDataFormatterJob::transform) |
|
|
|
.name("Transform AIWarningDataReceivedEvent"); |
|
|
|
|
|
|
|
// rabbitmq配置 |
|
|
|
@ -142,21 +139,23 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static AIWarningDataReceivedEvent transform(AIRootCloudWarningDataReceivedEvent event) { |
|
|
|
private static AIWarningDataReceivedEvent transform(JSONObject event) { |
|
|
|
AIWarningDataReceivedEvent aiWaringDataReceivedEvent = new AIWarningDataReceivedEvent(); |
|
|
|
if (Objects.nonNull(event) && Objects.nonNull(event.getPayload())) { |
|
|
|
Object object = event.get("payload"); |
|
|
|
if (Objects.nonNull(object)) { |
|
|
|
AIRootCloudWarningDataReceivedPayLoadEvent payLoadEvent = JSONUtil.toBean(object.toString(), AIRootCloudWarningDataReceivedPayLoadEvent.class); |
|
|
|
aiWaringDataReceivedEvent.setId(snowflake.nextId()); |
|
|
|
aiWaringDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); |
|
|
|
aiWaringDataReceivedEvent.setMachineIotMac(event.getPayload().getDeviceId()); |
|
|
|
aiWaringDataReceivedEvent.setMachineIotMac(payLoadEvent.getDeviceId()); |
|
|
|
aiWaringDataReceivedEvent.setReceivedTime(System.currentTimeMillis()); |
|
|
|
|
|
|
|
if (Objects.nonNull(event.getPayload().getInfo()) && CollUtil.isNotEmpty(event.getPayload().getInfo().getAlarmList())) { |
|
|
|
AIRootCloudWarningDataReceivedEvent.Info info = event.getPayload().getInfo(); |
|
|
|
if (Objects.nonNull(payLoadEvent.getInfo()) && CollUtil.isNotEmpty(payLoadEvent.getInfo().getAlarmList())) { |
|
|
|
AIRootCloudWarningDataReceivedPayLoadEvent.Info info = payLoadEvent.getInfo(); |
|
|
|
aiWaringDataReceivedEvent.setCameraId(info.getCameraId()); |
|
|
|
aiWaringDataReceivedEvent.setCameraName(info.getCameraName()); |
|
|
|
aiWaringDataReceivedEvent.setCameraPosition(info.getCameraPosition()); |
|
|
|
List<AIWarningDataReceivedEvent.Alarm> alarmList = new ArrayList<>(); |
|
|
|
event.getPayload().getInfo().getAlarmList().forEach(a -> { |
|
|
|
payLoadEvent.getInfo().getAlarmList().forEach(a -> { |
|
|
|
AIWarningDataReceivedEvent.Alarm alarm = Convert.convert(AIWarningDataReceivedEvent.Alarm.class, a); |
|
|
|
alarm.setPicUrl(OSSUtils.getFileUrl(a.getPicUrl())); |
|
|
|
alarm.setThumbnail(OSSUtils.getFileUrl(a.getThumbnail())); |
|
|
|
|