From b85fae8d014855e643397aafff65a0fdf278da1d Mon Sep 17 00:00:00 2001
From: "1049970895@qniao.cn" <1049970895>
Date: Fri, 29 Jul 2022 18:08:48 +0800
Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../rc/RootCloudIotDataEventSourceMocker.java | 6 +-
.../dependency-reduced-pom.xml | 6 ++
root-cloud-statistics/pom.xml | 18 ++++
.../iot/rc/RootCloudIotDataFormatterJob.java | 18 +++-
.../com/qniao/iot/rc/CloudBoxEventJob.java | 90 -------------------
5 files changed, 44 insertions(+), 94 deletions(-)
delete mode 100644 root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java
diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java
index d23d198..d211998 100644
--- a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java
+++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java
@@ -81,11 +81,11 @@ public class RootCloudIotDataEventSourceMocker {
private static Properties createKafkaProperties() {
Properties kafkaProps = new Properties();
// 本地环境
- kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "SASL_PLAINTEXT://localhost:9093");
+ // kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19093");
// 测试环境
//kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.29.115.145:9092");
// 正式环境
- //kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092");
+ kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:19092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
@@ -94,7 +94,7 @@ public class RootCloudIotDataEventSourceMocker {
kafkaProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
kafkaProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
kafkaProps.put("sasl.jaas.config",
- "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";");
+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"qnkafka\" password=\"qnkafkaonetwogo\";");
diff --git a/root-cloud-statistics/dependency-reduced-pom.xml b/root-cloud-statistics/dependency-reduced-pom.xml
index 1569da1..959d952 100644
--- a/root-cloud-statistics/dependency-reduced-pom.xml
+++ b/root-cloud-statistics/dependency-reduced-pom.xml
@@ -77,6 +77,12 @@
2.17.2
runtime
+
+ com.alibaba
+ druid
+ 1.1.12
+ test
+
diff --git a/root-cloud-statistics/pom.xml b/root-cloud-statistics/pom.xml
index 1921f49..6077f77 100644
--- a/root-cloud-statistics/pom.xml
+++ b/root-cloud-statistics/pom.xml
@@ -159,6 +159,24 @@ under the License.
mysql-connector-java
8.0.29
+
+
+ com.alibaba
+ fastjson
+ 1.2.31
+
+
+
+ com.qniao
+ ddd-event
+ 0.0.1-SNAPSHOT
+
+
+
+ io.netty
+ netty-all
+ 4.1.42.Final
+
diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
index 78185ff..3f64b4f 100644
--- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
+++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
@@ -39,10 +39,15 @@ import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.SaslConfigs;
import java.math.BigDecimal;
-import java.util.Objects;
+import java.security.Provider;
+import java.util.*;
/**
* Skeleton for a Flink DataStream Job.
@@ -74,7 +79,12 @@ public class RootCloudIotDataFormatterJob {
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUPID))
+ /*.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
+ .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN")
+ .setProperty("sasl.jaas.config",
+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";")*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
+ //.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema())
.build();
@@ -84,10 +94,16 @@ public class RootCloudIotDataFormatterJob {
.map((MapFunction) RootCloudIotDataFormatterJob::transform)
.name("Transform MachineIotDataReceivedEvent");
+ Properties kafkaProducerConfig = new Properties();
+ kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ kafkaProducerConfig.setProperty("sasl.jaas.config",
+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";");
// 写入kafka
transformDs.sinkTo(
KafkaSink.builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SINK_KAFKA_BOOTSTRAP_SERVERS))
+ //.setKafkaProducerConfig(kafkaProducerConfig)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(ApolloConfig.get(ConfigConstant.SINK_KAFKA_TOPICS))
diff --git a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java b/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java
deleted file mode 100644
index 3028283..0000000
--- a/root-cloud-statistics/src/test/java/com/qniao/iot/rc/CloudBoxEventJob.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.qniao.iot.rc;
-
-import cn.hutool.core.collection.ListUtil;
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.json.JSONUtil;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.connector.kafka.source.KafkaSource;
-import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.util.Collector;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-import java.time.Duration;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.util.*;
-
-
-public class CloudBoxEventJob {
-
- public static void main(String[] args) throws Exception {
-
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
- env.getConfig().setAutoWatermarkInterval(1000L);
- env.setParallelism(1);
- /*Map offsets = new HashMap<>();
- TopicPartition topicPartition = new TopicPartition("data-message-channel-qn", 0);
- offsets.put(topicPartition, 5872534L);*/
- KafkaSource source = KafkaSource.builder()
- .setBootstrapServers("172.19.14.225:9092")
- .setTopics("data-message-channel-qn")
- .setGroupId("cloud_box_event_job")
- .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
- .setStartingOffsets(OffsetsInitializer.earliest())
- .setValueOnlyDeserializer(new CloudBoxDataHistoryEventDeserializationSchema())
- .build();
-
- SingleOutputStreamOperator fromSource = env
- .fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)),
- "cloudBoxDataHistoryEvent fromSource")
- .filter((FilterFunction) value -> {
- String eventKey = value.getEventKey();
- return StrUtil.isNotEmpty(eventKey) && eventKey.equals("qn_cloud_box_data_history");
- });
-
- fromSource.print();
-
- SingleOutputStreamOperator flatMap = fromSource
- .flatMap(new RichFlatMapFunction() {
- @Override
- public void flatMap(CloudBoxDataHistoryEvent event, Collector out) {
- String body = event.getBody();
- if (StrUtil.isNotEmpty(body)) {
- Body bean = JSONUtil.toBean(body, Body.class);
- bean.setCurrentTime(LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
- out.collect(bean);
- }
- }
- }).name("cloudBoxDataHistoryEvent flatmap");
- SingleOutputStreamOperator> toMysql = flatMap
- .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))
- .withTimestampAssigner(((body, recordTimestamp) -> body.getCurrentTime())))
- .keyBy(Body::getData_type)
- .window(TumblingEventTimeWindows.of(Time.seconds(2)))
- .process(new ProcessWindowFunction, Integer, TimeWindow>() {
- @Override
- public void process(Integer aLong, ProcessWindowFunction, Integer, TimeWindow>.Context context,
- Iterable elements, Collector> out) {
- List list = ListUtil.toList(elements);
- if (list.size() > 0) {
- out.collect(list);
- }
- }
- }).name("to mysql");
-
- toMysql.addSink(new SinkMysqlFunc()).name("sink to mysql");
-
- env.execute("cloud box event job");
- }
-}