diff --git a/src/main/java/com/qniao/iot/gizwits/DeviceState.java b/src/main/java/com/qniao/iot/gizwits/DeviceState.java new file mode 100644 index 0000000..f9a6c10 --- /dev/null +++ b/src/main/java/com/qniao/iot/gizwits/DeviceState.java @@ -0,0 +1,46 @@ +package com.qniao.iot.gizwits; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class DeviceState { + + /** + * 机器标识 + */ + private Long machineId; + + /** + * 设备物联地址(云盒物理标识) + */ + private Long machineIotMac; + + /** + * 状态: 0:关机 1:生产中 2:待机 + */ + private Integer status; + + /** + * 计算单位 + */ + private Integer countUnit; + + /** + * 发生时间 + */ + private Long updateTime; + + @Override + public String toString() { + return "设备状态:{" + + "machineId='" + machineId + '\'' + + ", status='" + status + + ", updateTime='" + updateTime + + '\'' + + '}'; + } +} diff --git a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java index 6bd5736..fe26d0a 100644 --- a/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java +++ b/src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java @@ -6,6 +6,8 @@ import cn.hutool.db.Db; import cn.hutool.http.HttpUtil; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONUtil; +import com.qniao.iot.gizwits.config.ApolloConfig; +import com.qniao.iot.gizwits.constant.ConfigConstant; import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema; import lombok.extern.slf4j.Slf4j; @@ -32,6 +34,8 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.elasticsearch.action.AliasesRequest; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -54,20 +58,23 @@ import java.util.*; public class IotMonitoringDataJob { private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient - .builder(new HttpHost("120.79.137.137", 9200, "http")) + .builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME), + ApolloConfig.getInt(ConfigConstant.ES_POST), + ApolloConfig.getStr(ConfigConstant.ES_SCHEME))) .setHttpClientConfigCallback(httpAsyncClientBuilder -> { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials("elastic", "qnol26215")); + new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME), + ApolloConfig.getStr(ConfigConstant.ES_PASSWORD))); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); }) .setRequestConfigCallback(requestConfigBuilder -> { // 设置es连接超时时间 - requestConfigBuilder.setConnectTimeout(3000); + requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT)); return requestConfigBuilder; })); - private final static String SQL = "select qmrs.status \n" + + private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" + "from qn_machine_realtime_state qmrs\n" + " LEFT JOIN (select example_id, count_unit from qn_machine_list where is_delete = 0) qml\n" + " ON qmrs.iot_mac = qml.example_id\n" + @@ -80,11 +87,10 @@ public class IotMonitoringDataJob { env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE); // 获取设备数据源 KafkaSource source = KafkaSource.builder() - .setBootstrapServers("120.25.199.30:19092") - .setTopics("test") - //.setTopics("machine_iot_data_received_event") - .setGroupId("1235") - .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) + .setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS)) + .setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS)) + .setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID)) + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000") .setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema()) .build(); @@ -93,12 +99,10 @@ public class IotMonitoringDataJob { DataStreamSource dataStreamSource = env .fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source"); - // 数据过滤 SingleOutputStreamOperator streamOperator = dataStreamSource .filter((FilterFunction) value -> value.getReportTime() != null - && value.getDataSource() != null && value.getMachinePwrStat() != null && value.getMachineIotMac() == 861193040814171L); - + && value.getDataSource() != null && value.getMachinePwrStat() != null); // mac分组并进行工作时长的集合操作 DataStream machineIotDataReceivedEventDataStream = streamOperator @@ -156,7 +160,9 @@ public class IotMonitoringDataJob { DeviceTotalData nowDeviceState = new DeviceTotalData(); if (lastedDeviceState.getReportTime() <= receivedEvent.getReportTime()) { if (lastWorkingStat == null) { - lastWorkingStat = getDeviceStateListJson(receivedEvent.getMachineIotMac()); + DeviceState deviceState = getDeviceStateListJson(receivedEvent.getMachineIotMac()); + Integer status = deviceState == null ? null : deviceState.getStatus(); + lastWorkingStat = status == null ? 0 : status; lastPwStat = lastWorkingStat == 0 ? 0 : 1; } if (onData == null) { @@ -271,14 +277,14 @@ public class IotMonitoringDataJob { } - private Integer getDeviceStateListJson(Long machineIotMac) throws SQLException { + private DeviceState getDeviceStateListJson(Long machineIotMac) throws SQLException { // 查询数据库最新的设备状态 - List list = Db.use().query(SQL, Integer.class, machineIotMac); + List list = Db.use().query(SQL, DeviceState.class, machineIotMac); if (CollUtil.isNotEmpty(list)) { return list.get(0); } - return 0; + return null; } private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception { @@ -409,9 +415,10 @@ public class IotMonitoringDataJob { searchSourceBuilder.sort("reportTime", SortOrder.DESC); searchSourceBuilder.size(1); // 创建查询请求对象,将查询对象配置到其中 - SearchRequest searchRequest = new SearchRequest("device_monitoring_data"); + SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_*"); searchRequest.source(searchSourceBuilder); - GetIndexRequest exist = new GetIndexRequest("device_monitoring_data"); + String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); + GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate); // 先判断客户端是否存在 boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); if (exists) { @@ -431,8 +438,6 @@ public class IotMonitoringDataJob { } }).name("machineIotDataReceivedEventDataStream keyBy stream"); - machineIotDataReceivedEventDataStream.print(); - // 写入es sinkEs(machineIotDataReceivedEventDataStream); @@ -442,9 +447,9 @@ public class IotMonitoringDataJob { private static void sinkEs(DataStream dataStream) { List httpHosts = new ArrayList<>(); - httpHosts.add(new HttpHost("120.79.137.137", - 9200, - "http")); + httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST), + ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST), + ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME))); ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, (ElasticsearchSinkFunction) (deviceMonitoringData, runtimeContext, requestIndexer) -> { @@ -453,7 +458,7 @@ public class IotMonitoringDataJob { String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); //创建es 请求 IndexRequest indexRequest = Requests.indexRequest() - .index("device_monitoring_data" + "_" + indexDateSuffix) + .index(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix) .source(BeanUtil.beanToMap(deviceMonitoringData)); requestIndexer.add(indexRequest); } @@ -470,8 +475,8 @@ public class IotMonitoringDataJob { restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials("elastic", - "qnol26215")); + new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME), + ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD))); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); }); restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { diff --git a/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java b/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java index 001e902..a80db53 100644 --- a/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java +++ b/src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java @@ -7,12 +7,12 @@ public class ApolloConfig { private static final Config config = ConfigService.getAppConfig(); - public static String get(String key, String defaultValue) { + public static String getStr(String key, String defaultValue) { return config.getProperty(key, defaultValue); } - public static String get(String key) { + public static String getStr(String key) { return config.getProperty(key, null); } diff --git a/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java b/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java index 470be1d..252ea91 100644 --- a/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java +++ b/src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java @@ -41,4 +41,16 @@ public interface ConfigConstant { String SINK_ELASTICSEARCH_SCHEME = "sink.elasticsearch.scheme"; String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index"; + + String ES_HOST_NAME = "es.host.name"; + + String ES_POST = "es.post"; + + String ES_SCHEME = "es.scheme"; + + String ES_USER_NAME = "es.user.name"; + + String ES_PASSWORD = "es.password"; + + String ES_CONNECT_TIMEOUT = "es.connect.timeout"; }