Browse Source

first commit

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
commit
5753af8760
10 changed files with 743 additions and 0 deletions
  1. 186
      pom.xml
  2. 69
      src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java
  3. 41
      src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java
  4. 247
      src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java
  5. 24
      src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java
  6. 44
      src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java
  7. 91
      src/main/java/com/qniao/iot/gizwits/utils/EsRestClientUtil.java
  8. 5
      src/main/resources/META-INF/app.properties
  9. 25
      src/main/resources/log4j2.properties
  10. 11
      src/test/java/Demo1.java

186
pom.xml

@ -0,0 +1,186 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>iot-gizwits-monitoring-data</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.0</flink.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.4</version>
</dependency>
<!-- apollo -->
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-core</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.31</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>ddd-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.qniao.iot.gizwits.GizWitsIotMonitoringDataJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>maven-releases</id>
<name>Nexus releases Repository</name>
<url>http://120.78.76.88:8081/repository/maven-snapshots/</url>
</repository>
</distributionManagement>
</project>

69
src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java

@ -0,0 +1,69 @@
package com.qniao.iot.gizwits;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class DeviceMonitoringData {
/**
* 数据来源
*/
private Integer dataSource;
/**
* 设备物联地址(云盒物理标识)
*/
private Long machineIotMac;
/**
* 机器电源状态0断电 1供电
*/
private Integer machinePwrStat;
/**
* 机器工作状态0未工作 1工作中 2待机中
*/
private Integer machineWorkingStat;
/**
* 累加作业总数
*/
private Long accJobCount;
/**
* 当前作业计数
*/
private Long currJobCount;
/**
* 当前作业时长
*/
private Long currJobDuration;
/**
* 数据实际采样时间
*/
private Long reportTime;
/**
* 实际接收到数据的时间
*/
private Long receivedTime;
/**
* 机器标识
*/
private Long machineId;
/**
* 累计工作时长
*/
private Long accJobCountDuration;
/**
* 上次开机时间(毫秒)
*/
private Long lastBootTime;
}

41
src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java

@ -0,0 +1,41 @@
package com.qniao.iot.gizwits;
import lombok.Data;
import java.time.LocalDate;
import java.time.LocalDateTime;
@Data
public class DeviceTotalData {
/**
* 上次开机时间
*/
private LocalDateTime lastBootTime;
/**
* 当天作业计数
*/
private Long theDayJobCount;
/**
* 当天作业时长
*/
private Long theDayJobDuration;
/**
* 累计作业计数
*/
private Long JobTotal;
/**
* 累计作业时长
*/
private Long JobDurationTotal;
/**
* 当前日期
*/
private LocalDate currLocalDate;
}

247
src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java

@ -0,0 +1,247 @@
package com.qniao.iot.gizwits;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.gizwits.config.ApolloConfig;
import com.qniao.iot.gizwits.constant.ConfigConstant;
import com.qniao.iot.gizwits.utils.EsRestClientUtil;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.sql.Date;
import java.time.*;
import java.time.temporal.TemporalUnit;
import java.util.*;
import java.util.function.Consumer;
import static java.time.temporal.ChronoUnit.SECONDS;
@Slf4j
public class GizWitsIotMonitoringDataJob {
private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient
.builder(new HttpHost("120.79.137.137", 9200, "http"))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "qnol26215"));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
})
.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(3000);
return requestConfigBuilder;
}));
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
.build();
// 设备数据源转换
DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
// mac分组并进行工作时长的集合操作
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = dataStreamSource
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>() {
private ValueState<DeviceTotalData> deviceTotalData;
@Override
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
deviceTotalData = getRuntimeContext()
.getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class)));
}
@Override
public void processElement(MachineIotDataReceivedEvent event,
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>.Context ctx,
Collector<DeviceMonitoringData> out) throws Exception {
DeviceTotalData lastedDeviceState = getDeviceTotalData(event);
}
private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws IOException {
DeviceTotalData value = deviceTotalData.value();
Long reportTime = event.getReportTime();
LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
if (value == null) {
// 从es中获取
DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac());
DeviceTotalData data = new DeviceTotalData();
if (deviceMonitoringData != null) {
data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
data.setJobTotal(deviceMonitoringData.getAccJobCount());
// 单位秒
data.setCurrLocalDate(localDate);
data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
LocalDateTime ldt = new Date(deviceMonitoringData.getLastBootTime())
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime();
data.setLastBootTime(ldt);
} else {
// es中也没有machine_iot_data_received_event索引中拿
queryDeviceMonitoringData(event.getMachineIotMac(), localDate, value);
}
deviceTotalData.update(data);
}
// 是否日期是当天的否则需要更新当天工作时长和当天工作量
if (!value.getCurrLocalDate().isEqual(localDate)) {
queryDeviceMonitoringData(event.getMachineIotMac(), localDate, value);
}
return null;
}
private Tuple2<Long, Long> queryDeviceMonitoringData(Long machineIotMac,
LocalDate localDate,
DeviceTotalData value) throws IOException {
LocalDateTime startTime = LocalDateTime.of(localDate, LocalTime.MIN);
LocalDateTime endTime = LocalDateTime.of(localDate, LocalTime.MAX);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
/*searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime")
.gte(startTime.atZone(ZoneOffset.of("+8")).toEpochSecond())
.lte(endTime.atZone(ZoneOffset.of("+8")).toEpochSecond()));*/
searchSourceBuilder.sort("reportTime");
searchSourceBuilder.size(500);
List<MachineIotDataReceivedEvent> receivedEventList = new ArrayList<>();
EsRestClientUtil.queryDeviceListPageResult(searchSourceBuilder,
receivedEventList::add, MachineIotDataReceivedEvent.class, getIndicesList());
List<Tuple2<Long, Long>> tuple3List = statistics(receivedEventList);
}
/**
*
* @param receivedEventList
* @return 时长数量
*/
private List<Tuple2<Long, Long>> statistics(List<MachineIotDataReceivedEvent> receivedEventList) {
Map<String, Long> map = new HashMap<>();
MachineIotDataReceivedEvent firstEvent;
Integer nextPwrStat;
ArrayList<Integer> nextWorkingStatList;
boolean isHasWaitingWork = false;
for (int i = 0; i < receivedEventList.size(); i++) {
MachineIotDataReceivedEvent receivedEvent = receivedEventList.get(i);
firstEvent = receivedEvent;
Integer machinePwrStat = receivedEvent.getMachinePwrStat();
Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
Long reportTime = receivedEvent.getReportTime();
if (i == 0) {
Instant instant = Instant.ofEpochMilli(reportTime * 1000);
ZoneId zone = ZoneId.systemDefault();
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, zone);
LocalDateTime startTime = LocalDateTime.of(localDateTime.toLocalDate(), LocalTime.MIN);
long l = Duration.between(startTime, localDateTime).get(SECONDS);
map.put("currJobDuration", l);
map.put("currJobCount", receivedEvent.getCurrJobCount());
nextPwrStat = 1;
}
}
}
private String[] getIndicesList() throws IOException {
GetAliasesRequest request = new GetAliasesRequest();
GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request, RequestOptions.DEFAULT);
Map<String, Set<AliasMetaData>> map = getAliasesResponse.getAliases();
Set<String> indices = map.keySet();
List<String> indicesList = new ArrayList<>();
for (String key : indices) {
if (key.contains(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))) {
indicesList.add(key);
}
}
return ArrayUtil.toArray(indicesList, String.class);
}
private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac) {
try {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest("DeviceMonitoringData");
searchRequest.source(searchSourceBuilder);
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class);
}
} catch (IOException e) {
log.error("获取es数据异常", e);
}
return null;
}
}).name("machineIotDataReceivedEventDataStream keyBy stream");
}
}

24
src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java

@ -0,0 +1,24 @@
package com.qniao.iot.gizwits.config;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
public class ApolloConfig {
private static final Config config = ConfigService.getAppConfig();
public static String get(String key, String defaultValue) {
return config.getProperty(key, defaultValue);
}
public static String get(String key) {
return config.getProperty(key, null);
}
public static Integer getInt(String key) {
return config.getIntProperty(key, null);
}
}

44
src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java

