diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index f288b5f..e3d1ce6 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -107,155 +107,157 @@ public class IotDevicePowerOnAndOffDataJob { final DataStream dataStreamSource = env .addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE), false, new MachineOutputCommandDeserializationSchema())).setParallelism(1); + try { + SingleOutputStreamOperator outputStreamOperator = dataStreamSource + .keyBy(MachineOutputCommand::getMac) + .process(new KeyedProcessFunction() { - SingleOutputStreamOperator outputStreamOperator = dataStreamSource - .keyBy(MachineOutputCommand::getMac) - .process(new KeyedProcessFunction() { - - private ValueState powerOnAndOffDataEventValueState; + private ValueState powerOnAndOffDataEventValueState; - @Override - public void open(Configuration parameters) { + @Override + public void open(Configuration parameters) { - StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10)) - .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) - .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) - .build(); + StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10)) + .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) + .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) + .build(); - ValueStateDescriptor powerOnAndOffDataEventValue - = new ValueStateDescriptor<>("powerOnAndOffDataEventValue", - TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)); - // 设置状态值的过期时间,为了解决手动修改数据没有同步的问题 - powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig); + ValueStateDescriptor powerOnAndOffDataEventValue + = new ValueStateDescriptor<>("powerOnAndOffDataEventValue", + TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)); + // 设置状态值的过期时间,为了解决手动修改数据没有同步的问题 + powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig); - // 必须在 open 生命周期初始化 - powerOnAndOffDataEventValueState = getRuntimeContext() - .getState(powerOnAndOffDataEventValue); - } + // 必须在 open 生命周期初始化 + powerOnAndOffDataEventValueState = getRuntimeContext() + .getState(powerOnAndOffDataEventValue); + } - @Override - public void processElement(MachineOutputCommand command, - KeyedProcessFunction.Context ctx, - Collector out) throws Exception { + @Override + public void processElement(MachineOutputCommand command, + KeyedProcessFunction.Context ctx, + Collector out) throws Exception { - IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(command); - Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); - Long reportTime = command.getTimestamp(); - if (reportTime >= lastReportTime) { - Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); - Integer machinePwrStat = command.getMachinePwrStat(); - Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat(); - Integer machineWorkingStat = command.getMachineWorkingStat(); - if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) - || (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) { - Long currJobCount = command.getCurrJobCount(); - Long currJobDuration = command.getCurrJobDuration(); - IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); - powerOnAndOffDataEvent.setId(snowflake.nextId()); - powerOnAndOffDataEvent.setDataSource(command.getDataSource()); - powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); - powerOnAndOffDataEvent.setMachinePwrStat(command.getMachinePwrStat()); - powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); - powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); - powerOnAndOffDataEvent.setReportTime(command.getTimestamp()); - powerOnAndOffDataEvent.setReceivedTime(LocalDateTime - .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); - if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) { - if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { - // 上次是关机,但是这次是开机,说明周期产能从新开始 - powerOnAndOffDataEvent.setCurrJobCount(currJobCount); - powerOnAndOffDataEvent.setCurrJobDuration(currJobDuration); - } - } else { - Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount(); - Long lastCurrJobDuration = lastPowerOnAndOffDataEvent.getCurrJobDuration(); - // 直接累加 - powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount); - powerOnAndOffDataEvent.setCurrJobDuration(lastCurrJobDuration + currJobDuration); - } - // 上次的状态只有两种,要么只有开机时间不为空,要么是开机和关机时间都不为空,否则不处理 - if (lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { - if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { - // 只有开机时间不为空 - if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(machinePwrStat)) { - powerOnAndOffDataEvent.setMachinePowerOffTime(command.getTimestamp()); + IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(command); + Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime(); + Long reportTime = command.getTimestamp(); + if (reportTime >= lastReportTime) { + Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat(); + Integer machinePwrStat = command.getMachinePwrStat(); + Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat(); + Integer machineWorkingStat = command.getMachineWorkingStat(); + if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0) + || (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) { + Long currJobCount = command.getCurrJobCount(); + Long currJobDuration = command.getCurrJobDuration(); + IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); + powerOnAndOffDataEvent.setId(snowflake.nextId()); + powerOnAndOffDataEvent.setDataSource(command.getDataSource()); + powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); + powerOnAndOffDataEvent.setMachinePwrStat(command.getMachinePwrStat()); + powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); + powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); + powerOnAndOffDataEvent.setReportTime(command.getTimestamp()); + powerOnAndOffDataEvent.setReceivedTime(LocalDateTime + .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); + if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) { + if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { + // 上次是关机,但是这次是开机,说明周期产能从新开始 + powerOnAndOffDataEvent.setCurrJobCount(currJobCount); + powerOnAndOffDataEvent.setCurrJobDuration(currJobDuration); } - powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); - powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); - out.collect(powerOnAndOffDataEvent); } else { - // // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 - if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { - // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(command.getTimestamp()); + Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount(); + Long lastCurrJobDuration = lastPowerOnAndOffDataEvent.getCurrJobDuration(); + // 直接累加 + powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount); + powerOnAndOffDataEvent.setCurrJobDuration(lastCurrJobDuration + currJobDuration); + } + // 上次的状态只有两种,要么只有开机时间不为空,要么是开机和关机时间都不为空,否则不处理 + if (lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { + if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { + // 只有开机时间不为空 + if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(machinePwrStat)) { + powerOnAndOffDataEvent.setMachinePowerOffTime(command.getTimestamp()); + } + powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); out.collect(powerOnAndOffDataEvent); + } else { + // // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 + if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { + // 开机 + powerOnAndOffDataEvent.setMachinePowerOnTime(command.getTimestamp()); + powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); + out.collect(powerOnAndOffDataEvent); + } } } } } } - } - private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineOutputCommand command) throws IOException { + private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineOutputCommand command) throws IOException { - IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value(); - if (iotDevicePowerOnAndOffDataEvent == null) { - iotDevicePowerOnAndOffDataEvent = getByEs(command); + IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value(); + if (iotDevicePowerOnAndOffDataEvent == null) { + iotDevicePowerOnAndOffDataEvent = getByEs(command); + } + return iotDevicePowerOnAndOffDataEvent; } - return iotDevicePowerOnAndOffDataEvent; - } - private IotDevicePowerOnAndOffDataEvent getByEs(MachineOutputCommand command) throws IOException { + private IotDevicePowerOnAndOffDataEvent getByEs(MachineOutputCommand command) throws IOException { - // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", command.getMac())); - searchSourceBuilder.sort("receivedTime", SortOrder.DESC); - searchSourceBuilder.size(1); - // 创建查询请求对象,将查询对象配置到其中 - SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)); - searchRequest.source(searchSourceBuilder); - String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); - GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate); - // 先判断索引是否存在 - boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); - if (exists) { - // 执行查询,然后处理响应结果 - SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); - // 根据状态和数据条数验证是否返回了数据 - if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { - SearchHits hits = searchResponse.getHits(); - SearchHit reqHit = hits.getHits()[0]; - return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class); + // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", command.getMac())); + searchSourceBuilder.sort("receivedTime", SortOrder.DESC); + searchSourceBuilder.size(1); + // 创建查询请求对象,将查询对象配置到其中 + SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)); + searchRequest.source(searchSourceBuilder); + String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); + GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate); + // 先判断索引是否存在 + boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); + if (exists) { + // 执行查询,然后处理响应结果 + SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + // 根据状态和数据条数验证是否返回了数据 + if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { + SearchHits hits = searchResponse.getHits(); + SearchHit reqHit = hits.getHits()[0]; + return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class); + } } + IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); + powerOnAndOffDataEvent.setId(snowflake.nextId()); + powerOnAndOffDataEvent.setDataSource(command.getDataSource()); + powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); + powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); + powerOnAndOffDataEvent.setCurrJobCount(command.getCurrJobCount()); + powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrJobDuration()); + Integer machinePwrStat = command.getMachinePwrStat(); + powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat); + powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); + Long reportTime = command.getTimestamp(); + // 开机 + powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); + if (machinePwrStat == 0) { + // 关机 + powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); + } + powerOnAndOffDataEvent.setReportTime(reportTime); + powerOnAndOffDataEvent.setReceivedTime(LocalDateTime + .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); + return powerOnAndOffDataEvent; } - IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent(); - powerOnAndOffDataEvent.setId(snowflake.nextId()); - powerOnAndOffDataEvent.setDataSource(command.getDataSource()); - powerOnAndOffDataEvent.setMachineIotMac(command.getMac()); - powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput()); - powerOnAndOffDataEvent.setCurrJobCount(command.getCurrJobCount()); - powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrJobDuration()); - Integer machinePwrStat = command.getMachinePwrStat(); - powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat); - powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat()); - Long reportTime = command.getTimestamp(); - // 开机 - powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime); - if (machinePwrStat == 0) { - // 关机 - powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime); - } - powerOnAndOffDataEvent.setReportTime(reportTime); - powerOnAndOffDataEvent.setReceivedTime(LocalDateTime - .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); - return powerOnAndOffDataEvent; - } - }).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); - - sinkEs(outputStreamOperator); + }).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); + sinkEs(outputStreamOperator); + } catch (Exception e) { + log.error("iot_device_power_on_and_off_data 执行异常", e); + } env.execute("iot_device_power_on_and_off_data"); }