Browse Source

first commit

hph-优化版本
1049970895@qniao.cn 3 years ago
commit
c9055d1be5
12 changed files with 1079 additions and 0 deletions
  1. 51
      iot-device-power-on-and-off-data-event/pom.xml
  2. 70
      iot-device-power-on-and-off-data-event/src/main/java/com/qniao/iot/device/power/event/IotDevicePowerOnAndOffDataEvent.java
  3. 224
      iot-device-power-on-and-off-data-job/pom.xml
  4. 359
      iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java
  5. 29
      iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/config/ApolloConfig.java
  6. 40
      iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java
  7. 87
      iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/utils/EsRestClientUtil.java
  8. 103
      iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/utils/SnowFlake.java
  9. 5
      iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties
  10. 53
      iot-device-power-on-and-off-data-job/src/main/resources/db.setting
  11. 25
      iot-device-power-on-and-off-data-job/src/main/resources/log4j2.properties
  12. 33
      pom.xml

51
iot-device-power-on-and-off-data-event/pom.xml

@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.qniao</groupId>
<artifactId>java-dependency</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>iot-device-power-on-and-off-data-event</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<lombok.version>1.18.24</lombok.version>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>maven-releases</id>
<name>Nexus releases Repository</name>
<url>http://120.78.76.88:8081/repository/maven-snapshots/</url>
</repository>
</distributionManagement>
</project>

70
iot-device-power-on-and-off-data-event/src/main/java/com/qniao/iot/device/power/event/IotDevicePowerOnAndOffDataEvent.java

@ -0,0 +1,70 @@
package com.qniao.iot.device.power.event;
import lombok.Data;
import java.io.Serializable;
/**
* 机器物联开机和关机数据
**/
@Data
public class IotDevicePowerOnAndOffDataEvent implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 唯一标识
*/
private Long id;
/**
* 数据来源
*/
private Integer dataSource;
/**
* 设备物联地址(云盒物理标识)
*/
private Long machineIotMac;
/**
* 机器电源状态0断电 1供电
*/
private Integer machinePwrStat;
/**
* 机器工作状态0未工作 1工作中 2待机中
*/
private Integer machineWorkingStat;
/**
* 当前作业计数
*/
private Long currJobCount;
/**
* 当前作业时长
*/
private Long currJobDuration;
/**
* 设备开机时间
*/
private Long machinePowerOnTime;
/**
* 设备关机时间
*/
private Long machinePowerOffTime;
/**
* 数据实际采样时间
*/
private Long reportTime;
/**
* 实际接收到数据的时间
*/
private Long receivedTime;
}

224
iot-device-power-on-and-off-data-job/pom.xml

@ -0,0 +1,224 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>iot-device-power-on-and-off-data</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>iot-device-power-on-and-off-data-job</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.15.0</flink.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.12</artifactId>
<version>1.14.5</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-machine-data-constant</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.4</version>
</dependency>
<!-- apollo -->
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-core</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.31</version>
</dependency>
<!--mysql数据库驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>ddd-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.qniao</groupId>
<artifactId>iot-device-power-on-and-off-data-event</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.qniao.iot.device.power.IotDevicePowerOnAndOffDataJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- 配置远程仓库 -->
<repositories>
<repository>
<id>nexus</id>
<name>qniao</name>
<url>http://120.78.76.88:8081/repository/maven-public/</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
</repositories>
</project>

359
iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/IotDevicePowerOnAndOffDataJob.java

