Browse Source

备份代码

master
1049970895@qniao.cn 3 years ago
parent
commit
99007d2cb1
3 changed files with 155 additions and 5 deletions
  1. 10
      iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java
  2. 76
      iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java
  3. 74
      iot-machine-state-event-generator-job/src/test/java/DemoTes.java

10
iot-machine-data-event/src/main/java/com/qniao/iot/machine/event/MachineIotDataReceivedEvent.java

@ -76,4 +76,14 @@ public class MachineIotDataReceivedEvent implements Serializable {
* 实际接收到数据的时间
*/
private Long receivedTime;
/*
*//**
* 计算单位
*//*
private Integer countUnit;
*//**
* 机器标识
*//*
private Long machineId;*/
}

76
iot-machine-state-event-generator-job/src/main/java/com/qniao/iot/machine/event/generator/job/IotMachineEventGeneratorJob.java

@ -2,8 +2,10 @@ package com.qniao.iot.machine.event.generator.job;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.db.Db;
import cn.hutool.json.JSONUtil;
import com.qniao.domain.BaseCommand;
import com.qniao.iot.machine.command.PowerOffMachineCommand;
import com.qniao.iot.machine.command.PowerOnMachineCommand;
@ -15,6 +17,7 @@ import com.qniao.iot.machine.event.MachineIotDataReceivedEventRabbitMqSerializat
import com.qniao.iot.machine.event.generator.config.ApolloConfig;
import com.qniao.iot.machine.event.generator.constant.ConfigConstant;
import com.rabbitmq.client.AMQP;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
@ -41,19 +44,29 @@ import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.*;
@Slf4j
public class IotMachineEventGeneratorJob {
private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" +
@ -63,6 +76,9 @@ public class IotMachineEventGeneratorJob {
"where qmrs.iot_mac = ?\n" +
" and qmrs.is_delete = 0";
//private static RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost("")));
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@ -88,6 +104,7 @@ public class IotMachineEventGeneratorJob {
@Override
public void open(Configuration parameters) {
// 必须在 open 生命周期初始化
deviceState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("deviceState1", TypeInformation.of(DeviceState.class)));
@ -162,7 +179,12 @@ public class IotMachineEventGeneratorJob {
// 获取最新设备状态
DeviceState deviceStateListJson = deviceState.value();
if (deviceStateListJson == null) {
// 查询数据库最新的设备状态
List<DeviceState> list = Db.use().query(SQL, DeviceState.class, machineIotMac);
// 查询es最新的设备状态
//deviceStateListJson = queryLatestDeviceStatus(machineIotMac);
if (CollUtil.isNotEmpty(list)) {
deviceStateListJson = list.get(0);
}
@ -174,6 +196,50 @@ public class IotMachineEventGeneratorJob {
return deviceStateListJson;
}
/*private static DeviceState queryLatestDeviceStatus(Long machineIotMac) {
try {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac));
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest(getLatestIndices());
searchRequest.source(searchSourceBuilder);
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
MachineIotDataReceivedEvent receivedEvent = JSONUtil
.toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class);
DeviceState deviceState = new DeviceState();
deviceState.setMachineId(machineIotMac);
deviceState.setMachineId();
}
} catch (IOException e) {
log.error("", e);
}
return null;
}*/
/*private static String[] getLatestIndices() throws IOException {
GetAliasesRequest request = new GetAliasesRequest();
GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request, RequestOptions.DEFAULT);
Map<String, Set<AliasMetaData>> map = getAliasesResponse.getAliases();
Set<String> indices = map.keySet();
List<String> indicesList = new ArrayList<>();
for (String key : indices) {
if(key.contains(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))) {
indicesList.add(key);
}
}
return ArrayUtil.toArray(indicesList, String.class);
}*/
private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) {
// rabbitmq配置

74
iot-machine-state-event-generator-job/src/test/java/DemoTes.java

@ -0,0 +1,74 @@
import cn.hutool.json.JSONUtil;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.generator.config.ApolloConfig;
import com.qniao.iot.machine.event.generator.constant.ConfigConstant;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
public class DemoTes {
public static void main(String[] args) throws IOException {
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient
.builder(new HttpHost("120.79.137.137", 9200, "http"))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "qnol26215"));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
})
.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(3000);
return requestConfigBuilder;
}));
try {
// 构建查询条件注意termQuery 支持多种格式查询 booleanintdoublestring 这里使用的是 string 的查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", "102104060102"));
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest("machine_iot_data_received_event_202208",
"machine_iot_data_received_event_202207", "machine_iot_data_received_event_197001");
searchRequest.source(searchSourceBuilder);
// 执行查询然后处理响应结果
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 根据状态和数据条数验证是否返回了数据
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) {
SearchHits hits = searchResponse.getHits();
SearchHit reqHit = hits.getHits()[0];
MachineIotDataReceivedEvent receivedEvent = JSONUtil
.toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class);
System.out.println(receivedEvent);
}
} catch (IOException e) {
}
restHighLevelClient.close();
}
}
Loading…
Cancel
Save