diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8452323 --- /dev/null +++ b/.gitignore @@ -0,0 +1,39 @@ +### Java template +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +### Maven template +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +# https://github.com/takari/maven-wrapper#usage-without-binary-jar +.mvn/wrapper/maven-wrapper.jar + +/.idea diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java index c12af7f..af8df5e 100644 --- a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java +++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java @@ -1,5 +1,6 @@ package com.qniao.iot.device.power; +import cn.hutool.core.bean.BeanUtil; import cn.hutool.json.JSONUtil; import com.qniao.iot.device.power.config.ApolloConfig; import com.qniao.iot.device.power.constant.ConfigConstant; @@ -17,6 +18,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.util.Collector; @@ -26,6 +29,7 @@ import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.*; @@ -44,7 +48,10 @@ import java.io.IOException; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; @Slf4j public class IotDevicePowerOnAndOffDataJob { @@ -53,14 +60,14 @@ public class IotDevicePowerOnAndOffDataJob { ApolloConfig.getLong(ConfigConstant.SNOW_FLAKE_MACHINE_ID)); private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient - .builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME), - ApolloConfig.getInt(ConfigConstant.ES_POST), - ApolloConfig.getStr(ConfigConstant.ES_SCHEME))) + .builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST), + ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), + ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))) .setHttpClientConfigCallback(httpAsyncClientBuilder -> { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME), - ApolloConfig.getStr(ConfigConstant.ES_PASSWORD))); + new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), + ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); }) .setRequestConfigCallback(requestConfigBuilder -> { @@ -133,18 +140,19 @@ public class IotDevicePowerOnAndOffDataJob { out.collect(powerOnAndOffDataEvent); } }else { - // 上次的状态只有两种,要么是开机时间不为空,要么是开机和关机时间都不为空,否则不处理 + // 上次的状态只有两种,要么只有开机时间不为空,要么是开机和关机时间都不为空,否则不处理 if(lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) { if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) { + // 只有开机时间不为空 if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { - powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); + powerOnAndOffDataEvent.setMachinePowerOnTime(lastPowerOnAndOffDataEvent.getMachinePowerOnTime()); }else { powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime()); } powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent); out.collect(powerOnAndOffDataEvent); } else { - // 说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 + // // 开机和关机时间都不为空,说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态 if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) { // 开机 powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime()); @@ -220,18 +228,19 @@ public class IotDevicePowerOnAndOffDataJob { } }).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac"); + sinkEs(streamOperator); env.execute("device_monitoring_data"); } - /*private static void sinkEs(DataStream dataStream) { + private static void sinkEs(SingleOutputStreamOperator dataStream) { List httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST), ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); - ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, - (ElasticsearchSinkFunction) (deviceMonitoringData, runtimeContext, requestIndexer) -> { + ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, + (ElasticsearchSinkFunction) (deviceMonitoringData, runtimeContext, requestIndexer) -> { LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime()) .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate(); @@ -273,7 +282,7 @@ public class IotDevicePowerOnAndOffDataJob { ); //数据流添加sink dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink"); - }*/ + } private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) {