|
|
|
@ -18,7 +18,10 @@ |
|
|
|
|
|
|
|
package com.qniao.iot.rc; |
|
|
|
|
|
|
|
import com.ctrip.framework.apollo.ConfigService; |
|
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; |
|
|
|
import com.qniao.iot.rc.config.ApolloConfig; |
|
|
|
import com.qniao.iot.rc.constant.ConfigConstant; |
|
|
|
import com.qniao.iot.rc.constant.DataSource; |
|
|
|
import com.qniao.iot.rc.event.MachineIotDataReceivedEventSerializationSchema; |
|
|
|
import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema; |
|
|
|
@ -61,9 +64,9 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|
|
|
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|
|
|
KafkaSource<RootCloudIotDataReceiptedEvent> source = KafkaSource.<RootCloudIotDataReceiptedEvent>builder() |
|
|
|
.setBootstrapServers(params.get("source.bootstrap.servers")) |
|
|
|
.setTopics("root_cloud_iot_report_data_event") |
|
|
|
.setGroupId("root_cloud_iot_data_etl") |
|
|
|
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
|
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) |
|
|
|
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) |
|
|
|
.setStartingOffsets(OffsetsInitializer.earliest()) |
|
|
|
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) |
|
|
|
.build(); |
|
|
|
@ -77,10 +80,10 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
// 写入kafka |
|
|
|
transformDs.sinkTo( |
|
|
|
KafkaSink.<MachineIotDataReceivedEvent>builder() |
|
|
|
.setBootstrapServers(params.get("sink.bootstrap.servers")) |
|
|
|
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) |
|
|
|
.setRecordSerializer( |
|
|
|
KafkaRecordSerializationSchema.builder() |
|
|
|
.setTopic("machine_iot_data_received_event") |
|
|
|
.setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS)) |
|
|
|
.setValueSerializationSchema(new MachineIotDataReceivedEventSerializationSchema()) |
|
|
|
.build() |
|
|
|
).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) |
|
|
|
|