From 7ba5e8f433b2354ecc0f715b66667e68112351b7 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Sat, 20 Aug 2022 11:31:11 +0800 Subject: [PATCH] =?UTF-8?q?es=20sink=20=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../event/generator/config/ApolloConfig.java | 4 +- .../generator/constant/ConfigConstant.java | 12 ++ .../job/IotMachineEventGeneratorJob.java | 189 ++++++++++++++++-- 3 files changed, 186 insertions(+), 19 deletions(-) diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/config/ApolloConfig.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/config/ApolloConfig.java index 57ad6de..d382172 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/config/ApolloConfig.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/config/ApolloConfig.java @@ -7,12 +7,12 @@ public class ApolloConfig { private static final Config config = ConfigService.getAppConfig(); - public static String get(String key, String defaultValue) { + public static String getStr(String key, String defaultValue) { return config.getProperty(key, defaultValue); } - public static String get(String key) { + public static String getStr(String key) { return config.getProperty(key, null); } diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java index 2838478..a8a12ff 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java @@ -41,4 +41,16 @@ public interface ConfigConstant { String SINK_ELASTICSEARCH_SCHEME = "sink.elasticsearch.scheme"; String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index"; + + String ES_HOST_NAME = "es.host.name"; + + String ES_POST = "es.post"; + + String ES_SCHEME = "es.scheme"; + + String ES_USER_NAME = "es.user.name"; + + String ES_PASSWORD = "es.password"; + + String ES_CONNECT_TIMEOUT = "es.connect.timeout"; } diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 043da93..929447a 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -41,8 +41,16 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Requests; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; import java.sql.SQLException; @@ -56,6 +64,23 @@ import java.util.List; @Slf4j public class IotMachineEventGeneratorJob { + private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient( + RestClient.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME), + ApolloConfig.getInt(ConfigConstant.ES_POST), + ApolloConfig.getStr(ConfigConstant.ES_SCHEME))) + .setHttpClientConfigCallback(httpAsyncClientBuilder -> { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME), + ApolloConfig.getStr(ConfigConstant.ES_PASSWORD))); + return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + }) + .setRequestConfigCallback(requestConfigBuilder -> { + // 设置es连接超时时间 + requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT)); + return requestConfigBuilder; + })); + private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" + "from qn_machine_realtime_state qmrs\n" + " LEFT JOIN (select example_id, count_unit from qn_machine_list where is_delete = 0) qml\n" + @@ -63,14 +88,19 @@ public class IotMachineEventGeneratorJob { "where qmrs.iot_mac = ?\n" + " and qmrs.is_delete = 0"; + /** + * 当前索引日期后缀 + */ + private static String currIndicesDateSuffix; + public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); KafkaSource source = KafkaSource.builder() - .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) - .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) - .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) + .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) + .setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS)) + .setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) @@ -165,6 +195,8 @@ public class IotMachineEventGeneratorJob { if (deviceStateListJson == null) { // 查询数据库最新的设备状态 List list = Db.use().query(SQL, DeviceState.class, machineIotMac); + // 查询es最新的设备状态 勿删 + //DeviceState deviceState1 = queryLatestDeviceState(machineIotMac); if (CollUtil.isNotEmpty(list)) { deviceStateListJson = list.get(0); @@ -177,14 +209,49 @@ public class IotMachineEventGeneratorJob { return deviceStateListJson; } + /* 勿删 + private static DeviceState queryLatestDeviceState(Long machineIotMac) { + + try { + // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); + searchSourceBuilder.sort("reportTime", SortOrder.DESC); + searchSourceBuilder.size(1); + // 创建查询请求对象,将查询对象配置到其中 + SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)); + searchRequest.source(searchSourceBuilder); + String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); + GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate); + // 先判断索引是否存在 + boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); + if (exists) { + // 执行查询,然后处理响应结果 + SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + // 根据状态和数据条数验证是否返回了数据 + if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { + SearchHits hits = searchResponse.getHits(); + SearchHit reqHit = hits.getHits()[0]; + MachineIotDataReceivedEvent receivedEvent = JSONUtil + .toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class); + DeviceState deviceState = new DeviceState(); + + } + } + } catch (Exception e) { + log.error("获取es数据异常", e); + } + return null; + }*/ + private static void sinkRabbitMq(DataStream commandDataStream) { // rabbitmq配置 RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() - .setHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_HOST)) - .setVirtualHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST)) - .setUserName(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_USER_NAME)) - .setPassword(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_PASSWORD)) + .setHost(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_HOST)) + .setVirtualHost(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST)) + .setUserName(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_USER_NAME)) + .setPassword(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_PASSWORD)) .setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)) .build(); @@ -198,18 +265,18 @@ public class IotMachineEventGeneratorJob { if (command instanceof PowerOnMachineCommand) { // 机器通电 - return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY); + return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY); } if (command instanceof PowerOffMachineCommand) { // 机器断电 - return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY); + return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY); } if (command instanceof StopMachineWorkingCommand) { // 机器待机 - return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY); + return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY); } else { // 机器工作 - return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY); + return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY); } } @@ -222,7 +289,7 @@ public class IotMachineEventGeneratorJob { public String computeExchange(BaseCommand command) { // 交换机名称 - return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE); + return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_EXCHANGE); } @@ -232,9 +299,9 @@ public class IotMachineEventGeneratorJob { private static void sinkEs(DataStream dataStream) { List httpHosts = new ArrayList<>(); - httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST), + httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST), ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), - ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); + ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, (ElasticsearchSinkFunction) (machineIotDataReceivedEvent, runtimeContext, requestIndexer) -> { @@ -242,9 +309,13 @@ public class IotMachineEventGeneratorJob { LocalDate reportDate = new Date(machineIotDataReceivedEvent.getReportTime()) .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); + // 索引名称 + String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix; + // 校验索引是否存在 + checkIndicesIsExists(indexDateSuffix, indicesName); //创建es 请求 IndexRequest indexRequest = Requests.indexRequest() - .index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix) + .index(indicesName) .source(BeanUtil.beanToMap(machineIotDataReceivedEvent)) .id(StrUtil.toString(machineIotDataReceivedEvent.getId())); requestIndexer.add(indexRequest); @@ -262,8 +333,8 @@ public class IotMachineEventGeneratorJob { restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), - ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); + new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), + ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); }); restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { @@ -277,6 +348,90 @@ public class IotMachineEventGeneratorJob { dataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink"); } + private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) { + + if (currIndicesDateSuffix == null) { + // 当前月的索引为空 + createIndices(indicesName, indexDateSuffix); + } else { + // 校验当前消息能否符合当前索引 + if (!indexDateSuffix.equals(currIndicesDateSuffix)) { + // 如果不符合,需要重建索引 + createIndices(indicesName, indexDateSuffix); + } + } + } + + private static void createIndices(String indicesName, String indexDateSuffix) { + + // 判断索引是否存在 + GetIndexRequest exist = new GetIndexRequest(indicesName); + // 先判断客户端是否存在 + try { + boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); + if (!exists) { + // 创建索引 + CreateIndexRequest request = new CreateIndexRequest(indicesName); + // 字段映射 + String mappersStr = "{\n" + + " \"properties\": {\n" + + " \"machineWorkingStat\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"machineIotMac\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"machinePwrStat\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"currWaitingDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"igStat\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"currJobDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"currJobCount\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"receivedTime\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"currStoppingDuration\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"id\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"accJobCount\": {\n" + + " \"type\": \"long\"\n" + + " },\n" + + " \"dataSource\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"reportTime\": {\n" + + " \"type\": \"date\"\n" + + " }\n" + + " }\n" + + " }"; + request.mapping(mappersStr, XContentType.JSON); + // 设置索引别名 + request.alias(new Alias(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX))); + // 暂时不管是否创建成功 + CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT); + boolean acknowledged = createIndexResponse.isAcknowledged(); + boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged(); + if (!acknowledged || !shardsAcknowledged) { + throw new Exception("自定义索引创建失败!!!"); + } + currIndicesDateSuffix = indexDateSuffix; + } + } catch (Exception e) { + e.printStackTrace(); + } + } private static Integer getDeviceStatus(MachineIotDataReceivedEvent event) {