@ -0,0 +1,359 @@
package com.qniao.iot.device.power;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.device.power.config.ApolloConfig;
import com.qniao.iot.device.power.constant.ConfigConstant;
import com.qniao.iot.device.power.event.IotDevicePowerOnAndOffDataEvent;
import com.qniao.iot.device.power.utils.SnowFlake;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqDeserializationSchema;
import com.qniao.iot.rc.constant.MachinePwrStatusEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
@Slf4j
public class IotDevicePowerOnAndOffDataJob {
static SnowFlake snowflake = new SnowFlake(ApolloConfig.getLong(ConfigConstant.SNOW_FLAKE_DATACENTER_ID),
ApolloConfig.getLong(ConfigConstant.SNOW_FLAKE_MACHINE_ID));
private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient
.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME),
ApolloConfig.getInt(ConfigConstant.ES_POST),
ApolloConfig.getStr(ConfigConstant.ES_SCHEME)))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME),
ApolloConfig.getStr(ConfigConstant.ES_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
})
.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT));
return requestConfigBuilder;
}));
/**
* 当前索引日期后缀
*/
private static String currIndicesDateSuffix;
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("127.0.0.1")
.setPort(5672)
.setUserName("admin")
.setPassword("admin")
.setVirtualHost("datastream")
.build();
// 设备数据源转换
DataStreamSource<MachineIotDataReceivedEvent> streamSource = env.addSource(new RMQSource<>(connectionConfig,
"iotDevicePowerOnAndOffDataEvent", true, new MachineIotDataReceivedEventRabbitMqDeserializationSchema()));
SingleOutputStreamOperator<IotDevicePowerOnAndOffDataEvent> streamOperator = streamSource
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
.process(new KeyedProcessFunction<Long, MachineIotDataReceivedEvent, IotDevicePowerOnAndOffDataEvent>() {
private ValueState<IotDevicePowerOnAndOffDataEvent> powerOnAndOffDataEventValueState;
@Override
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
powerOnAndOffDataEventValueState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("powerOnAndOffDataEventValue",
TypeInformation.of(IotDevicePowerOnAndOffDataEvent.class)));
}
@Override
public void processElement(MachineIotDataReceivedEvent event,
KeyedProcessFunction<Long, MachineIotDataReceivedEvent, IotDevicePowerOnAndOffDataEvent>.Context ctx,
Collector<IotDevicePowerOnAndOffDataEvent> out) throws Exception {
IotDevicePowerOnAndOffDataEvent lastPowerOnAndOffDataEvent = getLastPowerOnAndOffDataEvent(event);
Integer machinePwrStat = event.getMachinePwrStat();
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId());
powerOnAndOffDataEvent.setDataSource(event.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac());
powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat());
powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat());
powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount());
powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration());
powerOnAndOffDataEvent.setReportTime(event.getReportTime());
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
if(lastPowerOnAndOffDataEvent == null) {
// 如果上一次是空的那么只能处理开机数据
if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime());
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
out.collect(powerOnAndOffDataEvent);
}
}else {
// 上次的状态只有两种要么是开机时间不为空要么是开机和关机时间都不为空否则不处理
if(lastPowerOnAndOffDataEvent.getMachinePowerOnTime() != null) {
if (lastPowerOnAndOffDataEvent.getMachinePowerOffTime() == null) {
if(MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime());
}else {
powerOnAndOffDataEvent.setMachinePowerOffTime(event.getReportTime());
}
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
out.collect(powerOnAndOffDataEvent);
} else {
// 说明上一个生产周期已经过了那么当前设备的电源状态必须要是开机状态
if (MachinePwrStatusEnum.MACHINE_ON.getValue().equals(machinePwrStat)) {
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(event.getReportTime());
powerOnAndOffDataEventValueState.update(powerOnAndOffDataEvent);
out.collect(powerOnAndOffDataEvent);
}
}
}
}
}
private IotDevicePowerOnAndOffDataEvent getLastPowerOnAndOffDataEvent(MachineIotDataReceivedEvent event) throws IOException {
IotDevicePowerOnAndOffDataEvent iotDevicePowerOnAndOffDataEvent = powerOnAndOffDataEventValueState.value();
if(iotDevicePowerOnAndOffDataEvent == null) {
iotDevicePowerOnAndOffDataEvent = getByEs(event);
}
return iotDevicePowerOnAndOffDataEvent;
}
private IotDevicePowerOnAndOffDataEvent getByEs(MachineIotDataReceivedEvent event) {
try {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", event.getMachineIotMac()));
searchSourceBuilder.sort("receivedTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX));
searchRequest.source(searchSourceBuilder);
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate);
// 先判断索引是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (exists) {
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
return JSONUtil.toBean(reqHit.getSourceAsString(), IotDevicePowerOnAndOffDataEvent.class);
} else {
// 如果没有就自定义
IotDevicePowerOnAndOffDataEvent powerOnAndOffDataEvent = new IotDevicePowerOnAndOffDataEvent();
powerOnAndOffDataEvent.setId(snowflake.nextId());
powerOnAndOffDataEvent.setDataSource(event.getDataSource());
powerOnAndOffDataEvent.setMachineIotMac(event.getMachineIotMac());
powerOnAndOffDataEvent.setCurrJobCount(event.getCurrJobCount());
powerOnAndOffDataEvent.setCurrJobDuration(event.getCurrJobDuration());
Integer machinePwrStat = event.getMachinePwrStat();
powerOnAndOffDataEvent.setMachinePwrStat(event.getMachinePwrStat());
powerOnAndOffDataEvent.setMachineWorkingStat(event.getMachineWorkingStat());
Long reportTime = event.getReportTime();
if (machinePwrStat == 1) {
// 开机
powerOnAndOffDataEvent.setMachinePowerOnTime(reportTime);
} else {
// 关机
powerOnAndOffDataEvent.setMachinePowerOffTime(reportTime);
}
powerOnAndOffDataEvent.setReportTime(reportTime);
powerOnAndOffDataEvent.setReceivedTime(LocalDateTime
.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
return powerOnAndOffDataEvent;
}
}
} catch (Exception e) {
log.error("获取es数据异常", e);
}
return null;
}
}).name("iotDevicePowerOnAndOffDataEvent keyBy machineIotMac");
env.execute("device_monitoring_data");
}
/*private static void sinkEs(DataStream<DeviceMonitoringData> dataStream) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
ElasticsearchSink.Builder<DeviceMonitoringData> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
(ElasticsearchSinkFunction<DeviceMonitoringData>) (deviceMonitoringData, runtimeContext, requestIndexer) -> {
LocalDate reportDate = new java.util.Date(deviceMonitoringData.getReportTime())
.toInstant().atOffset(ZoneOffset.of("+8")).toLocalDate();
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
// 索引名称
String indicesName = ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix;
// 校验索引是否存在
checkIndicesIsExists(indexDateSuffix, indicesName);
//创建es 请求
IndexRequest indexRequest = Requests.indexRequest()
.index(indicesName)
.source(BeanUtil.beanToMap(deviceMonitoringData));
requestIndexer.add(indexRequest);
}
);
//刷新前缓冲的最大动作量
esSinkBuilder.setBulkFlushMaxActions(10);
//刷新前缓冲区的最大数据大小以MB为单位
esSinkBuilder.setBulkFlushMaxSizeMb(5);
//无论缓冲操作的数量或大小如何都要刷新的时间间隔
esSinkBuilder.setBulkFlushInterval(5000L);
// 客户端创建配置回调配置账号密码
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder
.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
return requestConfigBuilder;
});
}
);
//数据流添加sink
dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink");
}*/
private static void checkIndicesIsExists(String indexDateSuffix, String indicesName) {
if (currIndicesDateSuffix == null) {
// 当前月的索引为空
createIndices(indicesName, indexDateSuffix);
} else {
// 校验当前消息能否符合当前索引
if (!indexDateSuffix.equals(currIndicesDateSuffix)) {
// 如果不符合需要重建索引
createIndices(indicesName, indexDateSuffix);
}
}
}
private static void createIndices(String indicesName, String indexDateSuffix) {
// 判断索引是否存在
GetIndexRequest exist = new GetIndexRequest(indicesName);
// 先判断客户端是否存在
try {
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (!exists) {
// 创建索引
CreateIndexRequest request = new CreateIndexRequest(indicesName);
// 字段映射
String mappersStr = "{\n" +
" \"properties\": {\n" +
" \"accJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"accJobCountDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobCount\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"currJobDuration\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"dataSource\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"lastBootTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"machineIotMac\": {\n" +
" \"type\": \"keyword\"\n" +
" },\n" +
" \"machinePwrStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"machineWorkingStat\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"receivedTime\": {\n" +
" \"type\": \"date\"\n" +
" },\n" +
" \"reportTime\": {\n" +
" \"type\": \"date\"\n" +
" }\n" +
" }\n" +
"}";
request.mapping(mappersStr, XContentType.JSON);
// 设置索引别名
request.alias(new Alias(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX)));
// 暂时不管是否创建成功
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
if (!acknowledged || !shardsAcknowledged) {
throw new Exception("自定义索引创建失败!!!");
}
currIndicesDateSuffix = indexDateSuffix;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

29
iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/config/ApolloConfig.java

@ -0,0 +1,29 @@
package com.qniao.iot.device.power.config;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
public class ApolloConfig {
private static final Config config = ConfigService.getAppConfig();
public static String getStr(String key, String defaultValue) {
return config.getProperty(key, defaultValue);
}
public static String getStr(String key) {
return config.getProperty(key, null);
}
public static Integer getInt(String key) {
return config.getIntProperty(key, null);
}
public static long getLong(String key) {
return config.getLongProperty(key, null);
}
}

40
iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/constant/ConfigConstant.java

@ -0,0 +1,40 @@
package com.qniao.iot.device.power.constant;
public interface ConfigConstant {
String SOURCE_KAFKA_BOOTSTRAP_SERVERS = "source.kafka.bootstrap.servers";
String SOURCE_KAFKA_TOPICS = "source.kafka.topics";
String SOURCE_KAFKA_GROUP_ID = "source.kafka.groupId";
String SINK_ELASTICSEARCH_HOST = "sink.elasticsearch.host";
String SINK_ELASTICSEARCH_POST = "sink.elasticsearch.post";
String SINK_ELASTICSEARCH_USER_NAME = "sink.elasticsearch.userName";
String SINK_ELASTICSEARCH_PASSWORD = "sink.elasticsearch.password";
String SINK_ELASTICSEARCH_CONNECT_TIMEOUT = "sink.elasticsearch.connectTimeout";
String SINK_ELASTICSEARCH_SCHEME = "sink.elasticsearch.scheme";
String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index";
String ES_HOST_NAME = "es.host.name";
String ES_POST = "es.post";
String ES_SCHEME = "es.scheme";
String ES_USER_NAME = "es.user.name";
String ES_PASSWORD = "es.password";
String ES_CONNECT_TIMEOUT = "es.connect.timeout";
String SNOW_FLAKE_DATACENTER_ID = "snow.flake.datacenter.id";
String SNOW_FLAKE_MACHINE_ID = "snow.flake.machine.id";
}

87
iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/utils/EsRestClientUtil.java

@ -0,0 +1,87 @@
package com.qniao.iot.device.power.utils;
import cn.hutool.json.JSONUtil;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.function.Consumer;
public class EsRestClientUtil {
private static String host = "120.79.137.137:9200";
private static String scheme = "http";
private static String index = "qn_cloud_box_data_history";
private static RestClientBuilder builder = null;
private static RestHighLevelClient client = null;
public static void init() {
String[] nodeIpInfos = host.split(":");
builder = RestClient.builder(new HttpHost(nodeIpInfos[0], Integer.parseInt(nodeIpInfos[1]), scheme))
.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(10 * 60 * 1000);
requestConfigBuilder.setSocketTimeout(10 * 60 * 1000);
requestConfigBuilder.setConnectionRequestTimeout(10 * 60 * 1000);
return requestConfigBuilder;
});
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "qnol26215"));
builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
client = new RestHighLevelClient(builder);
}
public static <T>void queryDeviceListPageResult(SearchSourceBuilder sourceBuilder, Consumer<T> consumer,
Class<T> classes, String... indices) {
SearchRequest searchRequest = new SearchRequest(indices).scroll("5m").source(sourceBuilder);
if (client == null) {
init();
}
try {
SearchResponse response;
String scrollId = null;
while (!"none".equals(scrollId)) {
if (scrollId != null) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId).scroll("5m");
response = client.scroll(scrollRequest, RequestOptions.DEFAULT);
} else {
response = client.search(searchRequest, RequestOptions.DEFAULT);
}
int s = response.status().getStatus();
if (s == RestStatus.OK.getStatus()) {
SearchHit[] hits = response.getHits().getHits();
scrollId = response.getScrollId();
if (hits != null) {
System.out.println("*********************查询es结果数量 :" + hits.length);
for (SearchHit hit : hits) {
consumer.accept(JSONUtil.toBean(hit.getSourceAsString(), classes));
}
}
} else {
//清除滚屏
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

103
iot-device-power-on-and-off-data-job/src/main/java/com/qniao/iot/device/power/utils/SnowFlake.java

@ -0,0 +1,103 @@
package com.qniao.iot.device.power.utils;
/**
* @description: Twitter的分布式自增ID雪花算法snowflake
* @author: zp
* @date: 2019-10-29 10:05
*/
public class SnowFlake {
/**
* 起始的时间戳
*/
private final static long START_STMP = 1480166465631L;
/**
* 每一部分占用的位数
*/
private final static long SEQUENCE_BIT = 12; //序列号占用的位数
private final static long MACHINE_BIT = 5; //机器标识占用的位数
private final static long DATACENTER_BIT = 5;//数据中心占用的位数
/**
* 每一部分的最大值
*/
private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);
private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);
/**
* 每一部分向左的位移
*/
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;
private long datacenterId = 1L; //数据中心
private long machineId = 1L; //机器标识
private long sequence = 0L; //序列号
private long lastStmp = -1L;//上一次时间戳
// public SnowFlake(){
// }
public SnowFlake(long datacenterId,
long machineId) {
if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {
throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");
}
if (machineId > MAX_MACHINE_NUM || machineId < 0) {
throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
}
this.datacenterId = datacenterId;
this.machineId = machineId;
}
/**
* 产生下一个ID
*
* @return
*/
public synchronized Long nextId() {
long currStmp = getNewstmp();
if (currStmp < lastStmp) {
throw new RuntimeException("Clock moved backwards. Refusing to generate id");
}
if (currStmp == lastStmp) {
//相同毫秒内序列号自增
sequence = (sequence + 1) & MAX_SEQUENCE;
//同一毫秒的序列数已经达到最大
if (sequence == 0L) {
currStmp = getNextMill();
}
} else {
//不同毫秒内序列号置为0
sequence = 0L;
}
lastStmp = currStmp;
return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分
| datacenterId << DATACENTER_LEFT //数据中心部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //序列号部分
}
private long getNextMill() {
long mill = getNewstmp();
while (mill <= lastStmp) {
mill = getNewstmp();
}
return mill;
}
private long getNewstmp() {
return System.currentTimeMillis();
}
public static void main(String[] args) {
SnowFlake s = new SnowFlake(1, 1);
System.out.println(s.nextId());
}
}

