diff --git a/iot-machine-data-command/src/main/java/com/qniao/iot/machine/schema/MachineOutputCommandDeserializationSchema.java b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/schema/MachineOutputCommandDeserializationSchema.java new file mode 100644 index 0000000..e1c284e --- /dev/null +++ b/iot-machine-data-command/src/main/java/com/qniao/iot/machine/schema/MachineOutputCommandDeserializationSchema.java @@ -0,0 +1,35 @@ +package com.qniao.iot.machine.schema; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.qniao.iot.machine.command.MachineOutputCommand; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +/** + * 机器物联数据已接收事件反序列化概要 + */ +public class MachineOutputCommandDeserializationSchema implements DeserializationSchema { + /** + * 注册JavaTimeModule,支持LocalDateTime字段的解析 + */ + final private ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public MachineOutputCommand deserialize(byte[] message) throws IOException { + + return objectMapper.readValue(message, MachineOutputCommand.class); + } + + @Override + public boolean isEndOfStream(MachineOutputCommand nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + + return TypeInformation.of(MachineOutputCommand.class); + } +}