From 35aee3da3f1a680813f796ab56f329b3c213d208 Mon Sep 17 00:00:00 2001 From: Derran Date: Tue, 7 Nov 2023 16:09:22 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=82=E5=B8=B8=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../power/IotDevicePowerOnAndOffDataJob.java | 254 +++++++++--------- 1 file changed, 129 insertions(+), 125 deletions(-) diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index e3d1ce6..1f7fa37 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -107,37 +107,37 @@ public class IotDevicePowerOnAndOffDataJob { final DataStream dataStreamSource = env .addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE), false, new MachineOutputCommandDeserializationSchema())).setParallelism(1); - try { - SingleOutputStreamOperator outputStreamOperator = dataStreamSource - .keyBy(MachineOutputCommand::getMac) - .process(new KeyedProcessFunction() { - private ValueState powerOnAndOffDataEventValueState; + SingleOutputStreamOperator outputStreamOperator = dataStreamSource + .keyBy(MachineOutputCommand::getMac) + .process(new KeyedProcessFunction() { - @Override - public void open(Configuration parameters) { + private ValueState powerOnAndOffDataEventValueState; - StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10)) - .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) - .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) - .build(); + @Override + public void open(Configuration parameters) { - ValueStateDescriptor powerOnAndOffDataEventValue - = new ValueStateDescriptor<>("powerOnAndOffDataEventValue", - TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)); - // 设置状态值的过期时间,为了解决手动修改数据没有同步的问题 - powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig); + StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10)) + .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) + .build(); - // 必须在 open 生命周期初始化 - powerOnAndOffDataEventValueState = getRuntimeContext() - .getState(powerOnAndOffDataEventValue); - } + ValueStateDescriptor powerOnAndOffDataEventValue + = new ValueStateDescriptor<>("powerOnAndOffDataEventValue", + TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)); + // 设置状态值的过期时间,为了解决手动修改数据没有同步的问题 + powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig); - @Override - public void processElement(MachineOutputCommand command, - KeyedProcessFunction.Context ctx, - Collector out) throws Exception { + // 必须在 open 生命周期初始化 + powerOnAndOffDataEventValueState = getRuntimeContext() + .getState(powerOnAndOffDataEventValue); + } + @Override + public void processElement(MachineOutputCommand command, + KeyedProcessFunction.Context ctx, + Collector out) { + try { IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(command); Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); Long reportTime = command.getTimestamp(); @@ -195,121 +195,125 @@ public class IotDevicePowerOnAndOffDataJob { } } } + } catch (Exception e) { + log.error("iot_device_power_on_and_off_data processElement 执行异常", e); } + } - private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineOutputCommand command) throws IOException { + private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineOutputCommand command) throws IOException { - IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value(); - if (iotDevicePowerOnAndOffDataEvent == null) { - iotDevicePowerOnAndOffDataEvent = getByEs(command); - } - return iotDevicePowerOnAndOffDataEvent; + IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value(); + if (iotDevicePowerOnAndOffDataEvent == null) { + iotDevicePowerOnAndOffDataEvent = getByEs(command); } + return iotDevicePowerOnAndOffDataEvent; + } - private IotDevicePowerOnAndOffDataEvent getByEs(MachineOutputCommand command) throws IOException { + private IotDevicePowerOnAndOffDataEvent getByEs(MachineOutputCommand command) throws IOException { - // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", command.getMac())); - searchSourceBuilder.sort("receivedTime", SortOrder.DESC); - searchSourceBuilder.size(1); - // 创建查询请求对象,将查询对象配置到其中 - SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)); - searchRequest.source(searchSourceBuilder); - String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); - GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate); - // 先判断索引是否存在 - boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); - if (exists) { - // 执行查询,然后处理响应结果 - SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); - // 根据状态和数据条数验证是否返回了数据 - if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { - SearchHits hits = searchResponse.getHits(); - SearchHit reqHit = hits.getHits()[0]; - return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class); - } + // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", command.getMac())); + searchSourceBuilder.sort("receivedTime", SortOrder.DESC); + searchSourceBuilder.size(1); + // 创建查询请求对象,将查询对象配置到其中 + SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)); + searchRequest.source(searchSourceBuilder); + String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); + GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate); + // 先判断索引是否存在 + boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); + if (exists) { + // 执行查询,然后处理响应结果 + SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + // 根据状态和数据条数验证是否返回了数据 + if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { + SearchHits hits = searchResponse.getHits(); + SearchHit reqHit = hits.getHits()[0]; + return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class); } - IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); - powerOnAndOffDataEvent.setId(snowflake.nextId()); - powerOnAndOffDataEvent.setDataSource(command.getDataSource()); - powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); - powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); - powerOnAndOffDataEvent.setCurrJobCount(command.getCurrJobCount()); - powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrJobDuration()); - Integer machinePwrStat = command.getMachinePwrStat(); - powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat); - powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); - Long reportTime = command.getTimestamp(); - // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); - if (machinePwrStat == 0) { - // 关机 - powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); - } - powerOnAndOffDataEvent.setReportTime(reportTime); - powerOnAndOffDataEvent.setReceivedTime(LocalDateTime - .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); - return powerOnAndOffDataEvent; } - }).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); + IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); + powerOnAndOffDataEvent.setId(snowflake.nextId()); + powerOnAndOffDataEvent.setDataSource(command.getDataSource()); + powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); + powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); + powerOnAndOffDataEvent.setCurrJobCount(command.getCurrJobCount()); + powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrJobDuration()); + Integer machinePwrStat = command.getMachinePwrStat(); + powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat); + powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); + Long reportTime = command.getTimestamp(); + // 开机 + powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); + if (machinePwrStat == 0) { + // 关机 + powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); + } + powerOnAndOffDataEvent.setReportTime(reportTime); + powerOnAndOffDataEvent.setReceivedTime(LocalDateTime + .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); + return powerOnAndOffDataEvent; + } + }).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); + + sinkEs(outputStreamOperator); - sinkEs(outputStreamOperator); - } catch (Exception e) { - log.error("iot_device_power_on_and_off_data 执行异常", e); - } env.execute("iot_device_power_on_and_off_data"); } private static void sinkEs(SingleOutputStreamOperator dataStream) { + try { + List httpHosts = new ArrayList<>(); + httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST), + ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), + ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); + ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, + (ElasticsearchSinkFunction) (deviceMonitoringData, runtimeContext, requestIndexer) -> { - List httpHosts = new ArrayList<>(); - httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST), - ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), - ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); - ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, - (ElasticsearchSinkFunction) (deviceMonitoringData, runtimeContext, requestIndexer) -> { - - LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime()) - .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); - String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); - // 索引名称 - String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix; - // 校验索引是否存在 - checkIndicesIsExists(indexDateSuffix, indicesName); - //创建es 请求 - IndexRequest indexRequest = Requests.indexRequest() - .index(indicesName) - .source(BeanUtil.beanToMap(deviceMonitoringData)); - requestIndexer.add(indexRequest); - } - ); - //刷新前缓冲的最大动作量 - esSinkBuilder.setBulkFlushMaxActions(10); - //刷新前缓冲区的最大数据大小(以MB为单位) - esSinkBuilder.setBulkFlushMaxSizeMb(5); - //无论缓冲操作的数量或大小如何,都要刷新的时间间隔 - esSinkBuilder.setBulkFlushInterval(5000L); - // 客户端创建配置回调,配置账号密码 - esSinkBuilder.setRestClientFactory( - restClientBuilder -> { - restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), - ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); - return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - }); - restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { - // 设置es连接超时时间 - requestConfigBuilder - .setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); - return requestConfigBuilder; - }); - } - ); - //数据流添加sink - dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink"); + LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime()) + .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); + String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); + // 索引名称 + String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix; + // 校验索引是否存在 + checkIndicesIsExists(indexDateSuffix, indicesName); + //创建es 请求 + IndexRequest indexRequest = Requests.indexRequest() + .index(indicesName) + .source(BeanUtil.beanToMap(deviceMonitoringData)); + requestIndexer.add(indexRequest); + } + ); + //刷新前缓冲的最大动作量 + esSinkBuilder.setBulkFlushMaxActions(10); + //刷新前缓冲区的最大数据大小(以MB为单位) + esSinkBuilder.setBulkFlushMaxSizeMb(5); + //无论缓冲操作的数量或大小如何,都要刷新的时间间隔 + esSinkBuilder.setBulkFlushInterval(5000L); + // 客户端创建配置回调,配置账号密码 + esSinkBuilder.setRestClientFactory( + restClientBuilder -> { + restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), + ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); + return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + }); + restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { + // 设置es连接超时时间 + requestConfigBuilder + .setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); + return requestConfigBuilder; + }); + } + ); + //数据流添加sink + dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink"); + } catch (Exception e) { + log.error("iot_device_power_on_and_off_data sinkEs 执行异常", e); + } } private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) {