|
|
|
@ -18,43 +18,25 @@ |
|
|
|
|
|
|
|
package com.qniao.iot.rc; |
|
|
|
|
|
|
|
import cn.hutool.db.Db; |
|
|
|
import cn.hutool.db.Entity; |
|
|
|
import com.fasterxml.jackson.databind.util.JSONPObject; |
|
|
|
import com.qniao.domain.BaseCommand; |
|
|
|
import com.qniao.iot.machine.command.PowerOffMachineCommand; |
|
|
|
import com.qniao.iot.machine.command.PowerOnMachineCommand; |
|
|
|
import com.qniao.iot.machine.command.StartMachineWorkingCommand; |
|
|
|
import com.qniao.iot.machine.command.StopMachineWorkingCommand; |
|
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; |
|
|
|
import com.qniao.iot.machine.event.generator.job.IotMachineEventGeneratorJob; |
|
|
|
import com.qniao.iot.rc.command.BaseCommandSerializationSchema; |
|
|
|
import com.qniao.iot.rc.constant.DataSource; |
|
|
|
import com.qniao.iot.rc.event.MachineIotDataReceivedEventSerializationSchema; |
|
|
|
import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema; |
|
|
|
import com.rabbitmq.client.AMQP; |
|
|
|
import com.rabbitmq.tools.json.JSONUtil; |
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
|
import org.apache.flink.api.common.functions.FilterFunction; |
|
|
|
import org.apache.flink.api.common.functions.MapFunction; |
|
|
|
import org.apache.flink.api.common.functions.RichFilterFunction; |
|
|
|
import org.apache.flink.api.common.functions.RuntimeContext; |
|
|
|
import org.apache.flink.api.common.state.ValueState; |
|
|
|
import org.apache.flink.api.common.state.ValueStateDescriptor; |
|
|
|
import org.apache.flink.api.java.utils.ParameterTool; |
|
|
|
import org.apache.flink.configuration.Configuration; |
|
|
|
import org.apache.flink.connector.base.DeliveryGuarantee; |
|
|
|
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; |
|
|
|
import org.apache.flink.connector.kafka.sink.KafkaSink; |
|
|
|
import org.apache.flink.connector.kafka.source.KafkaSource; |
|
|
|
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
|
|
|
import org.apache.flink.streaming.api.CheckpointingMode; |
|
|
|
import org.apache.flink.streaming.api.datastream.DataStream; |
|
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
|
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; |
|
|
|
import org.apache.flink.util.Collector; |
|
|
|
|
|
|
|
import java.math.BigDecimal; |
|
|
|
import java.sql.SQLException; |
|
|
|
import java.util.*; |
|
|
|
import java.util.Objects; |
|
|
|
|
|
|
|
/** |
|
|
|
* Skeleton for a Flink DataStream Job. |
|
|
|
@ -92,59 +74,23 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
.map((MapFunction<RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>) RootCloudIotDataFormatterJob::transform) |
|
|
|
.name("Transform MachineIotDataReceivedEvent"); |
|
|
|
|
|
|
|
// 设备数据分组 |
|
|
|
DataStream<BaseCommand> commandDataStream = transformDs.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) |
|
|
|
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>() { |
|
|
|
|
|
|
|
private ValueState<DeviceState> deviceState; |
|
|
|
|
|
|
|
@Override |
|
|
|
public void open(Configuration parameters) throws SQLException { |
|
|
|
// 必须在 open 生命周期初始化 |
|
|
|
// TODO 获取所有设备的最新状态 |
|
|
|
//Db.use().findAll(new Entity(), Object.class); |
|
|
|
deviceState = getRuntimeContext() |
|
|
|
.getState(new ValueStateDescriptor<>("deviceState", DeviceState.class)); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction<Long, |
|
|
|
MachineIotDataReceivedEvent, BaseCommand>.Context ctx, Collector<BaseCommand> out) throws Exception { |
|
|
|
|
|
|
|
// 获取最新设备状态 |
|
|
|
DeviceState lastedDeviceState = deviceState.value(); |
|
|
|
Integer deviceStatus = getDeviceStatus(event); |
|
|
|
if (deviceStatus == null) { |
|
|
|
out.collect(null); |
|
|
|
} else { |
|
|
|
if (lastedDeviceState == null) { |
|
|
|
deviceState.update(new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); |
|
|
|
} else { |
|
|
|
DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); |
|
|
|
DeviceState oldState = deviceState.value(); |
|
|
|
collDeviceStatusChange(out, newState, oldState, event); |
|
|
|
deviceState.update(newState); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}).filter((FilterFunction<BaseCommand>) Objects::nonNull).name("keyBy stream"); |
|
|
|
|
|
|
|
|
|
|
|
// 写入rabbitmq |
|
|
|
IotMachineEventGeneratorJob.sinkRabbitMq(commandDataStream); |
|
|
|
|
|
|
|
// 写入es |
|
|
|
IotMachineEventGeneratorJob.sinkEs(commandDataStream); |
|
|
|
// 写入kafka |
|
|
|
transformDs.sinkTo( |
|
|
|
KafkaSink.<MachineIotDataReceivedEvent>builder() |
|
|
|
.setBootstrapServers(params.get("sink.bootstrap.servers")) |
|
|
|
.setRecordSerializer( |
|
|
|
KafkaRecordSerializationSchema.builder() |
|
|
|
.setTopic("machine_iot_data_received_event") |
|
|
|
.setValueSerializationSchema(new MachineIotDataReceivedEventSerializationSchema()) |
|
|
|
.build() |
|
|
|
).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) |
|
|
|
.build() |
|
|
|
).name("MachineIotDataReceivedEvent Sink"); |
|
|
|
|
|
|
|
env.execute("Kafka Job"); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) { |
|
|
|
|
|
|
|
MachineIotDataReceivedEvent machineIotDataReceivedEvent = new MachineIotDataReceivedEvent(); |
|
|
|
@ -153,7 +99,8 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
machineIotDataReceivedEvent.setMachineIotMac(Long.valueOf(event.get__assetId__())); |
|
|
|
machineIotDataReceivedEvent.setDataSource(DataSource.ROOT_CLOUD); |
|
|
|
machineIotDataReceivedEvent.setMachinePwrStat(event.getPWR_sta()); |
|
|
|
machineIotDataReceivedEvent.setMachineWorkingStat(event.getWorking_sta()); |
|
|
|
// 工作状态 |
|
|
|
machineIotDataReceivedEvent.setMachineWorkingStat(event.getACC_sta()); |
|
|
|
machineIotDataReceivedEvent.setIgStat(event.getIG_sta()); |
|
|
|
machineIotDataReceivedEvent.setAccJobCount(event.getACC_count_total()); |
|
|
|
machineIotDataReceivedEvent.setCurrJobCount(event.getACC_count()); |
|
|
|
@ -167,28 +114,4 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
} |
|
|
|
return machineIotDataReceivedEvent; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static Integer getDeviceStatus(MachineIotDataReceivedEvent event) { |
|
|
|
|
|
|
|
// 设备状态 |
|
|
|
return event.getMachineWorkingStat(); |
|
|
|
} |
|
|
|
|
|
|
|
private static void collDeviceStatusChange(Collector<BaseCommand> out, DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) { |
|
|
|
|
|
|
|
if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { |
|
|
|
// 设备开机 |
|
|
|
out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { |
|
|
|
// 设备关机 |
|
|
|
out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { |
|
|
|
// 设备开始待机 |
|
|
|
out.collect(new StopMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { |
|
|
|
// 设备开始工作 |
|
|
|
out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} |
|
|
|
} |
|
|
|
} |