diff --git a/iot-machine-data-command/pom.xml b/iot-machine-data-command/pom.xml
index 263af16..21bbb40 100644
--- a/iot-machine-data-command/pom.xml
+++ b/iot-machine-data-command/pom.xml
@@ -3,8 +3,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- iot-machine-state-event-generator
com.qniao
+ java-dependency
0.0.1-SNAPSHOT
4.0.0
diff --git a/iot-machine-data-constant/pom.xml b/iot-machine-data-constant/pom.xml
index 3d22f37..28280f4 100644
--- a/iot-machine-data-constant/pom.xml
+++ b/iot-machine-data-constant/pom.xml
@@ -3,8 +3,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- iot-machine-state-event-generator
com.qniao
+ java-dependency
0.0.1-SNAPSHOT
4.0.0
diff --git a/iot-machine-data-event/pom.xml b/iot-machine-data-event/pom.xml
index 78a4d0f..1f59073 100644
--- a/iot-machine-data-event/pom.xml
+++ b/iot-machine-data-event/pom.xml
@@ -3,8 +3,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- iot-machine-state-event-generator
com.qniao
+ java-dependency
0.0.1-SNAPSHOT
4.0.0
diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java
index e8fc9eb..b6e68a0 100644
--- a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java
+++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java
@@ -28,12 +28,12 @@ public class MachineIotDataReceivedEvent implements Serializable {
private Long machineIotMac;
/**
- * 机器电源状态
+ * 机器电源状态(0断电 1供电)
*/
private Integer machinePwrStat;
/**
- * 机器工作状态
+ * 机器工作状态(0停机状态 1工作状态 2待机状态)
*/
private Integer machineWorkingStat;
diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqSerializationSchema.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqSerializationSchema.java
new file mode 100644
index 0000000..ad53212
--- /dev/null
+++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEventRabbitMqSerializationSchema.java
@@ -0,0 +1,21 @@
+package com.qniao.iot.machine.event;
+
+import com.qniao.domain.BaseCommand;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+
+public class MachineIotDataReceivedEventRabbitMqSerializationSchema implements SerializationSchema {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @Override
+ public byte[] serialize(BaseCommand command) {
+ try {
+ return OBJECT_MAPPER.writeValueAsBytes(command);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException("Could not serialize record: " + command, e);
+ }
+ }
+}
diff --git a/iot-machine-state-event-generator-job/dependency-reduced-pom.xml b/iot-machine-state-event-generator-job/dependency-reduced-pom.xml
index 6eb25e6..3440136 100644
--- a/iot-machine-state-event-generator-job/dependency-reduced-pom.xml
+++ b/iot-machine-state-event-generator-job/dependency-reduced-pom.xml
@@ -1,7 +1,7 @@
- iot-machine-state-event-generator
+ java-dependency
com.qniao
0.0.1-SNAPSHOT
@@ -45,12 +45,6 @@
-
-
-
- com.qniao.iot.machine.event.generator.job.IotMachineEventGeneratorJob
-
-
@@ -58,6 +52,18 @@
+
+ com.qniao
+ iot-machine-data-event
+ 0.0.1-SNAPSHOT
+ provided
+
+
+ org.apache.flink
+ flink-streaming-java
+ 1.15.0
+ provided
+
org.apache.logging.log4j
log4j-slf4j-impl
@@ -76,7 +82,56 @@
2.17.2
runtime
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.13.3
+ provided
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ 2.13.3
+ provided
+
+
+ com.qniao
+ iot-machine-data-command
+ 0.0.1-SNAPSHOT
+ provided
+
+
+ com.qniao
+ iot-machine-data-constant
+ 0.0.1-SNAPSHOT
+ provided
+
+
+ org.apache.flink
+ flink-connector-rabbitmq_2.12
+ 1.14.5
+ provided
+
+
+ org.apache.flink
+ flink-connector-elasticsearch7_2.11
+ 1.14.5
+ provided
+
+
+ commons-logging
+ commons-logging
+ 1.2
+ provided
+
+
+
+ maven-releases
+ Nexus releases Repository
+ http://120.78.76.88:8081/repository/maven-snapshots/
+
+
1.8
2.17.2
diff --git a/iot-machine-state-event-generator-job/pom.xml b/iot-machine-state-event-generator-job/pom.xml
index e1fab28..b18756e 100644
--- a/iot-machine-state-event-generator-job/pom.xml
+++ b/iot-machine-state-event-generator-job/pom.xml
@@ -3,8 +3,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- iot-machine-state-event-generator
com.qniao
+ java-dependency
0.0.1-SNAPSHOT
4.0.0
@@ -34,17 +34,17 @@
flink-streaming-java
${flink.version}
-
+
-
+
@@ -76,6 +76,36 @@
jackson-datatype-jsr310
2.13.3
+
+
+ com.qniao
+ iot-machine-data-command
+ 0.0.1-SNAPSHOT
+
+
+
+ com.qniao
+ iot-machine-data-constant
+ 0.0.1-SNAPSHOT
+
+
+
+ org.apache.flink
+ flink-connector-rabbitmq_2.12
+ 1.14.5
+
+
+
+ org.apache.flink
+ flink-connector-elasticsearch7_2.11
+ 1.14.5
+
+
+
+ commons-logging
+ commons-logging
+ 1.2
+
@@ -94,12 +124,12 @@
-
+
+ <!– Run shade goal on package phase –>
package
@@ -116,8 +146,8 @@
-
+ <!– Do not copy the signatures in the META-INF folder.
+ Otherwise, this might cause SecurityExceptions when using the JAR. –>
*:*
META-INF/*.SF
@@ -126,19 +156,18 @@
-
-
-
- com.qniao.iot.machine.event.generator.job.IotMachineEventGeneratorJob
-
-
-
-
+ -->
+
+
+
+ maven-releases
+ Nexus releases Repository
+ http://120.78.76.88:8081/repository/maven-snapshots/
+
+
\ No newline at end of file
diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java
index 9671ba4..72ce839 100644
--- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java
+++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java
@@ -1,27 +1,126 @@
package com.qniao.iot.machine.event.generator.job;
-import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
-import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.connector.kafka.source.KafkaSource;
-import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import com.qniao.domain.BaseCommand;
+import com.qniao.iot.machine.command.PowerOffMachineCommand;
+import com.qniao.iot.machine.command.PowerOnMachineCommand;
+import com.qniao.iot.machine.command.StartMachineWorkingCommand;
+import com.qniao.iot.machine.command.StopMachineWorkingCommand;
+import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema;
+import com.rabbitmq.client.AMQP;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSinkPublishOptions;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
public class IotMachineEventGeneratorJob {
- public static void main(String[] args) throws Exception {
- final ParameterTool params = ParameterTool.fromArgs(args);
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
-
- // 定义Kafka数据源
- KafkaSource source = KafkaSource.builder()
- .setBootstrapServers(params.get("source.bootstrap.servers"))
- .setTopics("root_cloud_iot_report_data_event")
- .setGroupId("iot.machine.generator")
- .setStartingOffsets(OffsetsInitializer.earliest())
- .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
- .build();
+
+ public static void sinkRabbitMq(DataStream commandDataStream) {
+
+ // rabbitmq配置(测试环境)
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setHost("8.135.8.221")
+ .setVirtualHost("/")
+ .setUserName("qniao")
+ .setPassword("Qianniao2020")
+ .setPort(5672).build();
+
+ // 发送相应的指令到rabbitmq的交换机
+ commandDataStream.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(), new RMQSinkPublishOptions() {
+
+ @Override
+ public String computeRoutingKey(BaseCommand command) {
+ return "machine-iot-data-received-event";
+ }
+
+ @Override
+ public AMQP.BasicProperties computeProperties(BaseCommand command) {
+ return null;
+ }
+
+ @Override
+ public String computeExchange(BaseCommand command) {
+
+ // 交换机名称
+ return "flink_test_exchange";
+ }
+ })).name("commandDataStream to rabbitmq Sink");
+
+ // 直接发队列
+ // commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12");
+ }
+
+ public static void sinkEs(DataStream commandDataStream) {
+
+ List httpHosts = new ArrayList<>();
+ httpHosts.add(new HttpHost("119.23.41.137", 9200, "http"));
+ ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
+ (ElasticsearchSinkFunction) (command, runtimeContext, requestIndexer) -> {
+
+ HashMap map = new HashMap<>();
+ if(command instanceof PowerOnMachineCommand) {
+ // 设备开机数据
+ PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand)command;
+ map.put("id", powerOnMachineCommand.getId().toString());
+ map.put("currTotalOutput", powerOnMachineCommand.getCurrTotalOutput().toString());
+ }
+ if(command instanceof PowerOffMachineCommand) {
+ // 设备关机数据
+ PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand)command;
+ map.put("id", powerOffMachineCommand.getId().toString());
+ map.put("currTotalOutput", powerOffMachineCommand.getCurrTotalOutput().toString());
+ }
+ if(command instanceof StopMachineWorkingCommand) {
+ // 设备待机数据
+ StopMachineWorkingCommand stopMachineWorkingCommand = (StopMachineWorkingCommand)command;
+ map.put("id", stopMachineWorkingCommand.getId().toString());
+ map.put("currTotalOutput", stopMachineWorkingCommand.getCurrTotalOutput().toString());
+ }
+ if(command instanceof StartMachineWorkingCommand) {
+ // 设备工作数据
+ StartMachineWorkingCommand startMachineWorkingCommand = (StartMachineWorkingCommand)command;
+ map.put("id", startMachineWorkingCommand.getId().toString());
+ map.put("currTotalOutput", startMachineWorkingCommand.getCurrTotalOutput().toString());
+ }
+ //创建es 请求
+ IndexRequest indexRequest = Requests.indexRequest().index("machine-iot-data-received-event").source(map);
+ requestIndexer.add(indexRequest);
+ }
+ );
+ //刷新前缓冲的最大动作量
+ esSinkBuilder.setBulkFlushMaxActions(10);
+ //刷新前缓冲区的最大数据大小(以MB为单位)
+ esSinkBuilder.setBulkFlushMaxSizeMb(5);
+ //论缓冲操作的数量或大小如何都要刷新的时间间隔
+ esSinkBuilder.setBulkFlushInterval(5000L);
+ // 客户端创建配置回调,配置账号密码
+ esSinkBuilder.setRestClientFactory(
+ restClientBuilder -> {
+ restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic","qn56521"));
+ return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ });
+ restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
+ // 设置es连接超时时间
+ requestConfigBuilder.setConnectTimeout(3000);
+ return requestConfigBuilder;
+ });
+ }
+ );
+ //数据流添加sink
+ commandDataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink");
}
}
diff --git a/pom.xml b/pom.xml
index fcd5af0..b92b0fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,14 +46,14 @@ under the License.
-
+
nexus
qniao