Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
15d0afeae9
2 changed files with 54 additions and 21 deletions
  1. 50
      src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java
  2. 25
      src/test/java/SourceMockerDemo.java

50
src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java

@ -92,7 +92,7 @@ public class IotMonitoringDataJob {
.setBootstrapServers("120.25.199.30:19092")
.setTopics("test")
//.setTopics("machine_iot_data_received_event")
.setGroupId("123")
.setGroupId("1235")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
@ -106,7 +106,7 @@ public class IotMonitoringDataJob {
// 数据过滤
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> value.getReportTime() != null
&& value.getDataSource() != null && value.getMachinePwrStat() != null && value.getMachineIotMac() == 861193040814171L);
&& value.getDataSource() != null && value.getMachinePwrStat() != null && value.getMachineIotMac() == 102104060102L);
// mac分组并进行工作时长的集合操作
@ -162,7 +162,7 @@ public class IotMonitoringDataJob {
DeviceTotalData onData = onDataState.value();
MachineIotDataReceivedEvent lastOffData = lastOffDataState.value();
MachineIotDataReceivedEvent lastOnData = lastOnDataState.value();
//MachineIotDataReceivedEvent lastOnData = lastOnDataState.value();
MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value();
Integer lastWorkingStat = lastWorkingStatState.value();
DeviceTotalData lastedDeviceState = deviceTotalDataStat.value();
@ -178,7 +178,8 @@ public class IotMonitoringDataJob {
DeviceTotalData nowDeviceState = new DeviceTotalData();
if (lastedDeviceState == null) {
lastedDeviceState = getDeviceTotalData(receivedEvent);
lastOnData = receivedEvent;
lastedDeviceState.setJobTotal(6218646L);
//lastOnData = receivedEvent;
}
if(lastedDeviceState.getLastReportTime() <= receivedEvent.getReportTime()) {
if (lastWorkingStat == null) {
@ -213,25 +214,32 @@ public class IotMonitoringDataJob {
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
}else {
// 机智云
Long workingJon = accJobCount - lastedDeviceState.getJobTotal();
Long jobTotal = lastedDeviceState.getJobTotal();
Long workingJon;
if(accJobCount > jobTotal) {
workingJon = accJobCount - lastedDeviceState.getJobTotal();
}else {
workingJon = 0L;
}
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
}
nowDeviceState.setCurrLocalDate(localDate);
if (lastOnData != null) {
/*if (lastOnData != null) {
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime()), ZoneId.systemDefault()));
} else {
nowDeviceState.setLastBootTime(onData.getLastBootTime());
}
}*/
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault()));
nowDeviceState.setLastReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
lastOffDataState.update(receivedEvent);
// 关机后将待机数据清除
lastWaitJobDataState.update(null);
}else {
nowDeviceState = lastedDeviceState;
}
lastOffDataState.update(receivedEvent);
} else {
if (machineWorkingStat.equals(1)) {
// 工作中
@ -239,17 +247,24 @@ public class IotMonitoringDataJob {
// 转为秒
workingDuration = workingDuration /1000;
if(dataSource == 1) {
// 树根
// 树根今日当前数 + 这次信息点距离上次信息点生产的数量
nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
}else {
// 机智云
Long workingJon = accJobCount - lastedDeviceState.getJobTotal();
Long jobTotal = lastedDeviceState.getJobTotal();
Long workingJon;
if(accJobCount > jobTotal) {
workingJon = accJobCount - lastedDeviceState.getJobTotal();
}else {
workingJon = 0L;
}
nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon);
nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount());
}
nowDeviceState.setCurrLocalDate(localDate);
nowDeviceState.setLastBootTime(onData.getLastBootTime());
//nowDeviceState.setLastBootTime(onData.getLastBootTime());
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault()));
if (lastWaitJobData != null) {
LocalDateTime lastWaitJobTime = LocalDateTime
.ofInstant(Instant.ofEpochMilli(lastWaitJobData.getReportTime()),
@ -274,9 +289,12 @@ public class IotMonitoringDataJob {
if (lastOffData != null) {
// 如果上次是关机消息那么这次就是开机消息
// 记录本次开机作为上次开机时间
lastOnDataState.update(receivedEvent);
//lastOnDataState.update(receivedEvent);
nowDeviceState.setLastReportTime(reportTime);
// 记录一个周期的开机时间
onDataState.update(nowDeviceState);
onData = nowDeviceState;
lastOffDataState.update(null);
}
}
// 如果上次是待机并且这次也是待机那么就不需要发送了
@ -291,7 +309,7 @@ public class IotMonitoringDataJob {
data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration());
data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal());
data.setReportTime(reportTime);
data.setLastBootTime(onData.getLastBootTime().atZone(ZoneOffset.systemDefault()).toEpochSecond() * 1000);
data.setLastBootTime(onData.getLastReportTime());
out.collect(data);
}
}
@ -368,7 +386,10 @@ public class IotMonitoringDataJob {
DeviceTotalData deviceTotalData = null;
// 通过http去请求之前的接口拿数据
for (int i = 1; i <= 20; i++) {
int i = 0;
boolean stop = false;
while (i <= 20 && !stop) {
i++;
String result = HttpUtil
.get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D");
Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data");
@ -400,6 +421,7 @@ public class IotMonitoringDataJob {
deviceTotalData.setTheDayJobCount(0L);
deviceTotalData.setCurrLocalDate(LocalDate.now());
deviceTotalData.setLastReportTime(reportTime);
stop = true;
break;
}
}

25
src/test/java/SourceMockerDemo.java

@ -13,6 +13,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
@ -41,14 +42,20 @@ public class SourceMockerDemo {
List<Integer> accStaList = Arrays.asList(0, 1, 2);
Long currJobDuration = 231L;
long accJobCount = 2314234L;
long accJobCount = 6218646L;
int f = 0;
// 循环发送事件
while (true) {
f++;
MachineIotDataReceivedEvent event = new MachineIotDataReceivedEvent();
event.setId(RandomUtil.randomLong(999999999999999L));
event.setMachineIotMac(861193040814171L);
// 机智云
//event.setMachineIotMac(861193040814171L);
// 树根
event.setMachineIotMac(102104060102L);
event.setMachinePwrStat(RandomUtil.randomEles(pwrStaList, 1).get(0));
event.setMachineWorkingStat(RandomUtil.randomEles(accStaList, 1).get(0));
// 递增每次加一个随机数
@ -56,13 +63,17 @@ public class SourceMockerDemo {
// 递增每次加一个随机数
event.setCurrWaitingDuration(0L);
event.setIgStat(0);
event.setReceivedTime(LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8")) * 1000);
event.setReportTime(LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8")) * 1000);
event.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
if(f%5 == 0) {
event.setReportTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
}else {
event.setReportTime(LocalDateTime.now().plusDays(-1).toInstant(ZoneOffset.of("+8")).toEpochMilli());
}
// 递增加一个随机数
event.setCurrJobCount(currJobDuration = currJobDuration + RandomUtil.randomLong(99L));
event.setCurrJobCount(RandomUtil.randomLong(1, 5));
// 基础值加CurrJobCount
event.setAccJobCount(accJobCount = accJobCount + RandomUtil.randomLong(99L));
event.setAccJobCount(accJobCount = accJobCount + RandomUtil.randomLong(10, 99));
event.setDataSource(1);
// 递增随机加一个数
event.setCurrStoppingDuration(0L);

Loading…
Cancel
Save