Browse Source

feat;增加mocker

hph_优化版本
lizhongkang@qniao.cn 3 years ago
parent
commit
b215df739c
8 changed files with 104 additions and 23 deletions
  1. 1
      pom.xml
  2. 20
      root-cloud-event/pom.xml
  3. 27
      root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSerialization.java
  4. 56
      root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java
  5. 14
      root-cloud-statistics/pom.xml
  6. 4
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java
  7. 3
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java
  8. 2
      root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java

1
pom.xml

@ -26,6 +26,7 @@ under the License.
<modules> <modules>
<module>root-cloud-source</module> <module>root-cloud-source</module>
<module>root-cloud-event</module> <module>root-cloud-event</module>
<module>root-cloud-mocker</module>
<module>root-cloud-statistics</module> <module>root-cloud-statistics</module>
</modules> </modules>
<packaging>pom</packaging> <packaging>pom</packaging>

20
root-cloud-event/pom.xml

@ -16,6 +16,7 @@
<maven.compiler.target>${target.java.version}</maven.compiler.target> <maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version> <log4j.version>2.17.2</log4j.version>
<lombok.version>1.18.24</lombok.version> <lombok.version>1.18.24</lombok.version>
<jackson.version>2.13.3</jackson.version>
</properties> </properties>
<dependencies> <dependencies>
@ -43,22 +44,9 @@
<!-- Add logging framework, to produce console output when running in the IDE. --> <!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. --> <!-- These dependencies are excluded from the application JAR by default. -->
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>

27
root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSerialization.java

@ -0,0 +1,27 @@
package com.qniao.iot.rc;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
public class RootCloudIotDataEventSerialization {
private static final ObjectMapper objectMapper = new ObjectMapper();
private final String topic;
public RootCloudIotDataEventSerialization(String topic) {
this.topic = topic;
}
public ProducerRecord<String, byte[]> serialize(
final RootCloudIotDataReceiptedEvent message, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + message, e);
}
}
}

56
root-cloud-mocker/src/main/java/com/qniao/iot/rc/RootCloudIotDataEventSourceMocker.java

@ -0,0 +1,56 @@
package com.qniao.iot.rc;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.math.BigDecimal;
import java.util.Properties;
public class RootCloudIotDataEventSourceMocker {
// 延迟毫秒
public static final long DELAY = 3000;
public static void main(String[] args) throws Exception {
// 创建kafka配置属性
Properties kafkaProps = createKafkaProperties();
// 创建Kafka消息的生产者
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(kafkaProps);
String topic = "root_cloud_iot_report_data_event";
// 循环发送事件
while (true) {
RootCloudIotDataReceiptedEvent event = new RootCloudIotDataReceiptedEvent();
event.set__assetId__("10000");
event.setACC_count(50L);
event.setACC_count_total(500L);
event.setPWR_sta(1);
event.setWorking_sta(1);
event.setStoping_duration("100");
event.setRunning_duration(new BigDecimal(1250));
event.setIG_sta(1);
event.setWaiting_duration(new BigDecimal(500));
ProducerRecord<String, byte[]> record = new RootCloudIotDataEventSerialization(topic).serialize(
event,
null);
producer.send(record);
Thread.sleep(DELAY);
}
}
private static Properties createKafkaProperties() {
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "120.25.199.30:9092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
return kafkaProps;
}
}

14
root-cloud-statistics/pom.xml

@ -25,7 +25,7 @@ under the License.
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>IOT Statistics</name>
<name>root-cloud-statistics</name>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -50,13 +50,11 @@ under the License.
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId> <artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId> <artifactId>flink-clients</artifactId>
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
@ -85,6 +83,16 @@ under the License.
<version>${log4j.version}</version> <version>${log4j.version}</version>
<scope>runtime</scope> <scope>runtime</scope>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.13.3</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

4
root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java

@ -55,7 +55,7 @@ public class RootCloudIotDataFormatterJob {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<RootCloudIotDataReceiptedEvent> source = KafkaSource.<RootCloudIotDataReceiptedEvent>builder() KafkaSource<RootCloudIotDataReceiptedEvent> source = KafkaSource.<RootCloudIotDataReceiptedEvent>builder()
.setBootstrapServers("kafka:9092")
.setBootstrapServers("120.25.199.30:9092")
.setTopics("root_cloud_iot_report_data_event") .setTopics("root_cloud_iot_report_data_event")
.setGroupId("flink-kafka-demo") .setGroupId("flink-kafka-demo")
.setStartingOffsets(OffsetsInitializer.earliest()) .setStartingOffsets(OffsetsInitializer.earliest())
@ -79,7 +79,7 @@ public class RootCloudIotDataFormatterJob {
// 再发送到kafka队列中 // 再发送到kafka队列中
transformDs.sinkTo( transformDs.sinkTo(
KafkaSink.<MachineIotDataReceivedEvent>builder() KafkaSink.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers("kafka:9092")
.setBootstrapServers("120.25.199.30:9092")
.setRecordSerializer( .setRecordSerializer(
KafkaRecordSerializationSchema.builder() KafkaRecordSerializationSchema.builder()
.setTopic("machine_iot_data_received_event") .setTopic("machine_iot_data_received_event")

3
root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/MachineIotDataReceivedEventSerializationSchema.java

@ -1,8 +1,9 @@
package com.qniao.iot.rc.event; package com.qniao.iot.rc.event;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.SerializationSchema;
/** /**
* @author Lzk * @author Lzk

2
root-cloud-statistics/src/main/java/com/qniao/iot/rc/event/RootCloudIotDataReceiptedEventDeserializationSchema.java

@ -1,9 +1,9 @@
package com.qniao.iot.rc.event; package com.qniao.iot.rc.event;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import com.qniao.iot.rc.RootCloudIotDataReceiptedEvent; import com.qniao.iot.rc.RootCloudIotDataReceiptedEvent;
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException; import java.io.IOException;

Loading…
Cancel
Save