Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
0d84b81c49
4 changed files with 92 additions and 89 deletions
  1. 5
      src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java
  2. 5
      src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java
  3. 162
      src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java
  4. 9
      src/test/java/SourceMockerDemo.java

5
src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java

@ -42,6 +42,11 @@ public class DeviceMonitoringData {
*/
private Long currJobDuration;
/**
* 当前开机时长
*/
private Long currDuration;
/**
* 数据实际采样时间单位豪秒
*/

5
src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java

@ -23,6 +23,11 @@ public class DeviceTotalData {
*/
private Long theDayJobDuration;
/**
* 当天开机时长单位秒
*/
private Long theDayDuration;
/**
* 累计作业计数
*/

162
src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java

@ -76,7 +76,7 @@ public class IotMonitoringDataJob {
return requestConfigBuilder;
}));
private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" +
private final static String SQL = "select qmrs.status \n" +
"from qn_machine_realtime_state qmrs\n" +
" LEFT JOIN (select example_id, count_unit from qn_machine_list where is_delete = 0) qml\n" +
" ON qmrs.iot_mac = qml.example_id\n" +
@ -129,9 +129,12 @@ public class IotMonitoringDataJob {
// 当前周期的待机数据
private ValueState<MachineIotDataReceivedEvent> lastWaitJobDataState;
// 上次的状态
// 上次的工作状态
private ValueState<Integer> lastWorkingStatState;
// 上次的开机状态
private ValueState<Integer> lastPwStatState;
@Override
public void open(Configuration parameters) {
@ -153,6 +156,9 @@ public class IotMonitoringDataJob {
lastWorkingStatState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class)));
lastPwStatState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastPwStat", TypeInformation.of(Integer.class)));
}
@Override
@ -162,28 +168,25 @@ public class IotMonitoringDataJob {
DeviceTotalData onData = onDataState.value();
MachineIotDataReceivedEvent lastOffData = lastOffDataState.value();
//MachineIotDataReceivedEvent lastOnData = lastOnDataState.value();
MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value();
Integer lastWorkingStat = lastWorkingStatState.value();
DeviceTotalData lastedDeviceState = deviceTotalDataStat.value();
Integer lastPwStat = lastPwStatState.value();
DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent);
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
lastPwStatState.update(machinePwrStat);
Long reportTime = receivedEvent.getReportTime();
Long accJobCount = receivedEvent.getAccJobCount();
// 1树根 0机智云
Integer dataSource = receivedEvent.getDataSource();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
if (lastedDeviceState == null) {
lastedDeviceState = getDeviceTotalData(receivedEvent);
lastedDeviceState.setJobTotal(6218646L);
//lastOnData = receivedEvent;
}
if(lastedDeviceState.getLastReportTime() <= receivedEvent.getReportTime()) {
if (lastedDeviceState.getLastReportTime() <= receivedEvent.getReportTime()) {
if (lastWorkingStat == null) {
lastWorkingStat = getDeviceStateListJson(receivedEvent.getMachineIotMac());
lastPwStat = lastWorkingStat == 0 ? 0 : 1;
}
if (onData == null) {
onData = lastedDeviceState;
@ -191,110 +194,95 @@ public class IotMonitoringDataJob {
}
LocalDate localDate = new Date(reportTime).toLocalDate();
Long a;
Long lastReportTime = lastedDeviceState.getLastReportTime();
if (lastReportTime == null) {
// 如果上次的消息时间为空那么不进行计算
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal());
}
// 直接通过两个消息的时间差进行计算毫秒
Long workingDuration = reportTime - lastedDeviceState.getLastReportTime();
// 转为秒
workingDuration = workingDuration / 1000;
if (machinePwrStat.equals(0)) {
if (lastWorkingStat == 1) {
// 如果上次是工作中那就进行累加
Long lastReportTime = lastedDeviceState.getLastReportTime();
if (lastReportTime == null) {
// 如果上次的消息时间为空那么不进行计算
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal());
} else {
// 直接通过两个消息的时间差进行计算毫秒
Long workingDuration = reportTime - lastedDeviceState.getLastReportTime();
// 转为秒
workingDuration = workingDuration / 1000;
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
if(dataSource == 1) {
// 树根
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
}else {
// 机智云
Long jobTotal = lastedDeviceState.getJobTotal();
Long workingJon;
if(accJobCount > jobTotal) {
workingJon = accJobCount - lastedDeviceState.getJobTotal();
}else {
workingJon = 0L;
if (lastPwStat != 0) {
if (lastWorkingStat == 1) {
// 如果上次是工作中那就进行累加
if (lastReportTime != null) {
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
if (dataSource == 1) {
// 树根
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
} else {
// 机智云
Long jobTotal = lastedDeviceState.getJobTotal();
Long workingJon;
if (accJobCount > jobTotal) {
workingJon = accJobCount - lastedDeviceState.getJobTotal();
} else {
workingJon = 0L;
}
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
}
nowDeviceState.setCurrLocalDate(localDate);
/*if (lastOnData != null) {
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime()), ZoneId.systemDefault()));
nowDeviceState.setCurrLocalDate(localDate);
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault()));
nowDeviceState.setLastReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
// 关机后将待机数据清除
lastWaitJobDataState.update(null);
} else {
nowDeviceState.setLastBootTime(onData.getLastBootTime());
}*/
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault()));
nowDeviceState.setLastReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
// 关机后将待机数据清除
lastWaitJobDataState.update(null);
}else {
nowDeviceState = lastedDeviceState;
deviceTotalDataStat.update(nowDeviceState);
}
} else {
nowDeviceState = lastedDeviceState;
deviceTotalDataStat.update(nowDeviceState);
}
lastOffDataState.update(receivedEvent);
} else {
if (machineWorkingStat.equals(1)) {
// 工作中
Long workingDuration = reportTime - lastedDeviceState.getLastReportTime();
// 转为秒
workingDuration = workingDuration /1000;
if(dataSource == 1) {
if (dataSource == 1) {
// 树根今日当前数 + 这次信息点距离上次信息点生产的数量
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
}else {
} else {
// 机智云
Long jobTotal = lastedDeviceState.getJobTotal();
Long workingJon;
if(accJobCount > jobTotal) {
if (accJobCount > jobTotal) {
workingJon = accJobCount - lastedDeviceState.getJobTotal();
}else {
} else {
workingJon = 0L;
}
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
nowDeviceState.setCurrLocalDate(localDate);
//nowDeviceState.setLastBootTime(onData.getLastBootTime());
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault()));
if (lastWaitJobData != null) {
LocalDateTime lastWaitJobTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(lastWaitJobData.getReportTime()),
ZoneId.systemDefault());
LocalDateTime localDateTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(reportTime),
ZoneId.systemDefault());
a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + a);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + a);
} else {
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
}
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
nowDeviceState.setLastReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
}else {
} else {
// 待机
lastWaitJobDataState.update(receivedEvent);
nowDeviceState = lastedDeviceState;
deviceTotalDataStat.update(nowDeviceState);
}
if (lastOffData != null) {
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
if (lastPwStat == 0) {
// 如果上次是关机消息那么这次就是开机消息
// 记录本次开机作为上次开机时间
//lastOnDataState.update(receivedEvent);
nowDeviceState.setLastReportTime(reportTime);
// 记录一个周期的开机时间
onDataState.update(nowDeviceState);
onData = nowDeviceState;
lastOffDataState.update(null);
}
}
// 如果上次是待机并且这次也是待机那么就不需要发送了
@ -310,6 +298,7 @@ public class IotMonitoringDataJob {
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setLastBootTime(onData.getLastReportTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
out.collect(data);
}
}
@ -350,6 +339,8 @@ public class IotMonitoringDataJob {
// es中也没有直接从老接口拿
data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime);
}
// TODO 测试数据记得删除
data.setJobTotal(6218646L);
value = data;
}
// 是否日期是当天的否则需要更新当天工作时长和当天工作量
@ -366,6 +357,7 @@ public class IotMonitoringDataJob {
value.setLastBootTime(LocalDateTime.ofInstant(Instant
.ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault()));
value.setLastReportTime(deviceMonitoringData.getReportTime());
value.setTheDayDuration(0L);
} else {
// value有值但是日期不对说明到了第二天那么只需要对当天数据清零即可
value.setJobTotal(value.getJobTotal());
@ -374,7 +366,8 @@ public class IotMonitoringDataJob {
value.setTheDayJobCount(0L);
value.setCurrLocalDate(localDate);
value.setLastBootTime(value.getLastBootTime());
value.setLastReportTime(value.getLastReportTime());
value.setTheDayDuration(value.getTheDayDuration());
value.setLastReportTime(reportTime);
}
}
deviceTotalDataStat.update(value);
@ -406,7 +399,7 @@ public class IotMonitoringDataJob {
}
for (Object o : objects.toArray()) {
Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac");
Long iotMac = Long.parseLong((String)mac);
Long iotMac = Long.parseLong((String) mac);
if (iotMac.equals(machineIotMac)) {
deviceTotalData = new DeviceTotalData();
Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal");
@ -436,6 +429,7 @@ public class IotMonitoringDataJob {
deviceTotalData.setCurrLocalDate(LocalDate.now());
deviceTotalData.setLastReportTime(reportTime);
}
deviceTotalData.setTheDayDuration(0L);
return deviceTotalData;
}
@ -453,10 +447,10 @@ public class IotMonitoringDataJob {
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest("DeviceMonitoringData");
searchRequest.source(searchSourceBuilder);
GetIndexRequest exist=new GetIndexRequest("DeviceMonitoringData");
GetIndexRequest exist = new GetIndexRequest("DeviceMonitoringData");
// 先判断客户端是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if(exists) {
if (exists) {
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据

9
src/test/java/SourceMockerDemo.java

@ -21,7 +21,6 @@ import java.util.concurrent.Future;
public class SourceMockerDemo {
// 延迟毫秒
public static final long DELAY = 1000;
public static void main(String[] args) throws Exception {
// 创建kafka配置属性
@ -64,11 +63,11 @@ public class SourceMockerDemo {
event.setCurrWaitingDuration(0L);
event.setIgStat(0);
event.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
if(f%5 == 0) {
event.setReportTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
event.setReportTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
/*if(f > 20) {
}else {
event.setReportTime(LocalDateTime.now().plusDays(-1).toInstant(ZoneOffset.of("+8")).toEpochMilli());
}
}*/
// 递增加一个随机数
event.setCurrJobCount(RandomUtil.randomLong(1, 5));
@ -90,7 +89,7 @@ public class SourceMockerDemo {
Future<RecordMetadata> send = producer.send(record);
System.out.println(send.get());
Thread.sleep(DELAY);
Thread.sleep(RandomUtil.randomLong(1, 5) * 1000);
}
}

Loading…
Cancel
Save