From 5753af8760aaf493ee4ff2cbffe5ce6f121c42e1 Mon Sep 17 00:00:00 2001 From: "1049970895@qniao.cn" <1049970895> Date: Tue, 9 Aug 2022 17:51:36 +0800 Subject: [PATCH] first commit --- pom.xml | 186 +++++++++++++ .../iot/gizwits/DeviceMonitoringData.java | 69 +++++ .../qniao/iot/gizwits/DeviceTotalData.java | 41 +++ .../gizwits/GizWitsIotMonitoringDataJob.java | 247 ++++++++++++++++++ .../iot/gizwits/config/ApolloConfig.java | 24 ++ .../iot/gizwits/constant/ConfigConstant.java | 44 ++++ .../iot/gizwits/utils/EsRestClientUtil.java | 91 +++++++ src/main/resources/META-INF/app.properties | 5 + src/main/resources/log4j2.properties | 25 ++ src/test/java/Demo1.java | 11 + 10 files changed, 743 insertions(+) create mode 100644 pom.xml create mode 100644 src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java create mode 100644 src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java create mode 100644 src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java create mode 100644 src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java create mode 100644 src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java create mode 100644 src/main/java/com/qniao/iot/gizwits/utils/EsRestClientUtil.java create mode 100644 src/main/resources/META-INF/app.properties create mode 100644 src/main/resources/log4j2.properties create mode 100644 src/test/java/Demo1.java diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f5178cd --- /dev/null +++ b/pom.xml @@ -0,0 +1,186 @@ + + + 4.0.0 + + org.example + iot-gizwits-monitoring-data + 1.0-SNAPSHOT + + + 8 + 8 + UTF-8 + 1.15.0 + 1.8 + ${target.java.version} + ${target.java.version} + 2.17.2 + + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + org.apache.flink + flink-clients + ${flink.version} + + + + org.apache.flink + flink-connector-kafka + ${flink.version} + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + runtime + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + + commons-logging + commons-logging + 1.2 + + + + com.qniao + iot-machine-data-event + 0.0.1-SNAPSHOT + + + + org.apache.flink + flink-connector-elasticsearch7_2.11 + 1.14.5 + + + + cn.hutool + hutool-all + 5.8.4 + + + + + com.ctrip.framework.apollo + apollo-client + 2.0.1 + + + + com.ctrip.framework.apollo + apollo-core + 2.0.1 + + + + com.alibaba + fastjson + 1.2.31 + + + + com.qniao + ddd-event + 0.0.1-SNAPSHOT + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + ${target.java.version} + ${target.java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + + + org.apache.flink:flink-shaded-force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.qniao.iot.gizwits.GizWitsIotMonitoringDataJob + + + + + + + + + + + + maven-releases + Nexus releases Repository + http://120.78.76.88:8081/repository/maven-snapshots/ + + + + \ No newline at end of file diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java b/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java new file mode 100644 index 0000000..a8f0283 --- /dev/null +++ b/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java @@ -0,0 +1,69 @@ +package com.qniao.iot.gizwits; + +import lombok.Data; + +import java.time.LocalDateTime; + +@Data +public class DeviceMonitoringData { + + /** + * 数据来源 + */ + private Integer dataSource; + + /** + * 设备物联地址(云盒物理标识) + */ + private Long machineIotMac; + + /** + * 机器电源状态(0断电 1供电) + */ + private Integer machinePwrStat; + + /** + * 机器工作状态(0未工作 1工作中 2待机中) + */ + private Integer machineWorkingStat; + + /** + * 累加作业总数 + */ + private Long accJobCount; + + /** + * 当前作业计数 + */ + private Long currJobCount; + + /** + * 当前作业时长 + */ + private Long currJobDuration; + + /** + * 数据实际采样时间 + */ + private Long reportTime; + + /** + * 实际接收到数据的时间 + */ + private Long receivedTime; + + /** + * 机器标识 + */ + private Long machineId; + + /** + * 累计工作时长 + */ + private Long accJobCountDuration; + + /** + * 上次开机时间(毫秒) + */ + private Long lastBootTime; +} diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java new file mode 100644 index 0000000..91d8105 --- /dev/null +++ b/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java @@ -0,0 +1,41 @@ +package com.qniao.iot.gizwits; + +import lombok.Data; + +import java.time.LocalDate; +import java.time.LocalDateTime; + +@Data +public class DeviceTotalData { + + /** + * 上次开机时间 + */ + private LocalDateTime lastBootTime; + + /** + * 当天作业计数 + */ + private Long theDayJobCount; + + /** + * 当天作业时长 + */ + private Long theDayJobDuration; + + /** + * 累计作业计数 + */ + private Long JobTotal; + + /** + * 累计作业时长 + */ + private Long JobDurationTotal; + + /** + * 当前日期 + */ + private LocalDate currLocalDate; + +} diff --git a/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java new file mode 100644 index 0000000..b1a02cf --- /dev/null +++ b/src/main/java/com/qniao/iot/gizwits/GizWitsIotMonitoringDataJob.java @@ -0,0 +1,247 @@ +package com.qniao.iot.gizwits; + +import cn.hutool.core.date.LocalDateTimeUtil; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.json.JSONUtil; +import com.qniao.iot.gizwits.config.ApolloConfig; +import com.qniao.iot.gizwits.constant.ConfigConstant; +import com.qniao.iot.gizwits.utils.EsRestClientUtil; +import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.GetAliasesResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.IOException; +import java.sql.Date; +import java.time.*; +import java.time.temporal.TemporalUnit; +import java.util.*; +import java.util.function.Consumer; + +import static java.time.temporal.ChronoUnit.SECONDS; + +@Slf4j +public class GizWitsIotMonitoringDataJob { + + private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient + .builder(new HttpHost("120.79.137.137", 9200, "http")) + .setHttpClientConfigCallback(httpAsyncClientBuilder -> { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials("elastic", "qnol26215")); + return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + }) + .setRequestConfigCallback(requestConfigBuilder -> { + // 设置es连接超时时间 + requestConfigBuilder.setConnectTimeout(3000); + return requestConfigBuilder; + })); + + public static void main(String[] args) { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); + // 获取设备数据源 + KafkaSource source = KafkaSource.builder() + .setBootstrapServers(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) + .setTopics(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_TOPICS)) + .setGroupId(ApolloConfig.get(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) + .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") + .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) + .build(); + + // 设备数据源转换 + DataStreamSource dataStreamSource = env + .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); + + // mac分组并进行工作时长的集合操作 + DataStream machineIotDataReceivedEventDataStream = dataStreamSource + .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) + .process(new KeyedProcessFunction() { + + private ValueState deviceTotalData; + + @Override + public void open(Configuration parameters) { + + // 必须在 open 生命周期初始化 + deviceTotalData = getRuntimeContext() + .getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class))); + } + + @Override + public void processElement(MachineIotDataReceivedEvent event, + KeyedProcessFunction.Context ctx, + Collector out) throws Exception { + + DeviceTotalData lastedDeviceState = getDeviceTotalData(event); + } + + private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws IOException { + + DeviceTotalData value = deviceTotalData.value(); + Long reportTime = event.getReportTime(); + LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); + if (value == null) { + // 从es中获取 + DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac()); + DeviceTotalData data = new DeviceTotalData(); + if (deviceMonitoringData != null) { + data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); + data.setJobTotal(deviceMonitoringData.getAccJobCount()); + // 单位秒 + data.setCurrLocalDate(localDate); + data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); + data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); + LocalDateTime ldt = new Date(deviceMonitoringData.getLastBootTime()) + .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime(); + data.setLastBootTime(ldt); + } else { + // es中也没有,从“machine_iot_data_received_event”索引中拿 + queryDeviceMonitoringData(event.getMachineIotMac(), localDate, value); + } + deviceTotalData.update(data); + } + // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 + if (!value.getCurrLocalDate().isEqual(localDate)) { + queryDeviceMonitoringData(event.getMachineIotMac(), localDate, value); + } + return null; + } + + private Tuple2 queryDeviceMonitoringData(Long machineIotMac, + LocalDate localDate, + DeviceTotalData value) throws IOException { + + LocalDateTime startTime = LocalDateTime.of(localDate, LocalTime.MIN); + LocalDateTime endTime = LocalDateTime.of(localDate, LocalTime.MAX); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); + /*searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime") + .gte(startTime.atZone(ZoneOffset.of("+8")).toEpochSecond()) + .lte(endTime.atZone(ZoneOffset.of("+8")).toEpochSecond()));*/ + searchSourceBuilder.sort("reportTime"); + searchSourceBuilder.size(500); + List receivedEventList = new ArrayList<>(); + EsRestClientUtil.queryDeviceListPageResult(searchSourceBuilder, + receivedEventList::add, MachineIotDataReceivedEvent.class, getIndicesList()); + List> tuple3List = statistics(receivedEventList); + + } + + /** + * + * @param receivedEventList + * @return 时长,数量 + */ + private List> statistics(List receivedEventList) { + + Map map = new HashMap<>(); + MachineIotDataReceivedEvent firstEvent; + Integer nextPwrStat; + ArrayList nextWorkingStatList; + boolean isHasWaitingWork = false; + + for (int i = 0; i < receivedEventList.size(); i++) { + + MachineIotDataReceivedEvent receivedEvent = receivedEventList.get(i); + firstEvent = receivedEvent; + Integer machinePwrStat = receivedEvent.getMachinePwrStat(); + Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); + Long reportTime = receivedEvent.getReportTime(); + + if (i == 0) { + Instant instant = Instant.ofEpochMilli(reportTime * 1000); + ZoneId zone = ZoneId.systemDefault(); + LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, zone); + LocalDateTime startTime = LocalDateTime.of(localDateTime.toLocalDate(), LocalTime.MIN); + long l = Duration.between(startTime, localDateTime).get(SECONDS); + map.put("currJobDuration", l); + map.put("currJobCount", receivedEvent.getCurrJobCount()); + nextPwrStat = 1; + } + + } + + } + + private String[] getIndicesList() throws IOException { + + GetAliasesRequest request = new GetAliasesRequest(); + GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request, RequestOptions.DEFAULT); + Map> map = getAliasesResponse.getAliases(); + Set indices = map.keySet(); + List indicesList = new ArrayList<>(); + for (String key : indices) { + if (key.contains(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))) { + indicesList.add(key); + } + } + return ArrayUtil.toArray(indicesList, String.class); + } + + private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac) { + + try { + // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); + searchSourceBuilder.sort("reportTime", SortOrder.DESC); + searchSourceBuilder.size(1); + // 创建查询请求对象,将查询对象配置到其中 + SearchRequest searchRequest = new SearchRequest("DeviceMonitoringData"); + searchRequest.source(searchSourceBuilder); + // 执行查询,然后处理响应结果 + SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + // 根据状态和数据条数验证是否返回了数据 + if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { + SearchHits hits = searchResponse.getHits(); + SearchHit reqHit = hits.getHits()[0]; + return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class); + } + } catch (IOException e) { + log.error("获取es数据异常", e); + } + return null; + } + }).name("machineIotDataReceivedEventDataStream keyBy stream"); + } +} diff --git a/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java b/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java new file mode 100644 index 0000000..001e902 --- /dev/null +++ b/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java @@ -0,0 +1,24 @@ +package com.qniao.iot.gizwits.config; + +import com.ctrip.framework.apollo.Config; +import com.ctrip.framework.apollo.ConfigService; + +public class ApolloConfig { + + private static final Config config = ConfigService.getAppConfig(); + + public static String get(String key, String defaultValue) { + + return config.getProperty(key, defaultValue); + } + + public static String get(String key) { + + return config.getProperty(key, null); + } + + public static Integer getInt(String key) { + + return config.getIntProperty(key, null); + } +} diff --git a/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java b/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java new file mode 100644 index 0000000..470be1d --- /dev/null +++ b/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java @@ -0,0 +1,44 @@ +package com.qniao.iot.gizwits.constant; + +public interface ConfigConstant { + + String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers"; + + String SOURCE_KAFKA_TOPICS = "source.kafka.topics"; + + String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId"; + + String SINK_RABBITMQ_HOST = "sink.rabbitmq.host"; + + String SINK_RABBITMQ_PORT = "sink.rabbitmq.port"; + + String SINK_RABBITMQ_VIRTUAL_HOST = "sink.rabbitmq.virtualHost"; + + String SINK_RABBITMQ_USER_NAME = "sink.rabbitmq.userName"; + + String SINK_RABBITMQ_PASSWORD = "sink.rabbitmq.password"; + + String SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.powerOnMachineCommand.routingKey"; + + String SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.powerOffMachineCommand.routingKey"; + + String SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY = "sink.rabbitmq.stopMachineWorkingCommand.routingKey"; + + String SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY = "sink.rabbitmq.startMachineWorkingCommand.routingKey"; + + String SINK_RABBITMQ_EXCHANGE = "sink.rabbitmq.exchange"; + + String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host"; + + String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post"; + + String SINK_ELASTICSEARCH_USER_NAME = "sink.elasticsearch.userName"; + + String SINK_ELASTICSEARCH_PASSWORD = "sink.elasticsearch.password"; + + String SINK_ELASTICSEARCH_CONNECT_TIMEOUT = "sink.elasticsearch.connectTimeout"; + + String SINK_ELASTICSEARCH_SCHEME = "sink.elasticsearch.scheme"; + + String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index"; +} diff --git a/src/main/java/com/qniao/iot/gizwits/utils/EsRestClientUtil.java b/src/main/java/com/qniao/iot/gizwits/utils/EsRestClientUtil.java new file mode 100644 index 0000000..4c3f963 --- /dev/null +++ b/src/main/java/com/qniao/iot/gizwits/utils/EsRestClientUtil.java @@ -0,0 +1,91 @@ +package com.qniao.iot.gizwits.utils; + +import cn.hutool.json.JSONUtil; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + + +public class EsRestClientUtil { + + private static String host = "120.79.137.137:9200"; + private static String scheme = "http"; + private static String index = "qn_cloud_box_data_history"; + private static RestClientBuilder builder = null; + private static RestHighLevelClient client = null; + + public static void init() { + String[] nodeIpInfos = host.split(":"); + builder = RestClient.builder(new HttpHost(nodeIpInfos[0], Integer.parseInt(nodeIpInfos[1]), scheme)) + .setRequestConfigCallback(requestConfigBuilder -> { + requestConfigBuilder.setConnectTimeout(10 * 60 * 1000); + requestConfigBuilder.setSocketTimeout(10 * 60 * 1000); + requestConfigBuilder.setConnectionRequestTimeout(10 * 60 * 1000); + return requestConfigBuilder; + }); + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "qnol26215")); + builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider)); + client = new RestHighLevelClient(builder); + } + + public static void queryDeviceListPageResult(SearchSourceBuilder sourceBuilder, Consumer consumer, + Class classes, String... indices) { + + SearchRequest searchRequest = new SearchRequest(indices).scroll("5m").source(sourceBuilder); + if (client == null) { + init(); + } + try { + SearchResponse response; + String scrollId = null; + while (!"none".equals(scrollId)) { + if (scrollId != null) { + SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId).scroll("5m"); + response = client.scroll(scrollRequest, RequestOptions.DEFAULT); + } else { + response = client.search(searchRequest, RequestOptions.DEFAULT); + } + + int s = response.status().getStatus(); + if (s == RestStatus.OK.getStatus()) { + SearchHit[] hits = response.getHits().getHits(); + scrollId = response.getScrollId(); + if (hits != null) { + System.out.println("*********************查询es结果数量 :" + hits.length); + for (SearchHit hit : hits) { + consumer.accept(JSONUtil.toBean(hit.getSourceAsString(), classes)); + } + } + } else { + //清除滚屏 + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId); + client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + return tupleList; + } +} \ No newline at end of file diff --git a/src/main/resources/META-INF/app.properties b/src/main/resources/META-INF/app.properties new file mode 100644 index 0000000..94b0185 --- /dev/null +++ b/src/main/resources/META-INF/app.properties @@ -0,0 +1,5 @@ +app.id=iot-gizwits-model-formatter + +# test 8.135.8.221 +# prod 47.112.164.224 +apollo.meta=http://47.112.164.224:5000 \ No newline at end of file diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties new file mode 100644 index 0000000..32c696e --- /dev/null +++ b/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/src/test/java/Demo1.java b/src/test/java/Demo1.java new file mode 100644 index 0000000..8d76d0e --- /dev/null +++ b/src/test/java/Demo1.java @@ -0,0 +1,11 @@ +import java.sql.Date; +import java.time.LocalDate; + +public class Demo1 { + + public static void main(String[] args) { + + LocalDate localDate = new Date(1659088698L * 1000).toLocalDate(); + System.out.println(localDate); + } +}