|
|
|
@ -0,0 +1,35 @@ |
|
|
|
package com.qniao.iot.machine.schema; |
|
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper; |
|
|
|
import com.qniao.iot.machine.command.MachineOutputCommand; |
|
|
|
import org.apache.flink.api.common.serialization.DeserializationSchema; |
|
|
|
import org.apache.flink.api.common.typeinfo.TypeInformation; |
|
|
|
|
|
|
|
import java.io.IOException; |
|
|
|
|
|
|
|
/** |
|
|
|
* 机器物联数据已接收事件反序列化概要 |
|
|
|
*/ |
|
|
|
public class MachineOutputCommandDeserializationSchema implements DeserializationSchema<MachineOutputCommand> { |
|
|
|
/** |
|
|
|
* 注册JavaTimeModule,支持LocalDateTime字段的解析 |
|
|
|
*/ |
|
|
|
final private ObjectMapper objectMapper = new ObjectMapper(); |
|
|
|
|
|
|
|
@Override |
|
|
|
public MachineOutputCommand deserialize(byte[] message) throws IOException { |
|
|
|
|
|
|
|
return objectMapper.readValue(message, MachineOutputCommand.class); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public boolean isEndOfStream(MachineOutputCommand nextElement) { |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public TypeInformation<MachineOutputCommand> getProducedType() { |
|
|
|
|
|
|
|
return TypeInformation.of(MachineOutputCommand.class); |
|
|
|
} |
|
|
|
} |