Browse Source

集成apollo

master
1049970895@qniao.cn 3 years ago
parent
commit
70b04fac42
6 changed files with 134 additions and 85 deletions
  1. 2
      iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java
  2. 12
      iot-machine-state-event-generator-job/pom.xml
  3. 24
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/config/ApolloConfig.java
  4. 38
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java
  5. 140
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java
  6. 3
      iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties

2
iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java

@ -33,7 +33,7 @@ public class MachineIotDataReceivedEvent implements Serializable {
private Integer machinePwrStat;
/**
* 机器工作状态0未工作 1工作中
* 机器工作状态0未工作 1工作中 2待机中
*/
private Integer machineWorkingStat;

12
iot-machine-state-event-generator-job/pom.xml

@ -107,6 +107,18 @@
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- apollo -->
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-core</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
<build>

24
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/config/ApolloConfig.java

@ -0,0 +1,24 @@
package com.qniao.iot.machine.event.generator.config;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
public class ApolloConfig {
private static final Config config = ConfigService.getAppConfig();
public static String get(String key, String defaultValue) {
return config.getProperty(key, defaultValue);
}
public static String get(String key) {
return config.getProperty(key, null);
}
public static Integer getInt(String key) {
return config.getIntProperty(key, null);
}
}

38
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java

@ -0,0 +1,38 @@
package com.qniao.iot.machine.event.generator.constant;
public interface ConfigConstant {
String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers";
String SOURCE_KAFKA_TOPICS = "source.kafka.topics";
String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId";
String SINK_RABBITMQ_HOST = "sink.rabbitmq.host";
String SINK_RABBITMQ_PORT = "sink.rabbitmq.port";
String SINK_RABBITMQ_VIRTUAL_HOST = "sink.rabbitmq.virtualHost";
String SINK_RABBITMQ_USER_NAME = "sink.rabbitmq.userName";
String SINK_RABBITMQ_PASSWORD = "sink.rabbitmq.password";
String SINK_RABBITMQ_ROUTING_KEY = "sink.rabbitmq.routingKey";
String SINK_RABBITMQ_EXCHANGE = "sink.rabbitmq.exchange";
String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host";
String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post";
String SINK_ELASTICSEARCH_USER_NAME = "sink.elasticsearch.userName";
String SINK_ELASTICSEARCH_PASSWORD = "sink.elasticsearch.password";
String SINK_ELASTICSEARCH_CONNECT_TIMEOUT = "sink.elasticsearch.connectTimeout";
String SINK_ELASTICSEARCH_SCHEME = "sink.elasticsearch.scheme";
String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index";
}

140
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -1,6 +1,8 @@
package com.qniao.iot.machine.event.generator.job;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Db;
import cn.hutool.json.JSON;
@ -13,18 +15,19 @@ import com.qniao.iot.machine.command.StopMachineWorkingCommand;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializationSchema;
import com.qniao.iot.machine.event.generator.config.ApolloConfig;
import com.qniao.iot.machine.event.generator.constant.ConfigConstant;
import com.rabbitmq.client.AMQP;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
@ -41,8 +44,6 @@ import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import javax.sql.DataSource;
import java.io.IOException;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;
@ -59,23 +60,22 @@ public class IotMachineEventGeneratorJob {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(params.get("source.bootstrap.servers"))
.setTopics("machine_iot_data_received_event")
.setGroupId("root_cloud_iot_data_etl")
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
.build();
// 设备数据分组
DataStream<BaseCommand> commandDataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source")
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
DataStream<BaseCommand> commandDataStream = dataStreamSource.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, BaseCommand>() {
private ValueState<JSON> deviceState;
@ -93,7 +93,8 @@ public class IotMachineEventGeneratorJob {
deviceState -> deviceState, (deviceState1, deviceState2) -> deviceState1));
}
deviceState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("deviceState", TypeInformation.of(JSON.class), JSONUtil.parse(allMachineMap)));
.getState(new ValueStateDescriptor<>("deviceState",
TypeInformation.of(JSON.class), JSONUtil.parse(allMachineMap)));
}
@Override
@ -102,13 +103,15 @@ public class IotMachineEventGeneratorJob {
// 获取最新设备状态
JSON deviceStateListJson = deviceState.value();
DeviceState lastedDeviceState = deviceStateListJson.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class);
DeviceState lastedDeviceState = deviceStateListJson
.getByPath(StrUtil.toString(event.getMachineIotMac()), DeviceState.class);
Integer deviceStatus = getDeviceStatus(event);
if (deviceStatus != null) {
deviceStateListJson.putByPath(StrUtil.toString(event.getMachineIotMac()),
new DeviceState(event.getId(), event.getMachineIotMac(), deviceStatus, event.getReportTime()));
if (lastedDeviceState != null) {
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(), deviceStatus, event.getReportTime());
DeviceState newState = new DeviceState(lastedDeviceState.getMachineId(), event.getMachineIotMac(),
deviceStatus, event.getReportTime());
collDeviceStatusChange(out, newState, lastedDeviceState, event);
}
deviceState.update(deviceStateListJson);
@ -121,7 +124,7 @@ public class IotMachineEventGeneratorJob {
sinkRabbitMq(commandDataStream);
// 写入es
sinkEs(commandDataStream);
sinkEs(dataStreamSource);
env.execute("Kafka Job");
}
@ -131,72 +134,48 @@ public class IotMachineEventGeneratorJob {
// rabbitmq配置测试环境
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("8.135.8.221")
.setVirtualHost("/")
.setUserName("qniao")
.setPassword("Qianniao2020")
.setPort(5672).build();
.setHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_HOST))
.setVirtualHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST))
.setUserName(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_USER_NAME))
.setPassword(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_PASSWORD))
.setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT)).build();
// 发送相应的指令到rabbitmq的交换机
commandDataStream.addSink(new RMQSink<>(connectionConfig, new MachineIotDataReceivedEventRabbitMqSerializationSchema(),
new RMQSinkPublishOptions<BaseCommand>() {
@Override
public String computeRoutingKey(BaseCommand command) {
return "machine-iot-data-received-event";
}
@Override
public AMQP.BasicProperties computeProperties(BaseCommand command) {
return null;
}
@Override
public String computeRoutingKey(BaseCommand command) {
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_ROUTING_KEY);
}
@Override
public String computeExchange(BaseCommand command) {
@Override
public AMQP.BasicProperties computeProperties(BaseCommand command) {
return null;
}
// 交换机名称
return "flink_test_exchange";
}
})).name("commandDataStream to rabbitmq Sink");
@Override
public String computeExchange(BaseCommand command) {
// 直接发队列
// commandDataStream.addSink(new RMQSink<>(connectionConfig, "flink_test_queue", new BaseCommandSerializationSchema())).name("12");
// 交换机名称
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE);
}
})).name("commandDataStream to rabbitmq Sink");
}
private static void sinkEs(DataStream<BaseCommand> commandDataStream) {
private static void sinkEs(DataStreamSource<MachineIotDataReceivedEvent> dataStream) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("119.23.41.137", 9200, "http"));
ElasticsearchSink.Builder<BaseCommand> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
(ElasticsearchSinkFunction<BaseCommand>) (command, runtimeContext, requestIndexer) -> {
HashMap<String, String> map = new HashMap<>();
if (command instanceof PowerOnMachineCommand) {
// 设备开机数据
PowerOnMachineCommand powerOnMachineCommand = (PowerOnMachineCommand) command;
map.put("id", powerOnMachineCommand.getId().toString());
map.put("currTotalOutput", powerOnMachineCommand.getCurrTotalOutput().toString());
}
if (command instanceof PowerOffMachineCommand) {
// 设备关机数据
PowerOffMachineCommand powerOffMachineCommand = (PowerOffMachineCommand) command;
map.put("id", powerOffMachineCommand.getId().toString());
map.put("currTotalOutput", powerOffMachineCommand.getCurrTotalOutput().toString());
}
if (command instanceof StopMachineWorkingCommand) {
// 设备待机数据
StopMachineWorkingCommand stopMachineWorkingCommand = (StopMachineWorkingCommand) command;
map.put("id", stopMachineWorkingCommand.getId().toString());
map.put("currTotalOutput", stopMachineWorkingCommand.getCurrTotalOutput().toString());
}
if (command instanceof StartMachineWorkingCommand) {
// 设备工作数据
StartMachineWorkingCommand startMachineWorkingCommand = (StartMachineWorkingCommand) command;
map.put("id", startMachineWorkingCommand.getId().toString());
map.put("currTotalOutput", startMachineWorkingCommand.getCurrTotalOutput().toString());
}
httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
ElasticsearchSink.Builder<MachineIotDataReceivedEvent> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
(ElasticsearchSinkFunction<MachineIotDataReceivedEvent>) (machineIotDataReceivedEvent, runtimeContext, requestIndexer) -> {
//创建es 请求
IndexRequest indexRequest = Requests.indexRequest().index("machine-iot-data-received-event").source(map);
IndexRequest indexRequest = Requests.indexRequest()
.index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))
.source(BeanUtil.beanToMap(machineIotDataReceivedEvent));
requestIndexer.add(indexRequest);
}
);
@ -204,25 +183,27 @@ public class IotMachineEventGeneratorJob {
esSinkBuilder.setBulkFlushMaxActions(10);
//刷新前缓冲区的最大数据大小以MB为单位
esSinkBuilder.setBulkFlushMaxSizeMb(5);
//论缓冲操作的数量或大小如何都要刷新的时间间隔
//论缓冲操作的数量或大小如何都要刷新的时间间隔
esSinkBuilder.setBulkFlushInterval(5000L);
// 客户端创建配置回调配置账号密码
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "qn56521"));
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(3000);
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
return requestConfigBuilder;
});
}
);
//数据流添加sink
commandDataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink");
dataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink");
}
@ -235,7 +216,7 @@ public class IotMachineEventGeneratorJob {
&& Integer.valueOf("1").equals(event.getMachineWorkingStat())) {
return 1;
} else if (Integer.valueOf("1").equals(event.getMachinePwrStat())
&& Integer.valueOf("0").equals(event.getMachineWorkingStat())) {
&& Integer.valueOf("2").equals(event.getMachineWorkingStat())) {
return 2;
}
return null;
@ -256,14 +237,5 @@ public class IotMachineEventGeneratorJob {
// 设备开始工作
out.collect(new StartMachineWorkingCommand(newState.getMachineIotMac(), event.getCurrJobCount()));
}
/*if (newState.getStatus() == 1 || newState.getStatus() == 2) {
System.out.println("设备开机。相关事件:" + event.toString());
out.collect(new PowerOnMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount()));
} else if (newState.getStatus() == 0) {
System.out.println("设备关机。相关事件:" + event.toString());
out.collect(new PowerOffMachineCommand(newState.getMachineIotMac(), event.getCurrJobCount()));
}*/
}
}

3
iot-machine-state-event-generator-job/src/main/resources/META-INF/app.properties

@ -0,0 +1,3 @@
app.id=machine-state-event-generator
apollo.meta=http://8.135.8.221:5000
Loading…
Cancel
Save