From 4705924b1e05ff752869588b2e386c6928d9843f Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Sat, 20 Aug 2022 14:48:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Erabbitmq=E5=8F=8D=E5=BA=8F?= =?UTF-8?q?=E5=88=97=E5=8C=96=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...vedEventRabbitMqDeserializationSchema.java | 32 +++++++++++++++++++ .../job/IotMachineEventGeneratorJob.java | 12 +++---- 2 files changed, 38 insertions(+), 6 deletions(-) create mode 100644 iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqDeserializationSchema.java diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqDeserializationSchema.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqDeserializationSchema.java new file mode 100644 index 0000000..99a2a63 --- /dev/null +++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqDeserializationSchema.java @@ -0,0 +1,32 @@ +package com.qniao.iot.machine.event; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; + +/** + * 机器物联数据已接收事件反序列化概要 + */ +public class MachineIotDataReceivedEventRabbitMqDeserializationSchema implements DeserializationSchema { + /** + * 注册JavaTimeModule,支持LocalDateTime字段的解析 + */ + final private ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public MachineIotDataReceivedEvent deserialize(byte[] message) throws IOException { + return objectMapper.readValue(message, MachineIotDataReceivedEvent.class); + } + + @Override + public boolean isEndOfStream(MachineIotDataReceivedEvent nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(MachineIotDataReceivedEvent.class); + } +} diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 929447a..de26cf2 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -65,19 +65,19 @@ import java.util.List; public class IotMachineEventGeneratorJob { private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient( - RestClient.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME), - ApolloConfig.getInt(ConfigConstant.ES_POST), - ApolloConfig.getStr(ConfigConstant.ES_SCHEME))) + RestClient.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST), + ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), + ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))) .setHttpClientConfigCallback(httpAsyncClientBuilder -> { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME), - ApolloConfig.getStr(ConfigConstant.ES_PASSWORD))); + new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), + ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); }) .setRequestConfigCallback(requestConfigBuilder -> { // 设置es连接超时时间 - requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT)); + requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); return requestConfigBuilder; }));