Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
395e122c4b
2 changed files with 20 additions and 46 deletions
  1. 56
      src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java
  2. 10
      src/test/java/SourceMockerDemo.java

56
src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java

@ -106,7 +106,7 @@ public class IotMonitoringDataJob {
// 数据过滤 // 数据过滤
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> value.getReportTime() != null .filter((FilterFunction<MachineIotDataReceivedEvent>) value -> value.getReportTime() != null
&& value.getDataSource() != null && value.getMachinePwrStat() != null && value.getMachineIotMac() == 102104060102L);
&& value.getDataSource() != null && value.getMachinePwrStat() != null && value.getMachineIotMac() == 861193040814171L);
// mac分组并进行工作时长的集合操作 // mac分组并进行工作时长的集合操作
@ -120,15 +120,6 @@ public class IotMonitoringDataJob {
// 开机数据 // 开机数据
private ValueState<DeviceTotalData> onDataState; private ValueState<DeviceTotalData> onDataState;
// 上次的关机数据
private ValueState<MachineIotDataReceivedEvent> lastOffDataState;
// 上次的开机数据
private ValueState<MachineIotDataReceivedEvent> lastOnDataState;
// 当前周期的待机数据
private ValueState<MachineIotDataReceivedEvent> lastWaitJobDataState;
// 上次的工作状态 // 上次的工作状态
private ValueState<Integer> lastWorkingStatState; private ValueState<Integer> lastWorkingStatState;
@ -145,15 +136,6 @@ public class IotMonitoringDataJob {
onDataState = getRuntimeContext() onDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class))); .getState(new ValueStateDescriptor<>("onData", TypeInformation.of(DeviceTotalData.class)));
lastOffDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastOffData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
lastOnDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastOnData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
lastWaitJobDataState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastWaitJobData", TypeInformation.of(MachineIotDataReceivedEvent.class)));
lastWorkingStatState = getRuntimeContext() lastWorkingStatState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class))); .getState(new ValueStateDescriptor<>("lastWorkingStat", TypeInformation.of(Integer.class)));
@ -167,8 +149,6 @@ public class IotMonitoringDataJob {
Collector<DeviceMonitoringData> out) throws Exception { Collector<DeviceMonitoringData> out) throws Exception {
DeviceTotalData onData = onDataState.value(); DeviceTotalData onData = onDataState.value();
MachineIotDataReceivedEvent lastOffData = lastOffDataState.value();
MachineIotDataReceivedEvent lastWaitJobData = lastWaitJobDataState.value();
Integer lastWorkingStat = lastWorkingStatState.value(); Integer lastWorkingStat = lastWorkingStatState.value();
Integer lastPwStat = lastPwStatState.value(); Integer lastPwStat = lastPwStatState.value();
DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent); DeviceTotalData lastedDeviceState = getDeviceTotalData(receivedEvent);
@ -193,7 +173,6 @@ public class IotMonitoringDataJob {
onDataState.update(onData); onDataState.update(onData);
} }
LocalDate localDate = new Date(reportTime).toLocalDate(); LocalDate localDate = new Date(reportTime).toLocalDate();
Long a;
Long lastReportTime = lastedDeviceState.getLastReportTime(); Long lastReportTime = lastedDeviceState.getLastReportTime();
if (lastReportTime == null) { if (lastReportTime == null) {
// 如果上次的消息时间为空那么不进行计算 // 如果上次的消息时间为空那么不进行计算
@ -235,8 +214,6 @@ public class IotMonitoringDataJob {
nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault())); nowDeviceState.setLastBootTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(onData.getLastReportTime()), ZoneId.systemDefault()));
nowDeviceState.setLastReportTime(reportTime); nowDeviceState.setLastReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState); deviceTotalDataStat.update(nowDeviceState);
// 关机后将待机数据清除
lastWaitJobDataState.update(null);
} else { } else {
nowDeviceState = lastedDeviceState; nowDeviceState = lastedDeviceState;
deviceTotalDataStat.update(nowDeviceState); deviceTotalDataStat.update(nowDeviceState);
@ -268,13 +245,10 @@ public class IotMonitoringDataJob {
nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration); nowDeviceState.setTheDayJobDuration(lastedDeviceState.getTheDayJobDuration() + workingDuration);
nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration); nowDeviceState.setJobDurationTotal(lastedDeviceState.getJobDurationTotal() + workingDuration);
nowDeviceState.setLastReportTime(reportTime); nowDeviceState.setLastReportTime(reportTime);
deviceTotalDataStat.update(nowDeviceState);
} else { } else {
// 待机
lastWaitJobDataState.update(receivedEvent);
nowDeviceState = lastedDeviceState; nowDeviceState = lastedDeviceState;
deviceTotalDataStat.update(nowDeviceState);
} }
deviceTotalDataStat.update(nowDeviceState);
nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration); nowDeviceState.setTheDayDuration(lastedDeviceState.getTheDayDuration() + workingDuration);
if (lastPwStat == 0) { if (lastPwStat == 0) {
// 如果上次是关机消息那么这次就是开机消息 // 如果上次是关机消息那么这次就是开机消息
@ -366,7 +340,7 @@ public class IotMonitoringDataJob {
value.setTheDayJobCount(0L); value.setTheDayJobCount(0L);
value.setCurrLocalDate(localDate); value.setCurrLocalDate(localDate);
value.setLastBootTime(value.getLastBootTime()); value.setLastBootTime(value.getLastBootTime());
value.setTheDayDuration(value.getTheDayDuration());
value.setTheDayDuration(0L);
value.setLastReportTime(reportTime); value.setLastReportTime(reportTime);
} }
} }
@ -445,9 +419,9 @@ public class IotMonitoringDataJob {
searchSourceBuilder.sort("reportTime", SortOrder.DESC); searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.size(1); searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中 // 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest("DeviceMonitoringData");
SearchRequest searchRequest = new SearchRequest("device_monitoring_data");
searchRequest.source(searchSourceBuilder); searchRequest.source(searchSourceBuilder);
GetIndexRequest exist = new GetIndexRequest("DeviceMonitoringData");
GetIndexRequest exist = new GetIndexRequest("device_monitoring_data");
// 先判断客户端是否存在 // 先判断客户端是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT); boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (exists) { if (exists) {
@ -470,17 +444,17 @@ public class IotMonitoringDataJob {
machineIotDataReceivedEventDataStream.print(); machineIotDataReceivedEventDataStream.print();
// 写入es // 写入es
//sinkEs(machineIotDataReceivedEventDataStream);
sinkEs(machineIotDataReceivedEventDataStream);
env.execute("iot_monitoring_data_job");
env.execute("device_monitoring_data");
} }
private static void sinkEs(DataStream<DeviceMonitoringData> dataStream) { private static void sinkEs(DataStream<DeviceMonitoringData> dataStream) {
List<HttpHost> httpHosts = new ArrayList<>(); List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
httpHosts.add(new HttpHost("120.79.137.137",
9200,
"http"));
ElasticsearchSink.Builder<DeviceMonitoringData> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, ElasticsearchSink.Builder<DeviceMonitoringData> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
(ElasticsearchSinkFunction<DeviceMonitoringData>) (deviceMonitoringData, runtimeContext, requestIndexer) -> { (ElasticsearchSinkFunction<DeviceMonitoringData>) (deviceMonitoringData, runtimeContext, requestIndexer) -> {
@ -489,7 +463,7 @@ public class IotMonitoringDataJob {
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM")); String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
//创建es 请求 //创建es 请求
IndexRequest indexRequest = Requests.indexRequest() IndexRequest indexRequest = Requests.indexRequest()
.index(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix)
.index("device_monitoring_data" + "_" + indexDateSuffix)
.source(BeanUtil.beanToMap(deviceMonitoringData)); .source(BeanUtil.beanToMap(deviceMonitoringData));
requestIndexer.add(indexRequest); requestIndexer.add(indexRequest);
} }
@ -506,18 +480,18 @@ public class IotMonitoringDataJob {
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> { restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
ApolloConfig.get(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
new UsernamePasswordCredentials("elastic",
"qnol26215"));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}); });
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间 // 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_CONNECT_TIMEOUT));
requestConfigBuilder.setConnectTimeout(30000);
return requestConfigBuilder; return requestConfigBuilder;
}); });
} }
); );
//数据流添加sink //数据流添加sink
dataStream.addSink(esSinkBuilder.build()).name("commandDataStream to es sink");
dataStream.addSink(esSinkBuilder.build()).name("deviceMonitoringData to es sink");
} }
} }

10
src/test/java/SourceMockerDemo.java

@ -51,10 +51,10 @@ public class SourceMockerDemo {
MachineIotDataReceivedEvent event = new MachineIotDataReceivedEvent(); MachineIotDataReceivedEvent event = new MachineIotDataReceivedEvent();
event.setId(RandomUtil.randomLong(999999999999999L)); event.setId(RandomUtil.randomLong(999999999999999L));
// 机智云 // 机智云
//event.setMachineIotMac(861193040814171L);
event.setMachineIotMac(861193040814171L);
// 树根 // 树根
event.setMachineIotMac(102104060102L);
//event.setMachineIotMac(102104060102L);
event.setMachinePwrStat(RandomUtil.randomEles(pwrStaList, 1).get(0)); event.setMachinePwrStat(RandomUtil.randomEles(pwrStaList, 1).get(0));
event.setMachineWorkingStat(RandomUtil.randomEles(accStaList, 1).get(0)); event.setMachineWorkingStat(RandomUtil.randomEles(accStaList, 1).get(0));
// 递增每次加一个随机数 // 递增每次加一个随机数
@ -63,11 +63,11 @@ public class SourceMockerDemo {
event.setCurrWaitingDuration(0L); event.setCurrWaitingDuration(0L);
event.setIgStat(0); event.setIgStat(0);
event.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli()); event.setReceivedTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
event.setReportTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
/*if(f > 20) {
if(f > 20) {
event.setReportTime(LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli());
}else { }else {
event.setReportTime(LocalDateTime.now().plusDays(-1).toInstant(ZoneOffset.of("+8")).toEpochMilli()); event.setReportTime(LocalDateTime.now().plusDays(-1).toInstant(ZoneOffset.of("+8")).toEpochMilli());
}*/
}
// 递增加一个随机数 // 递增加一个随机数
event.setCurrJobCount(RandomUtil.randomLong(1, 5)); event.setCurrJobCount(RandomUtil.randomLong(1, 5));

Loading…
Cancel
Save