|
|
|
@ -63,9 +63,6 @@ public class IotMachineEventGeneratorJob { |
|
|
|
"where qmrs.iot_mac = ?\n" + |
|
|
|
" and qmrs.is_delete = 0"; |
|
|
|
|
|
|
|
|
|
|
|
//private static RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost(""))); |
|
|
|
|
|
|
|
public static void main(String[] args) throws Exception { |
|
|
|
|
|
|
|
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
|
|
|
@ -168,9 +165,6 @@ public class IotMachineEventGeneratorJob { |
|
|
|
if (deviceStateListJson == null) { |
|
|
|
// 查询数据库最新的设备状态 |
|
|
|
List<DeviceState> list = Db.use().query(SQL, DeviceState.class, machineIotMac); |
|
|
|
// 查询es最新的设备状态 |
|
|
|
//deviceStateListJson = queryLatestDeviceStatus(machineIotMac); |
|
|
|
|
|
|
|
|
|
|
|
if (CollUtil.isNotEmpty(list)) { |
|
|
|
deviceStateListJson = list.get(0); |
|
|
|
@ -183,50 +177,6 @@ public class IotMachineEventGeneratorJob { |
|
|
|
return deviceStateListJson; |
|
|
|
} |
|
|
|
|
|
|
|
/*private static DeviceState queryLatestDeviceStatus(Long machineIotMac) { |
|
|
|
|
|
|
|
try { |
|
|
|
// 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) |
|
|
|
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); |
|
|
|
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); |
|
|
|
searchSourceBuilder.sort("reportTime", SortOrder.DESC); |
|
|
|
searchSourceBuilder.size(1); |
|
|
|
// 创建查询请求对象,将查询对象配置到其中 |
|
|
|
SearchRequest searchRequest = new SearchRequest(getLatestIndices()); |
|
|
|
searchRequest.source(searchSourceBuilder); |
|
|
|
// 执行查询,然后处理响应结果 |
|
|
|
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); |
|
|
|
// 根据状态和数据条数验证是否返回了数据 |
|
|
|
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { |
|
|
|
SearchHits hits = searchResponse.getHits(); |
|
|
|
SearchHit reqHit = hits.getHits()[0]; |
|
|
|
MachineIotDataReceivedEvent receivedEvent = JSONUtil |
|
|
|
.toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class); |
|
|
|
DeviceState deviceState = new DeviceState(); |
|
|
|
deviceState.setMachineId(machineIotMac); |
|
|
|
deviceState.setMachineId(); |
|
|
|
} |
|
|
|
} catch (IOException e) { |
|
|
|
log.error("", e); |
|
|
|
} |
|
|
|
return null; |
|
|
|
}*/ |
|
|
|
|
|
|
|
/*private static String[] getLatestIndices() throws IOException { |
|
|
|
|
|
|
|
GetAliasesRequest request = new GetAliasesRequest(); |
|
|
|
GetAliasesResponse getAliasesResponse = restHighLevelClient.indices().getAlias(request, RequestOptions.DEFAULT); |
|
|
|
Map<String, Set<AliasMetaData>> map = getAliasesResponse.getAliases(); |
|
|
|
Set<String> indices = map.keySet(); |
|
|
|
List<String> indicesList = new ArrayList<>(); |
|
|
|
for (String key : indices) { |
|
|
|
if(key.contains(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX))) { |
|
|
|
indicesList.add(key); |
|
|
|
} |
|
|
|
} |
|
|
|
return ArrayUtil.toArray(indicesList, String.class); |
|
|
|
}*/ |
|
|
|
|
|
|
|
private static void sinkRabbitMq(DataStream<BaseCommand> commandDataStream) { |
|
|
|
|
|
|
|
// rabbitmq配置 |
|
|
|
|