5
iot-device-power-on-and-off-data-job/src/main/resources/META-INF/app.properties

@ -0,0 +1,5 @@
app.id=iot-device-monitoring-data
# test 8.135.8.221
# prod 47.112.164.224
apollo.meta=http://47.112.164.224:5000

53
iot-device-power-on-and-off-data-job/src/main/resources/db.setting

@ -0,0 +1,53 @@
## db.setting文件
url = jdbc:mysql://rm-wz9it4fs5tk7n4tm1zo.mysql.rds.aliyuncs.com:3306/cloud_print_cloud_factory?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useTimezone=true&serverTimezone=GMT%2B8&useSSL=false
user = qn_cloudprint
pass = qncloudprint5682
# 是否在日志中显示执行的SQL
showSql = true
# 是否格式化显示的SQL
formatSql = false
# 是否显示SQL参数
showParams = true
# 打印SQL的日志等级,默认debug,可以是info、warn、error
sqlLevel = debug
# 初始化时建立物理连接的个数
initialSize = 0
# 最大连接池数量
maxActive = 20
# 最小连接池数量
minIdle = 0
# 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时
initialSize = 10
# 最大连接池数量
maxActive = 20
# 最小连接池数量
minIdle = 10
# 获取连接时最大等待时间,单位毫秒。配置了maxWait之后, 缺省启用公平锁,并发效率会有所下降, 如果需要可以通过配置useUnfairLock属性为true使用非公平锁。
maxWait = 0
# 是否缓存preparedStatement,也就是PSCache。 PSCache对支持游标的数据库性能提升巨大,比如说oracle。 在mysql5.5以下的版本中没有PSCache功能,建议关闭掉。作者在5.5版本中使用PSCache,通过监控界面发现PSCache有缓存命中率记录, 该应该是支持PSCache。
poolPreparedStatements = false
# 要启用PSCache,必须配置大于0,当大于0时, poolPreparedStatements自动触发修改为true。 在Druid中,不会存在Oracle下PSCache占用内存过多的问题, 可以把这个数值配置大一些,比如说100
maxOpenPreparedStatements = -1
# 用来检测连接是否有效的sql,要求是一个查询语句。 如果validationQuery为null,testOnBorrow、testOnReturn、 testWhileIdle都不会其作用。
validationQuery = SELECT 1
# 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。
testOnBorrow = true
# 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能
testOnReturn = false
# 建议配置为true,不影响性能,并且保证安全性。 申请连接的时候检测,如果空闲时间大于 timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
testWhileIdle = false
# 有两个含义: 1) Destroy线程会检测连接的间隔时间 2) testWhileIdle的判断依据,详细看testWhileIdle属性的说明
timeBetweenEvictionRunsMillis = 60000
# 物理连接初始化的时候执行的sql
connectionInitSqls = SELECT 1

25
iot-device-power-on-and-off-data-job/src/main/resources/log4j2.properties

@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

33
pom.xml

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>iot-device-power-on-and-off-data</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>iot-device-power-on-and-off-data-job</module>
<module>iot-device-power-on-and-off-data-event</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<!-- 配置远程仓库 -->
<repositories>
<repository>
<id>nexus</id>
<name>qniao</name>
<url>http://120.78.76.88:8081/repository/maven-public/</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
</repositories>
</project>
Loading…
Cancel
Save