Browse Source

更新

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
parent
commit
f15c65b678
3 changed files with 92 additions and 19 deletions
  1. 2
      src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java
  2. 83
      src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java
  3. 26
      src/main/resources/db.setting

2
src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java

@ -39,7 +39,7 @@ public class DeviceTotalData {
private LocalDate currLocalDate;
/**
* 上次消息事件
* 上次消息时间
*/
private Long lastReportTime;
}

83
src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java

@ -1,7 +1,9 @@
package com.qniao.iot.gizwits;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.db.Db;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
@ -11,6 +13,7 @@ import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
@ -19,6 +22,7 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsIni
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
@ -36,6 +40,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
@ -46,6 +51,7 @@ import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.sql.Date;
import java.sql.SQLException;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
@ -69,6 +75,13 @@ public class IotMonitoringDataJob {
return requestConfigBuilder;
}));
private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" +
"from qn_machine_realtime_state qmrs\n" +
" LEFT JOIN (select example_id, count_unit from qn_machine_list where is_delete = 0) qml\n" +
" ON qmrs.iot_mac = qml.example_id\n" +
"where qmrs.iot_mac = ?\n" +
" and qmrs.is_delete = 0";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@ -87,8 +100,14 @@ public class IotMonitoringDataJob {
DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
// 数据过滤
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> value.getReportTime() != null && value.getDataSource() != null);
// mac分组并进行工作时长的集合操作
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = dataStreamSource
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = streamOperator
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>() {
@ -154,10 +173,11 @@ public class IotMonitoringDataJob {
// 当前数据
DeviceTotalData nowDeviceState = getDeviceTotalData(receivedEvent);
if (lastedDeviceState == null) {
lastedDeviceState = deviceTotalDataStat.value();
lastOnData = receivedEvent;
}
if (lastWorkingStat == null) {
lastWorkingStat = 0;
lastWorkingStat = getDeviceStateListJson(receivedEvent.getMachineIotMac());
}
if (onData == null) {
onData = lastedDeviceState;
@ -165,7 +185,6 @@ public class IotMonitoringDataJob {
}
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
Long a;
if (machinePwrStat.equals(0)) {
if (lastWorkingStat == 1) {
// 如果上次是工作中那就进行累加
@ -199,6 +218,7 @@ public class IotMonitoringDataJob {
} else {
nowDeviceState.setLastBootTime(onData.getLastBootTime());
}
nowDeviceState.setLastReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
// 如果关机
onDataState.update(nowDeviceState);
@ -210,7 +230,6 @@ public class IotMonitoringDataJob {
if (lastOffData != null) {
lastOnData = receivedEvent;
}
assert lastedDeviceState != null;
if (machineWorkingStat.equals(1)) {
// 工作中
Long workingDuration = reportTime - lastedDeviceState.getLastReportTime();
@ -240,6 +259,7 @@ public class IotMonitoringDataJob {
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
}
nowDeviceState.setLastReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
}
if (machineWorkingStat.equals(2)) {
@ -268,6 +288,17 @@ public class IotMonitoringDataJob {
}
}
private Integer getDeviceStateListJson(Long machineIotMac) throws SQLException {
// 查询数据库最新的设备状态
List<Integer> list = Db.use().query(SQL, Integer.class, machineIotMac);
if (CollUtil.isNotEmpty(list)) {
return list.get(0);
}
return 0;
}
private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception {
DeviceTotalData value = deviceTotalDataStat.value();
@ -291,6 +322,7 @@ public class IotMonitoringDataJob {
// es中也没有直接从老接口拿
data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000);
}
value = data;
}
// 是否日期是当天的否则需要更新当天工作时长和当天工作量
if (value.getCurrLocalDate().isBefore(localDate)) {
@ -305,13 +337,16 @@ public class IotMonitoringDataJob {
data.setCurrLocalDate(localDate);
data.setLastBootTime(LocalDateTime.ofInstant(Instant
.ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault()));
deviceTotalDataStat.update(data);
} else {
// value有值但是日期不对说明到了第二天那么只需要对当天数据清零即可
value.setTheDayJobDuration(0L);
value.setTheDayJobCount(0L);
data = value;
data.setJobTotal(value.getJobTotal());
data.setJobDurationTotal(value.getJobDurationTotal());
data.setTheDayJobDuration(0L);
data.setTheDayJobCount(0L);
data.setCurrLocalDate(localDate);
data.setLastBootTime(value.getLastBootTime());
}
deviceTotalDataStat.update(data);
}
return data;
}
@ -351,6 +386,8 @@ public class IotMonitoringDataJob {
deviceTotalData.setLastBootTime(lastBootTime);
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setCurrLocalDate(LocalDate.now());
deviceTotalData.setLastReportTime(reportTime/1000);
break;
}
}
@ -359,9 +396,11 @@ public class IotMonitoringDataJob {
deviceTotalData = new DeviceTotalData();
deviceTotalData.setJobTotal(0L);
deviceTotalData.setJobDurationTotal(0L);
deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()));
deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime), ZoneId.systemDefault()));
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setCurrLocalDate(LocalDate.now());
deviceTotalData.setLastReportTime(reportTime/1000);
}
return deviceTotalData;
}
@ -396,23 +435,31 @@ public class IotMonitoringDataJob {
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest("DeviceMonitoringData");
searchRequest.source(searchSourceBuilder);
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class);
GetIndexRequest exist=new GetIndexRequest("DeviceMonitoringData");
// 先判断客户端是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if(exists) {
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class);
}
}
} catch (IOException e) {
} catch (Exception e) {
log.error("获取es数据异常", e);
}
return null;
}
}).name("machineIotDataReceivedEventDataStream keyBy stream");
machineIotDataReceivedEventDataStream.print();
// 写入es
sinkEs(machineIotDataReceivedEventDataStream);
//sinkEs(machineIotDataReceivedEventDataStream);
env.execute("iot_monitoring_data_job");
}

26
src/main/resources/db.setting

@ -0,0 +1,26 @@
## db.setting文件
url = jdbc:mysql://rm-wz9it4fs5tk7n4tm1zo.mysql.rds.aliyuncs.com:3306/cloud_print_cloud_factory?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useTimezone=true&serverTimezone=GMT%2B8&useSSL=false
user = qn_cloudprint
pass = qncloudprint5682
# 是否在日志中显示执行的SQL
showSql = true
# 是否格式化显示的SQL
formatSql = false
# 是否显示SQL参数
showParams = true
# 打印SQL的日志等级,默认debug,可以是info、warn、error
sqlLevel = debug
# 初始化时建立物理连接的个数
initialSize = 0
# 最大连接池数量
maxActive = 20
# 最小连接池数量
minIdle = 0
Loading…
Cancel
Save