Browse Source

异常处理

master
张彭杰 2 years ago
parent
commit
5e26064b38
1 changed files with 127 additions and 125 deletions
  1. 252
      iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java

252
iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java

@ -107,155 +107,157 @@ public class IotDevicePowerOnAndOffDataJob {
final DataStream<MachineOutputCommand> dataStreamSource = env
.addSource(new RMQSource<>(connectionConfig, ApolloConfig.getStr(ConfigConstant.SOURCE_RABBITMQ_QUEUE),
false, new MachineOutputCommandDeserializationSchema())).setParallelism(1);
try {
SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> outputStreamOperator = dataStreamSource
.keyBy(MachineOutputCommand::getMac)
.process(new KeyedProcessFunction<Long, MachineOutputCommand, IotDevicePowerOnAndOffDataEvent>() {
SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> outputStreamOperator = dataStreamSource
.keyBy(MachineOutputCommand::getMac)
.process(new KeyedProcessFunction<Long, MachineOutputCommand, IotDevicePowerOnAndOffDataEvent>() {
private ValueState<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValueState;
private ValueState<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValueState;
@Override
public void open(Configuration parameters) {
@Override
public void open(Configuration parameters) {
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValue
= new ValueStateDescriptor<>("powerOnAndOffDataEventValue",
TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class));
// 设置状态值的过期时间为了解决手动修改数据没有同步的问题
powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig);
ValueStateDescriptor<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValue
= new ValueStateDescriptor<>("powerOnAndOffDataEventValue",
TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class));
// 设置状态值的过期时间为了解决手动修改数据没有同步的问题
powerOnAndOffDataEventValue.enableTimeToLive(ttlConfig);
// 必须在 open 生命周期初始化
powerOnAndOffDataEventValueState = getRuntimeContext()
.getState(powerOnAndOffDataEventValue);
}
// 必须在 open 生命周期初始化
powerOnAndOffDataEventValueState = getRuntimeContext()
.getState(powerOnAndOffDataEventValue);
}
@Override
public void processElement(MachineOutputCommand command,
KeyedProcessFunction<Long, MachineOutputCommand, IotDevicePowerOnAndOffDataEvent>.Context ctx,
Collector<IotDevicePowerOnAndOffDataEvent> out) throws Exception {
@Override
public void processElement(MachineOutputCommand command,
KeyedProcessFunction<Long, MachineOutputCommand, IotDevicePowerOnAndOffDataEvent>.Context ctx,
Collector<IotDevicePowerOnAndOffDataEvent> out) throws Exception {
IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(command);
Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime();
Long reportTime = command.getTimestamp();
if (reportTime >= lastReportTime) {
Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat();
Integer machinePwrStat = command.getMachinePwrStat();
Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat();
Integer machineWorkingStat = command.getMachineWorkingStat();
if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0)
|| (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) {
Long currJobCount = command.getCurrJobCount();
Long currJobDuration = command.getCurrJobDuration();
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId());
powerOnAndOffDataEvent.setDataSource(command.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(command.getMac());
powerOnAndOffDataEvent.setMachinePwrStat(command.getMachinePwrStat());
powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat());
powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput());
powerOnAndOffDataEvent.setReportTime(command.getTimestamp());
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) {
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
// 上次是关机但是这次是开机说明周期产能从新开始
powerOnAndOffDataEvent.setCurrJobCount(currJobCount);
powerOnAndOffDataEvent.setCurrJobDuration(currJobDuration);
}
} else {
Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount();
Long lastCurrJobDuration = lastPowerOnAndOffDataEvent.getCurrJobDuration();
// 直接累加
powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount);
powerOnAndOffDataEvent.setCurrJobDuration(lastCurrJobDuration + currJobDuration);
}
// 上次的状态只有两种要么只有开机时间不为空要么是开机和关机时间都不为空否则不处理
if (lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) {
if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) {
// 只有开机时间不为空
if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(machinePwrStat)) {
powerOnAndOffDataEvent.setMachinePowerOffTime(command.getTimestamp());
IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(command);
Long lastReportTime = lastPowerOnAndOffDataEvent.getReportTime();
Long reportTime = command.getTimestamp();
if (reportTime >= lastReportTime) {
Integer lastMachinePwrStat = lastPowerOnAndOffDataEvent.getMachinePwrStat();
Integer machinePwrStat = command.getMachinePwrStat();
Integer lastMachineWorkingStat = lastPowerOnAndOffDataEvent.getMachineWorkingStat();
Integer machineWorkingStat = command.getMachineWorkingStat();
if (!((lastMachineWorkingStat == 0 && machineWorkingStat == 0)
|| (lastMachineWorkingStat == 2 && machineWorkingStat == 2))) {
Long currJobCount = command.getCurrJobCount();
Long currJobDuration = command.getCurrJobDuration();
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId());
powerOnAndOffDataEvent.setDataSource(command.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(command.getMac());
powerOnAndOffDataEvent.setMachinePwrStat(command.getMachinePwrStat());
powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat());
powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput());
powerOnAndOffDataEvent.setReportTime(command.getTimestamp());
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(lastMachinePwrStat)) {
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
// 上次是关机但是这次是开机说明周期产能从新开始
powerOnAndOffDataEvent.setCurrJobCount(currJobCount);
powerOnAndOffDataEvent.setCurrJobDuration(currJobDuration);
}
powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime());
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
out.collect(powerOnAndOffDataEvent);
} else {
// // 开机和关机时间都不为空说明上一个生产周期已经过了那么当前设备的电源状态必须要是开机状态
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(command.getTimestamp());
Long lastCurrJobCount = lastPowerOnAndOffDataEvent.getCurrJobCount();
Long lastCurrJobDuration = lastPowerOnAndOffDataEvent.getCurrJobDuration();
// 直接累加
powerOnAndOffDataEvent.setCurrJobCount(lastCurrJobCount + currJobCount);
powerOnAndOffDataEvent.setCurrJobDuration(lastCurrJobDuration + currJobDuration);
}
// 上次的状态只有两种要么只有开机时间不为空要么是开机和关机时间都不为空否则不处理
if (lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) {
if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) {
// 只有开机时间不为空
if (MachinePwrStatusEnum.MACHINE_OFF.getValue().equals(machinePwrStat)) {
powerOnAndOffDataEvent.setMachinePowerOffTime(command.getTimestamp());
}
powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime());
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
out.collect(powerOnAndOffDataEvent);
} else {
// // 开机和关机时间都不为空说明上一个生产周期已经过了那么当前设备的电源状态必须要是开机状态
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(command.getTimestamp());
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
out.collect(powerOnAndOffDataEvent);
}
}
}
}
}
}
}
private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineOutputCommand command) throws IOException {
private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineOutputCommand command) throws IOException {
IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value();
if (iotDevicePowerOnAndOffDataEvent == null) {
iotDevicePowerOnAndOffDataEvent = getByEs(command);
IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value();
if (iotDevicePowerOnAndOffDataEvent == null) {
iotDevicePowerOnAndOffDataEvent = getByEs(command);
}
return iotDevicePowerOnAndOffDataEvent;
}
return iotDevicePowerOnAndOffDataEvent;
}
private IotDevicePowerOnAndOffDataEvent getByEs(MachineOutputCommand command) throws IOException {
private IotDevicePowerOnAndOffDataEvent getByEs(MachineOutputCommand command) throws IOException {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", command.getMac()));
searchSourceBuilder.sort("receivedTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX));
searchRequest.source(searchSourceBuilder);
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate);
// 先判断索引是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (exists) {
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class);
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", command.getMac()));
searchSourceBuilder.sort("receivedTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX));
searchRequest.source(searchSourceBuilder);
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate);
// 先判断索引是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (exists) {
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class);
}
}
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId());
powerOnAndOffDataEvent.setDataSource(command.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(command.getMac());
powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput());
powerOnAndOffDataEvent.setCurrJobCount(command.getCurrJobCount());
powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrJobDuration());
Integer machinePwrStat = command.getMachinePwrStat();
powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat);
powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat());
Long reportTime = command.getTimestamp();
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
if (machinePwrStat == 0) {
// 关机
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
}
powerOnAndOffDataEvent.setReportTime(reportTime);
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
return powerOnAndOffDataEvent;
}
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId());
powerOnAndOffDataEvent.setDataSource(command.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(command.getMac());
powerOnAndOffDataEvent.setAccJobCount(command.getCurrTotalOutput());
powerOnAndOffDataEvent.setCurrJobCount(command.getCurrJobCount());
powerOnAndOffDataEvent.setCurrJobDuration(command.getCurrJobDuration());
Integer machinePwrStat = command.getMachinePwrStat();
powerOnAndOffDataEvent.setMachinePwrStat(machinePwrStat);
powerOnAndOffDataEvent.setMachineWorkingStat(command.getMachineWorkingStat());
Long reportTime = command.getTimestamp();
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
if (machinePwrStat == 0) {
// 关机
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
}
powerOnAndOffDataEvent.setReportTime(reportTime);
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
return powerOnAndOffDataEvent;
}
}).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac");
sinkEs(outputStreamOperator);
}).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac");
sinkEs(outputStreamOperator);
} catch (Exception e) {
log.error("iot_device_power_on_and_off_data 执行异常", e);
}
env.execute("iot_device_power_on_and_off_data");
}

Loading…
Cancel
Save