Browse Source

新增rabbitmq反序列化策略

master
1049970895@qniao.cn 3 years ago
parent
commit
4705924b1e
2 changed files with 38 additions and 6 deletions
  1. 32
      iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqDeserializationSchema.java
  2. 12
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

32
iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqDeserializationSchema.java

@ -0,0 +1,32 @@
package com.qniao.iot.machine.event;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
/**
* 机器物联数据已接收事件反序列化概要
*/
public class MachineIotDataReceivedEventRabbitMqDeserializationSchema implements DeserializationSchema<MachineIotDataReceivedEvent> {
/**
* 注册JavaTimeModule支持LocalDateTime字段的解析
*/
final private ObjectMapper objectMapper = new ObjectMapper();
@Override
public MachineIotDataReceivedEvent deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, MachineIotDataReceivedEvent.class);
}
@Override
public boolean isEndOfStream(MachineIotDataReceivedEvent nextElement) {
return false;
}
@Override
public TypeInformation<MachineIotDataReceivedEvent> getProducedType() {
return TypeInformation.of(MachineIotDataReceivedEvent.class);
}
}

12
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -65,19 +65,19 @@ import java.util.List;
public class IotMachineEventGeneratorJob {
private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(
RestClient.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME),
ApolloConfig.getInt(ConfigConstant.ES_POST),
ApolloConfig.getStr(ConfigConstant.ES_SCHEME)))
RestClient.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME),
ApolloConfig.getStr(ConfigConstant.ES_PASSWORD)));
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
})
.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT));
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
return requestConfigBuilder;
}));

Loading…
Cancel
Save