From 395e122c4b85eaa5e0ee07ebb4ec16fa105c0310 Mon Sep 17 00:00:00 2001 From: "hupenghui@qniao.cn" <1049970895> Date: Sun, 14 Aug 2022 22:11:32 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/gizwits/IotMonitoringDataJob.java | 56 +++++-------------- src/test/java/SourceMockerDemo.java | 10 ++-- 2 files changed, 20 insertions(+), 46 deletions(-) diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java index 4f86356..38d1b20 100644 --- a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java @@ -106,7 +106,7 @@ public class IotMonitoringDataJob { // 数据过滤 SingleOutputStreamOperator streamOperator = dataStreamSource .filter((FilterFunction) value -> value.getReportTime() != null - && value.getDataSource() != null && value.getMachinePwrStat() != null && value.getMachineIotMac() == 102104060102L); + && value.getDataSource() != null && value.getMachinePwrStat() != null && value.getMachineIotMac() == 861193040814171L); // mac分组并进行工作时长的集合操作 @@ -120,15 +120,6 @@ public class IotMonitoringDataJob { // 开机数据 private ValueState onDataState; - // 上次的关机数据 - private ValueState lastOffDataState; - - // 上次的开机数据 - private ValueState lastOnDataState; - - // 当前周期的待机数据 - private ValueState lastWaitJobDataState; - // 上次的工作状态 private ValueState lastWorkingStatState; @@ -145,15 +136,6 @@ public class IotMonitoringDataJob { onDataState = getRuntimeContext() .getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class))); - lastOffDataState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("lastOffData", TypeInformation.of(MachineIotDataReceivedEvent.class))); - - lastOnDataState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("lastOnData", TypeInformation.of(MachineIotDataReceivedEvent.class))); - - lastWaitJobDataState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("lastWaitJobData", TypeInformation.of(MachineIotDataReceivedEvent.class))); - lastWorkingStatState = getRuntimeContext() .getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class))); @@ -167,8 +149,6 @@ public class IotMonitoringDataJob { Collector out) throws Exception { DeviceTotalData onData = onDataState.value(); - MachineIotDataReceivedEvent lastOffData = lastOffDataState.value(); - MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value(); Integer lastWorkingStat = lastWorkingStatState.value(); Integer lastPwStat = lastPwStatState.value(); DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent); @@ -193,7 +173,6 @@ public class IotMonitoringDataJob { onDataState.update(onData); } LocalDate localDate = new Date(reportTime).toLocalDate(); - Long a; Long lastReportTime = lastedDeviceState.getLastReportTime(); if (lastReportTime == null) { // 如果上次的消息时间为空,那么不进行计算 @@ -235,8 +214,6 @@ public class IotMonitoringDataJob { nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault())); nowDeviceState.setLastReportTime(reportTime); deviceTotalDataStat.update(nowDeviceState); - // 关机后将待机数据清除 - lastWaitJobDataState.update(null); } else { nowDeviceState = lastedDeviceState; deviceTotalDataStat.update(nowDeviceState); @@ -268,13 +245,10 @@ public class IotMonitoringDataJob { nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); nowDeviceState.setLastReportTime(reportTime); - deviceTotalDataStat.update(nowDeviceState); } else { - // 待机 - lastWaitJobDataState.update(receivedEvent); nowDeviceState = lastedDeviceState; - deviceTotalDataStat.update(nowDeviceState); } + deviceTotalDataStat.update(nowDeviceState); nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration); if (lastPwStat == 0) { // 如果上次是关机消息,那么这次就是开机消息 @@ -366,7 +340,7 @@ public class IotMonitoringDataJob { value.setTheDayJobCount(0L); value.setCurrLocalDate(localDate); value.setLastBootTime(value.getLastBootTime()); - value.setTheDayDuration(value.getTheDayDuration()); + value.setTheDayDuration(0L); value.setLastReportTime(reportTime); } } @@ -445,9 +419,9 @@ public class IotMonitoringDataJob { searchSourceBuilder.sort("reportTime", SortOrder.DESC); searchSourceBuilder.size(1); // 创建查询请求对象,将查询对象配置到其中 - SearchRequest searchRequest = new SearchRequest("DeviceMonitoringData"); + SearchRequest searchRequest = new SearchRequest("device_monitoring_data"); searchRequest.source(searchSourceBuilder); - GetIndexRequest exist = new GetIndexRequest("DeviceMonitoringData"); + GetIndexRequest exist = new GetIndexRequest("device_monitoring_data"); // 先判断客户端是否存在 boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); if (exists) { @@ -470,17 +444,17 @@ public class IotMonitoringDataJob { machineIotDataReceivedEventDataStream.print(); // 写入es - //sinkEs(machineIotDataReceivedEventDataStream); + sinkEs(machineIotDataReceivedEventDataStream); - env.execute("iot_monitoring_data_job"); + env.execute("device_monitoring_data"); } private static void sinkEs(DataStream dataStream) { List httpHosts = new ArrayList<>(); - httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST), - ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), - ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); + httpHosts.add(new HttpHost("120.79.137.137", + 9200, + "http")); ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, (ElasticsearchSinkFunction) (deviceMonitoringData, runtimeContext, requestIndexer) -> { @@ -489,7 +463,7 @@ public class IotMonitoringDataJob { String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); //创建es 请求 IndexRequest indexRequest = Requests.indexRequest() - .index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix) + .index("device_monitoring_data" + "_" + indexDateSuffix) .source(BeanUtil.beanToMap(deviceMonitoringData)); requestIndexer.add(indexRequest); } @@ -506,18 +480,18 @@ public class IotMonitoringDataJob { restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), - ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); + new UsernamePasswordCredentials("elastic", + "qnol26215")); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); }); restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { // 设置es连接超时时间 - requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); + requestConfigBuilder.setConnectTimeout(30000); return requestConfigBuilder; }); } ); //数据流添加sink - dataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink"); + dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink"); } } diff --git a/src/test/java/SourceMockerDemo.java b/src/test/java/SourceMockerDemo.java index d753d93..d772c2c 100644 --- a/src/test/java/SourceMockerDemo.java +++ b/src/test/java/SourceMockerDemo.java @@ -51,10 +51,10 @@ public class SourceMockerDemo { MachineIotDataReceivedEvent event = new MachineIotDataReceivedEvent(); event.setId(RandomUtil.randomLong(999999999999999L)); // 机智云 - //event.setMachineIotMac(861193040814171L); + event.setMachineIotMac(861193040814171L); // 树根 - event.setMachineIotMac(102104060102L); + //event.setMachineIotMac(102104060102L); event.setMachinePwrStat(RandomUtil.randomEles(pwrStaList, 1).get(0)); event.setMachineWorkingStat(RandomUtil.randomEles(accStaList, 1).get(0)); // 递增每次加一个随机数 @@ -63,11 +63,11 @@ public class SourceMockerDemo { event.setCurrWaitingDuration(0L); event.setIgStat(0); event.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); - event.setReportTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); - /*if(f > 20) { + if(f > 20) { + event.setReportTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); }else { event.setReportTime(LocalDateTime.now().plusDays(-1).toInstant(ZoneOffset.of("+8")).toEpochMilli()); - }*/ + } // 递增加一个随机数 event.setCurrJobCount(RandomUtil.randomLong(1, 5));