From 82ad1e0ee3d893e60024107b65a7f829e44b53dc Mon Sep 17 00:00:00 2001 From: zangkun Date: Fri, 8 Jul 2022 11:25:59 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E6=9C=BA=E5=99=A8=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E7=9A=84=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iot-machine-state-event-generator-job/pom.xml | 8 +++++++- .../event/generator/job/IotMachineEventGeneratorJob.java | 4 +++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/iot-machine-state-event-generator-job/pom.xml b/iot-machine-state-event-generator-job/pom.xml index e776894..b7b021b 100644 --- a/iot-machine-state-event-generator-job/pom.xml +++ b/iot-machine-state-event-generator-job/pom.xml @@ -26,6 +26,11 @@ printing-packaging-factory-service-event 0.0.1-SNAPSHOT + + com.qniao + iot-machine-data-event + 0.0.1-SNAPSHOT + @@ -131,7 +136,8 @@ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> - com.qniao.iot.machine.event.generator.job.IotMachineEventGeneratorJob + com.qniao.iot.machine.event.generator.job.IotMachineEventGeneratorJob + diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 8b4587b..9671ba4 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -14,10 +14,12 @@ public class IotMachineEventGeneratorJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); + + // 定义Kafka数据源 KafkaSource source = KafkaSource.builder() .setBootstrapServers(params.get("source.bootstrap.servers")) .setTopics("root_cloud_iot_report_data_event") - .setGroupId("root_cloud_iot_data_etl") + .setGroupId("iot.machine.generator") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .build();