diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index de26cf2..d241048 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -4,6 +4,7 @@ import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.db.Db; +import cn.hutool.json.JSONUtil; import com.qniao.domain.BaseCommand; import com.qniao.iot.machine.command.PowerOffMachineCommand; import com.qniao.iot.machine.command.PowerOnMachineCommand; @@ -17,6 +18,7 @@ import com.qniao.iot.machine.event.generator.constant.ConfigConstant; import com.rabbitmq.client.AMQP; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -26,6 +28,7 @@ import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsIni import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; @@ -43,6 +46,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Requests; import org.elasticsearch.client.RestClient; @@ -51,6 +56,12 @@ import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.sql.SQLException; @@ -110,7 +121,20 @@ public class IotMachineEventGeneratorJob { DataStreamSource dataStreamSource = env .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); - DataStream machineIotDataReceivedEventDataStream = dataStreamSource + + // 过滤掉工作状态但是产能为0的信息 + SingleOutputStreamOperator streamOperator = dataStreamSource + .filter(new RichFilterFunction() { + @Override + public boolean filter(MachineIotDataReceivedEvent value) { + + Integer machineWorkingStat = value.getMachineWorkingStat(); + Long currJobCount = value.getCurrJobCount(); + return !(machineWorkingStat == 2 && currJobCount == 0); + } + }).name("machine iot data received event filter"); + + DataStream machineIotDataReceivedEventDataStream = streamOperator .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { @@ -121,7 +145,7 @@ public class IotMachineEventGeneratorJob { // 必须在 open 生命周期初始化 deviceState = getRuntimeContext() - .getState(new ValueStateDescriptor<>("deviceState1", TypeInformation.of(DeviceState.class))); + .getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(DeviceState.class))); } @Override @@ -130,28 +154,26 @@ public class IotMachineEventGeneratorJob { Collector out) throws Exception { DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac()); - Integer deviceStatus = getDeviceStatus(event); - if (deviceStatus != null) { - // 更新状态 - if (lastedDeviceState != null) { - DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), - deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime()); - collDeviceStatusChange1(out, newState, lastedDeviceState, event); - this.deviceState.update(newState); - } + Integer deviceStatus = event.getMachineWorkingStat(); + // 更新状态 + if (lastedDeviceState != null) { + DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), + deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime()); + collDeviceStatusChange1(out, newState, lastedDeviceState, event); + this.deviceState.update(newState); } } }).name("machineIotDataReceivedEventDataStream keyBy stream"); - DataStream commandDataStream = dataStreamSource - .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) + DataStream commandDataStream = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) .process(new KeyedProcessFunction() { private ValueState deviceState; @Override public void open(Configuration parameters) { + // 必须在 open 生命周期初始化 deviceState = getRuntimeContext() .getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(DeviceState.class))); @@ -164,20 +186,17 @@ public class IotMachineEventGeneratorJob { // 获取最新设备状态 DeviceState lastedDeviceState = getDeviceStateListJson(deviceState, event.getMachineIotMac()); - Integer deviceStatus = getDeviceStatus(event); - if (deviceStatus != null) { - // 更新状态 - if (lastedDeviceState != null) { - DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), - deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime()); - collDeviceStatusChange(out, newState, lastedDeviceState, event); - this.deviceState.update(newState); - } + Integer deviceStatus = event.getMachineWorkingStat(); + // 更新状态 + if (lastedDeviceState != null) { + DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), + deviceStatus, lastedDeviceState.getCountUnit(), event.getReportTime()); + collDeviceStatusChange(out, newState, lastedDeviceState, event); + this.deviceState.update(newState); } } }).name("keyBy stream"); - // 写入rabbitmq sinkRabbitMq(commandDataStream); @@ -193,23 +212,22 @@ public class IotMachineEventGeneratorJob { // 获取最新设备状态 DeviceState deviceStateListJson = deviceState.value(); if (deviceStateListJson == null) { + int countUnit = 1; // 查询数据库最新的设备状态 List list = Db.use().query(SQL, DeviceState.class, machineIotMac); - // 查询es最新的设备状态 勿删 - //DeviceState deviceState1 = queryLatestDeviceState(machineIotMac); - if (CollUtil.isNotEmpty(list)) { deviceStateListJson = list.get(0); } - if (deviceStateListJson != null) { - // 如果是空的,并且在表中都没找到,说明是没有被记录的设备,不用管 - deviceState.update(deviceStateListJson); + DeviceState latestDeviceState = queryLatestDeviceState(machineIotMac); + if (latestDeviceState != null && deviceStateListJson != null) { + latestDeviceState.setCountUnit(countUnit); + latestDeviceState.setMachineId(deviceStateListJson.getMachineId()); } + deviceStateListJson = latestDeviceState; } return deviceStateListJson; } - /* 勿删 private static DeviceState queryLatestDeviceState(Long machineIotMac) { try { @@ -235,14 +253,17 @@ public class IotMachineEventGeneratorJob { MachineIotDataReceivedEvent receivedEvent = JSONUtil .toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class); DeviceState deviceState = new DeviceState(); - + deviceState.setMachineIotMac(machineIotMac); + deviceState.setStatus(receivedEvent.getMachineWorkingStat()); + deviceState.setUpdateTime(receivedEvent.getReportTime()); + return deviceState; } } } catch (Exception e) { log.error("获取es数据异常", e); } return null; - }*/ + } private static void sinkRabbitMq(DataStream commandDataStream) { @@ -433,21 +454,6 @@ public class IotMachineEventGeneratorJob { } } - private static Integer getDeviceStatus(MachineIotDataReceivedEvent event) { - - // 设备状态 - if (Integer.valueOf("0").equals(event.getMachinePwrStat())) { - return 0; - } else if (Integer.valueOf("1").equals(event.getMachinePwrStat()) - && Integer.valueOf("1").equals(event.getMachineWorkingStat())) { - return 1; - } else if (Integer.valueOf("1").equals(event.getMachinePwrStat()) - && Integer.valueOf("2").equals(event.getMachineWorkingStat())) { - return 2; - } - return null; - } - private static void collDeviceStatusChange(Collector out, DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) {