9 changed files with 258 additions and 54 deletions
Split View
Diff Options
-
2iot-machine-data-command/pom.xml
-
2iot-machine-data-constant/pom.xml
-
2iot-machine-data-event/pom.xml
-
4iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java
-
21iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqSerializationSchema.java
-
69iot-machine-state-event-generator-job/dependency-reduced-pom.xml
-
67iot-machine-state-event-generator-job/pom.xml
-
141iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java
-
4pom.xml
@ -0,0 +1,21 @@ |
|||
package com.qniao.iot.machine.event; |
|||
|
|||
import com.qniao.domain.BaseCommand; |
|||
import org.apache.flink.api.common.serialization.SerializationSchema; |
|||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; |
|||
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; |
|||
|
|||
|
|||
public class MachineIotDataReceivedEventRabbitMqSerializationSchema implements SerializationSchema<BaseCommand> { |
|||
|
|||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); |
|||
|
|||
@Override |
|||
public byte[] serialize(BaseCommand command) { |
|||
try { |
|||
return OBJECT_MAPPER.writeValueAsBytes(command); |
|||
} catch (JsonProcessingException e) { |
|||
throw new IllegalArgumentException("Could not serialize record: " + command, e); |
|||
} |
|||
} |
|||
} |
|||
@ -1,27 +1,126 @@ |
|||
package com.qniao.iot.machine.event.generator.job; |
|||
|
|||
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; |
|||
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; |
|||
import org.apache.flink.api.java.utils.ParameterTool; |
|||
import org.apache.flink.connector.kafka.source.KafkaSource; |
|||
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
|||
import org.apache.flink.streaming.api.CheckpointingMode; |
|||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|||
import com.qniao.domain.BaseCommand; |
|||
import com.qniao.iot.machine.command.PowerOffMachineCommand; |
|||
import com.qniao.iot.machine.command.PowerOnMachineCommand; |
|||
import com.qniao.iot.machine.command.StartMachineWorkingCommand; |
|||
import com.qniao.iot.machine.command.StopMachineWorkingCommand; |
|||
import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema; |
|||
import com.rabbitmq.client.AMQP; |
|||
import org.apache.flink.streaming.api.datastream.DataStream; |
|||
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; |
|||
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; |
|||
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; |
|||
import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions; |
|||
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; |
|||
import org.apache.http.HttpHost; |
|||
import org.apache.http.auth.AuthScope; |
|||
import org.apache.http.auth.UsernamePasswordCredentials; |
|||
import org.apache.http.client.CredentialsProvider; |
|||
import org.apache.http.impl.client.BasicCredentialsProvider; |
|||
import org.elasticsearch.action.index.IndexRequest; |
|||
import org.elasticsearch.client.Requests; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.HashMap; |
|||
import java.util.List; |
|||
|
|||
public class IotMachineEventGeneratorJob { |
|||
public static void main(String[] args) throws Exception { |
|||
final ParameterTool params = ParameterTool.fromArgs(args); |
|||
|
|||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|||
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|||
|
|||
// 定义Kafka数据源 |
|||
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder() |
|||
.setBootstrapServers(params.get("source.bootstrap.servers")) |
|||
.setTopics("root_cloud_iot_report_data_event") |
|||
.setGroupId("iot.machine.generator") |
|||
.setStartingOffsets(OffsetsInitializer.earliest()) |
|||
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) |
|||
.build(); |
|||
|
|||
public static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) { |
|||
|
|||
// rabbitmq配置(测试环境) |
|||
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() |
|||
.setHost("8.135.8.221") |
|||
.setVirtualHost("/") |
|||
.setUserName("qniao") |
|||
.setPassword("Qianniao2020") |
|||
.setPort(5672).build(); |
|||
|
|||
// 发送相应的指令到rabbitmq的交换机 |
|||
commandDataStream.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(), new RMQSinkPublishOptions<BaseCommand>() { |
|||
|
|||
@Override |
|||
public String computeRoutingKey(BaseCommand command) { |
|||
return "machine-iot-data-received-event"; |
|||
} |
|||
|
|||
@Override |
|||
public AMQP.BasicProperties computeProperties(BaseCommand command) { |
|||
return null; |
|||
} |
|||
|
|||
@Override |
|||
public String computeExchange(BaseCommand command) { |
|||
|
|||
// 交换机名称 |
|||
return "flink_test_exchange"; |
|||
} |
|||
})).name("commandDataStream to rabbitmq Sink"); |
|||
|
|||
// 直接发队列 |
|||
// commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12"); |
|||
} |
|||
|
|||
public static void sinkEs(DataStream<BaseCommand> commandDataStream) { |
|||
|
|||
List<HttpHost> httpHosts = new ArrayList<>(); |
|||
httpHosts.add(new HttpHost("119.23.41.137", 9200, "http")); |
|||
ElasticsearchSink.Builder<BaseCommand> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, |
|||
(ElasticsearchSinkFunction<BaseCommand>) (command, runtimeContext, requestIndexer) -> { |
|||
|
|||
HashMap<String, String> map = new HashMap<>(); |
|||
if(command instanceof PowerOnMachineCommand) { |
|||
// 设备开机数据 |
|||
PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand)command; |
|||
map.put("id", powerOnMachineCommand.getId().toString()); |
|||
map.put("currTotalOutput", powerOnMachineCommand.getCurrTotalOutput().toString()); |
|||
} |
|||
if(command instanceof PowerOffMachineCommand) { |
|||
// 设备关机数据 |
|||
PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand)command; |
|||
map.put("id", powerOffMachineCommand.getId().toString()); |
|||
map.put("currTotalOutput", powerOffMachineCommand.getCurrTotalOutput().toString()); |
|||
} |
|||
if(command instanceof StopMachineWorkingCommand) { |
|||
// 设备待机数据 |
|||
StopMachineWorkingCommand stopMachineWorkingCommand = (StopMachineWorkingCommand)command; |
|||
map.put("id", stopMachineWorkingCommand.getId().toString()); |
|||
map.put("currTotalOutput", stopMachineWorkingCommand.getCurrTotalOutput().toString()); |
|||
} |
|||
if(command instanceof StartMachineWorkingCommand) { |
|||
// 设备工作数据 |
|||
StartMachineWorkingCommand startMachineWorkingCommand = (StartMachineWorkingCommand)command; |
|||
map.put("id", startMachineWorkingCommand.getId().toString()); |
|||
map.put("currTotalOutput", startMachineWorkingCommand.getCurrTotalOutput().toString()); |
|||
} |
|||
//创建es 请求 |
|||
IndexRequest indexRequest = Requests.indexRequest().index("machine-iot-data-received-event").source(map); |
|||
requestIndexer.add(indexRequest); |
|||
} |
|||
); |
|||
//刷新前缓冲的最大动作量 |
|||
esSinkBuilder.setBulkFlushMaxActions(10); |
|||
//刷新前缓冲区的最大数据大小(以MB为单位) |
|||
esSinkBuilder.setBulkFlushMaxSizeMb(5); |
|||
//论缓冲操作的数量或大小如何都要刷新的时间间隔 |
|||
esSinkBuilder.setBulkFlushInterval(5000L); |
|||
// 客户端创建配置回调,配置账号密码 |
|||
esSinkBuilder.setRestClientFactory( |
|||
restClientBuilder -> { |
|||
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { |
|||
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
|||
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","qn56521")); |
|||
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); |
|||
}); |
|||
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { |
|||
// 设置es连接超时时间 |
|||
requestConfigBuilder.setConnectTimeout(3000); |
|||
return requestConfigBuilder; |
|||
}); |
|||
} |
|||
); |
|||
//数据流添加sink |
|||
commandDataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink"); |
|||
} |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save