From f15c65b67868ea71fa318e365075f858f4fcd2d9 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Fri, 12 Aug 2022 18:04:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../qniao/iot/gizwits/DeviceTotalData.java | 2 +- .../iot/gizwits/IotMonitoringDataJob.java | 83 +++++++++++++++---- src/main/resources/db.setting | 26 ++++++ 3 files changed, 92 insertions(+), 19 deletions(-) create mode 100644 src/main/resources/db.setting diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java index 6186837..de8d1e9 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java +++ b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java @@ -39,7 +39,7 @@ public class DeviceTotalData { private LocalDate currLocalDate; /** - * 上次消息事件 + * 上次消息时间 */ private Long lastReportTime; } diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java index fc037ca..7a03c9f 100644 --- a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java @@ -1,7 +1,9 @@ package com.qniao.iot.gizwits; import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ArrayUtil; +import cn.hutool.db.Db; import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONUtil; @@ -11,6 +13,7 @@ import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; @@ -19,6 +22,7 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsIni import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; @@ -36,6 +40,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.*; +import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; @@ -46,6 +51,7 @@ import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.sql.Date; +import java.sql.SQLException; import java.time.*; import java.time.format.DateTimeFormatter; import java.util.*; @@ -69,6 +75,13 @@ public class IotMonitoringDataJob { return requestConfigBuilder; })); + private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" + + "from qn_machine_realtime_state qmrs\n" + + " LEFT JOIN (select example_id, count_unit from qn_machine_list where is_delete = 0) qml\n" + + " ON qmrs.iot_mac = qml.example_id\n" + + "where qmrs.iot_mac = ?\n" + + " and qmrs.is_delete = 0"; + public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -87,8 +100,14 @@ public class IotMonitoringDataJob { DataStreamSource dataStreamSource = env .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); + + // 数据过滤 + SingleOutputStreamOperator streamOperator = dataStreamSource + .filter((FilterFunction) value -> value.getReportTime() != null && value.getDataSource() != null); + + // mac分组并进行工作时长的集合操作 - DataStream machineIotDataReceivedEventDataStream = dataStreamSource + DataStream machineIotDataReceivedEventDataStream = streamOperator .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { @@ -154,10 +173,11 @@ public class IotMonitoringDataJob { // 当前数据 DeviceTotalData nowDeviceState = getDeviceTotalData(receivedEvent); if (lastedDeviceState == null) { + lastedDeviceState = deviceTotalDataStat.value(); lastOnData = receivedEvent; } if (lastWorkingStat == null) { - lastWorkingStat = 0; + lastWorkingStat = getDeviceStateListJson(receivedEvent.getMachineIotMac()); } if (onData == null) { onData = lastedDeviceState; @@ -165,7 +185,6 @@ public class IotMonitoringDataJob { } LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); Long a; - if (machinePwrStat.equals(0)) { if (lastWorkingStat == 1) { // 如果上次是工作中,那就进行累加 @@ -199,6 +218,7 @@ public class IotMonitoringDataJob { } else { nowDeviceState.setLastBootTime(onData.getLastBootTime()); } + nowDeviceState.setLastReportTime(reportTime); deviceTotalDataStat.update(nowDeviceState); // 如果关机 onDataState.update(nowDeviceState); @@ -210,7 +230,6 @@ public class IotMonitoringDataJob { if (lastOffData != null) { lastOnData = receivedEvent; } - assert lastedDeviceState != null; if (machineWorkingStat.equals(1)) { // 工作中 Long workingDuration = reportTime - lastedDeviceState.getLastReportTime(); @@ -240,6 +259,7 @@ public class IotMonitoringDataJob { nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); } + nowDeviceState.setLastReportTime(reportTime); deviceTotalDataStat.update(nowDeviceState); } if (machineWorkingStat.equals(2)) { @@ -268,6 +288,17 @@ public class IotMonitoringDataJob { } } + + private Integer getDeviceStateListJson(Long machineIotMac) throws SQLException { + + // 查询数据库最新的设备状态 + List list = Db.use().query(SQL, Integer.class, machineIotMac); + if (CollUtil.isNotEmpty(list)) { + return list.get(0); + } + return 0; + } + private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception { DeviceTotalData value = deviceTotalDataStat.value(); @@ -291,6 +322,7 @@ public class IotMonitoringDataJob { // es中也没有,直接从老接口拿 data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000); } + value = data; } // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 if (value.getCurrLocalDate().isBefore(localDate)) { @@ -305,13 +337,16 @@ public class IotMonitoringDataJob { data.setCurrLocalDate(localDate); data.setLastBootTime(LocalDateTime.ofInstant(Instant .ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault())); - deviceTotalDataStat.update(data); } else { // value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可 - value.setTheDayJobDuration(0L); - value.setTheDayJobCount(0L); - data = value; + data.setJobTotal(value.getJobTotal()); + data.setJobDurationTotal(value.getJobDurationTotal()); + data.setTheDayJobDuration(0L); + data.setTheDayJobCount(0L); + data.setCurrLocalDate(localDate); + data.setLastBootTime(value.getLastBootTime()); } + deviceTotalDataStat.update(data); } return data; } @@ -351,6 +386,8 @@ public class IotMonitoringDataJob { deviceTotalData.setLastBootTime(lastBootTime); deviceTotalData.setTheDayJobDuration(0L); deviceTotalData.setTheDayJobCount(0L); + deviceTotalData.setCurrLocalDate(LocalDate.now()); + deviceTotalData.setLastReportTime(reportTime/1000); break; } } @@ -359,9 +396,11 @@ public class IotMonitoringDataJob { deviceTotalData = new DeviceTotalData(); deviceTotalData.setJobTotal(0L); deviceTotalData.setJobDurationTotal(0L); - deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault())); + deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime), ZoneId.systemDefault())); deviceTotalData.setTheDayJobDuration(0L); deviceTotalData.setTheDayJobCount(0L); + deviceTotalData.setCurrLocalDate(LocalDate.now()); + deviceTotalData.setLastReportTime(reportTime/1000); } return deviceTotalData; } @@ -396,23 +435,31 @@ public class IotMonitoringDataJob { // 创建查询请求对象,将查询对象配置到其中 SearchRequest searchRequest = new SearchRequest("DeviceMonitoringData"); searchRequest.source(searchSourceBuilder); - // 执行查询,然后处理响应结果 - SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); - // 根据状态和数据条数验证是否返回了数据 - if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { - SearchHits hits = searchResponse.getHits(); - SearchHit reqHit = hits.getHits()[0]; - return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class); + GetIndexRequest exist=new GetIndexRequest("DeviceMonitoringData"); + // 先判断客户端是否存在 + boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); + if(exists) { + // 执行查询,然后处理响应结果 + SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + // 根据状态和数据条数验证是否返回了数据 + if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { + SearchHits hits = searchResponse.getHits(); + SearchHit reqHit = hits.getHits()[0]; + return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class); + } } - } catch (IOException e) { + } catch (Exception e) { log.error("获取es数据异常", e); } return null; } }).name("machineIotDataReceivedEventDataStream keyBy stream"); + + machineIotDataReceivedEventDataStream.print(); + // 写入es - sinkEs(machineIotDataReceivedEventDataStream); + //sinkEs(machineIotDataReceivedEventDataStream); env.execute("iot_monitoring_data_job"); } diff --git a/src/main/resources/db.setting b/src/main/resources/db.setting new file mode 100644 index 0000000..3e3aa65 --- /dev/null +++ b/src/main/resources/db.setting @@ -0,0 +1,26 @@ +## db.setting文件 + +url = jdbc:mysql://rm-wz9it4fs5tk7n4tm1zo.mysql.rds.aliyuncs.com:3306/cloud_print_cloud_factory?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useTimezone=true&serverTimezone=GMT%2B8&useSSL=false +user = qn_cloudprint +pass = qncloudprint5682 + +# 是否在日志中显示执行的SQL +showSql = true + +# 是否格式化显示的SQL +formatSql = false + +# 是否显示SQL参数 +showParams = true + +# 打印SQL的日志等级,默认debug,可以是info、warn、error +sqlLevel = debug + +# 初始化时建立物理连接的个数 +initialSize = 0 + +# 最大连接池数量 +maxActive = 20 + +# 最小连接池数量 +minIdle = 0 \ No newline at end of file