|
|
|
@ -18,6 +18,8 @@ |
|
|
|
|
|
|
|
package com.qniao.iot.rc; |
|
|
|
|
|
|
|
import cn.hutool.db.Db; |
|
|
|
import cn.hutool.db.Entity; |
|
|
|
import com.fasterxml.jackson.databind.util.JSONPObject; |
|
|
|
import com.qniao.domain.BaseCommand; |
|
|
|
import com.qniao.iot.machine.command.PowerOffMachineCommand; |
|
|
|
@ -25,6 +27,7 @@ import com.qniao.iot.machine.command.PowerOnMachineCommand; |
|
|
|
import com.qniao.iot.machine.command.StartMachineWorkingCommand; |
|
|
|
import com.qniao.iot.machine.command.StopMachineWorkingCommand; |
|
|
|
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; |
|
|
|
import com.qniao.iot.machine.event.generator.job.IotMachineEventGeneratorJob; |
|
|
|
import com.qniao.iot.rc.command.BaseCommandSerializationSchema; |
|
|
|
import com.qniao.iot.rc.constant.DataSource; |
|
|
|
import com.qniao.iot.rc.event.RootCloudIotDataReceiptedEventDeserializationSchema; |
|
|
|
@ -32,7 +35,9 @@ import com.rabbitmq.client.AMQP; |
|
|
|
import com.rabbitmq.tools.json.JSONUtil; |
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|
|
|
import org.apache.flink.api.common.functions.FilterFunction; |
|
|
|
import org.apache.flink.api.common.functions.MapFunction; |
|
|
|
import org.apache.flink.api.common.functions.RichFilterFunction; |
|
|
|
import org.apache.flink.api.common.functions.RuntimeContext; |
|
|
|
import org.apache.flink.api.common.state.ValueState; |
|
|
|
import org.apache.flink.api.common.state.ValueStateDescriptor; |
|
|
|
@ -45,28 +50,10 @@ import org.apache.flink.streaming.api.datastream.DataStream; |
|
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
|
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; |
|
|
|
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; |
|
|
|
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; |
|
|
|
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; |
|
|
|
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; |
|
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; |
|
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; |
|
|
|
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; |
|
|
|
import org.apache.flink.util.Collector; |
|
|
|
import org.apache.http.HttpHost; |
|
|
|
import org.apache.http.auth.AuthScope; |
|
|
|
import org.apache.http.auth.UsernamePasswordCredentials; |
|
|
|
import org.apache.http.client.CredentialsProvider; |
|
|
|
import org.apache.http.client.config.RequestConfig; |
|
|
|
import org.apache.http.impl.client.BasicCredentialsProvider; |
|
|
|
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; |
|
|
|
import org.elasticsearch.action.index.IndexRequest; |
|
|
|
import org.elasticsearch.client.Requests; |
|
|
|
import org.elasticsearch.client.RestClient; |
|
|
|
import org.elasticsearch.client.RestClientBuilder; |
|
|
|
|
|
|
|
import java.math.BigDecimal; |
|
|
|
import java.net.InetSocketAddress; |
|
|
|
import java.sql.SQLException; |
|
|
|
import java.util.*; |
|
|
|
|
|
|
|
/** |
|
|
|
@ -112,28 +99,28 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
private ValueState<DeviceState> deviceState; |
|
|
|
|
|
|
|
@Override |
|
|
|
public void open(Configuration parameters) { |
|
|
|
public void open(Configuration parameters) throws SQLException { |
|
|
|
// 必须在 open 生命周期初始化 |
|
|
|
// TODO 获取所有设备的最新状态 |
|
|
|
//Db.use().findAll(new Entity(), Object.class); |
|
|
|
deviceState = getRuntimeContext() |
|
|
|
.getState(new ValueStateDescriptor<>("deviceState", DeviceState.class)); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
public void processElement(MachineIotDataReceivedEvent event, KeyedProcessFunction<Long, |
|
|
|
MachineIotDataReceivedEvent, BaseCommand>.Context ctx, Collector<BaseCommand> out) throws Exception { |
|
|
|
|
|
|
|
System.out.println("收到事件数据-------------------------:" + event); |
|
|
|
// 获取最新设备状态 |
|
|
|
DeviceState lastedDeviceState = deviceState.value(); |
|
|
|
Integer deviceStatus = getDeviceStatus(event); |
|
|
|
|
|
|
|
if (deviceStatus == null) { |
|
|
|
out.collect(null); |
|
|
|
} else { |
|
|
|
if (lastedDeviceState == null) { |
|
|
|
// TODO 后续优化,先从数据库中获取设备最新的状态 |
|
|
|
deviceState.update(new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime())); |
|
|
|
System.out.println("初始化设备状态" + deviceState.value().toString()); |
|
|
|
} else { |
|
|
|
DeviceState newState = new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()); |
|
|
|
DeviceState oldState = deviceState.value(); |
|
|
|
@ -142,95 +129,21 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}).name("keyBy stream"); |
|
|
|
}).filter((FilterFunction<BaseCommand>) Objects::nonNull).name("keyBy stream"); |
|
|
|
|
|
|
|
|
|
|
|
// 写入rabbitmq |
|
|
|
sinkRabbitMq(commandDataStream); |
|
|
|
IotMachineEventGeneratorJob.sinkRabbitMq(commandDataStream); |
|
|
|
|
|
|
|
// 写入es |
|
|
|
sinkEs(commandDataStream); |
|
|
|
IotMachineEventGeneratorJob.sinkEs(commandDataStream); |
|
|
|
|
|
|
|
env.execute("Kafka Job"); |
|
|
|
} |
|
|
|
|
|
|
|
private static void sinkEs(DataStream<BaseCommand> commandDataStream) { |
|
|
|
|
|
|
|
List<HttpHost> httpHosts = new ArrayList<>(); |
|
|
|
httpHosts.add(new HttpHost("119.23.41.137", 9200, "http")); |
|
|
|
ElasticsearchSink.Builder<BaseCommand> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, |
|
|
|
(ElasticsearchSinkFunction<BaseCommand>) (command, runtimeContext, requestIndexer) -> { |
|
|
|
|
|
|
|
HashMap<String, String> map = new HashMap<>(); |
|
|
|
if(command instanceof PowerOnMachineCommand) { |
|
|
|
PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand)command; |
|
|
|
map.put("id", powerOnMachineCommand.getId().toString()); |
|
|
|
map.put("currTotalOutput", powerOnMachineCommand.getCurrTotalOutput().toString()); |
|
|
|
} |
|
|
|
|
|
|
|
if(command instanceof PowerOffMachineCommand) { |
|
|
|
PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand)command; |
|
|
|
map.put("id", powerOffMachineCommand.getId().toString()); |
|
|
|
map.put("currTotalOutput", powerOffMachineCommand.getCurrTotalOutput().toString()); |
|
|
|
} |
|
|
|
//创建es 请求 |
|
|
|
IndexRequest indexRequest = Requests.indexRequest().index("flink").source(map); |
|
|
|
requestIndexer.add(indexRequest); |
|
|
|
} |
|
|
|
); |
|
|
|
//刷新前缓冲的最大动作量 |
|
|
|
esSinkBuilder.setBulkFlushMaxActions(10); |
|
|
|
//刷新前缓冲区的最大数据大小(以MB为单位) |
|
|
|
esSinkBuilder.setBulkFlushMaxSizeMb(5); |
|
|
|
//论缓冲操作的数量或大小如何都要刷新的时间间隔 |
|
|
|
esSinkBuilder.setBulkFlushInterval(5000L); |
|
|
|
// 客户端创建配置回调,配置账号密码 |
|
|
|
esSinkBuilder.setRestClientFactory( |
|
|
|
restClientBuilder -> restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { |
|
|
|
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
|
|
|
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","qn56521")); |
|
|
|
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); |
|
|
|
}) |
|
|
|
); |
|
|
|
//数据流添加sink |
|
|
|
commandDataStream.addSink(esSinkBuilder.build()).name("BaseCommand sink"); |
|
|
|
} |
|
|
|
|
|
|
|
private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) { |
|
|
|
|
|
|
|
// rabbitmq配置(测试环境) |
|
|
|
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() |
|
|
|
.setHost("8.135.8.221") |
|
|
|
.setVirtualHost("/") |
|
|
|
.setUserName("qniao") |
|
|
|
.setPassword("Qianniao2020") |
|
|
|
.setPort(5672).build(); |
|
|
|
|
|
|
|
// 发送相应的指令到rabbitmq的交换机 |
|
|
|
commandDataStream.addSink(new RMQSink<>(connectionConfig, new BaseCommandSerializationSchema(), new RMQSinkPublishOptions<BaseCommand>() { |
|
|
|
|
|
|
|
@Override |
|
|
|
public String computeRoutingKey(BaseCommand command) { |
|
|
|
return "flink"; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public AMQP.BasicProperties computeProperties(BaseCommand command) { |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public String computeExchange(BaseCommand command) { |
|
|
|
|
|
|
|
// 交换机名称 |
|
|
|
System.out.println("发送消息:------------------" + command.toString()); |
|
|
|
return "flink_test_exchange"; |
|
|
|
} |
|
|
|
})).name("PowerOffMachineCommand Sink"); |
|
|
|
|
|
|
|
// 直接发队列 |
|
|
|
// commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12"); |
|
|
|
} |
|
|
|
|
|
|
|
private static MachineIotDataReceivedEvent transform(RootCloudIotDataReceiptedEvent event) { |
|
|
|
|
|
|
|
@ -258,38 +171,24 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
|
|
|
|
private static Integer getDeviceStatus(MachineIotDataReceivedEvent event) { |
|
|
|
|
|
|
|
if (event.getMachinePwrStat() == 0) { |
|
|
|
return 0; |
|
|
|
} else if (event.getMachinePwrStat() == 1 && event.getMachineWorkingStat() == 1) { |
|
|
|
return 1; |
|
|
|
} else if (event.getMachinePwrStat() == 1 && event.getMachineWorkingStat() == 0) { |
|
|
|
return 2; |
|
|
|
} |
|
|
|
|
|
|
|
return null; |
|
|
|
// 设备状态 |
|
|
|
return event.getMachineWorkingStat(); |
|
|
|
} |
|
|
|
|
|
|
|
private static void collDeviceStatusChange(Collector<BaseCommand> out, DeviceState newState, DeviceState oldState, MachineIotDataReceivedEvent event) { |
|
|
|
|
|
|
|
/*if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { |
|
|
|
System.out.println("设备开机。相关事件:" + event.toString()); |
|
|
|
if (oldState.getStatus() == 0 && (newState.getStatus() == 1 || newState.getStatus() == 2)) { |
|
|
|
// 设备开机 |
|
|
|
out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if ((oldState.getStatus() == 1 || oldState.getStatus() == 2) && newState.getStatus() == 0) { |
|
|
|
System.out.println("设备关机。相关事件:" + event.toString()); |
|
|
|
// 设备关机 |
|
|
|
out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if (oldState.getStatus() == 1 && newState.getStatus() == 2) { |
|
|
|
System.out.println("设备开始待机。相关事件:" + event.toString()); |
|
|
|
// 设备开始待机 |
|
|
|
out.collect(new StopMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if (oldState.getStatus() == 2 && newState.getStatus() == 1) { |
|
|
|
System.out.println("设备开始工作。相关事件:" + event.toString()); |
|
|
|
// 设备开始工作 |
|
|
|
out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
}*/ |
|
|
|
if (newState.getStatus() == 1 || newState.getStatus() == 2) { |
|
|
|
System.out.println("设备开机。相关事件:" + event.toString()); |
|
|
|
out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} else if (newState.getStatus() == 0) { |
|
|
|
System.out.println("设备关机。相关事件:" + event.toString()); |
|
|
|
out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount())); |
|
|
|
} |
|
|
|
} |
|
|
|
} |