Browse Source

代码优化

master
hupenghui@qniao.cn 3 years ago
parent
commit
6cfae494e3
1 changed files with 48 additions and 41 deletions
  1. 89
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

89
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -2,7 +2,6 @@ package com.qniao.iot.machine.event.generator.job;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Db;
import cn.hutool.json.JSON;
@ -43,14 +42,13 @@ import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.sql.SQLException;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
public class IotMachineEventGeneratorJob {
private final static String sql = "select qm.id as machine_id, qei.mac as machine_iot_mac, qm.status\n" +
private final static String SQL = "select qm.id as machine_id, qei.mac as machine_iot_mac, qm.status\n" +
"from (select id, status from qn_machine where is_delete = 0) qm\n" +
" left join (select machine_id, equipment_information_id\n" +
" from qn_machine_binding_cloud_box\n" +
@ -77,33 +75,28 @@ public class IotMachineEventGeneratorJob {
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
DataStream<MachineIotDataReceivedEvent> commandDataStream1 = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
List<DeviceState> deviceStateList = Db.use().query(SQL, DeviceState.class);
DataStream<MachineIotDataReceivedEvent> machineIotDataReceivedEventDataStream = dataStreamSource
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>() {
private ValueState<JSON> deviceState;
@Override
public void open(Configuration parameters) throws SQLException {
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
// 获取所有设备的最新状态
List<DeviceState> deviceStateList = Db.use().query(sql, DeviceState.class);
Map<Long, DeviceState> allMachineMap = new HashMap<>();
if (CollUtil.isNotEmpty(deviceStateList)) {
allMachineMap = deviceStateList.stream()
.collect(Collectors.toMap(DeviceState::getMachineIotMac,
deviceState -> deviceState, (deviceState1, deviceState2) -> deviceState1));
}
deviceState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("deviceState1",
TypeInformation.of(JSON.class), JSONUtil.parse(allMachineMap)));
.getState(new ValueStateDescriptor<>("deviceState1", TypeInformation.of(JSON.class)));
}
@Override
public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction<Long,
MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>.Context ctx, Collector<MachineIotDataReceivedEvent> out) throws Exception {
public void processElement(MachineIotDataReceivedEvent event,
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>.Context ctx,
Collector<MachineIotDataReceivedEvent> out) throws Exception {
// 获取最新设备状态
JSON deviceStateListJson = deviceState.value();
JSON deviceStateListJson = getDeviceStateListJson(deviceState, deviceStateList);
assert deviceStateListJson != null;
DeviceState lastedDeviceState = deviceStateListJson
.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class);
Integer deviceStatus = getDeviceStatus(event);
@ -118,36 +111,30 @@ public class IotMachineEventGeneratorJob {
deviceState.update(deviceStateListJson);
}
}
}).name("commandDataStream1 keyBy stream");
}).name("machineIotDataReceivedEventDataStream keyBy stream");
DataStream<BaseCommand> commandDataStream = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
DataStream<BaseCommand> commandDataStream = dataStreamSource
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>() {
private ValueState<JSON> deviceState;
@Override
public void open(Configuration parameters) throws SQLException {
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
// 获取所有设备的最新状态
List<DeviceState> deviceStateList = Db.use().query(sql, DeviceState.class);
Map<Long, DeviceState> allMachineMap = new HashMap<>();
if (CollUtil.isNotEmpty(deviceStateList)) {
allMachineMap = deviceStateList.stream()
.collect(Collectors.toMap(DeviceState::getMachineIotMac,
deviceState -> deviceState, (deviceState1, deviceState2) -> deviceState1));
}
deviceState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("deviceState",
TypeInformation.of(JSON.class), JSONUtil.parse(allMachineMap)));
.getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(JSON.class)));
}
@Override
public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction<Long,
MachineIotDataReceivedEvent, BaseCommand>.Context ctx, Collector<BaseCommand> out) throws Exception {
public void processElement(MachineIotDataReceivedEvent event,
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>.Context ctx,
Collector<BaseCommand> out) throws Exception {
// 获取最新设备状态
JSON deviceStateListJson = deviceState.value();
JSON deviceStateListJson = getDeviceStateListJson(deviceState, deviceStateList);
assert deviceStateListJson != null;
DeviceState lastedDeviceState = deviceStateListJson
.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class);
Integer deviceStatus = getDeviceStatus(event);
@ -169,11 +156,27 @@ public class IotMachineEventGeneratorJob {
sinkRabbitMq(commandDataStream);
// 写入es
sinkEs(commandDataStream1);
sinkEs(machineIotDataReceivedEventDataStream);
env.execute("Kafka Job");
}
private static JSON getDeviceStateListJson(ValueState<JSON> deviceState,
List<DeviceState> deviceStateList) throws IOException {
// 获取最新设备状态
JSON deviceStateListJson = deviceState.value();
if(deviceStateListJson == null) {
Map<Long, DeviceState> allMachineMap = new HashMap<>();
if (CollUtil.isNotEmpty(deviceStateList)) {
allMachineMap = deviceStateList.stream()
.collect(Collectors.toMap(DeviceState::getMachineIotMac,
state -> state, (state1, state2) -> state1));
}
deviceState.update(JSONUtil.parse(allMachineMap));
}
return deviceStateListJson;
}
private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) {
@ -186,7 +189,8 @@ public class IotMachineEventGeneratorJob {
.setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)).build();
// 发送相应的指令到rabbitmq的交换机
commandDataStream.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(),
commandDataStream
.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(),
new RMQSinkPublishOptions<BaseCommand>() {
@Override
@ -267,7 +271,9 @@ public class IotMachineEventGeneratorJob {
return null;
}
private static void collDeviceStatusChange(Collector<BaseCommand> out, DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) {
private static void collDeviceStatusChange(Collector<BaseCommand> out,
DeviceState newState,
DeviceState oldState, MachineIotDataReceivedEvent event) {
if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) {
// 设备开机
@ -284,8 +290,9 @@ public class IotMachineEventGeneratorJob {
}
}
private static void collDeviceStatusChange1(Collector<MachineIotDataReceivedEvent> out, DeviceState newState, DeviceState oldState,
MachineIotDataReceivedEvent event) {
private static void collDeviceStatusChange1(Collector<MachineIotDataReceivedEvent> out,
DeviceState newState,
DeviceState oldState, MachineIotDataReceivedEvent event) {
if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) {
// 设备开机

Loading…
Cancel
Save