diff --git a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java index 9f37e5e..2d6bdd2 100644 --- a/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java +++ b/iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java @@ -76,4 +76,14 @@ public class MachineIotDataReceivedEvent implements Serializable { * 实际接收到数据的时间 */ private Long receivedTime; +/* + *//** + * 计算单位 + *//* + private Integer countUnit; + + *//** + * 机器标识 + *//* + private Long machineId;*/ } diff --git a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java index 3b402dc..98b1414 100644 --- a/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java +++ b/iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java @@ -2,8 +2,10 @@ package com.qniao.iot.machine.event.generator.job; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.db.Db; +import cn.hutool.json.JSONUtil; import com.qniao.domain.BaseCommand; import com.qniao.iot.machine.command.PowerOffMachineCommand; import com.qniao.iot.machine.command.PowerOnMachineCommand; @@ -15,6 +17,7 @@ import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializat import com.qniao.iot.machine.event.generator.config.ApolloConfig; import com.qniao.iot.machine.event.generator.constant.ConfigConstant; import com.rabbitmq.client.AMQP; +import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; @@ -41,19 +44,29 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.*; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.client.indices.GetIndexResponse; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.sql.SQLException; import java.time.LocalDate; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; +import java.util.*; +@Slf4j public class IotMachineEventGeneratorJob { private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" + @@ -63,6 +76,9 @@ public class IotMachineEventGeneratorJob { "where qmrs.iot_mac = ?\n" + " and qmrs.is_delete = 0"; + + //private static RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(""))); + public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -88,6 +104,7 @@ public class IotMachineEventGeneratorJob { @Override public void open(Configuration parameters) { + // 必须在 open 生命周期初始化 deviceState = getRuntimeContext() .getState(new ValueStateDescriptor<>("deviceState1", TypeInformation.of(DeviceState.class))); @@ -162,7 +179,12 @@ public class IotMachineEventGeneratorJob { // 获取最新设备状态 DeviceState deviceStateListJson = deviceState.value(); if (deviceStateListJson == null) { + // 查询数据库最新的设备状态 List list = Db.use().query(SQL, DeviceState.class, machineIotMac); + // 查询es最新的设备状态 + //deviceStateListJson = queryLatestDeviceStatus(machineIotMac); + + if (CollUtil.isNotEmpty(list)) { deviceStateListJson = list.get(0); } @@ -174,6 +196,50 @@ public class IotMachineEventGeneratorJob { return deviceStateListJson; } + /*private static DeviceState queryLatestDeviceStatus(Long machineIotMac) { + + try { + // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); + searchSourceBuilder.sort("reportTime", SortOrder.DESC); + searchSourceBuilder.size(1); + // 创建查询请求对象,将查询对象配置到其中 + SearchRequest searchRequest = new SearchRequest(getLatestIndices()); + searchRequest.source(searchSourceBuilder); + // 执行查询,然后处理响应结果 + SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + // 根据状态和数据条数验证是否返回了数据 + if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { + SearchHits hits = searchResponse.getHits(); + SearchHit reqHit = hits.getHits()[0]; + MachineIotDataReceivedEvent receivedEvent = JSONUtil + .toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class); + DeviceState deviceState = new DeviceState(); + deviceState.setMachineId(machineIotMac); + deviceState.setMachineId(); + } + } catch (IOException e) { + log.error("", e); + } + return null; + }*/ + + /*private static String[] getLatestIndices() throws IOException { + + GetAliasesRequest request = new GetAliasesRequest(); + GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request, RequestOptions.DEFAULT); + Map> map = getAliasesResponse.getAliases(); + Set indices = map.keySet(); + List indicesList = new ArrayList<>(); + for (String key : indices) { + if(key.contains(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))) { + indicesList.add(key); + } + } + return ArrayUtil.toArray(indicesList, String.class); + }*/ + private static void sinkRabbitMq(DataStream commandDataStream) { // rabbitmq配置 diff --git a/iot-machine-state-event-generator-job/src/test/java/DemoTes.java b/iot-machine-state-event-generator-job/src/test/java/DemoTes.java new file mode 100644 index 0000000..51b7d68 --- /dev/null +++ b/iot-machine-state-event-generator-job/src/test/java/DemoTes.java @@ -0,0 +1,74 @@ +import cn.hutool.json.JSONUtil; +import com.qniao.iot.machine.event.MachineIotDataReceivedEvent; +import com.qniao.iot.machine.event.generator.config.ApolloConfig; +import com.qniao.iot.machine.event.generator.constant.ConfigConstant; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.*; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.client.indices.GetIndexResponse; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; + +public class DemoTes { + + + public static void main(String[] args) throws IOException { + + RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient + .builder(new HttpHost("120.79.137.137", 9200, "http")) + .setHttpClientConfigCallback(httpAsyncClientBuilder -> { + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials("elastic", "qnol26215")); + return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + }) + .setRequestConfigCallback(requestConfigBuilder -> { + // 设置es连接超时时间 + requestConfigBuilder.setConnectTimeout(3000); + return requestConfigBuilder; + })); + + try { + // 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", "102104060102")); + searchSourceBuilder.sort("reportTime", SortOrder.DESC); + searchSourceBuilder.size(1); + // 创建查询请求对象,将查询对象配置到其中 + SearchRequest searchRequest = new SearchRequest("machine_iot_data_received_event_202208", + "machine_iot_data_received_event_202207", "machine_iot_data_received_event_197001"); + searchRequest.source(searchSourceBuilder); + // 执行查询,然后处理响应结果 + SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + // 根据状态和数据条数验证是否返回了数据 + if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { + SearchHits hits = searchResponse.getHits(); + SearchHit reqHit = hits.getHits()[0]; + MachineIotDataReceivedEvent receivedEvent = JSONUtil + .toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class); + System.out.println(receivedEvent); + } + } catch (IOException e) { + + } + + restHighLevelClient.close(); + } +}