Browse Source

更新

feature_hph_新增rabbitmq_sink
1049970895@qniao.cn 3 years ago
parent
commit
e995c73393
11 changed files with 40 additions and 31 deletions
  1. 11
      pom.xml
  2. 2
      src/main/java/com/qniao/iot/CloudBoxData.java
  3. 2
      src/main/java/com/qniao/iot/DeviceMonitoringData.java
  4. 2
      src/main/java/com/qniao/iot/DeviceState.java
  5. 2
      src/main/java/com/qniao/iot/DeviceTotalData.java
  6. 12
      src/main/java/com/qniao/iot/IotMonitoringDataJob.java
  7. 2
      src/main/java/com/qniao/iot/config/ApolloConfig.java
  8. 2
      src/main/java/com/qniao/iot/constant/ConfigConstant.java
  9. 2
      src/main/java/com/qniao/iot/utils/EsRestClientUtil.java
  10. 20
      src/main/resources/db.setting
  11. 14
      src/test/java/Demo1.java

11
pom.xml

@ -4,8 +4,8 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>iot-gizwits-monitoring-data</artifactId>
<groupId>com.qniao</groupId>
<artifactId>iot-device-monitoring-data</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
@ -117,6 +117,11 @@
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>
<build>
@ -172,7 +177,7 @@
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.qniao.iot.gizwits.IotMonitoringDataJob</mainClass>
<mainClass>com.qniao.iot.IotMonitoringDataJob</mainClass>
</transformer>
</transformers>
</configuration>

src/main/java/com/qniao/iot/gizwits/CloudBoxData.java → src/main/java/com/qniao/iot/CloudBoxData.java

@ -1,4 +1,4 @@
package com.qniao.iot.gizwits;
package com.qniao.iot;
import lombok.Data;

src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java → src/main/java/com/qniao/iot/DeviceMonitoringData.java

@ -1,4 +1,4 @@
package com.qniao.iot.gizwits;
package com.qniao.iot;
import lombok.Data;

src/main/java/com/qniao/iot/gizwits/DeviceState.java → src/main/java/com/qniao/iot/DeviceState.java

@ -1,4 +1,4 @@
package com.qniao.iot.gizwits;
package com.qniao.iot;
import lombok.AllArgsConstructor;
import lombok.Data;

src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java → src/main/java/com/qniao/iot/DeviceTotalData.java

@ -1,4 +1,4 @@
package com.qniao.iot.gizwits;
package com.qniao.iot;
import lombok.Data;

src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java → src/main/java/com/qniao/iot/IotMonitoringDataJob.java

@ -1,4 +1,4 @@
package com.qniao.iot.gizwits;
package com.qniao.iot;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
@ -6,8 +6,8 @@ import cn.hutool.db.Db;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.gizwits.config.ApolloConfig;
import com.qniao.iot.gizwits.constant.ConfigConstant;
import com.qniao.iot.config.ApolloConfig;
import com.qniao.iot.constant.ConfigConstant;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
@ -81,6 +81,8 @@ public class IotMonitoringDataJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
@ -102,8 +104,6 @@ public class IotMonitoringDataJob {
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> value.getReportTime() != null
&& value.getDataSource() != null && value.getMachinePwrStat() != null);
streamOperator.print().name("数据源:");
// mac分组并进行工作时长的集合操作
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = streamOperator
.keyBy(MachineIotDataReceivedEvent::getMachineIotMac)
@ -351,7 +351,7 @@ public class IotMonitoringDataJob {
}
value = data;
}
deviceTotalDataStat.update(data);
deviceTotalDataStat.update(value);
return value;
}

src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java → src/main/java/com/qniao/iot/config/ApolloConfig.java

@ -1,4 +1,4 @@
package com.qniao.iot.gizwits.config;
package com.qniao.iot.config;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;

src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java → src/main/java/com/qniao/iot/constant/ConfigConstant.java

@ -1,4 +1,4 @@
package com.qniao.iot.gizwits.constant;
package com.qniao.iot.constant;
public interface ConfigConstant {

src/main/java/com/qniao/iot/gizwits/utils/EsRestClientUtil.java → src/main/java/com/qniao/iot/utils/EsRestClientUtil.java

@ -1,4 +1,4 @@
package com.qniao.iot.gizwits.utils;
package com.qniao.iot.utils;
import cn.hutool.json.JSONUtil;
import org.apache.http.HttpHost;

20
src/main/resources/db.setting

@ -23,4 +23,22 @@ initialSize = 0
maxActive = 20
# 最小连接池数量
minIdle = 0
minIdle = 0
## 连接池配置项
# 自动提交
autoCommit = true
# 等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 缺省:30秒
connectionTimeout = 30000
# 一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟
idleTimeout = 600000
# 一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒,参考MySQL wait_timeout参数(show variables like '%timeout%';)
maxLifetime = 1800000
# 获取连接前的测试SQL
connectionTestQuery = SELECT 1
# 最小闲置连接数
minimumIdle = 10
# 连接池中允许的最大连接数。缺省值:10;推荐的公式:((core_count * 2) + effective_spindle_count)
maximumPoolSize = 20
# 连接只读数据库时配置为true, 保证安全
readOnly = false

14
src/test/java/Demo1.java

@ -1,6 +1,3 @@
import com.qniao.iot.gizwits.CloudBoxData;
import com.qniao.iot.gizwits.utils.EsRestClientUtil;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
@ -12,29 +9,18 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedStats;
import org.elasticsearch.search.aggregations.metrics.ParsedTopHits;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.sql.Date;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Demo1 {

Loading…
Cancel
Save