|
|
|
@ -1,18 +1,38 @@ |
|
|
|
package com.qniao.iot.machine.event.generator.job; |
|
|
|
|
|
|
|
import cn.hutool.core.collection.CollUtil; |
|
|
|
import cn.hutool.core.util.StrUtil; |
|
|
|
import cn.hutool.db.Db; |
|
|
|
import cn.hutool.json.JSON; |
|
|
|
import cn.hutool.json.JSONUtil; |
|
|
|
import com.qniao.domain.BaseCommand; |
|
|
|
import com.qniao.iot.machine.command.PowerOffMachineCommand; |
|
|
|
import com.qniao.iot.machine.command.PowerOnMachineCommand; |
|
|
|
import com.qniao.iot.machine.command.StartMachineWorkingCommand; |
|
|
|
import com.qniao.iot.machine.command.StopMachineWorkingCommand; |
|
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; |
|
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; |
|
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema; |
|
|
|
import com.rabbitmq.client.AMQP; |
|
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
|
import org.apache.flink.api.common.functions.FilterFunction; |
|
|
|
import org.apache.flink.api.common.state.ValueState; |
|
|
|
import org.apache.flink.api.common.state.ValueStateDescriptor; |
|
|
|
import org.apache.flink.api.common.typeinfo.TypeInformation; |
|
|
|
import org.apache.flink.api.java.utils.ParameterTool; |
|
|
|
import org.apache.flink.configuration.Configuration; |
|
|
|
import org.apache.flink.connector.kafka.source.KafkaSource; |
|
|
|
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
|
|
|
import org.apache.flink.streaming.api.CheckpointingMode; |
|
|
|
import org.apache.flink.streaming.api.datastream.DataStream; |
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
|
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; |
|
|
|
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; |
|
|
|
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; |
|
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; |
|
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; |
|
|
|
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; |
|
|
|
import org.apache.flink.util.Collector; |
|
|
|
import org.apache.http.HttpHost; |
|
|
|
import org.apache.http.auth.AuthScope; |
|
|
|
import org.apache.http.auth.UsernamePasswordCredentials; |
|
|
|
@ -21,13 +41,94 @@ import org.apache.http.impl.client.BasicCredentialsProvider; |
|
|
|
import org.elasticsearch.action.index.IndexRequest; |
|
|
|
import org.elasticsearch.client.Requests; |
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.HashMap; |
|
|
|
import java.util.List; |
|
|
|
import javax.sql.DataSource; |
|
|
|
import java.io.IOException; |
|
|
|
import java.sql.SQLException; |
|
|
|
import java.util.*; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
public class IotMachineEventGeneratorJob { |
|
|
|
|
|
|
|
public static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) { |
|
|
|
private final static String sql = "select qm.id as machine_id, qei.mac as machine_iot_mac, qm.status\n" + |
|
|
|
"from (select id, status from qn_machine where is_delete = 0) qm\n" + |
|
|
|
" left join (select machine_id, equipment_information_id\n" + |
|
|
|
" from qn_machine_binding_cloud_box\n" + |
|
|
|
" where is_delete = 0) qmbcb ON qm.id = qmbcb.machine_id\n" + |
|
|
|
" left join (select id, mac from qn_equipment_information where is_delete = 0) qei\n" + |
|
|
|
" on qei.id = qmbcb.equipment_information_id"; |
|
|
|
|
|
|
|
public static void main(String[] args) throws Exception { |
|
|
|
|
|
|
|
|
|
|
|
final ParameterTool params = ParameterTool.fromArgs(args); |
|
|
|
|
|
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|
|
|
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|
|
|
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder() |
|
|
|
.setBootstrapServers(params.get("source.bootstrap.servers")) |
|
|
|
.setTopics("machine_iot_data_received_event") |
|
|
|
.setGroupId("root_cloud_iot_data_etl") |
|
|
|
.setStartingOffsets(OffsetsInitializer.earliest()) |
|
|
|
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) |
|
|
|
.build(); |
|
|
|
|
|
|
|
|
|
|
|
// 设备数据分组 |
|
|
|
DataStream<BaseCommand> commandDataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source") |
|
|
|
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac) |
|
|
|
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>() { |
|
|
|
|
|
|
|
private ValueState<JSON> deviceState; |
|
|
|
|
|
|
|
@Override |
|
|
|
public void open(Configuration parameters) throws SQLException, IOException { |
|
|
|
// 必须在 open 生命周期初始化 |
|
|
|
// 获取所有设备的最新状态 |
|
|
|
List<DeviceState> deviceStateList = Db.use().query(sql, DeviceState.class); |
|
|
|
System.out.println("已获取到设备最新状态数据:" + deviceStateList.size()); |
|
|
|
Map<Long, DeviceState> allMachineMap = new HashMap<>(); |
|
|
|
if (CollUtil.isNotEmpty(deviceStateList)) { |
|
|
|
allMachineMap = deviceStateList.stream() |
|
|
|
.collect(Collectors.toMap(DeviceState::getMachineIotMac, |
|
|
|
deviceState -> deviceState, (deviceState1, deviceState2) -> deviceState1)); |
|
|
|
} |
|
|
|
deviceState = getRuntimeContext() |
|
|
|
.getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(JSON.class), JSONUtil.parse(allMachineMap))); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction<Long, |
|
|
|
MachineIotDataReceivedEvent, BaseCommand>.Context ctx, Collector<BaseCommand> out) throws Exception { |
|
|
|
|
|
|
|
// 获取最新设备状态 |
|
|
|
JSON deviceStateListJson = deviceState.value(); |
|
|
|
System.out.println("目前处理的事件:" + event); |
|
|
|
DeviceState lastedDeviceState = deviceStateListJson.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class); |
|
|
|
Integer deviceStatus = getDeviceStatus(event); |
|
|
|
if (deviceStatus != null) { |
|
|
|
deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()), |
|
|
|
new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); |
|
|
|
if (lastedDeviceState != null) { |
|
|
|
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); |
|
|
|
collDeviceStatusChange(out, newState, lastedDeviceState, event); |
|
|
|
} |
|
|
|
deviceState.update(deviceStateListJson); |
|
|
|
} |
|
|
|
} |
|
|
|
}).name("keyBy stream"); |
|
|
|
|
|
|
|
|
|
|
|
// 写入rabbitmq |
|
|
|
sinkRabbitMq(commandDataStream); |
|
|
|
|
|
|
|
// 写入es |
|
|
|
sinkEs(commandDataStream); |
|
|
|
|
|
|
|
env.execute("Kafka Job"); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) { |
|
|
|
|
|
|
|
// rabbitmq配置(测试环境) |
|
|
|
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() |
|
|
|
@ -38,7 +139,8 @@ public class IotMachineEventGeneratorJob { |
|
|
|
.setPort(5672).build(); |
|
|
|
|
|
|
|
// 发送相应的指令到rabbitmq的交换机 |
|
|
|
commandDataStream.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(), new RMQSinkPublishOptions<BaseCommand>() { |
|
|
|
commandDataStream.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(), |
|
|
|
new RMQSinkPublishOptions<BaseCommand>() { |
|
|
|
|
|
|
|
@Override |
|
|
|
public String computeRoutingKey(BaseCommand command) { |
|
|
|
@ -62,7 +164,7 @@ public class IotMachineEventGeneratorJob { |
|
|
|
// commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12"); |
|
|
|
} |
|
|
|
|
|
|
|
public static void sinkEs(DataStream<BaseCommand> commandDataStream) { |
|
|
|
private static void sinkEs(DataStream<BaseCommand> commandDataStream) { |
|
|
|
|
|
|
|
List<HttpHost> httpHosts = new ArrayList<>(); |
|
|
|
httpHosts.add(new HttpHost("119.23.41.137", 9200, "http")); |
|
|
|
@ -70,27 +172,27 @@ public class IotMachineEventGeneratorJob { |
|
|
|
(ElasticsearchSinkFunction<BaseCommand>) (command, runtimeContext, requestIndexer) -> { |
|
|
|
|
|
|
|
HashMap<String, String> map = new HashMap<>(); |
|
|
|
if(command instanceof PowerOnMachineCommand) { |
|
|
|
if (command instanceof PowerOnMachineCommand) { |
|
|
|
// 设备开机数据 |
|
|
|
PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand)command; |
|
|
|
PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand) command; |
|
|
|
map.put("id", powerOnMachineCommand.getId().toString()); |
|
|
|
map.put("currTotalOutput", powerOnMachineCommand.getCurrTotalOutput().toString()); |
|
|
|
} |
|
|
|
if(command instanceof PowerOffMachineCommand) { |
|
|
|
if (command instanceof PowerOffMachineCommand) { |
|
|
|
// 设备关机数据 |
|
|
|
PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand)command; |
|
|
|
PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand) command; |
|
|
|
map.put("id", powerOffMachineCommand.getId().toString()); |
|
|
|
map.put("currTotalOutput", powerOffMachineCommand.getCurrTotalOutput().toString()); |
|
|
|
} |
|
|
|
if(command instanceof StopMachineWorkingCommand) { |
|
|
|
if (command instanceof StopMachineWorkingCommand) { |
|
|
|
// 设备待机数据 |
|
|
|
StopMachineWorkingCommand stopMachineWorkingCommand = (StopMachineWorkingCommand)command; |
|
|
|
StopMachineWorkingCommand stopMachineWorkingCommand = (StopMachineWorkingCommand) command; |
|
|
|
map.put("id", stopMachineWorkingCommand.getId().toString()); |
|
|
|
map.put("currTotalOutput", stopMachineWorkingCommand.getCurrTotalOutput().toString()); |
|
|
|
} |
|
|
|
if(command instanceof StartMachineWorkingCommand) { |
|
|
|
if (command instanceof StartMachineWorkingCommand) { |
|
|
|
// 设备工作数据 |
|
|
|
StartMachineWorkingCommand startMachineWorkingCommand = (StartMachineWorkingCommand)command; |
|
|
|
StartMachineWorkingCommand startMachineWorkingCommand = (StartMachineWorkingCommand) command; |
|
|
|
map.put("id", startMachineWorkingCommand.getId().toString()); |
|
|
|
map.put("currTotalOutput", startMachineWorkingCommand.getCurrTotalOutput().toString()); |
|
|
|
} |
|
|
|
@ -110,7 +212,7 @@ public class IotMachineEventGeneratorJob { |
|
|
|
restClientBuilder -> { |
|
|
|
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { |
|
|
|
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
|
|
|
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","qn56521")); |
|
|
|
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "qn56521")); |
|
|
|
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); |
|
|
|
}); |
|
|
|
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { |
|
|
|
@ -123,4 +225,46 @@ public class IotMachineEventGeneratorJob { |
|
|
|
//数据流添加sink |
|
|
|
commandDataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink"); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static Integer getDeviceStatus(MachineIotDataReceivedEvent event) { |
|
|
|
|
|
|
|
// 设备状态 |
|
|
|
if (Integer.valueOf("0").equals(event.getMachinePwrStat())) { |
|
|
|
return 0; |
|
|
|
} else if (Integer.valueOf("1").equals(event.getMachinePwrStat()) |
|
|
|
&& Integer.valueOf("1").equals(event.getMachineWorkingStat())) { |
|
|
|
return 1; |
|
|
|
} else if (Integer.valueOf("1").equals(event.getMachinePwrStat()) |
|
|
|
&& Integer.valueOf("0").equals(event.getMachineWorkingStat())) { |
|
|
|
return 2; |
|
|
|
} |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|
private static void collDeviceStatusChange(Collector<BaseCommand> out, DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) { |
|
|
|
|
|
|
|
if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { |
|
|
|
// 设备开机 |
|
|
|
out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { |
|
|
|
// 设备关机 |
|
|
|
out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { |
|
|
|
// 设备开始待机 |
|
|
|
out.collect(new StopMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { |
|
|
|
// 设备开始工作 |
|
|
|
out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*if (newState.getStatus() == 1 || newState.getStatus() == 2) { |
|
|
|
System.out.println("设备开机。相关事件:" + event.toString()); |
|
|
|
out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if (newState.getStatus() == 0) { |
|
|
|
System.out.println("设备关机。相关事件:" + event.toString()); |
|
|
|
out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
}*/ |
|
|
|
} |
|
|
|
} |