From c9055d1be5bb68c97ab91c16554e8207a38a444f Mon Sep 17 00:00:00 2001
From: "1049970895@qniao.cn" <1049970895>
Date: Sat, 20 Aug 2022 17:32:30 +0800
Subject: [PATCH] first commit
---
.../pom.xml | 51 +++
.../IotDevicePowerOnAndOffDataEvent.java | 70 ++++
iot-device-power-on-and-off-data-job/pom.xml | 224 +++++++++++
.../power/IotDevicePowerOnAndOffDataJob.java | 359 ++++++++++++++++++
.../iot/device/power/config/ApolloConfig.java | 29 ++
.../device/power/constant/ConfigConstant.java | 40 ++
.../device/power/utils/EsRestClientUtil.java | 87 +++++
.../iot/device/power/utils/SnowFlake.java | 103 +++++
.../main/resources/META-INF/app.properties | 5 +
.../src/main/resources/db.setting | 53 +++
.../src/main/resources/log4j2.properties | 25 ++
pom.xml | 33 ++
12 files changed, 1079 insertions(+)
create mode 100644 iot-device-power-on-and-off-data-event/pom.xml
create mode 100644 iot-device-power-on-and-off-data-event/src/main/java/com/qniao/iot/device/power/event/IotDevicePowerOnAndOffDataEvent.java
create mode 100644 iot-device-power-on-and-off-data-job/pom.xml
create mode 100644 iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java
create mode 100644 iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/config/ApolloConfig.java
create mode 100644 iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java
create mode 100644 iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/utils/EsRestClientUtil.java
create mode 100644 iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/utils/SnowFlake.java
create mode 100644 iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties
create mode 100644 iot-device-power-on-and-off-data-job/src/main/resources/db.setting
create mode 100644 iot-device-power-on-and-off-data-job/src/main/resources/log4j2.properties
create mode 100644 pom.xml
diff --git a/iot-device-power-on-and-off-data-event/pom.xml b/iot-device-power-on-and-off-data-event/pom.xml
new file mode 100644
index 0000000..97a1148
--- /dev/null
+++ b/iot-device-power-on-and-off-data-event/pom.xml
@@ -0,0 +1,51 @@
+
+
+
+ com.qniao
+ java-dependency
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ iot-device-power-on-and-off-data-event
+
+
+ UTF-8
+ 1.8
+ ${target.java.version}
+ ${target.java.version}
+ 1.18.24
+
+
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 8
+ 8
+
+
+
+
+
+
+
+ maven-releases
+ Nexus releases Repository
+ http://120.78.76.88:8081/repository/maven-snapshots/
+
+
+
+
\ No newline at end of file
diff --git a/iot-device-power-on-and-off-data-event/src/main/java/com/qniao/iot/device/power/event/IotDevicePowerOnAndOffDataEvent.java b/iot-device-power-on-and-off-data-event/src/main/java/com/qniao/iot/device/power/event/IotDevicePowerOnAndOffDataEvent.java
new file mode 100644
index 0000000..376614f
--- /dev/null
+++ b/iot-device-power-on-and-off-data-event/src/main/java/com/qniao/iot/device/power/event/IotDevicePowerOnAndOffDataEvent.java
@@ -0,0 +1,70 @@
+package com.qniao.iot.device.power.event;
+
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * 机器物联开机和关机数据
+ **/
+@Data
+public class IotDevicePowerOnAndOffDataEvent implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * 唯一标识
+ */
+ private Long id;
+
+ /**
+ * 数据来源
+ */
+ private Integer dataSource;
+
+ /**
+ * 设备物联地址(云盒物理标识)
+ */
+ private Long machineIotMac;
+
+ /**
+ * 机器电源状态(0断电 1供电)
+ */
+ private Integer machinePwrStat;
+
+ /**
+ * 机器工作状态(0未工作 1工作中 2待机中)
+ */
+ private Integer machineWorkingStat;
+
+ /**
+ * 当前作业计数
+ */
+ private Long currJobCount;
+
+ /**
+ * 当前作业时长
+ */
+ private Long currJobDuration;
+
+ /**
+ * 设备开机时间
+ */
+ private Long machinePowerOnTime;
+
+ /**
+ * 设备关机时间
+ */
+ private Long machinePowerOffTime;
+
+ /**
+ * 数据实际采样时间
+ */
+ private Long reportTime;
+
+ /**
+ * 实际接收到数据的时间
+ */
+ private Long receivedTime;
+}
diff --git a/iot-device-power-on-and-off-data-job/pom.xml b/iot-device-power-on-and-off-data-job/pom.xml
new file mode 100644
index 0000000..44b28dd
--- /dev/null
+++ b/iot-device-power-on-and-off-data-job/pom.xml
@@ -0,0 +1,224 @@
+
+
+
+ iot-device-power-on-and-off-data
+ org.example
+ 1.0-SNAPSHOT
+
+ 4.0.0
+
+ iot-device-power-on-and-off-data-job
+
+
+ 8
+ 8
+ UTF-8
+ 1.15.0
+ 1.8
+ ${target.java.version}
+ ${target.java.version}
+ 2.17.2
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-connector-rabbitmq_2.12
+ 1.14.5
+
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j.version}
+ runtime
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j.version}
+ runtime
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+ runtime
+
+
+
+ commons-logging
+ commons-logging
+ 1.2
+
+
+
+ com.qniao
+ iot-machine-data-event
+ 0.0.1-SNAPSHOT
+
+
+
+ com.qniao
+ iot-machine-data-constant
+ 0.0.1-SNAPSHOT
+
+
+
+ org.apache.flink
+ flink-connector-elasticsearch7_2.11
+ 1.14.5
+
+
+
+ cn.hutool
+ hutool-all
+ 5.8.4
+
+
+
+
+ com.ctrip.framework.apollo
+ apollo-client
+ 2.0.1
+
+
+
+ com.ctrip.framework.apollo
+ apollo-core
+ 2.0.1
+
+
+
+ com.alibaba
+ fastjson
+ 1.2.31
+
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.29
+
+
+
+ com.qniao
+ ddd-event
+ 0.0.1-SNAPSHOT
+
+
+
+ com.alibaba
+ druid
+ 1.2.6
+
+
+ org.projectlombok
+ lombok
+ RELEASE
+ compile
+
+
+ com.qniao
+ iot-device-power-on-and-off-data-event
+ 0.0.1-SNAPSHOT
+ compile
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+ ${target.java.version}
+ ${target.java.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.1.1
+
+
+
+ package
+
+ shade
+
+
+
+
+ org.apache.flink:flink-shaded-force-shading
+ com.google.code.findbugs:jsr305
+ org.slf4j:*
+ org.apache.logging.log4j:*
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ com.qniao.iot.device.power.IotDevicePowerOnAndOffDataJob
+
+
+
+
+
+
+
+
+
+
+
+
+ nexus
+ qniao
+ http://120.78.76.88:8081/repository/maven-public/
+
+ true
+ always
+
+
+
+
+
\ No newline at end of file
diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java
new file mode 100644
index 0000000..c12af7f
--- /dev/null
+++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java
@@ -0,0 +1,359 @@
+package com.qniao.iot.device.power;
+
+import cn.hutool.json.JSONUtil;
+import com.qniao.iot.device.power.config.ApolloConfig;
+import com.qniao.iot.device.power.constant.ConfigConstant;
+import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent;
+import com.qniao.iot.device.power.utils.SnowFlake;
+import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
+import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqDeserializationSchema;
+import com.qniao.iot.rc.constant.MachinePwrStatusEnum;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.state.*;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
+import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
+import org.apache.flink.util.Collector;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.admin.indices.alias.Alias;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.*;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
+@Slf4j
+public class IotDevicePowerOnAndOffDataJob {
+
+ static SnowFlake snowflake = new SnowFlake(ApolloConfig.getLong(ConfigConstant.SNOW_FLAKE_DATACENTER_ID),
+ ApolloConfig.getLong(ConfigConstant.SNOW_FLAKE_MACHINE_ID));
+
+ private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient
+ .builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME),
+ ApolloConfig.getInt(ConfigConstant.ES_POST),
+ ApolloConfig.getStr(ConfigConstant.ES_SCHEME)))
+ .setHttpClientConfigCallback(httpAsyncClientBuilder -> {
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME),
+ ApolloConfig.getStr(ConfigConstant.ES_PASSWORD)));
+ return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ })
+ .setRequestConfigCallback(requestConfigBuilder -> {
+ // 设置es连接超时时间
+ requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT));
+ return requestConfigBuilder;
+ }));
+
+ /**
+ * 当前索引日期后缀
+ */
+ private static String currIndicesDateSuffix;
+
+ public static void main(String[] args) throws Exception {
+
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
+ // 获取设备数据源
+ RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+ .setHost("127.0.0.1")
+ .setPort(5672)
+ .setUserName("admin")
+ .setPassword("admin")
+ .setVirtualHost("datastream")
+ .build();
+ // 设备数据源转换
+ DataStreamSource streamSource = env.addSource(new RMQSource<>(connectionConfig,
+ "iotDevicePowerOnAndOffDataEvent", true, new MachineIotDataReceivedEventRabbitMqDeserializationSchema()));
+
+
+ SingleOutputStreamOperator streamOperator = streamSource
+ .keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
+ .process(new KeyedProcessFunction() {
+
+ private ValueState powerOnAndOffDataEventValueState;
+
+ @Override
+ public void open(Configuration parameters) {
+
+ // 必须在 open 生命周期初始化
+ powerOnAndOffDataEventValueState = getRuntimeContext()
+ .getState(new ValueStateDescriptor<>("powerOnAndOffDataEventValue",
+ TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)));
+ }
+
+ @Override
+ public void processElement(MachineIotDataReceivedEvent event,
+ KeyedProcessFunction.Context ctx,
+ Collector out) throws Exception {
+
+ IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event);
+ Integer machinePwrStat = event.getMachinePwrStat();
+ IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
+ powerOnAndOffDataEvent.setId(snowflake.nextId());
+ powerOnAndOffDataEvent.setDataSource(event.getDataSource());
+ powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac());
+ powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat());
+ powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat());
+ powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount());
+ powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration());
+ powerOnAndOffDataEvent.setReportTime(event.getReportTime());
+ powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
+ .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
+ if(lastPowerOnAndOffDataEvent == null) {
+ // 如果上一次是空的,那么只能处理开机数据
+ if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
+ powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime());
+ powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
+ out.collect(powerOnAndOffDataEvent);
+ }
+ }else {
+ // 上次的状态只有两种,要么是开机时间不为空,要么是开机和关机时间都不为空,否则不处理
+ if(lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) {
+ if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) {
+ if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
+ powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime());
+ }else {
+ powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime());
+ }
+ powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
+ out.collect(powerOnAndOffDataEvent);
+ } else {
+ // 说明上一个生产周期已经过了,那么当前设备的电源状态必须要是开机状态
+ if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
+ // 开机
+ powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime());
+ powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
+ out.collect(powerOnAndOffDataEvent);
+ }
+ }
+ }
+ }
+ }
+
+ private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineIotDataReceivedEvent event) throws IOException {
+
+ IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value();
+ if(iotDevicePowerOnAndOffDataEvent == null) {
+ iotDevicePowerOnAndOffDataEvent = getByEs(event);
+ }
+ return iotDevicePowerOnAndOffDataEvent;
+ }
+
+ private IotDevicePowerOnAndOffDataEvent getByEs(MachineIotDataReceivedEvent event) {
+
+ try {
+ // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询)
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", event.getMachineIotMac()));
+ searchSourceBuilder.sort("receivedTime", SortOrder.DESC);
+ searchSourceBuilder.size(1);
+ // 创建查询请求对象,将查询对象配置到其中
+ SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX));
+ searchRequest.source(searchSourceBuilder);
+ String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
+ GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate);
+ // 先判断索引是否存在
+ boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
+ if (exists) {
+ // 执行查询,然后处理响应结果
+ SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
+ // 根据状态和数据条数验证是否返回了数据
+ if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
+ SearchHits hits = searchResponse.getHits();
+ SearchHit reqHit = hits.getHits()[0];
+ return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class);
+ } else {
+ // 如果没有就自定义
+ IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
+ powerOnAndOffDataEvent.setId(snowflake.nextId());
+ powerOnAndOffDataEvent.setDataSource(event.getDataSource());
+ powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac());
+ powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount());
+ powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration());
+ Integer machinePwrStat = event.getMachinePwrStat();
+ powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat());
+ powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat());
+ Long reportTime = event.getReportTime();
+ if (machinePwrStat == 1) {
+ // 开机
+ powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
+ } else {
+ // 关机
+ powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
+ }
+ powerOnAndOffDataEvent.setReportTime(reportTime);
+ powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
+ .now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
+ return powerOnAndOffDataEvent;
+ }
+ }
+ } catch (Exception e) {
+ log.error("获取es数据异常", e);
+ }
+ return null;
+ }
+ }).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac");
+
+
+ env.execute("device_monitoring_data");
+ }
+
+ /*private static void sinkEs(DataStream dataStream) {
+
+ List httpHosts = new ArrayList<>();
+ httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST),
+ ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
+ ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
+ ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
+ (ElasticsearchSinkFunction) (deviceMonitoringData, runtimeContext, requestIndexer) -> {
+
+ LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime())
+ .toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
+ String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
+ // 索引名称
+ String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix;
+ // 校验索引是否存在
+ checkIndicesIsExists(indexDateSuffix, indicesName);
+ //创建es 请求
+ IndexRequest indexRequest = Requests.indexRequest()
+ .index(indicesName)
+ .source(BeanUtil.beanToMap(deviceMonitoringData));
+ requestIndexer.add(indexRequest);
+ }
+ );
+ //刷新前缓冲的最大动作量
+ esSinkBuilder.setBulkFlushMaxActions(10);
+ //刷新前缓冲区的最大数据大小(以MB为单位)
+ esSinkBuilder.setBulkFlushMaxSizeMb(5);
+ //无论缓冲操作的数量或大小如何,都要刷新的时间间隔
+ esSinkBuilder.setBulkFlushInterval(5000L);
+ // 客户端创建配置回调,配置账号密码
+ esSinkBuilder.setRestClientFactory(
+ restClientBuilder -> {
+ restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
+ ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
+ return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ });
+ restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
+ // 设置es连接超时时间
+ requestConfigBuilder
+ .setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
+ return requestConfigBuilder;
+ });
+ }
+ );
+ //数据流添加sink
+ dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink");
+ }*/
+
+ private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) {
+
+ if (currIndicesDateSuffix == null) {
+ // 当前月的索引为空
+ createIndices(indicesName, indexDateSuffix);
+ } else {
+ // 校验当前消息能否符合当前索引
+ if (!indexDateSuffix.equals(currIndicesDateSuffix)) {
+ // 如果不符合,需要重建索引
+ createIndices(indicesName, indexDateSuffix);
+ }
+ }
+ }
+
+ private static void createIndices(String indicesName, String indexDateSuffix) {
+
+ // 判断索引是否存在
+ GetIndexRequest exist = new GetIndexRequest(indicesName);
+ // 先判断客户端是否存在
+ try {
+ boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
+ if (!exists) {
+ // 创建索引
+ CreateIndexRequest request = new CreateIndexRequest(indicesName);
+ // 字段映射
+ String mappersStr = "{\n" +
+ " \"properties\": {\n" +
+ " \"accJobCount\": {\n" +
+ " \"type\": \"long\"\n" +
+ " },\n" +
+ " \"accJobCountDuration\": {\n" +
+ " \"type\": \"long\"\n" +
+ " },\n" +
+ " \"currDuration\": {\n" +
+ " \"type\": \"long\"\n" +
+ " },\n" +
+ " \"currJobCount\": {\n" +
+ " \"type\": \"long\"\n" +
+ " },\n" +
+ " \"currJobDuration\": {\n" +
+ " \"type\": \"long\"\n" +
+ " },\n" +
+ " \"dataSource\": {\n" +
+ " \"type\": \"integer\"\n" +
+ " },\n" +
+ " \"lastBootTime\": {\n" +
+ " \"type\": \"date\"\n" +
+ " },\n" +
+ " \"machineIotMac\": {\n" +
+ " \"type\": \"keyword\"\n" +
+ " },\n" +
+ " \"machinePwrStat\": {\n" +
+ " \"type\": \"integer\"\n" +
+ " },\n" +
+ " \"machineWorkingStat\": {\n" +
+ " \"type\": \"integer\"\n" +
+ " },\n" +
+ " \"receivedTime\": {\n" +
+ " \"type\": \"date\"\n" +
+ " },\n" +
+ " \"reportTime\": {\n" +
+ " \"type\": \"date\"\n" +
+ " }\n" +
+ " }\n" +
+ "}";
+ request.mapping(mappersStr, XContentType.JSON);
+ // 设置索引别名
+ request.alias(new Alias(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)));
+ // 暂时不管是否创建成功
+ CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
+ boolean acknowledged = createIndexResponse.isAcknowledged();
+ boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
+ if (!acknowledged || !shardsAcknowledged) {
+ throw new Exception("自定义索引创建失败!!!");
+ }
+ currIndicesDateSuffix = indexDateSuffix;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/config/ApolloConfig.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/config/ApolloConfig.java
new file mode 100644
index 0000000..df2f8e7
--- /dev/null
+++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/config/ApolloConfig.java
@@ -0,0 +1,29 @@
+package com.qniao.iot.device.power.config;
+
+import com.ctrip.framework.apollo.Config;
+import com.ctrip.framework.apollo.ConfigService;
+
+public class ApolloConfig {
+
+ private static final Config config = ConfigService.getAppConfig();
+
+ public static String getStr(String key, String defaultValue) {
+
+ return config.getProperty(key, defaultValue);
+ }
+
+ public static String getStr(String key) {
+
+ return config.getProperty(key, null);
+ }
+
+ public static Integer getInt(String key) {
+
+ return config.getIntProperty(key, null);
+ }
+
+ public static long getLong(String key) {
+
+ return config.getLongProperty(key, null);
+ }
+}
diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java
new file mode 100644
index 0000000..2679b21
--- /dev/null
+++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java
@@ -0,0 +1,40 @@
+package com.qniao.iot.device.power.constant;
+
+public interface ConfigConstant {
+
+ String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers";
+
+ String SOURCE_KAFKA_TOPICS = "source.kafka.topics";
+
+ String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId";
+
+ String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host";
+
+ String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post";
+
+ String SINK_ELASTICSEARCH_USER_NAME = "sink.elasticsearch.userName";
+
+ String SINK_ELASTICSEARCH_PASSWORD = "sink.elasticsearch.password";
+
+ String SINK_ELASTICSEARCH_CONNECT_TIMEOUT = "sink.elasticsearch.connectTimeout";
+
+ String SINK_ELASTICSEARCH_SCHEME = "sink.elasticsearch.scheme";
+
+ String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index";
+
+ String ES_HOST_NAME = "es.host.name";
+
+ String ES_POST = "es.post";
+
+ String ES_SCHEME = "es.scheme";
+
+ String ES_USER_NAME = "es.user.name";
+
+ String ES_PASSWORD = "es.password";
+
+ String ES_CONNECT_TIMEOUT = "es.connect.timeout";
+
+ String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id";
+
+ String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id";
+}
diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/utils/EsRestClientUtil.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/utils/EsRestClientUtil.java
new file mode 100644
index 0000000..4d95ca6
--- /dev/null
+++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/utils/EsRestClientUtil.java
@@ -0,0 +1,87 @@
+package com.qniao.iot.device.power.utils;
+
+import cn.hutool.json.JSONUtil;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+
+public class EsRestClientUtil {
+
+ private static String host = "120.79.137.137:9200";
+ private static String scheme = "http";
+ private static String index = "qn_cloud_box_data_history";
+ private static RestClientBuilder builder = null;
+ private static RestHighLevelClient client = null;
+
+ public static void init() {
+ String[] nodeIpInfos = host.split(":");
+ builder = RestClient.builder(new HttpHost(nodeIpInfos[0], Integer.parseInt(nodeIpInfos[1]), scheme))
+ .setRequestConfigCallback(requestConfigBuilder -> {
+ requestConfigBuilder.setConnectTimeout(10 * 60 * 1000);
+ requestConfigBuilder.setSocketTimeout(10 * 60 * 1000);
+ requestConfigBuilder.setConnectionRequestTimeout(10 * 60 * 1000);
+ return requestConfigBuilder;
+ });
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "qnol26215"));
+ builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
+ client = new RestHighLevelClient(builder);
+ }
+
+ public static void queryDeviceListPageResult(SearchSourceBuilder sourceBuilder, Consumer consumer,
+ Class classes, String... indices) {
+
+ SearchRequest searchRequest = new SearchRequest(indices).scroll("5m").source(sourceBuilder);
+ if (client == null) {
+ init();
+ }
+ try {
+ SearchResponse response;
+ String scrollId = null;
+ while (!"none".equals(scrollId)) {
+ if (scrollId != null) {
+ SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId).scroll("5m");
+ response = client.scroll(scrollRequest, RequestOptions.DEFAULT);
+ } else {
+ response = client.search(searchRequest, RequestOptions.DEFAULT);
+ }
+
+ int s = response.status().getStatus();
+ if (s == RestStatus.OK.getStatus()) {
+ SearchHit[] hits = response.getHits().getHits();
+ scrollId = response.getScrollId();
+ if (hits != null) {
+ System.out.println("*********************查询es结果数量 :" + hits.length);
+ for (SearchHit hit : hits) {
+ consumer.accept(JSONUtil.toBean(hit.getSourceAsString(), classes));
+ }
+ }
+ } else {
+ //清除滚屏
+ ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+ clearScrollRequest.addScrollId(scrollId);
+ client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/utils/SnowFlake.java b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/utils/SnowFlake.java
new file mode 100644
index 0000000..c097a41
--- /dev/null
+++ b/iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/utils/SnowFlake.java
@@ -0,0 +1,103 @@
+package com.qniao.iot.device.power.utils;
+
+/**
+ * @description: Twitter的分布式自增ID雪花算法snowflake
+ * @author: zp
+ * @date: 2019-10-29 10:05
+ */
+public class SnowFlake {
+
+ /**
+ * 起始的时间戳
+ */
+ private final static long START_STMP = 1480166465631L;
+
+ /**
+ * 每一部分占用的位数
+ */
+ private final static long SEQUENCE_BIT = 12; //序列号占用的位数
+ private final static long MACHINE_BIT = 5; //机器标识占用的位数
+ private final static long DATACENTER_BIT = 5;//数据中心占用的位数
+
+ /**
+ * 每一部分的最大值
+ */
+ private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);
+ private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
+ private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);
+
+ /**
+ * 每一部分向左的位移
+ */
+ private final static long MACHINE_LEFT = SEQUENCE_BIT;
+ private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
+ private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;
+
+ private long datacenterId = 1L; //数据中心
+ private long machineId = 1L; //机器标识
+ private long sequence = 0L; //序列号
+ private long lastStmp = -1L;//上一次时间戳
+
+// public SnowFlake(){
+// }
+
+ public SnowFlake(long datacenterId,
+ long machineId) {
+ if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {
+ throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");
+ }
+ if (machineId > MAX_MACHINE_NUM || machineId < 0) {
+ throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
+ }
+ this.datacenterId = datacenterId;
+ this.machineId = machineId;
+ }
+
+ /**
+ * 产生下一个ID
+ *
+ * @return
+ */
+ public synchronized Long nextId() {
+ long currStmp = getNewstmp();
+ if (currStmp < lastStmp) {
+ throw new RuntimeException("Clock moved backwards. Refusing to generate id");
+ }
+
+ if (currStmp == lastStmp) {
+ //相同毫秒内,序列号自增
+ sequence = (sequence + 1) & MAX_SEQUENCE;
+ //同一毫秒的序列数已经达到最大
+ if (sequence == 0L) {
+ currStmp = getNextMill();
+ }
+ } else {
+ //不同毫秒内,序列号置为0
+ sequence = 0L;
+ }
+
+ lastStmp = currStmp;
+
+ return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分
+ | datacenterId << DATACENTER_LEFT //数据中心部分
+ | machineId << MACHINE_LEFT //机器标识部分
+ | sequence; //序列号部分
+ }
+
+ private long getNextMill() {
+ long mill = getNewstmp();
+ while (mill <= lastStmp) {
+ mill = getNewstmp();
+ }
+ return mill;
+ }
+
+ private long getNewstmp() {
+ return System.currentTimeMillis();
+ }
+
+ public static void main(String[] args) {
+ SnowFlake s = new SnowFlake(1, 1);
+ System.out.println(s.nextId());
+ }
+}
\ No newline at end of file
diff --git a/iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties b/iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties
new file mode 100644
index 0000000..d10864b
--- /dev/null
+++ b/iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties
@@ -0,0 +1,5 @@
+app.id=iot-device-monitoring-data
+
+# test 8.135.8.221
+# prod 47.112.164.224
+apollo.meta=http://47.112.164.224:5000
\ No newline at end of file
diff --git a/iot-device-power-on-and-off-data-job/src/main/resources/db.setting b/iot-device-power-on-and-off-data-job/src/main/resources/db.setting
new file mode 100644
index 0000000..3e3fba1
--- /dev/null
+++ b/iot-device-power-on-and-off-data-job/src/main/resources/db.setting
@@ -0,0 +1,53 @@
+## db.setting文件
+
+url = jdbc:mysql://rm-wz9it4fs5tk7n4tm1zo.mysql.rds.aliyuncs.com:3306/cloud_print_cloud_factory?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useTimezone=true&serverTimezone=GMT%2B8&useSSL=false
+user = qn_cloudprint
+pass = qncloudprint5682
+
+# 是否在日志中显示执行的SQL
+showSql = true
+
+# 是否格式化显示的SQL
+formatSql = false
+
+# 是否显示SQL参数
+showParams = true
+
+# 打印SQL的日志等级,默认debug,可以是info、warn、error
+sqlLevel = debug
+
+# 初始化时建立物理连接的个数
+initialSize = 0
+
+# 最大连接池数量
+maxActive = 20
+
+# 最小连接池数量
+minIdle = 0
+
+
+
+# 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时
+initialSize = 10
+# 最大连接池数量
+maxActive = 20
+# 最小连接池数量
+minIdle = 10
+# 获取连接时最大等待时间,单位毫秒。配置了maxWait之后, 缺省启用公平锁,并发效率会有所下降, 如果需要可以通过配置useUnfairLock属性为true使用非公平锁。
+maxWait = 0
+# 是否缓存preparedStatement,也就是PSCache。 PSCache对支持游标的数据库性能提升巨大,比如说oracle。 在mysql5.5以下的版本中没有PSCache功能,建议关闭掉。作者在5.5版本中使用PSCache,通过监控界面发现PSCache有缓存命中率记录, 该应该是支持PSCache。
+poolPreparedStatements = false
+# 要启用PSCache,必须配置大于0,当大于0时, poolPreparedStatements自动触发修改为true。 在Druid中,不会存在Oracle下PSCache占用内存过多的问题, 可以把这个数值配置大一些,比如说100
+maxOpenPreparedStatements = -1
+# 用来检测连接是否有效的sql,要求是一个查询语句。 如果validationQuery为null,testOnBorrow、testOnReturn、 testWhileIdle都不会其作用。
+validationQuery = SELECT 1
+# 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。
+testOnBorrow = true
+# 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能
+testOnReturn = false
+# 建议配置为true,不影响性能,并且保证安全性。 申请连接的时候检测,如果空闲时间大于 timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
+testWhileIdle = false
+# 有两个含义: 1) Destroy线程会检测连接的间隔时间 2) testWhileIdle的判断依据,详细看testWhileIdle属性的说明
+timeBetweenEvictionRunsMillis = 60000
+# 物理连接初始化的时候执行的sql
+connectionInitSqls = SELECT 1
\ No newline at end of file
diff --git a/iot-device-power-on-and-off-data-job/src/main/resources/log4j2.properties b/iot-device-power-on-and-off-data-job/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..32c696e
--- /dev/null
+++ b/iot-device-power-on-and-off-data-job/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..0d03fec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,33 @@
+
+
+ 4.0.0
+
+ org.example
+ iot-device-power-on-and-off-data
+ pom
+ 1.0-SNAPSHOT
+
+ iot-device-power-on-and-off-data-job
+ iot-device-power-on-and-off-data-event
+
+
+
+ 8
+ 8
+
+
+
+
+
+ nexus
+ qniao
+ http://120.78.76.88:8081/repository/maven-public/
+
+ true
+ always
+
+
+
+
\ No newline at end of file