|
|
@ -38,6 +38,7 @@ import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; |
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; |
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; |
|
|
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; |
|
|
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; |
|
|
import org.apache.flink.util.Collector; |
|
|
import org.apache.flink.util.Collector; |
|
|
|
|
|
import org.apache.flink.util.OutputTag; |
|
|
import org.apache.http.HttpHost; |
|
|
import org.apache.http.HttpHost; |
|
|
import org.apache.http.auth.AuthScope; |
|
|
import org.apache.http.auth.AuthScope; |
|
|
import org.apache.http.auth.UsernamePasswordCredentials; |
|
|
import org.apache.http.auth.UsernamePasswordCredentials; |
|
|
@ -149,49 +150,32 @@ public class IotMachineEventGeneratorJob { |
|
|
} |
|
|
} |
|
|
}).name("machine iot data received event filter"); |
|
|
}).name("machine iot data received event filter"); |
|
|
|
|
|
|
|
|
DataStream<MachineIotDataReceivedEvent> machineIotDataReceivedEventDataStream = streamOperator |
|
|
|
|
|
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) |
|
|
|
|
|
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>() { |
|
|
|
|
|
|
|
|
|
|
|
private ValueState<DeviceState> deviceState; |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
public void open(Configuration parameters) { |
|
|
|
|
|
|
|
|
|
|
|
// 必须在 open 生命周期初始化 |
|
|
|
|
|
deviceState = getRuntimeContext() |
|
|
|
|
|
.getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(DeviceState.class))); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
public void processElement(MachineIotDataReceivedEvent event, |
|
|
|
|
|
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>.Context ctx, |
|
|
|
|
|
Collector<MachineIotDataReceivedEvent> out) throws Exception { |
|
|
|
|
|
|
|
|
|
|
|
DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac()); |
|
|
|
|
|
Integer deviceStatus = event.getMachineWorkingStat(); |
|
|
|
|
|
// 更新状态 |
|
|
|
|
|
if (lastedDeviceState != null) { |
|
|
|
|
|
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), |
|
|
|
|
|
deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime()); |
|
|
|
|
|
collDeviceStatusChange1(out, newState, lastedDeviceState, event); |
|
|
|
|
|
this.deviceState.update(newState); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
}).name("machineIotDataReceivedEventDataStream keyBy stream"); |
|
|
|
|
|
|
|
|
// 新增一个测输出,用于接收MachineIotDataReceivedEvent类型数据 |
|
|
|
|
|
OutputTag<MachineIotDataReceivedEvent> machineIotDataReceivedEventOutput |
|
|
|
|
|
= new OutputTag<MachineIotDataReceivedEvent>("machine-iot-data-received-event") { |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DataStream<BaseCommand> commandDataStream = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) |
|
|
|
|
|
|
|
|
SingleOutputStreamOperator<BaseCommand> commandDataStream = streamOperator |
|
|
|
|
|
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) |
|
|
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>() { |
|
|
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>() { |
|
|
|
|
|
|
|
|
private ValueState<DeviceState> deviceState; |
|
|
private ValueState<DeviceState> deviceState; |
|
|
|
|
|
|
|
|
|
|
|
private ValueState<MachineIotDataReceivedEvent> lastDataReceivedEventState; |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
public void open(Configuration parameters) { |
|
|
public void open(Configuration parameters) { |
|
|
|
|
|
|
|
|
// 必须在 open 生命周期初始化 |
|
|
// 必须在 open 生命周期初始化 |
|
|
deviceState = getRuntimeContext() |
|
|
deviceState = getRuntimeContext() |
|
|
.getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(DeviceState.class))); |
|
|
|
|
|
|
|
|
.getState(new ValueStateDescriptor<>("deviceState", |
|
|
|
|
|
TypeInformation.of(DeviceState.class))); |
|
|
|
|
|
|
|
|
|
|
|
// 必须在 open 生命周期初始化 |
|
|
|
|
|
lastDataReceivedEventState = getRuntimeContext() |
|
|
|
|
|
.getState(new ValueStateDescriptor<>("lastDataReceivedEvent1", |
|
|
|
|
|
TypeInformation.of(MachineIotDataReceivedEvent.class))); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
@ -199,51 +183,81 @@ public class IotMachineEventGeneratorJob { |
|
|
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>.Context ctx, |
|
|
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>.Context ctx, |
|
|
Collector<BaseCommand> out) throws Exception { |
|
|
Collector<BaseCommand> out) throws Exception { |
|
|
|
|
|
|
|
|
// 获取最新设备状态 |
|
|
|
|
|
DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac()); |
|
|
|
|
|
Integer deviceStatus = event.getMachineWorkingStat(); |
|
|
|
|
|
// 更新状态 |
|
|
|
|
|
if (lastedDeviceState != null) { |
|
|
|
|
|
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), |
|
|
|
|
|
deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime()); |
|
|
|
|
|
collDeviceStatusChange(out, newState, lastedDeviceState, event); |
|
|
|
|
|
this.deviceState.update(newState); |
|
|
|
|
|
|
|
|
Long machineIotMac = event.getMachineIotMac(); |
|
|
|
|
|
DeviceState deviceState = getDeviceState(this.deviceState, machineIotMac); |
|
|
|
|
|
if (deviceState != null) { |
|
|
|
|
|
MachineIotDataReceivedEvent lastDataReceivedEvent = getLastDataReceivedEvent(lastDataReceivedEventState, machineIotMac); |
|
|
|
|
|
if (lastDataReceivedEvent == null) { |
|
|
|
|
|
lastDataReceivedEvent = event; |
|
|
|
|
|
} |
|
|
|
|
|
Integer pwrSta = event.getMachinePwrStat(); |
|
|
|
|
|
Integer workingSta = event.getMachineWorkingStat(); |
|
|
|
|
|
// 总产量 |
|
|
|
|
|
Long accCount = event.getAccJobCount(); |
|
|
|
|
|
Long reportTime = event.getReportTime(); |
|
|
|
|
|
Integer lastPwrStat = lastDataReceivedEvent.getMachinePwrStat(); |
|
|
|
|
|
Integer lastWorkingStat = lastDataReceivedEvent.getMachineWorkingStat(); |
|
|
|
|
|
Long lastReportTime = lastDataReceivedEvent.getReportTime(); |
|
|
|
|
|
Long lastAccJobCount = lastDataReceivedEvent.getAccJobCount(); |
|
|
|
|
|
// 只有当前消息的时间大于等于上一次消息的时间才要,否则丢弃 |
|
|
|
|
|
if (reportTime >= lastReportTime) { |
|
|
|
|
|
if ((pwrSta == 1 && workingSta == 1) |
|
|
|
|
|
|| (lastPwrStat == 1 && lastWorkingStat == 1)) { |
|
|
|
|
|
// 只有当前是工作中或上次是工作中才进行计算 |
|
|
|
|
|
// 如果这次的消息和上次的消息相差半个小时,那么不进行计算 |
|
|
|
|
|
if (reportTime - lastReportTime <= 30 * 60 * 1000) { |
|
|
|
|
|
event.setCurrCount(accCount - lastAccJobCount); |
|
|
|
|
|
// 单位是秒 |
|
|
|
|
|
event.setCurrDuration((reportTime - lastReportTime) / 3600); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
ctx.output(machineIotDataReceivedEventOutput, event); |
|
|
|
|
|
collDeviceStatusChange(out, deviceState, lastDataReceivedEvent, event); |
|
|
|
|
|
lastDataReceivedEventState.update(event); |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
}).name("keyBy stream"); |
|
|
}).name("keyBy stream"); |
|
|
|
|
|
|
|
|
|
|
|
// 获取测输出数据 |
|
|
|
|
|
DataStream<MachineIotDataReceivedEvent> sideOutput = commandDataStream.getSideOutput(machineIotDataReceivedEventOutput); |
|
|
|
|
|
|
|
|
// 写入rabbitmq |
|
|
// 写入rabbitmq |
|
|
sinkRabbitMq(commandDataStream); |
|
|
sinkRabbitMq(commandDataStream); |
|
|
|
|
|
|
|
|
// 写入es |
|
|
// 写入es |
|
|
sinkEs(machineIotDataReceivedEventDataStream); |
|
|
|
|
|
|
|
|
sinkEs(sideOutput); |
|
|
|
|
|
|
|
|
env.execute("iot machine event generator job"); |
|
|
env.execute("iot machine event generator job"); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private static DeviceState getDeviceStateListJson(ValueState<DeviceState> deviceState, |
|
|
|
|
|
Long machineIotMac) throws IOException, SQLException { |
|
|
|
|
|
|
|
|
private static MachineIotDataReceivedEvent getLastDataReceivedEvent(ValueState<MachineIotDataReceivedEvent> lastDataReceivedEventState, |
|
|
|
|
|
Long machineIotMac) throws IOException { |
|
|
|
|
|
|
|
|
|
|
|
MachineIotDataReceivedEvent value = lastDataReceivedEventState.value(); |
|
|
|
|
|
if (value == null) { |
|
|
|
|
|
value = queryLastDataReceivedEvent(machineIotMac); |
|
|
|
|
|
} |
|
|
|
|
|
return value; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static DeviceState getDeviceState(ValueState<DeviceState> deviceState, |
|
|
|
|
|
Long machineIotMac) throws SQLException, IOException { |
|
|
|
|
|
|
|
|
// 获取最新设备状态 |
|
|
|
|
|
DeviceState deviceStateListJson = deviceState.value(); |
|
|
|
|
|
if (deviceStateListJson == null) { |
|
|
|
|
|
int countUnit = 1; |
|
|
|
|
|
|
|
|
DeviceState value = deviceState.value(); |
|
|
|
|
|
if (value == null) { |
|
|
// 查询数据库最新的设备状态 |
|
|
// 查询数据库最新的设备状态 |
|
|
List<DeviceState> list = Db.use().query(SQL, DeviceState.class, machineIotMac); |
|
|
List<DeviceState> list = Db.use().query(SQL, DeviceState.class, machineIotMac); |
|
|
if (CollUtil.isNotEmpty(list)) { |
|
|
if (CollUtil.isNotEmpty(list)) { |
|
|
deviceStateListJson = list.get(0); |
|
|
|
|
|
|
|
|
value = list.get(0); |
|
|
} |
|
|
} |
|
|
DeviceState latestDeviceState = queryLatestDeviceState(machineIotMac); |
|
|
|
|
|
if (latestDeviceState != null && deviceStateListJson != null) { |
|
|
|
|
|
latestDeviceState.setCountUnit(countUnit); |
|
|
|
|
|
latestDeviceState.setMachineId(deviceStateListJson.getMachineId()); |
|
|
|
|
|
} |
|
|
|
|
|
deviceStateListJson = latestDeviceState; |
|
|
|
|
|
|
|
|
// 数据库找不到的话说明设备没记录 |
|
|
} |
|
|
} |
|
|
return deviceStateListJson; |
|
|
|
|
|
|
|
|
return value; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private static DeviceState queryLatestDeviceState(Long machineIotMac) { |
|
|
|
|
|
|
|
|
private static MachineIotDataReceivedEvent queryLastDataReceivedEvent(Long machineIotMac) { |
|
|
|
|
|
|
|
|
try { |
|
|
try { |
|
|
// 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) |
|
|
// 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) |
|
|
@ -265,13 +279,8 @@ public class IotMachineEventGeneratorJob { |
|
|
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { |
|
|
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { |
|
|
SearchHits hits = searchResponse.getHits(); |
|
|
SearchHits hits = searchResponse.getHits(); |
|
|
SearchHit reqHit = hits.getHits()[0]; |
|
|
SearchHit reqHit = hits.getHits()[0]; |
|
|
MachineIotDataReceivedEvent receivedEvent = JSONUtil |
|
|
|
|
|
|
|
|
return JSONUtil |
|
|
.toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class); |
|
|
.toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class); |
|
|
DeviceState deviceState = new DeviceState(); |
|
|
|
|
|
deviceState.setMachineIotMac(machineIotMac); |
|
|
|
|
|
deviceState.setStatus(receivedEvent.getMachineWorkingStat()); |
|
|
|
|
|
deviceState.setUpdateTime(receivedEvent.getReportTime()); |
|
|
|
|
|
return deviceState; |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} catch (Exception e) { |
|
|
} catch (Exception e) { |
|
|
@ -299,21 +308,7 @@ public class IotMachineEventGeneratorJob { |
|
|
@Override |
|
|
@Override |
|
|
public String computeRoutingKey(BaseCommand command) { |
|
|
public String computeRoutingKey(BaseCommand command) { |
|
|
|
|
|
|
|
|
if (command instanceof PowerOnMachineCommand) { |
|
|
|
|
|
// 机器通电 |
|
|
|
|
|
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY); |
|
|
|
|
|
} |
|
|
|
|
|
if (command instanceof PowerOffMachineCommand) { |
|
|
|
|
|
// 机器断电 |
|
|
|
|
|
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY); |
|
|
|
|
|
} |
|
|
|
|
|
if (command instanceof StopMachineWorkingCommand) { |
|
|
|
|
|
// 机器待机 |
|
|
|
|
|
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY); |
|
|
|
|
|
} else { |
|
|
|
|
|
// 机器工作 |
|
|
|
|
|
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_MACHINE_COMMAND_ROUTING_KEY); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@Override |
|
|
@Override |
|
|
@ -327,8 +322,6 @@ public class IotMachineEventGeneratorJob { |
|
|
// 交换机名称 |
|
|
// 交换机名称 |
|
|
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_EXCHANGE); |
|
|
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_EXCHANGE); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
})).name("commandDataStream to rabbitmq Sink"); |
|
|
})).name("commandDataStream to rabbitmq Sink"); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -470,66 +463,72 @@ public class IotMachineEventGeneratorJob { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private static void collDeviceStatusChange(Collector<BaseCommand> out, |
|
|
private static void collDeviceStatusChange(Collector<BaseCommand> out, |
|
|
DeviceState newState, |
|
|
|
|
|
DeviceState oldState, MachineIotDataReceivedEvent event) { |
|
|
|
|
|
|
|
|
|
|
|
Integer countUnit = newState.getCountUnit(); |
|
|
|
|
|
countUnit = countUnit == null ? 1 : countUnit; |
|
|
|
|
|
Long currJobCount = event.getCurrJobCount(); |
|
|
|
|
|
currJobCount = currJobCount == null ? 0 : currJobCount * countUnit; |
|
|
|
|
|
if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { |
|
|
|
|
|
|
|
|
DeviceState deviceState, |
|
|
|
|
|
MachineIotDataReceivedEvent lastDataReceivedEvent, |
|
|
|
|
|
MachineIotDataReceivedEvent event) { |
|
|
|
|
|
|
|
|
|
|
|
Long machineIotMac = event.getMachineIotMac(); |
|
|
|
|
|
Integer dataSource = event.getDataSource(); |
|
|
|
|
|
Long machineId = deviceState.getMachineId(); |
|
|
|
|
|
Long currCount = event.getCurrCount(); |
|
|
|
|
|
Long currDuration = event.getCurrDuration(); |
|
|
|
|
|
Long accJobCount = event.getAccJobCount(); |
|
|
|
|
|
Long reportTime = event.getReportTime(); |
|
|
|
|
|
Integer lastWorkingStat = lastDataReceivedEvent.getMachineWorkingStat(); |
|
|
|
|
|
Integer workingStat = event.getMachineWorkingStat(); |
|
|
|
|
|
Integer pwrStat = event.getMachinePwrStat(); |
|
|
|
|
|
if (lastWorkingStat == 0 && (workingStat == 1 || workingStat == 2)) { |
|
|
// 设备开机 |
|
|
// 设备开机 |
|
|
PowerOnMachineCommand powerOnMachineCommand = new PowerOnMachineCommand(newState.getMachineId(), |
|
|
|
|
|
newState.getMachineIotMac(), currJobCount); |
|
|
|
|
|
powerOnMachineCommand.setTimestamp(event.getReportTime()); |
|
|
|
|
|
out.collect(powerOnMachineCommand); |
|
|
|
|
|
} else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { |
|
|
|
|
|
|
|
|
PowerOnMachineCommand command = new PowerOnMachineCommand(); |
|
|
|
|
|
command.setId(machineId); |
|
|
|
|
|
command.setMac(machineIotMac); |
|
|
|
|
|
command.setMachinePwrStat(pwrStat); |
|
|
|
|
|
command.setMachineWorkingStat(workingStat); |
|
|
|
|
|
command.setDataSource(dataSource); |
|
|
|
|
|
command.setCurrCount(currCount); |
|
|
|
|
|
command.setCurrDuration(currDuration); |
|
|
|
|
|
command.setCurrTotalOutput(accJobCount); |
|
|
|
|
|
command.setTimestamp(reportTime); |
|
|
|
|
|
out.collect(command); |
|
|
|
|
|
} else if ((lastWorkingStat == 1 || lastWorkingStat == 2) && workingStat == 0) { |
|
|
// 设备关机 |
|
|
// 设备关机 |
|
|
PowerOffMachineCommand powerOffMachineCommand = new PowerOffMachineCommand(newState.getMachineId(), |
|
|
|
|
|
newState.getMachineIotMac(), currJobCount); |
|
|
|
|
|
powerOffMachineCommand.setTimestamp(event.getReportTime()); |
|
|
|
|
|
out.collect(powerOffMachineCommand); |
|
|
|
|
|
} else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { |
|
|
|
|
|
|
|
|
PowerOffMachineCommand command = new PowerOffMachineCommand(); |
|
|
|
|
|
command.setId(machineId); |
|
|
|
|
|
command.setMac(machineIotMac); |
|
|
|
|
|
command.setMachinePwrStat(pwrStat); |
|
|
|
|
|
command.setMachineWorkingStat(workingStat); |
|
|
|
|
|
command.setDataSource(dataSource); |
|
|
|
|
|
command.setCurrCount(currCount); |
|
|
|
|
|
command.setCurrDuration(currDuration); |
|
|
|
|
|
command.setCurrTotalOutput(accJobCount); |
|
|
|
|
|
command.setTimestamp(reportTime); |
|
|
|
|
|
out.collect(command); |
|
|
|
|
|
} else if (lastWorkingStat == 1 && workingStat == 2) { |
|
|
// 设备开始待机 |
|
|
// 设备开始待机 |
|
|
StopMachineWorkingCommand stopMachineWorkingCommand = new StopMachineWorkingCommand(newState.getMachineId(), |
|
|
|
|
|
newState.getMachineIotMac(), currJobCount); |
|
|
|
|
|
stopMachineWorkingCommand.setTimestamp(event.getReportTime()); |
|
|
|
|
|
out.collect(stopMachineWorkingCommand); |
|
|
|
|
|
} else if ((oldState.getStatus() == 2 || oldState.getStatus() == 1) && newState.getStatus() == 1) { |
|
|
|
|
|
|
|
|
StopMachineWorkingCommand command = new StopMachineWorkingCommand(); |
|
|
|
|
|
command.setId(machineId); |
|
|
|
|
|
command.setMac(machineIotMac); |
|
|
|
|
|
command.setMachinePwrStat(pwrStat); |
|
|
|
|
|
command.setMachineWorkingStat(workingStat); |
|
|
|
|
|
command.setDataSource(dataSource); |
|
|
|
|
|
command.setCurrCount(currCount); |
|
|
|
|
|
command.setCurrDuration(currDuration); |
|
|
|
|
|
command.setCurrTotalOutput(accJobCount); |
|
|
|
|
|
command.setTimestamp(reportTime); |
|
|
|
|
|
out.collect(command); |
|
|
|
|
|
} else if ((lastWorkingStat == 2 || lastWorkingStat == 1) && workingStat == 1) { |
|
|
// 设备开始工作 |
|
|
// 设备开始工作 |
|
|
StartMachineWorkingCommand startMachineWorkingCommand = new StartMachineWorkingCommand(newState.getMachineId(), |
|
|
|
|
|
newState.getMachineIotMac(), currJobCount); |
|
|
|
|
|
startMachineWorkingCommand.setTimestamp(event.getReportTime()); |
|
|
|
|
|
out.collect(startMachineWorkingCommand); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static void collDeviceStatusChange1(Collector<MachineIotDataReceivedEvent> out, |
|
|
|
|
|
DeviceState newState, |
|
|
|
|
|
DeviceState oldState, MachineIotDataReceivedEvent event) { |
|
|
|
|
|
|
|
|
|
|
|
Integer countUnit = newState.getCountUnit(); |
|
|
|
|
|
countUnit = countUnit == null ? 1 : countUnit; |
|
|
|
|
|
Long currJobCount = event.getCurrJobCount(); |
|
|
|
|
|
if (currJobCount != null) { |
|
|
|
|
|
event.setCurrJobCount(currJobCount * countUnit); |
|
|
|
|
|
} |
|
|
|
|
|
Long accJobCount = event.getAccJobCount(); |
|
|
|
|
|
if (accJobCount != null) { |
|
|
|
|
|
event.setAccJobCount(accJobCount * countUnit); |
|
|
|
|
|
} |
|
|
|
|
|
if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { |
|
|
|
|
|
// 设备开机 |
|
|
|
|
|
out.collect(event); |
|
|
|
|
|
} else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { |
|
|
|
|
|
// 设备关机 |
|
|
|
|
|
out.collect(event); |
|
|
|
|
|
} else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { |
|
|
|
|
|
// 设备待机中 |
|
|
|
|
|
out.collect(event); |
|
|
|
|
|
} else if ((oldState.getStatus() == 2 || oldState.getStatus() == 1) && newState.getStatus() == 1) { |
|
|
|
|
|
// 设备工作中 |
|
|
|
|
|
out.collect(event); |
|
|
|
|
|
|
|
|
StartMachineWorkingCommand command = new StartMachineWorkingCommand(); |
|
|
|
|
|
command.setId(machineId); |
|
|
|
|
|
command.setMac(machineIotMac); |
|
|
|
|
|
command.setMachinePwrStat(pwrStat); |
|
|
|
|
|
command.setMachineWorkingStat(workingStat); |
|
|
|
|
|
command.setDataSource(dataSource); |
|
|
|
|
|
command.setCurrCount(currCount); |
|
|
|
|
|
command.setCurrDuration(currDuration); |
|
|
|
|
|
command.setCurrTotalOutput(accJobCount); |
|
|
|
|
|
command.setTimestamp(reportTime); |
|
|
|
|
|
out.collect(command); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |