5 changed files with 4 additions and 468 deletions
Split View
Diff Options
-
2pom.xml
-
5src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java
-
443src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob_bak.java
-
20src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java
-
2src/main/resources/META-INF/app.properties
@ -1,443 +0,0 @@ |
|||
//package com.qniao.iot.gizwits; |
|||
// |
|||
//import cn.hutool.core.bean.BeanUtil; |
|||
//import cn.hutool.core.util.ArrayUtil; |
|||
//import cn.hutool.http.HttpUtil; |
|||
//import cn.hutool.json.JSONArray; |
|||
//import cn.hutool.json.JSONUtil; |
|||
//import com.qniao.iot.gizwits.config.ApolloConfig; |
|||
//import com.qniao.iot.gizwits.constant.ConfigConstant; |
|||
//import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; |
|||
//import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; |
|||
//import lombok.extern.slf4j.Slf4j; |
|||
//import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
|||
//import org.apache.flink.api.common.state.ValueState; |
|||
//import org.apache.flink.api.common.state.ValueStateDescriptor; |
|||
//import org.apache.flink.api.common.typeinfo.TypeInformation; |
|||
//import org.apache.flink.configuration.Configuration; |
|||
//import org.apache.flink.connector.kafka.source.KafkaSource; |
|||
//import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
|||
//import org.apache.flink.streaming.api.CheckpointingMode; |
|||
//import org.apache.flink.streaming.api.datastream.DataStream; |
|||
//import org.apache.flink.streaming.api.datastream.DataStreamSource; |
|||
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|||
//import org.apache.flink.streaming.api.functions.KeyedProcessFunction; |
|||
//import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; |
|||
//import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; |
|||
//import org.apache.flink.util.Collector; |
|||
//import org.apache.http.HttpHost; |
|||
//import org.apache.http.auth.AuthScope; |
|||
//import org.apache.http.auth.UsernamePasswordCredentials; |
|||
//import org.apache.http.client.CredentialsProvider; |
|||
//import org.apache.http.impl.client.BasicCredentialsProvider; |
|||
//import org.apache.kafka.clients.consumer.ConsumerConfig; |
|||
//import org.apache.kafka.clients.consumer.OffsetResetStrategy; |
|||
//import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; |
|||
//import org.elasticsearch.action.index.IndexRequest; |
|||
//import org.elasticsearch.action.search.SearchRequest; |
|||
//import org.elasticsearch.action.search.SearchResponse; |
|||
//import org.elasticsearch.client.*; |
|||
//import org.elasticsearch.cluster.metadata.AliasMetaData; |
|||
//import org.elasticsearch.index.query.QueryBuilders; |
|||
//import org.elasticsearch.rest.RestStatus; |
|||
//import org.elasticsearch.search.SearchHit; |
|||
//import org.elasticsearch.search.SearchHits; |
|||
//import org.elasticsearch.search.builder.SearchSourceBuilder; |
|||
//import org.elasticsearch.search.sort.SortOrder; |
|||
// |
|||
//import java.io.IOException; |
|||
//import java.sql.Date; |
|||
//import java.time.*; |
|||
//import java.time.format.DateTimeFormatter; |
|||
//import java.util.ArrayList; |
|||
//import java.util.List; |
|||
//import java.util.Map; |
|||
//import java.util.Set; |
|||
// |
|||
//import static java.time.temporal.ChronoUnit.SECONDS; |
|||
// |
|||
///** |
|||
// * 备份代码 |
|||
// */ |
|||
//@Slf4j |
|||
//public class IotMonitoringDataJob_bak { |
|||
// |
|||
// private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient |
|||
// .builder(new HttpHost("120.79.137.137", 9200, "http")) |
|||
// .setHttpClientConfigCallback(httpAsyncClientBuilder -> { |
|||
// CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
|||
// credentialsProvider.setCredentials(AuthScope.ANY, |
|||
// new UsernamePasswordCredentials("elastic", "qnol26215")); |
|||
// return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); |
|||
// }) |
|||
// .setRequestConfigCallback(requestConfigBuilder -> { |
|||
// // 设置es连接超时时间 |
|||
// requestConfigBuilder.setConnectTimeout(3000); |
|||
// return requestConfigBuilder; |
|||
// })); |
|||
// |
|||
// public static void main(String[] args) throws Exception { |
|||
// |
|||
// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|||
// env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); |
|||
// // 获取设备数据源 |
|||
// KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder() |
|||
// .setBootstrapServers("120.25.199.30:19092") |
|||
// .setTopics("machine_iot_data_received_event") |
|||
// .setGroupId("123") |
|||
// .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) |
|||
// .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") |
|||
// .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) |
|||
// .build(); |
|||
// |
|||
// // 设备数据源转换 |
|||
// DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env |
|||
// .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); |
|||
// |
|||
// // mac分组并进行工作时长的集合操作 |
|||
// DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = dataStreamSource |
|||
// .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) |
|||
// .process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>() { |
|||
// |
|||
// // 最新的设备数据 |
|||
// private ValueState<DeviceTotalData> deviceTotalDataStat; |
|||
// |
|||
// // 开机数据 |
|||
// private ValueState<DeviceTotalData> onDataState; |
|||
// |
|||
// // 上次的关机数据 |
|||
// private ValueState<MachineIotDataReceivedEvent> lastOffDataState; |
|||
// |
|||
// // 上次的开机数据 |
|||
// private ValueState<MachineIotDataReceivedEvent> lastOnDataState; |
|||
// |
|||
// // 当前周期的待机数据 |
|||
// private ValueState<MachineIotDataReceivedEvent> lastWaitJobDataState; |
|||
// |
|||
// // 上次的状态 |
|||
// private ValueState<Integer> lastWorkingStatState; |
|||
// |
|||
// @Override |
|||
// public void open(Configuration parameters) { |
|||
// |
|||
// // 必须在 open 生命周期初始化 |
|||
// deviceTotalDataStat = getRuntimeContext() |
|||
// .getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class))); |
|||
// |
|||
// onDataState = getRuntimeContext() |
|||
// .getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class))); |
|||
// |
|||
// lastOffDataState = getRuntimeContext() |
|||
// .getState(new ValueStateDescriptor<>("lastOffData", TypeInformation.of(MachineIotDataReceivedEvent.class))); |
|||
// |
|||
// lastOnDataState = getRuntimeContext() |
|||
// .getState(new ValueStateDescriptor<>("lastOnData", TypeInformation.of(MachineIotDataReceivedEvent.class))); |
|||
// |
|||
// lastWaitJobDataState = getRuntimeContext() |
|||
// .getState(new ValueStateDescriptor<>("lastWaitJobData", TypeInformation.of(MachineIotDataReceivedEvent.class))); |
|||
// |
|||
// lastWorkingStatState = getRuntimeContext() |
|||
// .getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class))); |
|||
// } |
|||
// |
|||
// @Override |
|||
// public void processElement(MachineIotDataReceivedEvent receivedEvent, |
|||
// KeyedProcessFunction<Long, MachineIotDataReceivedEvent, DeviceMonitoringData>.Context ctx, |
|||
// Collector<DeviceMonitoringData> out) throws Exception { |
|||
// |
|||
// DeviceTotalData onData = onDataState.value(); |
|||
// MachineIotDataReceivedEvent lastOffData = lastOffDataState.value(); |
|||
// MachineIotDataReceivedEvent lastOnData = lastOnDataState.value(); |
|||
// MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value(); |
|||
// Integer lastWorkingStat = lastWorkingStatState.value(); |
|||
// DeviceTotalData lastedDeviceState = deviceTotalDataStat.value(); |
|||
// Integer machinePwrStat = receivedEvent.getMachinePwrStat(); |
|||
// Integer machineWorkingStat = receivedEvent.getMachineWorkingStat(); |
|||
// lastWorkingStatState.update(machineWorkingStat); |
|||
// Long reportTime = receivedEvent.getReportTime(); |
|||
// if (lastedDeviceState == null) { |
|||
// lastedDeviceState = getDeviceTotalData(receivedEvent); |
|||
// lastOnData = receivedEvent; |
|||
// } |
|||
// if (lastWorkingStat == null) { |
|||
// lastWorkingStat = 0; |
|||
// } |
|||
// if (onData == null) { |
|||
// onData = lastedDeviceState; |
|||
// onDataState.update(onData); |
|||
// } |
|||
// LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); |
|||
// Long a; |
|||
// if (machinePwrStat.equals(0)) { |
|||
// assert lastedDeviceState != null; |
|||
// lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); |
|||
// lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); |
|||
// lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); |
|||
// lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); |
|||
// lastedDeviceState.setCurrLocalDate(localDate); |
|||
// if (lastOnData != null) { |
|||
// lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault())); |
|||
// } else { |
|||
// lastedDeviceState.setLastBootTime(onData.getLastBootTime()); |
|||
// } |
|||
// deviceTotalDataStat.update(lastedDeviceState); |
|||
// // 如果关机 |
|||
// onDataState.update(lastedDeviceState); |
|||
// lastOffDataState.update(receivedEvent); |
|||
// // 关机后将待机数据清除 |
|||
// lastWaitJobDataState.update(null); |
|||
// } else { |
|||
// if (lastOffData != null) { |
|||
// lastOnData = receivedEvent; |
|||
// } |
|||
// assert lastedDeviceState != null; |
|||
// if (machineWorkingStat.equals(1)) { |
|||
// // 工作中 |
|||
// lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount()); |
|||
// lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount()); |
|||
// lastedDeviceState.setCurrLocalDate(localDate); |
|||
// lastedDeviceState.setLastBootTime(onData.getLastBootTime()); |
|||
// if (lastWaitJobData != null) { |
|||
// LocalDateTime localDateTime = LocalDateTime |
|||
// .ofInstant(Instant.ofEpochMilli(reportTime * 1000), |
|||
// ZoneId.systemDefault()); |
|||
// LocalDateTime lastWaitJobTime = LocalDateTime |
|||
// .ofInstant(Instant.ofEpochMilli(reportTime * 1000), |
|||
// ZoneId.systemDefault()); |
|||
// a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS); |
|||
// lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a); |
|||
// lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a); |
|||
// } else { |
|||
// if(receivedEvent.getDataSource() == 0) { |
|||
// // 机智云 |
|||
// lastedDeviceState.setTheDayJobDuration(receivedEvent.getCurrJobDuration()); |
|||
// }else { |
|||
// // 树根 |
|||
// lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration()); |
|||
// } |
|||
// lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration()); |
|||
// } |
|||
// deviceTotalDataStat.update(lastedDeviceState); |
|||
// } |
|||
// if (machineWorkingStat.equals(2)) { |
|||
// // 待机 |
|||
// lastWaitJobDataState.update(receivedEvent); |
|||
// } |
|||
// } |
|||
// if (lastWorkingStat != 1) { |
|||
// DeviceMonitoringData data = new DeviceMonitoringData(); |
|||
// data.setDataSource(receivedEvent.getDataSource()); |
|||
// data.setMachineIotMac(receivedEvent.getMachineIotMac()); |
|||
// data.setMachinePwrStat(receivedEvent.getMachinePwrStat()); |
|||
// data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat()); |
|||
// data.setAccJobCount(lastedDeviceState.getJobTotal()); |
|||
// data.setCurrJobCount(lastedDeviceState.getTheDayJobCount()); |
|||
// data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration()); |
|||
// data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal()); |
|||
// data.setReportTime(reportTime); |
|||
// if (lastOnData == null) { |
|||
// data.setLastBootTime(reportTime * 1000); |
|||
// } else { |
|||
// data.setLastBootTime(lastOnData.getReportTime() * 1000); |
|||
// } |
|||
// out.collect(data); |
|||
// } |
|||
// } |
|||
// |
|||
// private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception { |
|||
// |
|||
// DeviceTotalData value = deviceTotalDataStat.value(); |
|||
// Long reportTime = event.getReportTime(); |
|||
// LocalDate localDate = new Date(reportTime * 1000).toLocalDate(); |
|||
// if (value == null) { |
|||
// // 从es中获取 |
|||
// DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), null); |
|||
// DeviceTotalData data = new DeviceTotalData(); |
|||
// if (deviceMonitoringData != null) { |
|||
// data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); |
|||
// data.setJobTotal(deviceMonitoringData.getAccJobCount()); |
|||
// // 单位秒 |
|||
// data.setCurrLocalDate(localDate); |
|||
// data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); |
|||
// data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); |
|||
// LocalDateTime ldt = new Date(deviceMonitoringData.getLastBootTime()) |
|||
// .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime(); |
|||
// data.setLastBootTime(ldt); |
|||
// } else { |
|||
// // es中也没有,从“qn_cloud_box_data”索引中拿 |
|||
// data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000); |
|||
// } |
|||
// deviceTotalDataStat.update(data); |
|||
// } |
|||
// // 是否日期是当天的,否则需要更新当天工作时长和当天工作量 |
|||
// if (value.getCurrLocalDate().isBefore(localDate)) { |
|||
// // 先从es中拿昨天最新的 |
|||
// DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), |
|||
// LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond()); |
|||
// if (deviceMonitoringData != null) { |
|||
// DeviceTotalData data = new DeviceTotalData(); |
|||
// data.setJobTotal(deviceMonitoringData.getAccJobCount()); |
|||
// data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration()); |
|||
// data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration()); |
|||
// data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount()); |
|||
// data.setCurrLocalDate(localDate); |
|||
// data.setLastBootTime(LocalDateTime.ofInstant(Instant |
|||
// .ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault())); |
|||
// deviceTotalDataStat.update(data); |
|||
// } else { |
|||
// // value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可 |
|||
// value.setTheDayJobDuration(0L); |
|||
// value.setTheDayJobCount(0L); |
|||
// } |
|||
// } |
|||
// return null; |
|||
// } |
|||
// |
|||
// private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) { |
|||
// |
|||
// DeviceTotalData deviceTotalData = null; |
|||
// |
|||
// // 通过http去请求之前的接口拿数据 |
|||
// for (int i = 1; i <= 20; i++) { |
|||
// String result = HttpUtil |
|||
// .get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D"); |
|||
// Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data"); |
|||
// if(data == null) { |
|||
// break; |
|||
// } |
|||
// Object records = JSONUtil.getByPath(JSONUtil.parse(result), "records"); |
|||
// if(records == null) { |
|||
// break; |
|||
// } |
|||
// JSONArray objects = JSONUtil.parseArray(records); |
|||
// if(objects.isEmpty()) { |
|||
// break; |
|||
// } |
|||
// for (Object o : objects.toArray()) { |
|||
// Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac"); |
|||
// Long iotMac = (Long)mac; |
|||
// if(iotMac.equals(machineIotMac)) { |
|||
// deviceTotalData = new DeviceTotalData(); |
|||
// Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal"); |
|||
// deviceTotalData.setJobTotal((Long)productionTotal); |
|||
// Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal"); |
|||
// deviceTotalData.setJobDurationTotal(((Long)workTotalTotal) * 3600); |
|||
// Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime"); |
|||
// LocalDateTime lastBootTime = LocalDateTime |
|||
// .parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:hh:ss")); |
|||
// deviceTotalData.setLastBootTime(lastBootTime); |
|||
// deviceTotalData.setTheDayJobDuration(0L); |
|||
// deviceTotalData.setTheDayJobCount(0L); |
|||
// break; |
|||
// } |
|||
// } |
|||
// } |
|||
// if(deviceTotalData == null) { |
|||
// deviceTotalData = new DeviceTotalData(); |
|||
// deviceTotalData.setJobTotal(0L); |
|||
// deviceTotalData.setJobDurationTotal(0L); |
|||
// deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault())); |
|||
// deviceTotalData.setTheDayJobDuration(0L); |
|||
// deviceTotalData.setTheDayJobCount(0L); |
|||
// } |
|||
// return deviceTotalData; |
|||
// } |
|||
// |
|||
// |
|||
// |
|||
// private String[] getIndicesList() throws IOException { |
|||
// |
|||
// GetAliasesRequest request = new GetAliasesRequest(); |
|||
// GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request, RequestOptions.DEFAULT); |
|||
// Map<String, Set<AliasMetaData>> map = getAliasesResponse.getAliases(); |
|||
// Set<String> indices = map.keySet(); |
|||
// List<String> indicesList = new ArrayList<>(); |
|||
// for (String key : indices) { |
|||
// if (key.contains(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))) { |
|||
// indicesList.add(key); |
|||
// } |
|||
// } |
|||
// return ArrayUtil.toArray(indicesList, String.class); |
|||
// } |
|||
// |
|||
// private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac, Long reportTime) { |
|||
// |
|||
// try { |
|||
// // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) |
|||
// SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); |
|||
// searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); |
|||
// if (reportTime != null) { |
|||
// searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime").lt(reportTime)); |
|||
// } |
|||
// searchSourceBuilder.sort("reportTime", SortOrder.DESC); |
|||
// searchSourceBuilder.size(1); |
|||
// // 创建查询请求对象,将查询对象配置到其中 |
|||
// SearchRequest searchRequest = new SearchRequest("DeviceMonitoringData"); |
|||
// searchRequest.source(searchSourceBuilder); |
|||
// // 执行查询,然后处理响应结果 |
|||
// SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); |
|||
// // 根据状态和数据条数验证是否返回了数据 |
|||
// if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { |
|||
// SearchHits hits = searchResponse.getHits(); |
|||
// SearchHit reqHit = hits.getHits()[0]; |
|||
// return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class); |
|||
// } |
|||
// } catch (IOException e) { |
|||
// log.error("获取es数据异常", e); |
|||
// } |
|||
// return null; |
|||
// } |
|||
// }).name("machineIotDataReceivedEventDataStream keyBy stream"); |
|||
// |
|||
// // 写入es |
|||
// sinkEs(machineIotDataReceivedEventDataStream); |
|||
// |
|||
// env.execute("iot_monitoring_data_job"); |
|||
// } |
|||
// |
|||
// private static void sinkEs(DataStream<DeviceMonitoringData> dataStream) { |
|||
// |
|||
// List<HttpHost> httpHosts = new ArrayList<>(); |
|||
// httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST), |
|||
// ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), |
|||
// ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); |
|||
// ElasticsearchSink.Builder<DeviceMonitoringData> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, |
|||
// (ElasticsearchSinkFunction<DeviceMonitoringData>) (deviceMonitoringData, runtimeContext, requestIndexer) -> { |
|||
// |
|||
// LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime()) |
|||
// .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); |
|||
// String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); |
|||
// //创建es 请求 |
|||
// IndexRequest indexRequest = Requests.indexRequest() |
|||
// .index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix) |
|||
// .source(BeanUtil.beanToMap(deviceMonitoringData)); |
|||
// requestIndexer.add(indexRequest); |
|||
// } |
|||
// ); |
|||
// //刷新前缓冲的最大动作量 |
|||
// esSinkBuilder.setBulkFlushMaxActions(10); |
|||
// //刷新前缓冲区的最大数据大小(以MB为单位) |
|||
// esSinkBuilder.setBulkFlushMaxSizeMb(5); |
|||
// //无论缓冲操作的数量或大小如何,都要刷新的时间间隔 |
|||
// esSinkBuilder.setBulkFlushInterval(5000L); |
|||
// // 客户端创建配置回调,配置账号密码 |
|||
// esSinkBuilder.setRestClientFactory( |
|||
// restClientBuilder -> { |
|||
// restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { |
|||
// CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
|||
// credentialsProvider.setCredentials(AuthScope.ANY, |
|||
// new UsernamePasswordCredentials(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), |
|||
// ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); |
|||
// return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); |
|||
// }); |
|||
// restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { |
|||
// // 设置es连接超时时间 |
|||
// requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT)); |
|||
// return requestConfigBuilder; |
|||
// }); |
|||
// } |
|||
// ); |
|||
// //数据流添加sink |
|||
// dataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink"); |
|||
// } |
|||
//} |
|||
@ -1,4 +1,4 @@ |
|||
app.id=iot-gizwits-model-formatter |
|||
app.id=iot-device-monitoring-data |
|||
|
|||
# test 8.135.8.221 |
|||
# prod 47.112.164.224 |
|||
Write
Preview
Loading…
Cancel
Save