diff --git a/pom.xml b/pom.xml index e3f3a5a..5ac04a0 100644 --- a/pom.xml +++ b/pom.xml @@ -26,6 +26,7 @@ under the License. root-cloud-source root-cloud-event + root-cloud-mocker root-cloud-statistics pom diff --git a/root-cloud-event/pom.xml b/root-cloud-event/pom.xml index 4d82721..e02fc93 100644 --- a/root-cloud-event/pom.xml +++ b/root-cloud-event/pom.xml @@ -16,6 +16,7 @@ ${target.java.version} 2.17.2 1.18.24 + 2.13.3 @@ -43,22 +44,9 @@ - org.apache.logging.log4j - log4j-slf4j-impl - ${log4j.version} - runtime - - - org.apache.logging.log4j - log4j-api - ${log4j.version} - runtime - - - org.apache.logging.log4j - log4j-core - ${log4j.version} - runtime + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} org.projectlombok diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSerialization.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSerialization.java new file mode 100644 index 0000000..de29d5f --- /dev/null +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSerialization.java @@ -0,0 +1,27 @@ +package com.qniao.iot.rc; + + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.ProducerRecord; + +import javax.annotation.Nullable; + +public class RootCloudIotDataEventSerialization { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private final String topic; + + public RootCloudIotDataEventSerialization(String topic) { + this.topic = topic; + } + + public ProducerRecord serialize( + final RootCloudIotDataReceiptedEvent message, @Nullable final Long timestamp) { + try { + //if topic is null, default topic will be used + return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message)); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Could not serialize record: " + message, e); + } + } +} diff --git a/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java new file mode 100644 index 0000000..da71a91 --- /dev/null +++ b/root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java @@ -0,0 +1,56 @@ +package com.qniao.iot.rc; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.math.BigDecimal; +import java.util.Properties; + +public class RootCloudIotDataEventSourceMocker { + // 延迟:毫秒 + public static final long DELAY = 3000; + + public static void main(String[] args) throws Exception { + // 创建kafka配置属性 + Properties kafkaProps = createKafkaProperties(); + + // 创建Kafka消息的生产者 + KafkaProducer producer = new KafkaProducer<>(kafkaProps); + + String topic = "root_cloud_iot_report_data_event"; + + + // 循环发送事件 + while (true) { + + RootCloudIotDataReceiptedEvent event = new RootCloudIotDataReceiptedEvent(); + event.set__assetId__("10000"); + event.setACC_count(50L); + event.setACC_count_total(500L); + event.setPWR_sta(1); + event.setWorking_sta(1); + event.setStoping_duration("100"); + event.setRunning_duration(new BigDecimal(1250)); + event.setIG_sta(1); + event.setWaiting_duration(new BigDecimal(500)); + ProducerRecord record = new RootCloudIotDataEventSerialization(topic).serialize( + event, + null); + + producer.send(record); + + Thread.sleep(DELAY); + } + } + + private static Properties createKafkaProperties() { + Properties kafkaProps = new Properties(); + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:9092"); + kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); + kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + return kafkaProps; + } +} diff --git a/root-cloud-statistics/pom.xml b/root-cloud-statistics/pom.xml index dffdb35..a9569d8 100644 --- a/root-cloud-statistics/pom.xml +++ b/root-cloud-statistics/pom.xml @@ -25,7 +25,7 @@ under the License. 0.0.1-SNAPSHOT jar - IOT Statistics + root-cloud-statistics UTF-8 @@ -50,13 +50,11 @@ under the License. org.apache.flink flink-streaming-java ${flink.version} - provided org.apache.flink flink-clients ${flink.version} - provided @@ -85,6 +83,16 @@ under the License. ${log4j.version} runtime + + com.fasterxml.jackson.core + jackson-databind + 2.13.3 + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + 2.13.3 + diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 977f978..cc44bd9 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -55,7 +55,7 @@ public class RootCloudIotDataFormatterJob { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); KafkaSource source = KafkaSource.builder() - .setBootstrapServers("kafka:9092") + .setBootstrapServers("120.25.199.30:9092") .setTopics("root_cloud_iot_report_data_event") .setGroupId("flink-kafka-demo") .setStartingOffsets(OffsetsInitializer.earliest()) @@ -79,7 +79,7 @@ public class RootCloudIotDataFormatterJob { // 再发送到kafka队列中 transformDs.sinkTo( KafkaSink.builder() - .setBootstrapServers("kafka:9092") + .setBootstrapServers("120.25.199.30:9092") .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("machine_iot_data_received_event") diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java index 46f9464..82971b5 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java @@ -1,8 +1,9 @@ package com.qniao.iot.rc.event; -import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.serialization.SerializationSchema; + /** * @author Lzk diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java index f41d6d7..13644d7 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java @@ -1,9 +1,9 @@ package com.qniao.iot.rc.event; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import com.qniao.iot.rc.RootCloudIotDataReceiptedEvent; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException;