|
|
|
@ -115,9 +115,6 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
.build(); |
|
|
|
|
|
|
|
|
|
|
|
// 把树根的数据转成我们自己的格式 |
|
|
|
//DataStreamSource<RootCloudIotDataReceiptedEvent> streamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "RootCloudIotDataReceiptedEvent Source"); |
|
|
|
|
|
|
|
// 把树根的数据转成我们自己的格式 |
|
|
|
SingleOutputStreamOperator<MachineIotDataReceivedEvent> transformDs = env |
|
|
|
.fromSource(source, WatermarkStrategy.noWatermarks(), "root cloud iot data receipted event Source") |
|
|
|
@ -125,164 +122,6 @@ public class RootCloudIotDataFormatterJob { |
|
|
|
.name("Transform MachineIotDataReceivedEvent"); |
|
|
|
|
|
|
|
|
|
|
|
// 数据过滤 |
|
|
|
/*SingleOutputStreamOperator<RootCloudIotDataReceiptedEvent> streamOperator = streamSource |
|
|
|
.filter(new RichFilterFunction<RootCloudIotDataReceiptedEvent>() { |
|
|
|
@Override |
|
|
|
public boolean filter(RootCloudIotDataReceiptedEvent value) { |
|
|
|
|
|
|
|
Long reportTime = value.get__timestamp__(); |
|
|
|
if (reportTime != null) { |
|
|
|
String reportTimeStr = StrUtil.toString(reportTime); |
|
|
|
if (reportTimeStr.length() == 10) { |
|
|
|
value.set__timestamp__(reportTime * 1000); |
|
|
|
} |
|
|
|
} |
|
|
|
if (value.getWorking_sta() != null |
|
|
|
&& value.get__assetId__() != null |
|
|
|
&& value.getPWR_sta() != null && reportTime != null && value.getACC_count() != null) { |
|
|
|
long nowTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); |
|
|
|
// 晚30分钟的数据就不要了 |
|
|
|
return nowTime - value.get__timestamp__() <= (30 * 60 * 1000); |
|
|
|
} |
|
|
|
return false; |
|
|
|
} |
|
|
|
}).name("machine iot data received event filter operator");*/ |
|
|
|
|
|
|
|
// 分组操作 |
|
|
|
/*SingleOutputStreamOperator<MachineIotDataReceivedEvent> outputStreamOperator = streamOperator |
|
|
|
.keyBy(RootCloudIotDataReceiptedEvent::get__assetId__) |
|
|
|
.process(new KeyedProcessFunction<String, RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>() { |
|
|
|
|
|
|
|
private final RestHighLevelClient restHighLevelClient = new RestHighLevelClient( |
|
|
|
RestClient.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_HOST), |
|
|
|
ApolloConfig.getInt(ConfigConstant.ELASTICSEARCH_POST), |
|
|
|
ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_SCHEME))) |
|
|
|
.setHttpClientConfigCallback(httpAsyncClientBuilder -> { |
|
|
|
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); |
|
|
|
credentialsProvider.setCredentials(AuthScope.ANY, |
|
|
|
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_USER_NAME), |
|
|
|
ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_PASSWORD))); |
|
|
|
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); |
|
|
|
}).setRequestConfigCallback(requestConfigBuilder -> { |
|
|
|
// 设置es连接超时时间 |
|
|
|
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ELASTICSEARCH_CONNECT_TIMEOUT)); |
|
|
|
return requestConfigBuilder; |
|
|
|
})); |
|
|
|
|
|
|
|
private ValueState<MachineIotDataReceivedEvent> eventValueState; |
|
|
|
|
|
|
|
@Override |
|
|
|
public void open(Configuration parameters) { |
|
|
|
|
|
|
|
eventValueState = getRuntimeContext() |
|
|
|
.getState(new ValueStateDescriptor<>("machineIotDataReceivedEventState", |
|
|
|
TypeInformation.of(MachineIotDataReceivedEvent.class))); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void processElement(RootCloudIotDataReceiptedEvent value, |
|
|
|
KeyedProcessFunction<String, RootCloudIotDataReceiptedEvent, MachineIotDataReceivedEvent>.Context ctx, |
|
|
|
Collector<MachineIotDataReceivedEvent> out) throws IOException { |
|
|
|
// 数据清洗 |
|
|
|
Long machineIotMac = Long.valueOf(value.get__assetId__()); |
|
|
|
MachineIotDataReceivedEvent lastReceivedEvent = eventValueState.value(); |
|
|
|
if (lastReceivedEvent == null) { |
|
|
|
lastReceivedEvent = getMachineIotDataReceivedEvent(machineIotMac, value); |
|
|
|
} |
|
|
|
Integer pwrSta = value.getPWR_sta(); |
|
|
|
Integer workingSta = value.getWorking_sta(); |
|
|
|
Long accCount = value.getACC_count(); |
|
|
|
Integer lastPwrStat = lastReceivedEvent.getMachinePwrStat(); |
|
|
|
Integer lastWorkingStat = lastReceivedEvent.getMachineWorkingStat(); |
|
|
|
Long lastReportTime = lastReceivedEvent.getReportTime(); |
|
|
|
Long reportTime = value.get__timestamp__(); |
|
|
|
// 只有当前消息的时间大于等于上一次消息的时间才要,否则丢弃 |
|
|
|
if (reportTime >= lastReportTime) { |
|
|
|
MachineIotDataReceivedEvent receivedEvent = new MachineIotDataReceivedEvent(); |
|
|
|
receivedEvent.setId(snowflake.nextId()); |
|
|
|
receivedEvent.setDataSource(DataSource.ROOT_CLOUD); |
|
|
|
receivedEvent.setMachineIotMac(machineIotMac); |
|
|
|
receivedEvent.setMachinePwrStat(pwrSta); |
|
|
|
receivedEvent.setMachineWorkingStat(workingSta); |
|
|
|
receivedEvent.setAccJobCount(accCount); |
|
|
|
if ((pwrSta == 1 && workingSta == 1) |
|
|
|
|| (lastPwrStat == 1 && lastWorkingStat == 1)) { |
|
|
|
// 只有当前是工作中或上次是工作中才进行计算 |
|
|
|
// 如果这次的消息和上次的消息相差半个小时,那么不进行计算 |
|
|
|
if (reportTime - lastReportTime <= 30 * 60 * 1000) { |
|
|
|
receivedEvent.setCurrJobCount(value.getACC_count() - lastReceivedEvent.getAccJobCount()); |
|
|
|
// 单位是秒 |
|
|
|
receivedEvent.setCurrJobDuration((reportTime - lastReportTime) / 3600); |
|
|
|
} |
|
|
|
} |
|
|
|
receivedEvent.setCurrWaitingDuration(0L); |
|
|
|
receivedEvent.setCurrStoppingDuration(0L); |
|
|
|
receivedEvent.setIgStat(value.getIG_sta()); |
|
|
|
receivedEvent.setReportTime(value.get__timestamp__()); |
|
|
|
receivedEvent.setReceivedTime(System.currentTimeMillis()); |
|
|
|
eventValueState.update(receivedEvent); |
|
|
|
out.collect(receivedEvent); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private MachineIotDataReceivedEvent getMachineIotDataReceivedEvent(Long machineIotMac, |
|
|
|
RootCloudIotDataReceiptedEvent event) { |
|
|
|
|
|
|
|
MachineIotDataReceivedEvent receivedEvent = getFromEs(machineIotMac); |
|
|
|
if (receivedEvent == null) { |
|
|
|
// 在es中没有查到,说明是新机器 |
|
|
|
receivedEvent = new MachineIotDataReceivedEvent(); |
|
|
|
receivedEvent.setId(snowflake.nextId()); |
|
|
|
receivedEvent.setDataSource(DataSource.ROOT_CLOUD); |
|
|
|
receivedEvent.setMachineIotMac(machineIotMac); |
|
|
|
receivedEvent.setMachinePwrStat(event.getPWR_sta()); |
|
|
|
receivedEvent.setMachineWorkingStat(event.getWorking_sta()); |
|
|
|
receivedEvent.setAccJobCount(event.getACC_count()); |
|
|
|
receivedEvent.setCurrJobCount(0L); |
|
|
|
receivedEvent.setCurrJobDuration(0L); |
|
|
|
receivedEvent.setCurrWaitingDuration(0L); |
|
|
|
receivedEvent.setCurrStoppingDuration(0L); |
|
|
|
receivedEvent.setIgStat(event.getIG_sta()); |
|
|
|
receivedEvent.setReportTime(event.get__timestamp__()); |
|
|
|
receivedEvent.setReceivedTime(System.currentTimeMillis()); |
|
|
|
} |
|
|
|
return receivedEvent; |
|
|
|
} |
|
|
|
|
|
|
|
private MachineIotDataReceivedEvent getFromEs(Long machineIotMac) { |
|
|
|
|
|
|
|
try { |
|
|
|
// 构建查询条件(注意:termQuery 支持多种格式查询,如 boolean、int、double、string 等,这里使用的是 string 的查询) |
|
|
|
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); |
|
|
|
searchSourceBuilder.query(QueryBuilders.termQuery("machineIotMac", machineIotMac)); |
|
|
|
searchSourceBuilder.sort("reportTime", SortOrder.DESC); |
|
|
|
searchSourceBuilder.size(1); |
|
|
|
// 创建查询请求对象,将查询对象配置到其中 |
|
|
|
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_INDEX)); |
|
|
|
searchRequest.source(searchSourceBuilder); |
|
|
|
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); |
|
|
|
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.ELASTICSEARCH_INDEX) + "_" + nowDate); |
|
|
|
// 先判断索引是否存在 |
|
|
|
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); |
|
|
|
if (exists) { |
|
|
|
// 执行查询,然后处理响应结果 |
|
|
|
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); |
|
|
|
// 根据状态和数据条数验证是否返回了数据 |
|
|
|
if (RestStatus.OK.equals(searchResponse.status()) && searchResponse.getHits().getTotalHits().value > 0) { |
|
|
|
SearchHits hits = searchResponse.getHits(); |
|
|
|
SearchHit reqHit = hits.getHits()[0]; |
|
|
|
return JSONUtil.toBean(reqHit.getSourceAsString(), MachineIotDataReceivedEvent.class); |
|
|
|
} |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("获取es数据异常", e); |
|
|
|
} |
|
|
|
return null; |
|
|
|
} |
|
|
|
}).name("machine iot data received event keyBy");*/ |
|
|
|
|
|
|
|
|
|
|
|
Properties kafkaProducerConfig = new Properties(); |
|
|
|
kafkaProducerConfig.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); |
|
|
|
kafkaProducerConfig.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); |
|
|
|
|