From ff41ffe45d46ef6aaf4a949372972d21b7d0f42e Mon Sep 17 00:00:00 2001 From: zangkun Date: Mon, 4 Jul 2022 17:07:54 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86kafka=E7=A1=AC=E7=BC=96=E7=A0=81?= =?UTF-8?q?=E7=9A=84=E5=9C=B0=E5=9D=80=E6=94=B9=E4=B8=BA=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E8=BF=90=E8=A1=8C=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 5 ++++- .../com/qniao/iot/rc/RootCloudIotDataFormatterJob.java | 9 ++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 0664c55..00971d2 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,6 @@ # comm-root-cloud-iot-formatter -根云IOT数据规格化程序 \ No newline at end of file +根云IOT数据规格化程序 + +运行程序时,配置程序执行参数: +-source.bootstrap.servers 120.25.199.30:9092 -sink.bootstrap.servers 120.25.199.30:9092 \ No newline at end of file diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index cc44bd9..c6d1eff 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -24,6 +24,7 @@ import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchem import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; @@ -52,12 +53,14 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin public class RootCloudIotDataFormatterJob { public static void main(String[] args) throws Exception { + final ParameterTool params = ParameterTool.fromArgs(args); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); KafkaSource source = KafkaSource.builder() - .setBootstrapServers("120.25.199.30:9092") + .setBootstrapServers(params.get("source.bootstrap.servers")) .setTopics("root_cloud_iot_report_data_event") - .setGroupId("flink-kafka-demo") + .setGroupId("root_cloud_iot_data_etl") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); @@ -79,7 +82,7 @@ public class RootCloudIotDataFormatterJob { // 再发送到kafka队列中 transformDs.sinkTo( KafkaSink.builder() - .setBootstrapServers("120.25.199.30:9092") + .setBootstrapServers(params.get("sink.bootstrap.servers")) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("machine_iot_data_received_event")