Browse Source

更新

master
1049970895@qniao.cn 3 years ago
parent
commit
aab058eb5d
1 changed files with 53 additions and 47 deletions
  1. 100
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

100
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -4,6 +4,7 @@ import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Db; import cn.hutool.db.Db;
import cn.hutool.json.JSONUtil;
import com.qniao.domain.BaseCommand; import com.qniao.domain.BaseCommand;
import com.qniao.iot.machine.command.PowerOffMachineCommand; import com.qniao.iot.machine.command.PowerOffMachineCommand;
import com.qniao.iot.machine.command.PowerOnMachineCommand; import com.qniao.iot.machine.command.PowerOnMachineCommand;
@ -17,6 +18,7 @@ import com.qniao.iot.machine.event.generator.constant.ConfigConstant;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
@ -26,6 +28,7 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsIni
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
@ -43,6 +46,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
@ -51,6 +56,12 @@ import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException; import java.sql.SQLException;
@ -110,7 +121,20 @@ public class IotMachineEventGeneratorJob {
DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
DataStream<MachineIotDataReceivedEvent> machineIotDataReceivedEventDataStream = dataStreamSource
// 过滤掉工作状态但是产能为0的信息
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource
.filter(new RichFilterFunction<MachineIotDataReceivedEvent>() {
@Override
public boolean filter(MachineIotDataReceivedEvent value) {
Integer machineWorkingStat = value.getMachineWorkingStat();
Long currJobCount = value.getCurrJobCount();
return !(machineWorkingStat == 2 && currJobCount == 0);
}
}).name("machine iot data received event filter");
DataStream<MachineIotDataReceivedEvent> machineIotDataReceivedEventDataStream = streamOperator
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>() { .process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, MachineIotDataReceivedEvent>() {
@ -121,7 +145,7 @@ public class IotMachineEventGeneratorJob {
// 必须在 open 生命周期初始化 // 必须在 open 生命周期初始化
deviceState = getRuntimeContext() deviceState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("deviceState1", TypeInformation.of(DeviceState.class)));
.getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(DeviceState.class)));
} }
@Override @Override
@ -130,28 +154,26 @@ public class IotMachineEventGeneratorJob {
Collector<MachineIotDataReceivedEvent> out) throws Exception { Collector<MachineIotDataReceivedEvent> out) throws Exception {
DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac()); DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac());
Integer deviceStatus = getDeviceStatus(event);
if (deviceStatus != null) {
// 更新状态
if (lastedDeviceState != null) {
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(),
deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime());
collDeviceStatusChange1(out, newState, lastedDeviceState, event);
this.deviceState.update(newState);
}
Integer deviceStatus = event.getMachineWorkingStat();
// 更新状态
if (lastedDeviceState != null) {
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(),
deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime());
collDeviceStatusChange1(out, newState, lastedDeviceState, event);
this.deviceState.update(newState);
} }
} }
}).name("machineIotDataReceivedEventDataStream keyBy stream"); }).name("machineIotDataReceivedEventDataStream keyBy stream");
DataStream<BaseCommand> commandDataStream = dataStreamSource
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
DataStream<BaseCommand> commandDataStream = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>() { .process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>() {
private ValueState<DeviceState> deviceState; private ValueState<DeviceState> deviceState;
@Override @Override
public void open(Configuration parameters) { public void open(Configuration parameters) {
// 必须在 open 生命周期初始化 // 必须在 open 生命周期初始化
deviceState = getRuntimeContext() deviceState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(DeviceState.class))); .getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(DeviceState.class)));
@ -164,20 +186,17 @@ public class IotMachineEventGeneratorJob {
// 获取最新设备状态 // 获取最新设备状态
DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac()); DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac());
Integer deviceStatus = getDeviceStatus(event);
if (deviceStatus != null) {
// 更新状态
if (lastedDeviceState != null) {
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(),
deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime());
collDeviceStatusChange(out, newState, lastedDeviceState, event);
this.deviceState.update(newState);
}
Integer deviceStatus = event.getMachineWorkingStat();
// 更新状态
if (lastedDeviceState != null) {
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(),
deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime());
collDeviceStatusChange(out, newState, lastedDeviceState, event);
this.deviceState.update(newState);
} }
} }
}).name("keyBy stream"); }).name("keyBy stream");
// 写入rabbitmq // 写入rabbitmq
sinkRabbitMq(commandDataStream); sinkRabbitMq(commandDataStream);
@ -193,23 +212,22 @@ public class IotMachineEventGeneratorJob {
// 获取最新设备状态 // 获取最新设备状态
DeviceState deviceStateListJson = deviceState.value(); DeviceState deviceStateListJson = deviceState.value();
if (deviceStateListJson == null) { if (deviceStateListJson == null) {
int countUnit = 1;
// 查询数据库最新的设备状态 // 查询数据库最新的设备状态
List<DeviceState> list = Db.use().query(SQL, DeviceState.class, machineIotMac); List<DeviceState> list = Db.use().query(SQL, DeviceState.class, machineIotMac);
// 查询es最新的设备状态 勿删
//DeviceState deviceState1 = queryLatestDeviceState(machineIotMac);
if (CollUtil.isNotEmpty(list)) { if (CollUtil.isNotEmpty(list)) {
deviceStateListJson = list.get(0); deviceStateListJson = list.get(0);
} }
if (deviceStateListJson != null) {
// 如果是空的并且在表中都没找到说明是没有被记录的设备不用管
deviceState.update(deviceStateListJson);
DeviceState latestDeviceState = queryLatestDeviceState(machineIotMac);
if (latestDeviceState != null && deviceStateListJson != null) {
latestDeviceState.setCountUnit(countUnit);
latestDeviceState.setMachineId(deviceStateListJson.getMachineId());
} }
deviceStateListJson = latestDeviceState;
} }
return deviceStateListJson; return deviceStateListJson;
} }
/* 勿删
private static DeviceState queryLatestDeviceState(Long machineIotMac) { private static DeviceState queryLatestDeviceState(Long machineIotMac) {
try { try {
@ -235,14 +253,17 @@ public class IotMachineEventGeneratorJob {
MachineIotDataReceivedEvent receivedEvent = JSONUtil MachineIotDataReceivedEvent receivedEvent = JSONUtil
.toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class); .toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class);
DeviceState deviceState = new DeviceState(); DeviceState deviceState = new DeviceState();
deviceState.setMachineIotMac(machineIotMac);
deviceState.setStatus(receivedEvent.getMachineWorkingStat());
deviceState.setUpdateTime(receivedEvent.getReportTime());
return deviceState;
} }
} }
} catch (Exception e) { } catch (Exception e) {
log.error("获取es数据异常", e); log.error("获取es数据异常", e);
} }
return null; return null;
}*/
}
private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) { private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) {
@ -433,21 +454,6 @@ public class IotMachineEventGeneratorJob {
} }
} }
private static Integer getDeviceStatus(MachineIotDataReceivedEvent event) {
// 设备状态
if (Integer.valueOf("0").equals(event.getMachinePwrStat())) {
return 0;
} else if (Integer.valueOf("1").equals(event.getMachinePwrStat())
&& Integer.valueOf("1").equals(event.getMachineWorkingStat())) {
return 1;
} else if (Integer.valueOf("1").equals(event.getMachinePwrStat())
&& Integer.valueOf("2").equals(event.getMachineWorkingStat())) {
return 2;
}
return null;
}
private static void collDeviceStatusChange(Collector<BaseCommand> out, private static void collDeviceStatusChange(Collector<BaseCommand> out,
DeviceState newState, DeviceState newState,
DeviceState oldState, MachineIotDataReceivedEvent event) { DeviceState oldState, MachineIotDataReceivedEvent event) {

Loading…
Cancel
Save