diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 12cfe36..d9a5df6 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -41,8 +41,10 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.config.SaslConfigs; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; @@ -114,6 +116,10 @@ public class IotMachineEventGeneratorJob { .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) + .setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") + .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") + .setProperty("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";") .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())