@ -0,0 +1,44 @@
package com.qniao.iot.gizwits.constant;
public interface ConfigConstant {
String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers";
String SOURCE_KAFKA_TOPICS = "source.kafka.topics";
String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId";
String SINK_RABBITMQ_HOST = "sink.rabbitmq.host";
String SINK_RABBITMQ_PORT = "sink.rabbitmq.port";
String SINK_RABBITMQ_VIRTUAL_HOST = "sink.rabbitmq.virtualHost";
String SINK_RABBITMQ_USER_NAME = "sink.rabbitmq.userName";
String SINK_RABBITMQ_PASSWORD = "sink.rabbitmq.password";
String SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.powerOnMachineCommand.routingKey";
String SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.powerOffMachineCommand.routingKey";
String SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY = "sink.rabbitmq.stopMachineWorkingCommand.routingKey";
String SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY = "sink.rabbitmq.startMachineWorkingCommand.routingKey";
String SINK_RABBITMQ_EXCHANGE = "sink.rabbitmq.exchange";
String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host";
String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post";
String SINK_ELASTICSEARCH_USER_NAME = "sink.elasticsearch.userName";
String SINK_ELASTICSEARCH_PASSWORD = "sink.elasticsearch.password";
String SINK_ELASTICSEARCH_CONNECT_TIMEOUT = "sink.elasticsearch.connectTimeout";
String SINK_ELASTICSEARCH_SCHEME = "sink.elasticsearch.scheme";
String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index";
}

91
src/main/java/com/qniao/iot/gizwits/utils/EsRestClientUtil.java

@ -0,0 +1,91 @@
package com.qniao.iot.gizwits.utils;
import cn.hutool.json.JSONUtil;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
public class EsRestClientUtil {
private static String host = "120.79.137.137:9200";
private static String scheme = "http";
private static String index = "qn_cloud_box_data_history";
private static RestClientBuilder builder = null;
private static RestHighLevelClient client = null;
public static void init() {
String[] nodeIpInfos = host.split(":");
builder = RestClient.builder(new HttpHost(nodeIpInfos[0], Integer.parseInt(nodeIpInfos[1]), scheme))
.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(10 * 60 * 1000);
requestConfigBuilder.setSocketTimeout(10 * 60 * 1000);
requestConfigBuilder.setConnectionRequestTimeout(10 * 60 * 1000);
return requestConfigBuilder;
});
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "qnol26215"));
builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
client = new RestHighLevelClient(builder);
}
public static <T>void queryDeviceListPageResult(SearchSourceBuilder sourceBuilder, Consumer<T> consumer,
Class<T> classes, String... indices) {
SearchRequest searchRequest = new SearchRequest(indices).scroll("5m").source(sourceBuilder);
if (client == null) {
init();
}
try {
SearchResponse response;
String scrollId = null;
while (!"none".equals(scrollId)) {
if (scrollId != null) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId).scroll("5m");
response = client.scroll(scrollRequest, RequestOptions.DEFAULT);
} else {
response = client.search(searchRequest, RequestOptions.DEFAULT);
}
int s = response.status().getStatus();
if (s == RestStatus.OK.getStatus()) {
SearchHit[] hits = response.getHits().getHits();
scrollId = response.getScrollId();
if (hits != null) {
System.out.println("*********************查询es结果数量 :" + hits.length);
for (SearchHit hit : hits) {
consumer.accept(JSONUtil.toBean(hit.getSourceAsString(), classes));
}
}
} else {
//清除滚屏
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
}
}
} catch (IOException e) {
e.printStackTrace();
}
return tupleList;
}
}

5
src/main/resources/META-INF/app.properties

@ -0,0 +1,5 @@
app.id=iot-gizwits-model-formatter
# test 8.135.8.221
# prod 47.112.164.224
apollo.meta=http://47.112.164.224:5000

25
src/main/resources/log4j2.properties

@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

11
src/test/java/Demo1.java

@ -0,0 +1,11 @@
import java.sql.Date;
import java.time.LocalDate;
public class Demo1 {
public static void main(String[] args) {
LocalDate localDate = new Date(1659088698L * 1000).toLocalDate();
System.out.println(localDate);
}
}
Loading…
Cancel
Save