Browse Source

es sink 优化

master
1049970895@qniao.cn 3 years ago
parent
commit
7ba5e8f433
3 changed files with 186 additions and 19 deletions
  1. 4
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/config/ApolloConfig.java
  2. 12
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java
  3. 189
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

4
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/config/ApolloConfig.java

@ -7,12 +7,12 @@ public class ApolloConfig {
private static final Config config = ConfigService.getAppConfig();
public static String get(String key, String defaultValue) {
public static String getStr(String key, String defaultValue) {
return config.getProperty(key, defaultValue);
}
public static String get(String key) {
public static String getStr(String key) {
return config.getProperty(key, null);
}

12
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/constant/ConfigConstant.java

@ -41,4 +41,16 @@ public interface ConfigConstant {
String SINK_ELASTICSEARCH_SCHEME = "sink.elasticsearch.scheme";
String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index";
String ES_HOST_NAME = "es.host.name";
String ES_POST = "es.post";
String ES_SCHEME = "es.scheme";
String ES_USER_NAME = "es.user.name";
String ES_PASSWORD = "es.password";
String ES_CONNECT_TIMEOUT = "es.connect.timeout";
}

189
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -41,8 +41,16 @@ import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.sql.SQLException;
@ -56,6 +64,23 @@ import java.util.List;
@Slf4j
public class IotMachineEventGeneratorJob {
private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(
RestClient.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME),
ApolloConfig.getInt(ConfigConstant.ES_POST),
ApolloConfig.getStr(ConfigConstant.ES_SCHEME)))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME),
ApolloConfig.getStr(ConfigConstant.ES_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
})
.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT));
return requestConfigBuilder;
}));
private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" +
"from qn_machine_realtime_state qmrs\n" +
" LEFT JOIN (select example_id, count_unit from qn_machine_list where is_delete = 0) qml\n" +
@ -63,14 +88,19 @@ public class IotMachineEventGeneratorJob {
"where qmrs.iot_mac = ?\n" +
" and qmrs.is_delete = 0";
/**
* 当前索引日期后缀
*/
private static String currIndicesDateSuffix;
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
@ -165,6 +195,8 @@ public class IotMachineEventGeneratorJob {
if (deviceStateListJson == null) {
// 查询数据库最新的设备状态
List<DeviceState> list = Db.use().query(SQL, DeviceState.class, machineIotMac);
// 查询es最新的设备状态 勿删
//DeviceState deviceState1 = queryLatestDeviceState(machineIotMac);
if (CollUtil.isNotEmpty(list)) {
deviceStateListJson = list.get(0);
@ -177,14 +209,49 @@ public class IotMachineEventGeneratorJob {
return deviceStateListJson;
}
/* 勿删
private static DeviceState queryLatestDeviceState(Long machineIotMac) {
try {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX));
searchRequest.source(searchSourceBuilder);
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate);
// 先判断索引是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (exists) {
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
MachineIotDataReceivedEvent receivedEvent = JSONUtil
.toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class);
DeviceState deviceState = new DeviceState();
}
}
} catch (Exception e) {
log.error("获取es数据异常", e);
}
return null;
}*/
private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) {
// rabbitmq配置
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_HOST))
.setVirtualHost(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST))
.setUserName(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_USER_NAME))
.setPassword(ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_PASSWORD))
.setHost(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_HOST))
.setVirtualHost(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_VIRTUAL_HOST))
.setUserName(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_USER_NAME))
.setPassword(ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_PASSWORD))
.setPort(ApolloConfig.getInt(ConfigConstant.SINK_RABBITMQ_PORT))
.build();
@ -198,18 +265,18 @@ public class IotMachineEventGeneratorJob {
if (command instanceof PowerOnMachineCommand) {
// 机器通电
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY);
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY);
}
if (command instanceof PowerOffMachineCommand) {
// 机器断电
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY);
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY);
}
if (command instanceof StopMachineWorkingCommand) {
// 机器待机
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY);
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY);
} else {
// 机器工作
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY);
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY);
}
}
@ -222,7 +289,7 @@ public class IotMachineEventGeneratorJob {
public String computeExchange(BaseCommand command) {
// 交换机名称
return ApolloConfig.get(ConfigConstant.SINK_RABBITMQ_EXCHANGE);
return ApolloConfig.getStr(ConfigConstant.SINK_RABBITMQ_EXCHANGE);
}
@ -232,9 +299,9 @@ public class IotMachineEventGeneratorJob {
private static void sinkEs(DataStream<MachineIotDataReceivedEvent> dataStream) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST),
httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
ElasticsearchSink.Builder<MachineIotDataReceivedEvent> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
(ElasticsearchSinkFunction<MachineIotDataReceivedEvent>) (machineIotDataReceivedEvent, runtimeContext, requestIndexer) -> {
@ -242,9 +309,13 @@ public class IotMachineEventGeneratorJob {
LocalDate reportDate = new Date(machineIotDataReceivedEvent.getReportTime())
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
// 索引名称
String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix;
// 校验索引是否存在
checkIndicesIsExists(indexDateSuffix, indicesName);
//创建es 请求
IndexRequest indexRequest = Requests.indexRequest()
.index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix)
.index(indicesName)
.source(BeanUtil.beanToMap(machineIotDataReceivedEvent))
.id(StrUtil.toString(machineIotDataReceivedEvent.getId()));
requestIndexer.add(indexRequest);
@ -262,8 +333,8 @@ public class IotMachineEventGeneratorJob {
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
@ -277,6 +348,90 @@ public class IotMachineEventGeneratorJob {
dataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink");
}
private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) {
if (currIndicesDateSuffix == null) {
// 当前月的索引为空
createIndices(indicesName, indexDateSuffix);
} else {
// 校验当前消息能否符合当前索引
if (!indexDateSuffix.equals(currIndicesDateSuffix)) {
// 如果不符合需要重建索引
createIndices(indicesName, indexDateSuffix);
}
}
}
private static void createIndices(String indicesName, String indexDateSuffix) {
// 判断索引是否存在
GetIndexRequest exist = new GetIndexRequest(indicesName);
// 先判断客户端是否存在
try {
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (!exists) {
// 创建索引
CreateIndexRequest request = new CreateIndexRequest(indicesName);
// 字段映射
String mappersStr = "{\n" +
" \"properties\": {\n" +
" \"machineWorkingStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"machineIotMac\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"machinePwrStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"currWaitingDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"igStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"currJobDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"receivedTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"currStoppingDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"id\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"accJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"dataSource\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"reportTime\": {\n" +
" \"type\": \"date\"\n" +
" }\n" +
" }\n" +
" }";
request.mapping(mappersStr, XContentType.JSON);
// 设置索引别名
request.alias(new Alias(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)));
// 暂时不管是否创建成功
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
if (!acknowledged || !shardsAcknowledged) {
throw new Exception("自定义索引创建失败!!!");
}
currIndicesDateSuffix = indexDateSuffix;
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static Integer getDeviceStatus(MachineIotDataReceivedEvent event) {

Loading…
Cancel
Save