Browse Source

代码优化

master
1049970895@qniao.cn 3 years ago
parent
commit
cca9463edb
1 changed files with 59 additions and 74 deletions
  1. 133
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

133
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -5,6 +5,7 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Db;
import cn.hutool.db.handler.RsHandler;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
@ -49,7 +50,9 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
@ -58,7 +61,8 @@ import java.util.stream.Collectors;
public class IotMachineEventGeneratorJob {
private final static String SQL = "select machine_id, iot_mac as machine_iot_mac, status from qn_machine_realtime_state where is_delete = 0";
private final static String SQL = "select machine_id, iot_mac as machine_iot_mac, status " +
"from qn_machine_realtime_state where iot_mac = ? and is_delete = 0";
public static void main(String[] args) throws Exception {
@ -84,13 +88,13 @@ public class IotMachineEventGeneratorJob {
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>() {
private ValueState<JSON> deviceState;
private ValueState<DeviceState> deviceState;
@Override
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
deviceState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("deviceState1", TypeInformation.of(JSON.class)));
.getState(new ValueStateDescriptor<>("deviceState1", TypeInformation.of(DeviceState.class)));
}
@Override
@ -98,23 +102,16 @@ public class IotMachineEventGeneratorJob {
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>.Context ctx,
Collector<MachineIotDataReceivedEvent> out) throws Exception {
Tuple2<JSON,Map<Long, DeviceState>> deviceStateListTuple = getDeviceStateListJson(deviceState);
JSON deviceStateListJson = deviceStateListTuple.f0;
assert deviceStateListJson != null;
DeviceState lastedDeviceState = deviceStateListJson
.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class);
DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac());
Integer deviceStatus = getDeviceStatus(event);
Map<Long, DeviceState> deviceStateMap = deviceStateListTuple.f1;
DeviceState ds = deviceStateMap.get(event.getMachineIotMac());
if (deviceStatus != null && ds != null) {
deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()),
new DeviceState(ds.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()));
if (deviceStatus != null) {
// 更新状态
if (lastedDeviceState != null) {
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(),
deviceStatus, event.getReportTime());
collDeviceStatusChange1(out, newState, lastedDeviceState, event);
this.deviceState.update(newState);
}
this.deviceState.update(deviceStateListJson);
}
}
}).name("machineIotDataReceivedEventDataStream keyBy stream");
@ -124,13 +121,13 @@ public class IotMachineEventGeneratorJob {
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>() {
private ValueState<JSON> deviceState;
private ValueState<DeviceState> deviceState;
@Override
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
deviceState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(JSON.class)));
.getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(DeviceState.class)));
}
@Override
@ -139,23 +136,16 @@ public class IotMachineEventGeneratorJob {
Collector<BaseCommand> out) throws Exception {
// 获取最新设备状态
Tuple2<JSON,Map<Long, DeviceState>> deviceStateListTuple = getDeviceStateListJson(deviceState);
JSON deviceStateListJson = deviceStateListTuple.f0;
assert deviceStateListJson != null;
DeviceState lastedDeviceState = deviceStateListJson
.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class);
DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac());
Integer deviceStatus = getDeviceStatus(event);
Map<Long, DeviceState> deviceStateMap = deviceStateListTuple.f1;
DeviceState ds = deviceStateMap.get(event.getMachineIotMac());
if (deviceStatus != null && ds != null) {
deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()),
new DeviceState(ds.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()));
if (deviceStatus != null) {
// 更新状态
if (lastedDeviceState != null) {
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(),
deviceStatus, event.getReportTime());
collDeviceStatusChange(out, newState, lastedDeviceState, event);
this.deviceState.update(newState);
}
deviceState.update(deviceStateListJson);
}
}
}).name("keyBy stream");
@ -170,26 +160,21 @@ public class IotMachineEventGeneratorJob {
env.execute("iot machine event generator job");
}
private static Tuple2<JSON,Map<Long, DeviceState>> getDeviceStateListJson(ValueState<JSON> deviceState) throws IOException, SQLException {
private static DeviceState getDeviceStateListJson(ValueState<DeviceState> deviceState,
Long machineIotMac) throws IOException, SQLException {
// 获取最新设备状态
JSON deviceStateListJson = deviceState.value();
Map<Long, DeviceState> allMachineMap = new HashMap<>();
if(deviceStateListJson == null) {
List<DeviceState> deviceStateList = Db.use().query(SQL, DeviceState.class);
if (CollUtil.isNotEmpty(deviceStateList)) {
allMachineMap = deviceStateList.stream()
.collect(Collectors.toMap(DeviceState::getMachineIotMac,
state -> state, (state1, state2) -> state1));
DeviceState deviceStateListJson = deviceState.value();
if (deviceStateListJson == null) {
deviceStateListJson = Db.use().query(SQL, (RsHandler<DeviceState>) rs -> new DeviceState(rs.getLong(1),
rs.getLong(2),
rs.getInt(3), System.currentTimeMillis()), machineIotMac);
if (deviceStateListJson != null) {
// 如果是空的并且在表中都没找到说明是没有被记录的设备不用管
deviceState.update(deviceStateListJson);
}
deviceState.update(deviceStateListJson = JSONUtil.parse(allMachineMap));
}else {
Map<Long, DeviceState> finalAllMachineMap = allMachineMap;
deviceStateListJson.toBean(Map.class).forEach((a, b)
-> finalAllMachineMap.put(Long.parseLong(String.valueOf(a)),
BeanUtil.toBean(b, DeviceState.class)));
}
return new Tuple2<>(deviceStateListJson, allMachineMap);
return deviceStateListJson;
}
private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) {
@ -206,40 +191,40 @@ public class IotMachineEventGeneratorJob {
// 发送相应的指令到rabbitmq的交换机
commandDataStream
.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(),
new RMQSinkPublishOptions<BaseCommand>() {
@Override
public String computeRoutingKey(BaseCommand command) {
if(command instanceof PowerOnMachineCommand) {
// 机器通电
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY);
}
if(command instanceof PowerOffMachineCommand) {
// 机器断电
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY);
}
if(command instanceof StopMachineWorkingCommand) {
// 机器待机
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY);
}else {
// 机器工作
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY);
}
}
new RMQSinkPublishOptions<BaseCommand>() {
@Override
public String computeRoutingKey(BaseCommand command) {
if (command instanceof PowerOnMachineCommand) {
// 机器通电
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY);
}
if (command instanceof PowerOffMachineCommand) {
// 机器断电
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY);
}
if (command instanceof StopMachineWorkingCommand) {
// 机器待机
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY);
} else {
// 机器工作
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY);
}
}
@Override
public AMQP.BasicProperties computeProperties(BaseCommand command) {
return null;
}
@Override
public AMQP.BasicProperties computeProperties(BaseCommand command) {
return null;
}
@Override
public String computeExchange(BaseCommand command) {
@Override
public String computeExchange(BaseCommand command) {
// 交换机名称
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE);
}
})).name("commandDataStream to rabbitmq Sink");
// 交换机名称
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE);
}
})).name("commandDataStream to rabbitmq Sink");
}
private static void sinkEs(DataStream<MachineIotDataReceivedEvent> dataStream) {

Loading…
Cancel
Save