diff --git a/pom.xml b/pom.xml
index 900a0f6..8230e4e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
org.example
- iot-gizwits-monitoring-data
+ iot-device-monitoring-data
1.0-SNAPSHOT
diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java
index fe26d0a..5305e53 100644
--- a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java
+++ b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java
@@ -34,8 +34,6 @@ import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.elasticsearch.action.AliasesRequest;
-import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
@@ -481,7 +479,8 @@ public class IotMonitoringDataJob {
});
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
- requestConfigBuilder.setConnectTimeout(30000);
+ requestConfigBuilder
+ .setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
return requestConfigBuilder;
});
}
diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob_bak.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob_bak.java
deleted file mode 100644
index 9c31ad4..0000000
--- a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob_bak.java
+++ /dev/null
@@ -1,443 +0,0 @@
-//package com.qniao.iot.gizwits;
-//
-//import cn.hutool.core.bean.BeanUtil;
-//import cn.hutool.core.util.ArrayUtil;
-//import cn.hutool.http.HttpUtil;
-//import cn.hutool.json.JSONArray;
-//import cn.hutool.json.JSONUtil;
-//import com.qniao.iot.gizwits.config.ApolloConfig;
-//import com.qniao.iot.gizwits.constant.ConfigConstant;
-//import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
-//import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
-//import lombok.extern.slf4j.Slf4j;
-//import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-//import org.apache.flink.api.common.state.ValueState;
-//import org.apache.flink.api.common.state.ValueStateDescriptor;
-//import org.apache.flink.api.common.typeinfo.TypeInformation;
-//import org.apache.flink.configuration.Configuration;
-//import org.apache.flink.connector.kafka.source.KafkaSource;
-//import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
-//import org.apache.flink.streaming.api.CheckpointingMode;
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.api.datastream.DataStreamSource;
-//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-//import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-//import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
-//import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
-//import org.apache.flink.util.Collector;
-//import org.apache.http.HttpHost;
-//import org.apache.http.auth.AuthScope;
-//import org.apache.http.auth.UsernamePasswordCredentials;
-//import org.apache.http.client.CredentialsProvider;
-//import org.apache.http.impl.client.BasicCredentialsProvider;
-//import org.apache.kafka.clients.consumer.ConsumerConfig;
-//import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-//import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
-//import org.elasticsearch.action.index.IndexRequest;
-//import org.elasticsearch.action.search.SearchRequest;
-//import org.elasticsearch.action.search.SearchResponse;
-//import org.elasticsearch.client.*;
-//import org.elasticsearch.cluster.metadata.AliasMetaData;
-//import org.elasticsearch.index.query.QueryBuilders;
-//import org.elasticsearch.rest.RestStatus;
-//import org.elasticsearch.search.SearchHit;
-//import org.elasticsearch.search.SearchHits;
-//import org.elasticsearch.search.builder.SearchSourceBuilder;
-//import org.elasticsearch.search.sort.SortOrder;
-//
-//import java.io.IOException;
-//import java.sql.Date;
-//import java.time.*;
-//import java.time.format.DateTimeFormatter;
-//import java.util.ArrayList;
-//import java.util.List;
-//import java.util.Map;
-//import java.util.Set;
-//
-//import static java.time.temporal.ChronoUnit.SECONDS;
-//
-///**
-// * 备份代码
-// */
-//@Slf4j
-//public class IotMonitoringDataJob_bak {
-//
-// private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient
-// .builder(new HttpHost("120.79.137.137", 9200, "http"))
-// .setHttpClientConfigCallback(httpAsyncClientBuilder -> {
-// CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-// credentialsProvider.setCredentials(AuthScope.ANY,
-// new UsernamePasswordCredentials("elastic", "qnol26215"));
-// return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-// })
-// .setRequestConfigCallback(requestConfigBuilder -> {
-// // 设置es连接超时时间
-// requestConfigBuilder.setConnectTimeout(3000);
-// return requestConfigBuilder;
-// }));
-//
-// public static void main(String[] args) throws Exception {
-//
-// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-// env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
-// // 获取设备数据源
-// KafkaSource source = KafkaSource.builder()
-// .setBootstrapServers("120.25.199.30:19092")
-// .setTopics("machine_iot_data_received_event")
-// .setGroupId("123")
-// .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
-// .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
-// .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
-// .build();
-//
-// // 设备数据源转换
-// DataStreamSource dataStreamSource = env
-// .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
-//
-// // mac分组并进行工作时长的集合操作
-// DataStream machineIotDataReceivedEventDataStream = dataStreamSource
-// .keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
-// .process(new KeyedProcessFunction() {
-//
-// // 最新的设备数据
-// private ValueState deviceTotalDataStat;
-//
-// // 开机数据
-// private ValueState onDataState;
-//
-// // 上次的关机数据
-// private ValueState lastOffDataState;
-//
-// // 上次的开机数据
-// private ValueState lastOnDataState;
-//
-// // 当前周期的待机数据
-// private ValueState lastWaitJobDataState;
-//
-// // 上次的状态
-// private ValueState lastWorkingStatState;
-//
-// @Override
-// public void open(Configuration parameters) {
-//
-// // 必须在 open 生命周期初始化
-// deviceTotalDataStat = getRuntimeContext()
-// .getState(new ValueStateDescriptor<>("accJobCountDuration", TypeInformation.of(DeviceTotalData.class)));
-//
-// onDataState = getRuntimeContext()
-// .getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class)));
-//
-// lastOffDataState = getRuntimeContext()
-// .getState(new ValueStateDescriptor<>("lastOffData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
-//
-// lastOnDataState = getRuntimeContext()
-// .getState(new ValueStateDescriptor<>("lastOnData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
-//
-// lastWaitJobDataState = getRuntimeContext()
-// .getState(new ValueStateDescriptor<>("lastWaitJobData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
-//
-// lastWorkingStatState = getRuntimeContext()
-// .getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class)));
-// }
-//
-// @Override
-// public void processElement(MachineIotDataReceivedEvent receivedEvent,
-// KeyedProcessFunction.Context ctx,
-// Collector out) throws Exception {
-//
-// DeviceTotalData onData = onDataState.value();
-// MachineIotDataReceivedEvent lastOffData = lastOffDataState.value();
-// MachineIotDataReceivedEvent lastOnData = lastOnDataState.value();
-// MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value();
-// Integer lastWorkingStat = lastWorkingStatState.value();
-// DeviceTotalData lastedDeviceState = deviceTotalDataStat.value();
-// Integer machinePwrStat = receivedEvent.getMachinePwrStat();
-// Integer machineWorkingStat = receivedEvent.getMachineWorkingStat();
-// lastWorkingStatState.update(machineWorkingStat);
-// Long reportTime = receivedEvent.getReportTime();
-// if (lastedDeviceState == null) {
-// lastedDeviceState = getDeviceTotalData(receivedEvent);
-// lastOnData = receivedEvent;
-// }
-// if (lastWorkingStat == null) {
-// lastWorkingStat = 0;
-// }
-// if (onData == null) {
-// onData = lastedDeviceState;
-// onDataState.update(onData);
-// }
-// LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
-// Long a;
-// if (machinePwrStat.equals(0)) {
-// assert lastedDeviceState != null;
-// lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
-// lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
-// lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
-// lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
-// lastedDeviceState.setCurrLocalDate(localDate);
-// if (lastOnData != null) {
-// lastedDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(lastOnData.getReportTime() * 1000), ZoneId.systemDefault()));
-// } else {
-// lastedDeviceState.setLastBootTime(onData.getLastBootTime());
-// }
-// deviceTotalDataStat.update(lastedDeviceState);
-// // 如果关机
-// onDataState.update(lastedDeviceState);
-// lastOffDataState.update(receivedEvent);
-// // 关机后将待机数据清除
-// lastWaitJobDataState.update(null);
-// } else {
-// if (lastOffData != null) {
-// lastOnData = receivedEvent;
-// }
-// assert lastedDeviceState != null;
-// if (machineWorkingStat.equals(1)) {
-// // 工作中
-// lastedDeviceState.setTheDayJobCount(onData.getTheDayJobCount() + receivedEvent.getCurrJobCount());
-// lastedDeviceState.setJobTotal(onData.getJobTotal() + receivedEvent.getCurrJobCount());
-// lastedDeviceState.setCurrLocalDate(localDate);
-// lastedDeviceState.setLastBootTime(onData.getLastBootTime());
-// if (lastWaitJobData != null) {
-// LocalDateTime localDateTime = LocalDateTime
-// .ofInstant(Instant.ofEpochMilli(reportTime * 1000),
-// ZoneId.systemDefault());
-// LocalDateTime lastWaitJobTime = LocalDateTime
-// .ofInstant(Instant.ofEpochMilli(reportTime * 1000),
-// ZoneId.systemDefault());
-// a = Duration.between(lastWaitJobTime, localDateTime).get(SECONDS);
-// lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + a);
-// lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + a);
-// } else {
-// if(receivedEvent.getDataSource() == 0) {
-// // 机智云
-// lastedDeviceState.setTheDayJobDuration(receivedEvent.getCurrJobDuration());
-// }else {
-// // 树根
-// lastedDeviceState.setTheDayJobDuration(onData.getTheDayJobDuration() + receivedEvent.getCurrJobDuration());
-// }
-// lastedDeviceState.setJobDurationTotal(onData.getJobDurationTotal() + receivedEvent.getCurrJobDuration());
-// }
-// deviceTotalDataStat.update(lastedDeviceState);
-// }
-// if (machineWorkingStat.equals(2)) {
-// // 待机
-// lastWaitJobDataState.update(receivedEvent);
-// }
-// }
-// if (lastWorkingStat != 1) {
-// DeviceMonitoringData data = new DeviceMonitoringData();
-// data.setDataSource(receivedEvent.getDataSource());
-// data.setMachineIotMac(receivedEvent.getMachineIotMac());
-// data.setMachinePwrStat(receivedEvent.getMachinePwrStat());
-// data.setMachineWorkingStat(receivedEvent.getMachineWorkingStat());
-// data.setAccJobCount(lastedDeviceState.getJobTotal());
-// data.setCurrJobCount(lastedDeviceState.getTheDayJobCount());
-// data.setCurrJobDuration(lastedDeviceState.getTheDayJobDuration());
-// data.setAccJobCountDuration(lastedDeviceState.getJobDurationTotal());
-// data.setReportTime(reportTime);
-// if (lastOnData == null) {
-// data.setLastBootTime(reportTime * 1000);
-// } else {
-// data.setLastBootTime(lastOnData.getReportTime() * 1000);
-// }
-// out.collect(data);
-// }
-// }
-//
-// private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception {
-//
-// DeviceTotalData value = deviceTotalDataStat.value();
-// Long reportTime = event.getReportTime();
-// LocalDate localDate = new Date(reportTime * 1000).toLocalDate();
-// if (value == null) {
-// // 从es中获取
-// DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(), null);
-// DeviceTotalData data = new DeviceTotalData();
-// if (deviceMonitoringData != null) {
-// data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
-// data.setJobTotal(deviceMonitoringData.getAccJobCount());
-// // 单位秒
-// data.setCurrLocalDate(localDate);
-// data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
-// data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
-// LocalDateTime ldt = new Date(deviceMonitoringData.getLastBootTime())
-// .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDateTime();
-// data.setLastBootTime(ldt);
-// } else {
-// // es中也没有,从“qn_cloud_box_data”索引中拿
-// data = queryDeviceMonitoringData(event.getMachineIotMac(), reportTime * 1000);
-// }
-// deviceTotalDataStat.update(data);
-// }
-// // 是否日期是当天的,否则需要更新当天工作时长和当天工作量
-// if (value.getCurrLocalDate().isBefore(localDate)) {
-// // 先从es中拿昨天最新的
-// DeviceMonitoringData deviceMonitoringData = queryLatestDeviceMonitoringData(event.getMachineIotMac(),
-// LocalDateTime.of(localDate, LocalTime.MIN).atZone(ZoneOffset.of("+8")).toEpochSecond());
-// if (deviceMonitoringData != null) {
-// DeviceTotalData data = new DeviceTotalData();
-// data.setJobTotal(deviceMonitoringData.getAccJobCount());
-// data.setJobDurationTotal(deviceMonitoringData.getAccJobCountDuration());
-// data.setTheDayJobDuration(deviceMonitoringData.getCurrJobDuration());
-// data.setTheDayJobCount(deviceMonitoringData.getCurrJobCount());
-// data.setCurrLocalDate(localDate);
-// data.setLastBootTime(LocalDateTime.ofInstant(Instant
-// .ofEpochMilli(deviceMonitoringData.getLastBootTime()), ZoneId.systemDefault()));
-// deviceTotalDataStat.update(data);
-// } else {
-// // value有值,但是日期不对,说明到了第二天,那么,只需要对当天数据清零即可
-// value.setTheDayJobDuration(0L);
-// value.setTheDayJobCount(0L);
-// }
-// }
-// return null;
-// }
-//
-// private DeviceTotalData queryDeviceMonitoringData(Long machineIotMac, Long reportTime) {
-//
-// DeviceTotalData deviceTotalData = null;
-//
-// // 通过http去请求之前的接口拿数据
-// for (int i = 1; i <= 20; i++) {
-// String result = HttpUtil
-// .get("http://api-ops-yyt.qniao.cn/show-billboard/get/billboard/module/data-script?id=3&metaData=%7B%22pageNum%22:" + i + "%7D");
-// Object data = JSONUtil.getByPath(JSONUtil.parse(result), "data");
-// if(data == null) {
-// break;
-// }
-// Object records = JSONUtil.getByPath(JSONUtil.parse(result), "records");
-// if(records == null) {
-// break;
-// }
-// JSONArray objects = JSONUtil.parseArray(records);
-// if(objects.isEmpty()) {
-// break;
-// }
-// for (Object o : objects.toArray()) {
-// Object mac = JSONUtil.getByPath(JSONUtil.parse(o), "mac");
-// Long iotMac = (Long)mac;
-// if(iotMac.equals(machineIotMac)) {
-// deviceTotalData = new DeviceTotalData();
-// Object productionTotal = JSONUtil.getByPath(JSONUtil.parse(o), "productionTotal");
-// deviceTotalData.setJobTotal((Long)productionTotal);
-// Object workTotalTotal = JSONUtil.getByPath(JSONUtil.parse(o), "workTotalTotal");
-// deviceTotalData.setJobDurationTotal(((Long)workTotalTotal) * 3600);
-// Object startingUpTime = JSONUtil.getByPath(JSONUtil.parse(o), "startingUpTime");
-// LocalDateTime lastBootTime = LocalDateTime
-// .parse((String) startingUpTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:hh:ss"));
-// deviceTotalData.setLastBootTime(lastBootTime);
-// deviceTotalData.setTheDayJobDuration(0L);
-// deviceTotalData.setTheDayJobCount(0L);
-// break;
-// }
-// }
-// }
-// if(deviceTotalData == null) {
-// deviceTotalData = new DeviceTotalData();
-// deviceTotalData.setJobTotal(0L);
-// deviceTotalData.setJobDurationTotal(0L);
-// deviceTotalData.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(reportTime * 1000), ZoneId.systemDefault()));
-// deviceTotalData.setTheDayJobDuration(0L);
-// deviceTotalData.setTheDayJobCount(0L);
-// }
-// return deviceTotalData;
-// }
-//
-//
-//
-// private String[] getIndicesList() throws IOException {
-//
-// GetAliasesRequest request = new GetAliasesRequest();
-// GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request, RequestOptions.DEFAULT);
-// Map> map = getAliasesResponse.getAliases();
-// Set indices = map.keySet();
-// List indicesList = new ArrayList<>();
-// for (String key : indices) {
-// if (key.contains(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))) {
-// indicesList.add(key);
-// }
-// }
-// return ArrayUtil.toArray(indicesList, String.class);
-// }
-//
-// private DeviceMonitoringData queryLatestDeviceMonitoringData(Long machineIotMac, Long reportTime) {
-//
-// try {
-// // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询)
-// SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-// searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
-// if (reportTime != null) {
-// searchSourceBuilder.query(QueryBuilders.rangeQuery("reportTime").lt(reportTime));
-// }
-// searchSourceBuilder.sort("reportTime", SortOrder.DESC);
-// searchSourceBuilder.size(1);
-// // 创建查询请求对象,将查询对象配置到其中
-// SearchRequest searchRequest = new SearchRequest("DeviceMonitoringData");
-// searchRequest.source(searchSourceBuilder);
-// // 执行查询,然后处理响应结果
-// SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
-// // 根据状态和数据条数验证是否返回了数据
-// if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
-// SearchHits hits = searchResponse.getHits();
-// SearchHit reqHit = hits.getHits()[0];
-// return JSONUtil.toBean(reqHit.getSourceAsString(), DeviceMonitoringData.class);
-// }
-// } catch (IOException e) {
-// log.error("获取es数据异常", e);
-// }
-// return null;
-// }
-// }).name("machineIotDataReceivedEventDataStream keyBy stream");
-//
-// // 写入es
-// sinkEs(machineIotDataReceivedEventDataStream);
-//
-// env.execute("iot_monitoring_data_job");
-// }
-//
-// private static void sinkEs(DataStream dataStream) {
-//
-// List httpHosts = new ArrayList<>();
-// httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST),
-// ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
-// ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
-// ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
-// (ElasticsearchSinkFunction) (deviceMonitoringData, runtimeContext, requestIndexer) -> {
-//
-// LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime())
-// .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
-// String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
-// //创建es 请求
-// IndexRequest indexRequest = Requests.indexRequest()
-// .index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix)
-// .source(BeanUtil.beanToMap(deviceMonitoringData));
-// requestIndexer.add(indexRequest);
-// }
-// );
-// //刷新前缓冲的最大动作量
-// esSinkBuilder.setBulkFlushMaxActions(10);
-// //刷新前缓冲区的最大数据大小(以MB为单位)
-// esSinkBuilder.setBulkFlushMaxSizeMb(5);
-// //无论缓冲操作的数量或大小如何,都要刷新的时间间隔
-// esSinkBuilder.setBulkFlushInterval(5000L);
-// // 客户端创建配置回调,配置账号密码
-// esSinkBuilder.setRestClientFactory(
-// restClientBuilder -> {
-// restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
-// CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-// credentialsProvider.setCredentials(AuthScope.ANY,
-// new UsernamePasswordCredentials(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
-// ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
-// return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-// });
-// restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
-// // 设置es连接超时时间
-// requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
-// return requestConfigBuilder;
-// });
-// }
-// );
-// //数据流添加sink
-// dataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink");
-// }
-//}
diff --git a/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java b/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java
index 252ea91..2605119 100644
--- a/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java
+++ b/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java
@@ -8,26 +8,6 @@ public interface ConfigConstant {
String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId";
- String SINK_RABBITMQ_HOST = "sink.rabbitmq.host";
-
- String SINK_RABBITMQ_PORT = "sink.rabbitmq.port";
-
- String SINK_RABBITMQ_VIRTUAL_HOST = "sink.rabbitmq.virtualHost";
-
- String SINK_RABBITMQ_USER_NAME = "sink.rabbitmq.userName";
-
- String SINK_RABBITMQ_PASSWORD = "sink.rabbitmq.password";
-
- String SINK_RABBITMQ_POWER_ON_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.powerOnMachineCommand.routingKey";
-
- String SINK_RABBITMQ_POWER_OFF_MACHINE_COMMAND_ROUTING_KEY = "sink.rabbitmq.powerOffMachineCommand.routingKey";
-
- String SINK_RABBITMQ_STOP_MACHINE_WORKING_COMMAND_ROUTING_KEY = "sink.rabbitmq.stopMachineWorkingCommand.routingKey";
-
- String SINK_RABBITMQ_START_MACHINE_WORKING_COMMAND_ROUTING_KEY = "sink.rabbitmq.startMachineWorkingCommand.routingKey";
-
- String SINK_RABBITMQ_EXCHANGE = "sink.rabbitmq.exchange";
-
String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host";
String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post";
diff --git a/src/main/resources/META-INF/app.properties b/src/main/resources/META-INF/app.properties
index 94b0185..d10864b 100644
--- a/src/main/resources/META-INF/app.properties
+++ b/src/main/resources/META-INF/app.properties
@@ -1,4 +1,4 @@
-app.id=iot-gizwits-model-formatter
+app.id=iot-device-monitoring-data
# test 8.135.8.221
# prod 47.112.164.224