From 1f243b521535bccfe6112c4dfe83f9ac6a1583a0 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Mon, 5 Sep 2022 15:44:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...ineOutputCommandDeserializationSchema.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 iot-machine-data-command/src/main/java/com/qniao/iot/machine/schema/MachineOutputCommandDeserializationSchema.java diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/schema/MachineOutputCommandDeserializationSchema.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/schema/MachineOutputCommandDeserializationSchema.java new file mode 100644 index 0000000..e1c284e --- /dev/null +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/schema/MachineOutputCommandDeserializationSchema.java @@ -0,0 +1,35 @@ +package com.qniao.iot.machine.schema; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qniao.iot.machine.command.MachineOutputCommand; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +/** + * 机器物联数据已接收事件反序列化概要 + */ +public class MachineOutputCommandDeserializationSchema implements DeserializationSchema { + /** + * 注册JavaTimeModule,支持LocalDateTime字段的解析 + */ + final private ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public MachineOutputCommand deserialize(byte[] message) throws IOException { + + return objectMapper.readValue(message, MachineOutputCommand.class); + } + + @Override + public boolean isEndOfStream(MachineOutputCommand nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + + return TypeInformation.of(MachineOutputCommand.class); + } +}