diff --git a/pom.xml b/pom.xml index 900a0f6..d0a433e 100644 --- a/pom.xml +++ b/pom.xml @@ -4,8 +4,8 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - org.example - iot-gizwits-monitoring-data + com.qniao + iot-device-monitoring-data 1.0-SNAPSHOT @@ -117,6 +117,11 @@ 0.0.1-SNAPSHOT + + com.zaxxer + HikariCP + 5.0.0 + @@ -172,7 +177,7 @@ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> - com.qniao.iot.gizwits.IotMonitoringDataJob + com.qniao.iot.IotMonitoringDataJob diff --git a/src/main/java/com/qniao/iot/gizwits/CloudBoxData.java b/src/main/java/com/qniao/iot/CloudBoxData.java similarity index 91% rename from src/main/java/com/qniao/iot/gizwits/CloudBoxData.java rename to src/main/java/com/qniao/iot/CloudBoxData.java index 1ab8952..046d1b5 100644 --- a/src/main/java/com/qniao/iot/gizwits/CloudBoxData.java +++ b/src/main/java/com/qniao/iot/CloudBoxData.java @@ -1,4 +1,4 @@ -package com.qniao.iot.gizwits; +package com.qniao.iot; import lombok.Data; diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java b/src/main/java/com/qniao/iot/DeviceMonitoringData.java similarity index 97% rename from src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java rename to src/main/java/com/qniao/iot/DeviceMonitoringData.java index 9b6a475..4981549 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceMonitoringData.java +++ b/src/main/java/com/qniao/iot/DeviceMonitoringData.java @@ -1,4 +1,4 @@ -package com.qniao.iot.gizwits; +package com.qniao.iot; import lombok.Data; diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceState.java b/src/main/java/com/qniao/iot/DeviceState.java similarity index 96% rename from src/main/java/com/qniao/iot/gizwits/DeviceState.java rename to src/main/java/com/qniao/iot/DeviceState.java index f9a6c10..115ac60 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceState.java +++ b/src/main/java/com/qniao/iot/DeviceState.java @@ -1,4 +1,4 @@ -package com.qniao.iot.gizwits; +package com.qniao.iot; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java b/src/main/java/com/qniao/iot/DeviceTotalData.java similarity index 96% rename from src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java rename to src/main/java/com/qniao/iot/DeviceTotalData.java index 8c92d84..d913ada 100644 --- a/src/main/java/com/qniao/iot/gizwits/DeviceTotalData.java +++ b/src/main/java/com/qniao/iot/DeviceTotalData.java @@ -1,4 +1,4 @@ -package com.qniao.iot.gizwits; +package com.qniao.iot; import lombok.Data; diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java similarity index 99% rename from src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java rename to src/main/java/com/qniao/iot/IotMonitoringDataJob.java index 1a14645..bf0576e 100644 --- a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/IotMonitoringDataJob.java @@ -1,4 +1,4 @@ -package com.qniao.iot.gizwits; +package com.qniao.iot; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; @@ -6,8 +6,8 @@ import cn.hutool.db.Db; import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONUtil; -import com.qniao.iot.gizwits.config.ApolloConfig; -import com.qniao.iot.gizwits.constant.ConfigConstant; +import com.qniao.iot.config.ApolloConfig; +import com.qniao.iot.constant.ConfigConstant; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; import lombok.extern.slf4j.Slf4j; @@ -81,6 +81,8 @@ public class IotMonitoringDataJob { public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); // 获取设备数据源 @@ -102,8 +104,6 @@ public class IotMonitoringDataJob { .filter((FilterFunction) value -> value.getReportTime() != null && value.getDataSource() != null && value.getMachinePwrStat() != null); - streamOperator.print().name("数据源:"); - // mac分组并进行工作时长的集合操作 DataStream machineIotDataReceivedEventDataStream = streamOperator .keyBy(MachineIotDataReceivedEvent::getMachineIotMac) @@ -351,7 +351,7 @@ public class IotMonitoringDataJob { } value = data; } - deviceTotalDataStat.update(data); + deviceTotalDataStat.update(value); return value; } diff --git a/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java b/src/main/java/com/qniao/iot/config/ApolloConfig.java similarity index 93% rename from src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java rename to src/main/java/com/qniao/iot/config/ApolloConfig.java index a80db53..63e914d 100644 --- a/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java +++ b/src/main/java/com/qniao/iot/config/ApolloConfig.java @@ -1,4 +1,4 @@ -package com.qniao.iot.gizwits.config; +package com.qniao.iot.config; import com.ctrip.framework.apollo.Config; import com.ctrip.framework.apollo.ConfigService; diff --git a/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java b/src/main/java/com/qniao/iot/constant/ConfigConstant.java similarity index 96% rename from src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java rename to src/main/java/com/qniao/iot/constant/ConfigConstant.java index 2605119..ea5cf40 100644 --- a/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java +++ b/src/main/java/com/qniao/iot/constant/ConfigConstant.java @@ -1,4 +1,4 @@ -package com.qniao.iot.gizwits.constant; +package com.qniao.iot.constant; public interface ConfigConstant { diff --git a/src/main/java/com/qniao/iot/gizwits/utils/EsRestClientUtil.java b/src/main/java/com/qniao/iot/utils/EsRestClientUtil.java similarity index 99% rename from src/main/java/com/qniao/iot/gizwits/utils/EsRestClientUtil.java rename to src/main/java/com/qniao/iot/utils/EsRestClientUtil.java index 4bae186..5e56ce3 100644 --- a/src/main/java/com/qniao/iot/gizwits/utils/EsRestClientUtil.java +++ b/src/main/java/com/qniao/iot/utils/EsRestClientUtil.java @@ -1,4 +1,4 @@ -package com.qniao.iot.gizwits.utils; +package com.qniao.iot.utils; import cn.hutool.json.JSONUtil; import org.apache.http.HttpHost; diff --git a/src/main/resources/db.setting b/src/main/resources/db.setting index 3e3aa65..7f6dd45 100644 --- a/src/main/resources/db.setting +++ b/src/main/resources/db.setting @@ -23,4 +23,22 @@ initialSize = 0 maxActive = 20 # 最小连接池数量 -minIdle = 0 \ No newline at end of file +minIdle = 0 + +## 连接池配置项 +# 自动提交 +autoCommit = true +# 等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 缺省:30秒 +connectionTimeout = 30000 +# 一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟 +idleTimeout = 600000 +# 一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒,参考MySQL wait_timeout参数(show variables like '%timeout%';) +maxLifetime = 1800000 +# 获取连接前的测试SQL +connectionTestQuery = SELECT 1 +# 最小闲置连接数 +minimumIdle = 10 +# 连接池中允许的最大连接数。缺省值:10;推荐的公式:((core_count * 2) + effective_spindle_count) +maximumPoolSize = 20 +# 连接只读数据库时配置为true, 保证安全 +readOnly = false \ No newline at end of file diff --git a/src/test/java/Demo1.java b/src/test/java/Demo1.java index 8215244..5e8811d 100644 --- a/src/test/java/Demo1.java +++ b/src/test/java/Demo1.java @@ -1,6 +1,3 @@ -import com.qniao.iot.gizwits.CloudBoxData; -import com.qniao.iot.gizwits.utils.EsRestClientUtil; -import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; @@ -12,29 +9,18 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.ParsedStats; import org.elasticsearch.search.aggregations.metrics.ParsedTopHits; import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; -import java.sql.Date; -import java.time.LocalDate; -import java.time.ZoneOffset; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class Demo1 {