|
|
|
@ -1,5 +1,6 @@ |
|
|
|
package com.qniao.iot.device.power; |
|
|
|
|
|
|
|
import cn.hutool.core.bean.BeanUtil; |
|
|
|
import cn.hutool.json.JSONUtil; |
|
|
|
import com.qniao.iot.device.power.config.ApolloConfig; |
|
|
|
import com.qniao.iot.device.power.constant.ConfigConstant; |
|
|
|
@ -17,6 +18,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; |
|
|
|
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; |
|
|
|
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
|
|
|
import org.apache.flink.streaming.api.functions.KeyedProcessFunction; |
|
|
|
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; |
|
|
|
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; |
|
|
|
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; |
|
|
|
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; |
|
|
|
import org.apache.flink.util.Collector; |
|
|
|
@ -26,6 +29,7 @@ import org.apache.http.auth.UsernamePasswordCredentials; |
|
|
|
import org.apache.http.client.CredentialsProvider; |
|
|
|
import org.apache.http.impl.client.BasicCredentialsProvider; |
|
|
|
import org.elasticsearch.action.admin.indices.alias.Alias; |
|
|
|
import org.elasticsearch.action.index.IndexRequest; |
|
|
|
import org.elasticsearch.action.search.SearchRequest; |
|
|
|
import org.elasticsearch.action.search.SearchResponse; |
|
|
|
import org.elasticsearch.client.*; |
|
|
|
@ -44,7 +48,10 @@ import java.io.IOException; |
|
|
|
import java.time.LocalDate; |
|
|
|
import java.time.LocalDateTime; |
|
|
|
import java.time.ZoneId; |
|
|
|
import java.time.ZoneOffset; |
|
|
|
import java.time.format.DateTimeFormatter; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.List; |
|
|
|
|
|
|
|
@Slf4j |
|
|
|
public class IotDevicePowerOnAndOffDataJob { |
|
|
|
@ -53,14 +60,14 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
|
ApolloConfig.getLong(ConfigConstant.SNOW_FLAKE_MACHINE_ID)); |
|
|
|
|
|
|
|
private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient |
|
|
|
.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME), |
|
|
|
ApolloConfig.getInt(ConfigConstant.ES_POST), |
|
|
|
ApolloConfig.getStr(ConfigConstant.ES_SCHEME))) |
|
|
|
.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST), |
|
|
|
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), |
|
|
|
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))) |
|
|
|
.setHttpClientConfigCallback(httpAsyncClientBuilder -> { |
|
|
|
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
|
|
|
credentialsProvider.setCredentials(AuthScope.ANY, |
|
|
|
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME), |
|
|
|
ApolloConfig.getStr(ConfigConstant.ES_PASSWORD))); |
|
|
|
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), |
|
|
|
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); |
|
|
|
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); |
|
|
|
}) |
|
|
|
.setRequestConfigCallback(requestConfigBuilder -> { |
|
|
|
@ -133,18 +140,19 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
|
out.collect(powerOnAndOffDataEvent); |
|
|
|
} |
|
|
|
}else { |
|
|
|
// 上次的状态只有两种,要么是开机时间不为空,要么是开机和关机时间都不为空,否则不处理 |
|
|
|
// 上次的状态只有两种,要么只有开机时间不为空,要么是开机和关机时间都不为空,否则不处理 |
|
|
|
if(lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { |
|
|
|
if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { |
|
|
|
// 只有开机时间不为空 |
|
|
|
if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); |
|
|
|
}else { |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime()); |
|
|
|
} |
|
|
|
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); |
|
|
|
out.collect(powerOnAndOffDataEvent); |
|
|
|
} else { |
|
|
|
// 说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 |
|
|
|
// // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 |
|
|
|
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { |
|
|
|
// 开机 |
|
|
|
powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); |
|
|
|
@ -220,18 +228,19 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
|
} |
|
|
|
}).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); |
|
|
|
|
|
|
|
sinkEs(streamOperator); |
|
|
|
|
|
|
|
env.execute("device_monitoring_data"); |
|
|
|
} |
|
|
|
|
|
|
|
/*private static void sinkEs(DataStream<DeviceMonitoringData> dataStream) { |
|
|
|
private static void sinkEs(SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> dataStream) { |
|
|
|
|
|
|
|
List<HttpHost> httpHosts = new ArrayList<>(); |
|
|
|
httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST), |
|
|
|
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), |
|
|
|
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); |
|
|
|
ElasticsearchSink.Builder<DeviceMonitoringData> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, |
|
|
|
(ElasticsearchSinkFunction<DeviceMonitoringData>) (deviceMonitoringData, runtimeContext, requestIndexer) -> { |
|
|
|
ElasticsearchSink.Builder<IotDevicePowerOnAndOffDataEvent> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, |
|
|
|
(ElasticsearchSinkFunction<IotDevicePowerOnAndOffDataEvent>) (deviceMonitoringData, runtimeContext, requestIndexer) -> { |
|
|
|
|
|
|
|
LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime()) |
|
|
|
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); |
|
|
|
@ -273,7 +282,7 @@ public class IotDevicePowerOnAndOffDataJob { |
|
|
|
); |
|
|
|
//数据流添加sink |
|
|
|
dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink"); |
|
|
|
}*/ |
|
|
|
} |
|
|
|
|
|
|
|
private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) { |
|
|
|
|
|
|
|
|