diff --git a/iot-machine-state-event-generator-job/pom.xml b/iot-machine-state-event-generator-job/pom.xml index e776894..b7b021b 100644 --- a/iot-machine-state-event-generator-job/pom.xml +++ b/iot-machine-state-event-generator-job/pom.xml @@ -26,6 +26,11 @@ printing-packaging-factory-service-event 0.0.1-SNAPSHOT + + com.qniao + iot-machine-data-event + 0.0.1-SNAPSHOT + @@ -131,7 +136,8 @@ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> - com.qniao.iot.machine.event.generator.job.IotMachineEventGeneratorJob + com.qniao.iot.machine.event.generator.job.IotMachineEventGeneratorJob + diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 8b4587b..9671ba4 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -14,10 +14,12 @@ public class IotMachineEventGeneratorJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); + + // 定义Kafka数据源 KafkaSource source = KafkaSource.builder() .setBootstrapServers(params.get("source.bootstrap.servers")) .setTopics("root_cloud_iot_report_data_event") - .setGroupId("root_cloud_iot_data_etl") + .setGroupId("iot.machine.generator") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .build();