Browse Source

更新

master
1049970895@qniao.cn 3 years ago
parent
commit
36a43ab514
2 changed files with 20 additions and 5 deletions
  1. 2
      iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java
  2. 23
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

2
iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java

@ -18,7 +18,7 @@ public class MachineIotDataReceivedEvent implements Serializable {
private Long id;
/**
* 数据来源
* 数据来源0机智云 1树根
*/
private Integer dataSource;

23
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -15,6 +15,7 @@ import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializati
import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema;
import com.qniao.iot.machine.event.generator.config.ApolloConfig;
import com.qniao.iot.machine.event.generator.constant.ConfigConstant;
import com.qniao.iot.rc.constant.DataSource;
import com.rabbitmq.client.AMQP;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@ -66,6 +67,8 @@ import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
@ -121,16 +124,28 @@ public class IotMachineEventGeneratorJob {
DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
// 过滤掉工作状态但是产能为0的信息
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource
.filter(new RichFilterFunction<MachineIotDataReceivedEvent>() {
@Override
public boolean filter(MachineIotDataReceivedEvent value) {
Integer machineWorkingStat = value.getMachineWorkingStat();
Long currJobCount = value.getCurrJobCount();
return !(machineWorkingStat == 2 && currJobCount == 0);
Integer dataSource = value.getDataSource();
boolean bool = true;
if (DataSource.TACT_CLOUD.equals(dataSource)) {
// 机智云树根的校验不了
Long currCount = value.getCurrCount();
Integer machineWorkingStat = value.getMachineWorkingStat();
bool = !(machineWorkingStat == 2 && currCount == 0);
}
if (bool && value.getMachinePwrStat() != null
&& value.getMachineIotMac() != null
&& value.getMachineWorkingStat() != null && value.getReportTime() != null) {
long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
// 晚30分钟的数据就不要了
return nowTime - value.getReportTime() <= (30 * 60 * 1000);
}
return false;
}
}).name("machine iot data received event filter");

Loading…
Cancel
Save