diff --git a/README.md b/README.md index 43b552f..4d88a66 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ -# iot-machine-status-event-generator +# iot-machine-state-event-generator diff --git a/iot-machine-data-event/pom.xml b/iot-machine-data-event/pom.xml new file mode 100644 index 0000000..0c2587c --- /dev/null +++ b/iot-machine-data-event/pom.xml @@ -0,0 +1,61 @@ + + + + iot-machine-state-event-generator + com.qniao + 0.0.1-SNAPSHOT + + 4.0.0 + + iot-machine-data-event + + + UTF-8 + 1.15.0 + 1.8 + ${target.java.version} + ${target.java.version} + 2.17.2 + 1.18.24 + 2.13.3 + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + + org.apache.flink + flink-connector-kafka + ${flink.version} + + + + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${jackson.version} + + + org.projectlombok + lombok + ${lombok.version} + + + + \ No newline at end of file diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java new file mode 100644 index 0000000..266c049 --- /dev/null +++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java @@ -0,0 +1,73 @@ +package com.qniao.iot.machine.event; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 机器物联数据已接收事件 + **/ +@Data +public class MachineIotDataReceivedEvent implements Serializable { + + private static final long serialVersionUID = 1L; + /** + * 唯一标识 + */ + private Long id; + + /** + * 数据来源 + */ + private Integer dataSource; + + /** + * 设备物联地址(云盒物理标识) + */ + private Long machineIotMac; + + /** + * 机器电源状态 + */ + private Integer machinePwrStat; + + /** + * 机器工作状态 + */ + private Integer machineWorkingStat; + + /** + * 累加作业总数 + */ + private Long accJobCount; + + /** + * 当前作业计数 + */ + private Long currJobCount; + + /** + * 当前作业时长 + */ + private Long currJobDuration; + + /** + * 当前待机时长 + */ + private Long currWaitingDuration; + + /** + * 当前停机时长 + */ + private Long currStoppingDuration; + + /** + * 计数开关状态 + */ + private Integer igStat; + + /** + * 数据采样时间 + */ + private Long reportTime; +} diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventKafkaDeserializationSchema.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventKafkaDeserializationSchema.java new file mode 100644 index 0000000..279e5ee --- /dev/null +++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventKafkaDeserializationSchema.java @@ -0,0 +1,32 @@ +package com.qniao.iot.machine.event; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +/** + * 机器物联数据已接收事件反序列化概要 + */ +public class MachineIotDataReceivedEventKafkaDeserializationSchema implements DeserializationSchema { + /** + * 注册JavaTimeModule,支持LocalDateTime字段的解析 + */ + final private ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public MachineIotDataReceivedEvent deserialize(byte[] message) throws IOException { + return objectMapper.readValue(message, MachineIotDataReceivedEvent.class); + } + + @Override + public boolean isEndOfStream(MachineIotDataReceivedEvent nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(MachineIotDataReceivedEvent.class); + } +} diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventKafkaSerializationSchema.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventKafkaSerializationSchema.java new file mode 100644 index 0000000..1fce9b2 --- /dev/null +++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventKafkaSerializationSchema.java @@ -0,0 +1,22 @@ +package com.qniao.iot.machine.event; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + + +/** + * 机器物联数据已接收事件序列化概要 + */ +public class MachineIotDataReceivedEventKafkaSerializationSchema implements SerializationSchema { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public byte[] serialize(MachineIotDataReceivedEvent event) { + try { + return OBJECT_MAPPER.writeValueAsBytes(event); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Could not serialize record: " + event, e); + } + } +} diff --git a/iot-machine-state-event-generator-job/pom.xml b/iot-machine-state-event-generator-job/pom.xml new file mode 100644 index 0000000..00fe76c --- /dev/null +++ b/iot-machine-state-event-generator-job/pom.xml @@ -0,0 +1,143 @@ + + + + iot-machine-state-event-generator + com.qniao + 0.0.1-SNAPSHOT + + 4.0.0 + + iot-machine-state-evnet-generator-job + + + UTF-8 + 1.15.0 + 1.8 + ${target.java.version} + ${target.java.version} + 2.17.2 + + + + + com.qniao + ddd-event + 0.0.1-SNAPSHOT + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + org.apache.flink + flink-clients + ${flink.version} + + + + org.apache.flink + flink-connector-kafka + ${flink.version} + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + com.fasterxml.jackson.core + jackson-databind + 2.13.3 + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + 2.13.3 + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${target.java.version} + ${target.java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + + + org.apache.flink:flink-shaded-force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.qniao.iot.rc.RootCloudIotDataFormatterJob + + + + + + + + + \ No newline at end of file diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java new file mode 100644 index 0000000..75a4cde --- /dev/null +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -0,0 +1,23 @@ +package com.qniao.iot.machine.event.generator.job; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +public class IotMachineEventGeneratorJob { + public static void main(String[] args) throws Exception { + final ParameterTool params = ParameterTool.fromArgs(args); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); + KafkaSource source = KafkaSource.builder() + .setBootstrapServers(params.get("source.bootstrap.servers")) + .setTopics("root_cloud_iot_report_data_event") + .setGroupId("root_cloud_iot_data_etl") + .setStartingOffsets(OffsetsInitializer.earliest()) + .setValueOnlyDeserializer(new RootCloudIotDataReceiptedEventDeserializationSchema()) + .build(); + } +} diff --git a/pom.xml b/pom.xml index 9181b87..cefc649 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,12 @@ under the License. 4.0.0 com.qniao - iot-machine-status-event-generator + iot-machine-state-event-generator 0.0.1-SNAPSHOT + + iot-machine-data-event + iot-machine-state-event-generator-job + iot-machine-data-event + pom