Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
0bb99d87f4
3 changed files with 138 additions and 137 deletions
  1. 8
      src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java
  2. 261
      src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java
  3. 6
      src/test/java/Demo2.java

8
src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java

@ -9,9 +9,9 @@ import java.time.LocalDateTime;
public class DeviceTotalData {
/**
* 上次开机时间
* 上次开机时间时间戳
*/
private LocalDateTime lastBootTime;
private Long lastBootTime;
/**
* 当天作业计数
@ -39,9 +39,9 @@ public class DeviceTotalData {
private Long jobDurationTotal;
/**
* 当前日期
* 当前日期格式yyyy-MM-dd
*/
private LocalDate currLocalDate;
private String currLocalDate;
/**
* 消息时间

261
src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java

@ -102,6 +102,8 @@ public class IotMonitoringDataJob {
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> value.getReportTime() != null
&& value.getDataSource() != null && value.getMachinePwrStat() != null);
streamOperator.print().name("kafka 数据源:");
// mac分组并进行工作时长的集合操作
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = streamOperator
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
@ -139,138 +141,142 @@ public class IotMonitoringDataJob {
@Override
public void processElement(MachineIotDataReceivedEvent receivedEvent,
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>.Context ctx,
Collector<DeviceMonitoringData> out) throws Exception {
DeviceTotalData onData = onDataState.value();
Integer lastWorkingStat = lastWorkingStatState.value();
Integer lastPwStat = lastPwStatState.value();
DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent);
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
lastPwStatState.update(machinePwrStat);
Long reportTime = receivedEvent.getReportTime();
Long accJobCount = receivedEvent.getAccJobCount();
// 1树根 0机智云
Integer dataSource = receivedEvent.getDataSource();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
if (lastedDeviceState.getReportTime() <= receivedEvent.getReportTime()) {
if (lastWorkingStat == null) {
DeviceState deviceState = getDeviceStateListJson(receivedEvent.getMachineIotMac());
Integer status = deviceState == null ? null : deviceState.getStatus();
lastWorkingStat = status == null ? 0 : status;
lastPwStat = lastWorkingStat == 0 ? 0 : 1;
}
if (onData == null) {
onData = lastedDeviceState;
onDataState.update(onData);
}
LocalDate localDate = new Date(reportTime).toLocalDate();
Long lastReportTime = lastedDeviceState.getReportTime();
if (lastReportTime == null) {
// 如果上次的消息时间为空那么不进行计算
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal());
}
// 直接通过两个消息的时间差进行计算毫秒
Long workingDuration = reportTime - lastedDeviceState.getReportTime();
// 转为秒
workingDuration = workingDuration / 1000;
if (machinePwrStat.equals(0)) {
if (lastPwStat != 0) {
if (lastWorkingStat == 1) {
// 如果上次是工作中那就进行累加
if (lastReportTime != null) {
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
if (dataSource == 1) {
// 树根
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
} else {
// 机智云
Long jobTotal = lastedDeviceState.getJobTotal();
Long workingJon;
if (accJobCount > jobTotal) {
workingJon = accJobCount - lastedDeviceState.getJobTotal();
Collector<DeviceMonitoringData> out) {
try {
DeviceTotalData onData = onDataState.value();
Integer lastWorkingStat = lastWorkingStatState.value();
Integer lastPwStat = lastPwStatState.value();
DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent);
// 如果当前消息的时间大于等于上次消息的时间才进行处理
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
lastWorkingStatState.update(machineWorkingStat);
lastPwStatState.update(machinePwrStat);
Long reportTime = receivedEvent.getReportTime();
Long accJobCount = receivedEvent.getAccJobCount();
// 1树根 0机智云
Integer dataSource = receivedEvent.getDataSource();
// 当前数据
DeviceTotalData nowDeviceState = new DeviceTotalData();
if (lastedDeviceState.getReportTime() <= receivedEvent.getReportTime()) {
if (lastWorkingStat == null) {
DeviceState deviceState = getDeviceStateListJson(receivedEvent.getMachineIotMac());
Integer status = deviceState == null ? null : deviceState.getStatus();
lastWorkingStat = status == null ? 0 : status;
lastPwStat = lastWorkingStat == 0 ? 0 : 1;
}
if (onData == null) {
onData = lastedDeviceState;
onDataState.update(onData);
}
LocalDate localDate = new Date(reportTime).toLocalDate();
Long lastReportTime = lastedDeviceState.getReportTime();
if (lastReportTime == null) {
// 如果上次的消息时间为空那么不进行计算
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration());
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal());
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal());
}
// 直接通过两个消息的时间差进行计算毫秒
Long workingDuration = reportTime - lastedDeviceState.getReportTime();
// 转为秒
workingDuration = workingDuration / 1000;
if (machinePwrStat.equals(0)) {
if (lastPwStat != 0) {
if (lastWorkingStat == 1) {
// 如果上次是工作中那就进行累加
if (lastReportTime != null) {
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
if (dataSource == 1) {
// 树根
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
} else {
workingJon = 0L;
// 机智云
Long jobTotal = lastedDeviceState.getJobTotal();
Long workingJon;
if (accJobCount > jobTotal) {
workingJon = accJobCount - lastedDeviceState.getJobTotal();
} else {
workingJon = 0L;
}
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(onData.getReportTime());
nowDeviceState.setReportTime(reportTime);
} else {
nowDeviceState = lastedDeviceState;
}
nowDeviceState.setCurrLocalDate(localDate);
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getReportTime()), ZoneId.systemDefault()));
nowDeviceState.setReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
} else {
nowDeviceState = lastedDeviceState;
deviceTotalDataStat.update(nowDeviceState);
}
} else {
nowDeviceState = lastedDeviceState;
deviceTotalDataStat.update(nowDeviceState);
}
} else {
if (machineWorkingStat.equals(1)) {
if (dataSource == 1) {
// 树根今日当前数 + 这次信息点距离上次信息点生产的数量
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
} else {
// 机智云
Long jobTotal = lastedDeviceState.getJobTotal();
Long workingJon;
if (accJobCount > jobTotal) {
workingJon = accJobCount - lastedDeviceState.getJobTotal();
if (machineWorkingStat.equals(1)) {
if (dataSource == 1) {
// 树根今日当前数 + 这次信息点距离上次信息点生产的数量
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
} else {
workingJon = 0L;
// 机智云
Long jobTotal = lastedDeviceState.getJobTotal();
Long workingJon;
if (accJobCount > jobTotal) {
workingJon = accJobCount - lastedDeviceState.getJobTotal();
} else {
workingJon = 0L;
}
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
nowDeviceState.setLastBootTime(onData.getReportTime());
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
nowDeviceState.setReportTime(reportTime);
} else {
nowDeviceState = lastedDeviceState;
}
deviceTotalDataStat.update(nowDeviceState);
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
if (lastPwStat == 0) {
// 如果上次是关机消息那么这次就是开机消息
// 记录本次开机作为上次开机时间
nowDeviceState.setReportTime(reportTime);
// 记录一个周期的开机时间
onDataState.update(nowDeviceState);
onData = nowDeviceState;
}
nowDeviceState.setCurrLocalDate(localDate);
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getReportTime()), ZoneId.systemDefault()));
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
nowDeviceState.setReportTime(reportTime);
} else {
nowDeviceState = lastedDeviceState;
}
deviceTotalDataStat.update(nowDeviceState);
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
if (lastPwStat == 0) {
// 如果上次是关机消息那么这次就是开机消息
// 记录本次开机作为上次开机时间
nowDeviceState.setReportTime(reportTime);
// 记录一个周期的开机时间
onDataState.update(nowDeviceState);
onData = nowDeviceState;
// 如果上次是待机并且这次也是待机那么就不需要发送了
if (!(lastWorkingStat == 2 && machineWorkingStat == 2)) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
data.setLastBootTime(onData.getReportTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
out.collect(data);
}
}
// 如果上次是待机并且这次也是待机那么就不需要发送了
if (!(lastWorkingStat == 2 && machineWorkingStat == 2)) {
DeviceMonitoringData data = new DeviceMonitoringData();
data.setDataSource(receivedEvent.getDataSource());
data.setMachineIotMac(receivedEvent.getMachineIotMac());
data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
data.setAccJobCount(nowDeviceState.getJobTotal());
data.setCurrJobCount(nowDeviceState.getTheDayJobCount());
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
data.setLastBootTime(onData.getReportTime());
data.setCurrDuration(nowDeviceState.getTheDayDuration());
out.collect(data);
}
}catch (Exception e) {
log.info("导致异常的信息:" + JSONUtil.toJsonStr(receivedEvent));
log.error("处理异常", e);
}
}
@ -298,12 +304,10 @@ public class IotMonitoringDataJob {
data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
data.setJobTotal(deviceMonitoringData.getAccJobCount());
// 单位秒
data.setCurrLocalDate(localDate);
data.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
LocalDateTime ldt = new Date(deviceMonitoringData.getLastBootTime())
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime();
data.setLastBootTime(ldt);
data.setLastBootTime(deviceMonitoringData.getLastBootTime());
data.setReportTime(deviceMonitoringData.getReportTime());
} else {
// es中也没有直接从老接口拿
@ -312,7 +316,7 @@ public class IotMonitoringDataJob {
value = data;
}
// 是否日期是当天的否则需要更新当天工作时长和当天工作量
if (value.getCurrLocalDate().isBefore(localDate)) {
if (LocalDate.parse(value.getCurrLocalDate(), DateTimeFormatter.ofPattern("yyyy-MM-dd")).isBefore(localDate)) {
// 先从es中拿昨天最新的
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(),
LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond());
@ -321,9 +325,8 @@ public class IotMonitoringDataJob {
value.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
value.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
value.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
value.setCurrLocalDate(localDate);
value.setLastBootTime(LocalDateTime.ofInstant(Instant
.ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault()));
value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
value.setLastBootTime(deviceMonitoringData.getLastBootTime());
value.setReportTime(deviceMonitoringData.getReportTime());
value.setTheDayDuration(0L);
} else {
@ -332,7 +335,7 @@ public class IotMonitoringDataJob {
value.setJobDurationTotal(value.getJobDurationTotal());
value.setTheDayJobDuration(0L);
value.setTheDayJobCount(0L);
value.setCurrLocalDate(localDate);
value.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
value.setLastBootTime(value.getLastBootTime());
value.setTheDayDuration(0L);
value.setReportTime(reportTime);
@ -377,10 +380,10 @@ public class IotMonitoringDataJob {
Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime");
LocalDateTime lastBootTime = LocalDateTime
.parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
deviceTotalData.setLastBootTime(lastBootTime);
deviceTotalData.setLastBootTime(lastBootTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setCurrLocalDate(LocalDate.now());
deviceTotalData.setCurrLocalDate(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
deviceTotalData.setReportTime(reportTime);
stop = true;
break;
@ -391,10 +394,10 @@ public class IotMonitoringDataJob {
deviceTotalData = new DeviceTotalData();
deviceTotalData.setJobTotal(0L);
deviceTotalData.setJobDurationTotal(0L);
deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime), ZoneId.systemDefault()));
deviceTotalData.setLastBootTime(reportTime);
deviceTotalData.setTheDayJobDuration(0L);
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setCurrLocalDate(LocalDate.now());
deviceTotalData.setCurrLocalDate(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
deviceTotalData.setReportTime(reportTime);
}
deviceTotalData.setTheDayDuration(0L);

6
src/test/java/Demo2.java

@ -2,6 +2,8 @@ import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import java.time.LocalDate;
public class Demo2 {
public static void main(String[] args) {
@ -14,9 +16,5 @@ public class Demo2 {
System.out.println(data);*/
Long a = 2314L;
a = a/1000;
System.out.println(a);
}
}
Loading…
Cancel
Save