diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java index d23d198..d211998 100644 --- a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java @@ -81,11 +81,11 @@ public class RootCloudIotDataEventSourceMocker { private static Properties createKafkaProperties() { Properties kafkaProps = new Properties(); // 本地环境 - kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SASL_PLAINTEXT://localhost:9093"); + // kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19093"); // 测试环境 //kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.29.115.145:9092"); // 正式环境 - //kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092"); + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092"); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); @@ -94,7 +94,7 @@ public class RootCloudIotDataEventSourceMocker { kafkaProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); kafkaProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); kafkaProps.put("sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";"); diff --git a/root-cloud-statistics/dependency-reduced-pom.xml b/root-cloud-statistics/dependency-reduced-pom.xml index 1569da1..959d952 100644 --- a/root-cloud-statistics/dependency-reduced-pom.xml +++ b/root-cloud-statistics/dependency-reduced-pom.xml @@ -77,6 +77,12 @@ 2.17.2 runtime + + com.alibaba + druid + 1.1.12 + test + diff --git a/root-cloud-statistics/pom.xml b/root-cloud-statistics/pom.xml index 1921f49..6077f77 100644 --- a/root-cloud-statistics/pom.xml +++ b/root-cloud-statistics/pom.xml @@ -159,6 +159,24 @@ under the License. mysql-connector-java 8.0.29 + + + com.alibaba + fastjson + 1.2.31 + + + + com.qniao + ddd-event + 0.0.1-SNAPSHOT + + + + io.netty + netty-all + 4.1.42.Final + diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 78185ff..3f64b4f 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -39,10 +39,15 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.SaslConfigs; import java.math.BigDecimal; -import java.util.Objects; +import java.security.Provider; +import java.util.*; /** * Skeleton for a Flink DataStream Job. @@ -74,7 +79,12 @@ public class RootCloudIotDataFormatterJob { .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID)) + /*.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") + .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") + .setProperty("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";")*/ .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) + //.setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) .build(); @@ -84,10 +94,16 @@ public class RootCloudIotDataFormatterJob { .map((MapFunction) RootCloudIotDataFormatterJob::transform) .name("Transform MachineIotDataReceivedEvent"); + Properties kafkaProducerConfig = new Properties(); + kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); + kafkaProducerConfig.setProperty("sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); // 写入kafka transformDs.sinkTo( KafkaSink.builder() .setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS)) + //.setKafkaProducerConfig(kafkaProducerConfig) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS)) diff --git a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java deleted file mode 100644 index 3028283..0000000 --- a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.qniao.iot.rc; - -import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONUtil; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.connector.kafka.source.KafkaSource; -import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; -import org.apache.kafka.clients.consumer.ConsumerConfig; - -import java.time.Duration; -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.util.*; - - -public class CloudBoxEventJob { - - public static void main(String[] args) throws Exception { - - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); - env.getConfig().setAutoWatermarkInterval(1000L); - env.setParallelism(1); - /*Map offsets = new HashMap<>(); - TopicPartition topicPartition = new TopicPartition("data-message-channel-qn", 0); - offsets.put(topicPartition, 5872534L);*/ - KafkaSource source = KafkaSource.builder() - .setBootstrapServers("172.19.14.225:9092") - .setTopics("data-message-channel-qn") - .setGroupId("cloud_box_event_job") - .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") - .setStartingOffsets(OffsetsInitializer.earliest()) - .setValueOnlyDeserializer(new CloudBoxDataHistoryEventDeserializationSchema()) - .build(); - - SingleOutputStreamOperator fromSource = env - .fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)), - "cloudBoxDataHistoryEvent fromSource") - .filter((FilterFunction) value -> { - String eventKey = value.getEventKey(); - return StrUtil.isNotEmpty(eventKey) && eventKey.equals("qn_cloud_box_data_history"); - }); - - fromSource.print(); - - SingleOutputStreamOperator flatMap = fromSource - .flatMap(new RichFlatMapFunction() { - @Override - public void flatMap(CloudBoxDataHistoryEvent event, Collector out) { - String body = event.getBody(); - if (StrUtil.isNotEmpty(body)) { - Body bean = JSONUtil.toBean(body, Body.class); - bean.setCurrentTime(LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli()); - out.collect(bean); - } - } - }).name("cloudBoxDataHistoryEvent flatmap"); - SingleOutputStreamOperator> toMysql = flatMap - .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)) - .withTimestampAssigner(((body, recordTimestamp) -> body.getCurrentTime()))) - .keyBy(Body::getData_type) - .window(TumblingEventTimeWindows.of(Time.seconds(2))) - .process(new ProcessWindowFunction, Integer, TimeWindow>() { - @Override - public void process(Integer aLong, ProcessWindowFunction, Integer, TimeWindow>.Context context, - Iterable elements, Collector> out) { - List list = ListUtil.toList(elements); - if (list.size() > 0) { - out.collect(list); - } - } - }).name("to mysql"); - - toMysql.addSink(new SinkMysqlFunc()).name("sink to mysql"); - - env.execute("cloud box event job"); - } -}