diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java index 8510df8..af34ec4 100644 --- a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java @@ -92,7 +92,7 @@ public class IotMonitoringDataJob { .setBootstrapServers("120.25.199.30:19092") .setTopics("test") //.setTopics("machine_iot_data_received_event") - .setGroupId("123") + .setGroupId("1235") .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) @@ -106,7 +106,7 @@ public class IotMonitoringDataJob { // 数据过滤 SingleOutputStreamOperator streamOperator = dataStreamSource .filter((FilterFunction) value -> value.getReportTime() != null - && value.getDataSource() != null && value.getMachinePwrStat() != null && value.getMachineIotMac() == 861193040814171L); + && value.getDataSource() != null && value.getMachinePwrStat() != null && value.getMachineIotMac() == 102104060102L); // mac分组并进行工作时长的集合操作 @@ -162,7 +162,7 @@ public class IotMonitoringDataJob { DeviceTotalData onData = onDataState.value(); MachineIotDataReceivedEvent lastOffData = lastOffDataState.value(); - MachineIotDataReceivedEvent lastOnData = lastOnDataState.value(); + //MachineIotDataReceivedEvent lastOnData = lastOnDataState.value(); MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value(); Integer lastWorkingStat = lastWorkingStatState.value(); DeviceTotalData lastedDeviceState = deviceTotalDataStat.value(); @@ -178,7 +178,8 @@ public class IotMonitoringDataJob { DeviceTotalData nowDeviceState = new DeviceTotalData(); if (lastedDeviceState == null) { lastedDeviceState = getDeviceTotalData(receivedEvent); - lastOnData = receivedEvent; + lastedDeviceState.setJobTotal(6218646L); + //lastOnData = receivedEvent; } if(lastedDeviceState.getLastReportTime() <= receivedEvent.getReportTime()) { if (lastWorkingStat == null) { @@ -213,25 +214,32 @@ public class IotMonitoringDataJob { nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); }else { // 机智云 - Long workingJon = accJobCount - lastedDeviceState.getJobTotal(); + Long jobTotal = lastedDeviceState.getJobTotal(); + Long workingJon; + if(accJobCount > jobTotal) { + workingJon = accJobCount - lastedDeviceState.getJobTotal(); + }else { + workingJon = 0L; + } nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); } } nowDeviceState.setCurrLocalDate(localDate); - if (lastOnData != null) { + /*if (lastOnData != null) { nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime()), ZoneId.systemDefault())); } else { nowDeviceState.setLastBootTime(onData.getLastBootTime()); - } + }*/ + nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault())); nowDeviceState.setLastReportTime(reportTime); deviceTotalDataStat.update(nowDeviceState); - lastOffDataState.update(receivedEvent); // 关机后将待机数据清除 lastWaitJobDataState.update(null); }else { nowDeviceState = lastedDeviceState; } + lastOffDataState.update(receivedEvent); } else { if (machineWorkingStat.equals(1)) { // 工作中 @@ -239,17 +247,24 @@ public class IotMonitoringDataJob { // 转为秒 workingDuration = workingDuration /1000; if(dataSource == 1) { - // 树根 + // 树根(今日当前数 + 这次信息点距离上次信息点生产的数量) nowDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); nowDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); }else { // 机智云 - Long workingJon = accJobCount - lastedDeviceState.getJobTotal(); + Long jobTotal = lastedDeviceState.getJobTotal(); + Long workingJon; + if(accJobCount > jobTotal) { + workingJon = accJobCount - lastedDeviceState.getJobTotal(); + }else { + workingJon = 0L; + } nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + workingJon); nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); } nowDeviceState.setCurrLocalDate(localDate); - nowDeviceState.setLastBootTime(onData.getLastBootTime()); + //nowDeviceState.setLastBootTime(onData.getLastBootTime()); + nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault())); if (lastWaitJobData != null) { LocalDateTime lastWaitJobTime = LocalDateTime .ofInstant(Instant.ofEpochMilli(lastWaitJobData.getReportTime()), @@ -274,9 +289,12 @@ public class IotMonitoringDataJob { if (lastOffData != null) { // 如果上次是关机消息,那么这次就是开机消息 // 记录本次开机作为上次开机时间 - lastOnDataState.update(receivedEvent); + //lastOnDataState.update(receivedEvent); + nowDeviceState.setLastReportTime(reportTime); // 记录一个周期的开机时间 onDataState.update(nowDeviceState); + onData = nowDeviceState; + lastOffDataState.update(null); } } // 如果上次是待机,并且这次也是待机,那么就不需要发送了 @@ -291,7 +309,7 @@ public class IotMonitoringDataJob { data.setCurrJobDuration(nowDeviceState.getTheDayJobDuration()); data.setAccJobCountDuration(nowDeviceState.getJobDurationTotal()); data.setReportTime(reportTime); - data.setLastBootTime(onData.getLastBootTime().atZone(ZoneOffset.systemDefault()).toEpochSecond() * 1000); + data.setLastBootTime(onData.getLastReportTime()); out.collect(data); } } @@ -368,7 +386,10 @@ public class IotMonitoringDataJob { DeviceTotalData deviceTotalData = null; // 通过http去请求之前的接口拿数据 - for (int i = 1; i <= 20; i++) { + int i = 0; + boolean stop = false; + while (i <= 20 && !stop) { + i++; String result = HttpUtil .get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D"); Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data"); @@ -400,6 +421,7 @@ public class IotMonitoringDataJob { deviceTotalData.setTheDayJobCount(0L); deviceTotalData.setCurrLocalDate(LocalDate.now()); deviceTotalData.setLastReportTime(reportTime); + stop = true; break; } } diff --git a/src/test/java/SourceMockerDemo.java b/src/test/java/SourceMockerDemo.java index 8f00dde..01d13f5 100644 --- a/src/test/java/SourceMockerDemo.java +++ b/src/test/java/SourceMockerDemo.java @@ -13,6 +13,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import java.math.BigDecimal; import java.time.LocalDateTime; import java.time.ZoneOffset; +import java.time.temporal.TemporalUnit; import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -41,14 +42,20 @@ public class SourceMockerDemo { List accStaList = Arrays.asList(0, 1, 2); Long currJobDuration = 231L; - long accJobCount = 2314234L; + long accJobCount = 6218646L; + + int f = 0; // 循环发送事件 while (true) { - + f++; MachineIotDataReceivedEvent event = new MachineIotDataReceivedEvent(); event.setId(RandomUtil.randomLong(999999999999999L)); - event.setMachineIotMac(861193040814171L); + // 机智云 + //event.setMachineIotMac(861193040814171L); + + // 树根 + event.setMachineIotMac(102104060102L); event.setMachinePwrStat(RandomUtil.randomEles(pwrStaList, 1).get(0)); event.setMachineWorkingStat(RandomUtil.randomEles(accStaList, 1).get(0)); // 递增每次加一个随机数 @@ -56,13 +63,17 @@ public class SourceMockerDemo { // 递增每次加一个随机数 event.setCurrWaitingDuration(0L); event.setIgStat(0); - event.setReceivedTime(LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8")) * 1000); - event.setReportTime(LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8")) * 1000); + event.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + if(f%5 == 0) { + event.setReportTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); + }else { + event.setReportTime(LocalDateTime.now().plusDays(-1).toInstant(ZoneOffset.of("+8")).toEpochMilli()); + } // 递增加一个随机数 - event.setCurrJobCount(currJobDuration = currJobDuration + RandomUtil.randomLong(99L)); + event.setCurrJobCount(RandomUtil.randomLong(1, 5)); // 基础值加CurrJobCount - event.setAccJobCount(accJobCount = accJobCount + RandomUtil.randomLong(99L)); + event.setAccJobCount(accJobCount = accJobCount + RandomUtil.randomLong(10, 99)); event.setDataSource(1); // 递增随机加一个数 event.setCurrStoppingDuration(0L);