From 96a3bd7f80a26bfd09e9e078a918879d0de6191f Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Mon, 5 Sep 2022 17:56:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/rc/RootCloudIotDataFormatterJob.java | 161 ------------------ 1 file changed, 161 deletions(-) diff --git a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java index 6b2ca94..0332d6f 100644 --- a/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java +++ b/root-cloud-statistics/src/main/java/com/qniao/iot/rc/RootCloudIotDataFormatterJob.java @@ -115,9 +115,6 @@ public class RootCloudIotDataFormatterJob { .build(); - // 把树根的数据转成我们自己的格式 - //DataStreamSource streamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "RootCloudIotDataReceiptedEvent Source"); - // 把树根的数据转成我们自己的格式 SingleOutputStreamOperator transformDs = env .fromSource(source, WatermarkStrategy.noWatermarks(), "root cloud iot data receipted event Source") @@ -125,164 +122,6 @@ public class RootCloudIotDataFormatterJob { .name("Transform MachineIotDataReceivedEvent"); - // 数据过滤 - /*SingleOutputStreamOperator streamOperator = streamSource - .filter(new RichFilterFunction() { - @Override - public boolean filter(RootCloudIotDataReceiptedEvent value) { - - Long reportTime = value.get__timestamp__(); - if (reportTime != null) { - String reportTimeStr = StrUtil.toString(reportTime); - if (reportTimeStr.length() == 10) { - value.set__timestamp__(reportTime * 1000); - } - } - if (value.getWorking_sta() != null - && value.get__assetId__() != null - && value.getPWR_sta() != null && reportTime != null && value.getACC_count() != null) { - long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); - // 晚30分钟的数据就不要了 - return nowTime - value.get__timestamp__() <= (30 * 60 * 1000); - } - return false; - } - }).name("machine iot data received event filter operator");*/ - - // 分组操作 - /*SingleOutputStreamOperator outputStreamOperator = streamOperator - .keyBy(RootCloudIotDataReceiptedEvent::get__assetId__) - .process(new KeyedProcessFunction() { - - private final RestHighLevelClient restHighLevelClient = new RestHighLevelClient( - RestClient.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_HOST), - ApolloConfig.getInt(ConfigConstant.ELASTICSEARCH_POST), - ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_SCHEME))) - .setHttpClientConfigCallback(httpAsyncClientBuilder -> { - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_USER_NAME), - ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_PASSWORD))); - return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - }).setRequestConfigCallback(requestConfigBuilder -> { - // 设置es连接超时时间 - requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ELASTICSEARCH_CONNECT_TIMEOUT)); - return requestConfigBuilder; - })); - - private ValueState eventValueState; - - @Override - public void open(Configuration parameters) { - - eventValueState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("machineIotDataReceivedEventState", - TypeInformation.of(MachineIotDataReceivedEvent.class))); - } - - @Override - public void processElement(RootCloudIotDataReceiptedEvent value, - KeyedProcessFunction.Context ctx, - Collector out) throws IOException { - // 数据清洗 - Long machineIotMac = Long.valueOf(value.get__assetId__()); - MachineIotDataReceivedEvent lastReceivedEvent = eventValueState.value(); - if (lastReceivedEvent == null) { - lastReceivedEvent = getMachineIotDataReceivedEvent(machineIotMac, value); - } - Integer pwrSta = value.getPWR_sta(); - Integer workingSta = value.getWorking_sta(); - Long accCount = value.getACC_count(); - Integer lastPwrStat = lastReceivedEvent.getMachinePwrStat(); - Integer lastWorkingStat = lastReceivedEvent.getMachineWorkingStat(); - Long lastReportTime = lastReceivedEvent.getReportTime(); - Long reportTime = value.get__timestamp__(); - // 只有当前消息的时间大于等于上一次消息的时间才要,否则丢弃 - if (reportTime >= lastReportTime) { - MachineIotDataReceivedEvent receivedEvent = new MachineIotDataReceivedEvent(); - receivedEvent.setId(snowflake.nextId()); - receivedEvent.setDataSource(DataSource.ROOT_CLOUD); - receivedEvent.setMachineIotMac(machineIotMac); - receivedEvent.setMachinePwrStat(pwrSta); - receivedEvent.setMachineWorkingStat(workingSta); - receivedEvent.setAccJobCount(accCount); - if ((pwrSta == 1 && workingSta == 1) - || (lastPwrStat == 1 && lastWorkingStat == 1)) { - // 只有当前是工作中或上次是工作中才进行计算 - // 如果这次的消息和上次的消息相差半个小时,那么不进行计算 - if (reportTime - lastReportTime <= 30 * 60 * 1000) { - receivedEvent.setCurrJobCount(value.getACC_count() - lastReceivedEvent.getAccJobCount()); - // 单位是秒 - receivedEvent.setCurrJobDuration((reportTime - lastReportTime) / 3600); - } - } - receivedEvent.setCurrWaitingDuration(0L); - receivedEvent.setCurrStoppingDuration(0L); - receivedEvent.setIgStat(value.getIG_sta()); - receivedEvent.setReportTime(value.get__timestamp__()); - receivedEvent.setReceivedTime(System.currentTimeMillis()); - eventValueState.update(receivedEvent); - out.collect(receivedEvent); - } - } - - private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac, - RootCloudIotDataReceiptedEvent event) { - - MachineIotDataReceivedEvent receivedEvent = getFromEs(machineIotMac); - if (receivedEvent == null) { - // 在es中没有查到,说明是新机器 - receivedEvent = new MachineIotDataReceivedEvent(); - receivedEvent.setId(snowflake.nextId()); - receivedEvent.setDataSource(DataSource.ROOT_CLOUD); - receivedEvent.setMachineIotMac(machineIotMac); - receivedEvent.setMachinePwrStat(event.getPWR_sta()); - receivedEvent.setMachineWorkingStat(event.getWorking_sta()); - receivedEvent.setAccJobCount(event.getACC_count()); - receivedEvent.setCurrJobCount(0L); - receivedEvent.setCurrJobDuration(0L); - receivedEvent.setCurrWaitingDuration(0L); - receivedEvent.setCurrStoppingDuration(0L); - receivedEvent.setIgStat(event.getIG_sta()); - receivedEvent.setReportTime(event.get__timestamp__()); - receivedEvent.setReceivedTime(System.currentTimeMillis()); - } - return receivedEvent; - } - - private MachineIotDataReceivedEvent getFromEs(Long machineIotMac) { - - try { - // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); - searchSourceBuilder.sort("reportTime", SortOrder.DESC); - searchSourceBuilder.size(1); - // 创建查询请求对象,将查询对象配置到其中 - SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_INDEX)); - searchRequest.source(searchSourceBuilder); - String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); - GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_INDEX) + "_" + nowDate); - // 先判断索引是否存在 - boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); - if (exists) { - // 执行查询,然后处理响应结果 - SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); - // 根据状态和数据条数验证是否返回了数据 - if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { - SearchHits hits = searchResponse.getHits(); - SearchHit reqHit = hits.getHits()[0]; - return JSONUtil.toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class); - } - } - } catch (Exception e) { - log.error("获取es数据异常", e); - } - return null; - } - }).name("machine iot data received event keyBy");*/ - - Properties kafkaProducerConfig = new Properties(); kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");