Browse Source

将kafka硬编码的地址改为执行运行参数

hph_优化版本
zangkun 3 years ago
parent
commit
ff41ffe45d
2 changed files with 10 additions and 4 deletions
  1. 5
      README.md
  2. 9
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

5
README.md

@ -1,3 +1,6 @@
# comm-root-cloud-iot-formatter
根云IOT数据规格化程序
根云IOT数据规格化程序
运行程序时,配置程序执行参数:
-source.bootstrap.servers 120.25.199.30:9092 -sink.bootstrap.servers 120.25.199.30:9092

9
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -24,6 +24,7 @@ import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchem
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
@ -52,12 +53,14 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin
public class RootCloudIotDataFormatterJob {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<RootCloudIotDataReceiptedEvent> source = KafkaSource.<RootCloudIotDataReceiptedEvent>builder()
.setBootstrapServers("120.25.199.30:9092")
.setBootstrapServers(params.get("source.bootstrap.servers"))
.setTopics("root_cloud_iot_report_data_event")
.setGroupId("flink-kafka-demo")
.setGroupId("root_cloud_iot_data_etl")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema())
.build();
@ -79,7 +82,7 @@ public class RootCloudIotDataFormatterJob {
// 再发送到kafka队列中
transformDs.sinkTo(
KafkaSink.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers("120.25.199.30:9092")
.setBootstrapServers(params.get("sink.bootstrap.servers"))
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("machine_iot_data_received_event")

Loading…
Cancel
Save