diff --git a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java index 9565b83..bc7e0fe 100644 --- a/src/main/java/com/qniao/iot/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java @@ -48,6 +48,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; + import java.sql.Date; import java.time.*; import java.time.format.DateTimeFormatter; @@ -81,7 +82,6 @@ public class IotMonitoringDataJob { public static void main(String[] args) throws Exception { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); // 获取设备数据源 @@ -103,16 +103,11 @@ public class IotMonitoringDataJob { .filter((FilterFunction) value -> { Long reportTime = value.getReportTime(); - if(reportTime != null + if (reportTime != null && value.getDataSource() != null && value.getMachinePwrStat() != null) { - String reportTimeStr = StrUtil.toString(reportTime); - if(reportTimeStr.length() == 10) { - // 机智云那边的设备可能是秒或毫秒 - reportTime = reportTime * 1000; - } long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); // 晚30分钟的数据就不要了 - return nowTime - reportTime <= (30*60*1000); + return nowTime - reportTime <= (30 * 60 * 1000); } return false; }); @@ -125,15 +120,6 @@ public class IotMonitoringDataJob { // 上次设备数据 private ValueState deviceTotalDataStat; - // 开机数据 - //private ValueState onDataState; - - // 上次的工作状态 - //private ValueState lastWorkingStatState; - - // 上次的开机状态 - //private ValueState lastPwStatState; - // 是否存在es中(假设都存在) private boolean isExistEs = true; @@ -144,14 +130,6 @@ public class IotMonitoringDataJob { deviceTotalDataStat = getRuntimeContext() .getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class))); - /*onDataState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class))); - - lastWorkingStatState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class))); - - lastPwStatState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("lastPwStat", TypeInformation.of(Integer.class)));*/ } @Override @@ -161,66 +139,50 @@ public class IotMonitoringDataJob { try { DeviceTotalData lastedDeviceState = getLastDeviceTotalData(receivedEvent); - Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat(); - Integer lastPwStat = lastedDeviceState.getMachinePwrStat(); - // 上次启动时间 - Long lastBootTime = lastedDeviceState.getLastBootTime(); - // 如果当前消息的时间大于等于上次消息的时间才进行处理 - Integer machinePwrStat = receivedEvent.getMachinePwrStat(); - Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); + Long lastReportTime = lastedDeviceState.getReportTime(); Long reportTime = receivedEvent.getReportTime(); - String reportTimeStr = StrUtil.toString(reportTime); - if(reportTimeStr.length() == 10) { - // 机智云那边的设备可能是秒或毫秒 - reportTime = reportTime * 1000; - } - // 1树根 0机智云 - Integer dataSource = receivedEvent.getDataSource(); - // 当前数据 - DeviceTotalData nowDeviceState = new DeviceTotalData(); - if (lastedDeviceState.getReportTime() <= receivedEvent.getReportTime()) { - if (lastWorkingStat == null || lastPwStat == null) { - lastWorkingStat = lastedDeviceState.getMachineWorkingStat(); - lastPwStat = lastedDeviceState.getMachinePwrStat(); - } + // 如果这次的消息事件小于上次消息的时间,那么就进行丢弃 + if (lastReportTime <= reportTime) { + Integer lastWorkingStat = lastedDeviceState.getMachineWorkingStat(); + Integer lastPwStat = lastedDeviceState.getMachinePwrStat(); + // 上次启动时间 + Long lastBootTime = lastedDeviceState.getLastBootTime(); + // 如果当前消息的时间大于等于上次消息的时间才进行处理 + Integer machinePwrStat = receivedEvent.getMachinePwrStat(); + Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); + // 1树根 0机智云 + Integer dataSource = receivedEvent.getDataSource(); + // 当前数据 + DeviceTotalData nowDeviceState = new DeviceTotalData(); nowDeviceState.setMachinePwrStat(machinePwrStat); nowDeviceState.setMachineWorkingStat(machineWorkingStat); LocalDate localDate = new Date(reportTime).toLocalDate(); - Long lastReportTime = lastedDeviceState.getReportTime(); - if (lastReportTime == null) { - // 如果上次的消息时间为空,那么不进行计算 - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount()); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration()); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal()); - nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal()); - } // 直接通过两个消息的时间差进行计算(毫秒) Long workingDuration = reportTime - lastedDeviceState.getReportTime(); // 转为秒 workingDuration = workingDuration / 1000; if (machinePwrStat.equals(0)) { + // 关机 if (lastPwStat != 0) { if (lastWorkingStat == 1) { - // 如果上次是工作中,那就进行累加 - if (lastReportTime != null) { - if (dataSource == 1) { - // 树根 - nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); - Long lastJobTotal = lastedDeviceState.getJobTotal(); - Long accJobCount = receivedEvent.getAccJobCount(); - // 直接往上类 - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + (accJobCount - lastJobTotal)); - nowDeviceState.setJobTotal(accJobCount); - } else { - // 机智云 - nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration()); - nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); - nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); - nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrJobCount()); - nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); - } + // 如果上次是工作状态,那么需要记录产量和生产时间 + if (dataSource == 1) { + // 树根 + nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); + Long lastJobTotal = lastedDeviceState.getJobTotal(); + Long accJobCount = receivedEvent.getAccJobCount(); + // 直接往上累加 + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + (accJobCount - lastJobTotal)); + nowDeviceState.setJobTotal(accJobCount); + } else { + // 机智云 + nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration()); + nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); + nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); + nowDeviceState.setTheDayJobCount(lastedDeviceState.getTheDayJobCount() + receivedEvent.getCurrJobCount()); + nowDeviceState.setJobTotal(lastedDeviceState.getJobTotal() + receivedEvent.getCurrJobCount()); } } else { nowDeviceState = lastedDeviceState; @@ -234,6 +196,7 @@ public class IotMonitoringDataJob { nowDeviceState.setMachinePwrStat(machinePwrStat); nowDeviceState.setMachineWorkingStat(machineWorkingStat); } else { + // 开机 if (machineWorkingStat.equals(1)) { // 工作 if (dataSource == 1) { @@ -259,7 +222,7 @@ public class IotMonitoringDataJob { // 设置开机时长,待机也要进行累加,所以放这里 if (dataSource == 1) { nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration); - }else { + } else { nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + receivedEvent.getCurrJobDuration()); } nowDeviceState.setCurrLocalDate(localDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); @@ -290,13 +253,13 @@ public class IotMonitoringDataJob { data.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); data.setLastBootTime(nowDeviceState.getLastBootTime()); data.setCurrDuration(nowDeviceState.getTheDayDuration()); - if(!isExistEs) { + if (!isExistEs) { isExistEs = true; } out.collect(data); } } - }catch (Exception e) { + } catch (Exception e) { log.info("导致异常的信息:" + JSONUtil.toJsonStr(receivedEvent)); log.error("处理异常", e); } @@ -332,7 +295,8 @@ public class IotMonitoringDataJob { value.setReportTime(reportTime); } // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 - if (LocalDate.parse(value.getCurrLocalDate(), DateTimeFormatter.ofPattern("yyyy-MM-dd")).isBefore(localDate)) { + if (LocalDate.parse(value.getCurrLocalDate(), DateTimeFormatter.ofPattern("yyyy-MM-dd")) + .isBefore(localDate)) { // value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可 value.setTheDayJobDuration(0L); value.setTheDayJobCount(0L); @@ -385,18 +349,18 @@ public class IotMonitoringDataJob { deviceTotalData.setCurrLocalDate(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); // 是否在线 Object isOnlineObj = JSONUtil.getByPath(JSONUtil.parse(o), "isOnline"); - if(isOnlineObj != null) { + if (isOnlineObj != null) { int isOnline = Integer.parseInt(String.valueOf(isOnlineObj)); - if(isOnline == 0) { + if (isOnline == 0) { // 开机 deviceTotalData.setMachinePwrStat(1); deviceTotalData.setMachineWorkingStat(2); - }else { + } else { // 关机 deviceTotalData.setMachinePwrStat(0); deviceTotalData.setMachineWorkingStat(0); } - }else { + } else { deviceTotalData.setMachinePwrStat(0); deviceTotalData.setMachineWorkingStat(0); } @@ -512,12 +476,12 @@ public class IotMonitoringDataJob { private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) { - if(currIndicesDateSuffix == null) { + if (currIndicesDateSuffix == null) { // 当前月的索引为空 createIndices(indicesName, indexDateSuffix); - }else { + } else { // 校验当前消息能否符合当前索引 - if(!indexDateSuffix.equals(currIndicesDateSuffix)) { + if (!indexDateSuffix.equals(currIndicesDateSuffix)) { // 如果不符合,需要重建索引 createIndices(indicesName, indexDateSuffix); } @@ -531,7 +495,7 @@ public class IotMonitoringDataJob { // 先判断客户端是否存在 try { boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); - if(!exists) { + if (!exists) { // 创建索引 CreateIndexRequest request = new CreateIndexRequest(indicesName); // 字段映射 @@ -582,7 +546,7 @@ public class IotMonitoringDataJob { CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT); boolean acknowledged = createIndexResponse.isAcknowledged(); boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged(); - if(!acknowledged || !shardsAcknowledged) { + if (!acknowledged || !shardsAcknowledged) { throw new Exception("自定义索引创建失败!!!"); } currIndicesDateSuffix = indexDateSuffix;