Browse Source

更新

feature_hph_新增rabbitmq_sink
hupenghui@qniao.cn 3 years ago
parent
commit
710b8e550b
4 changed files with 91 additions and 28 deletions
  1. 46
      src/main/java/com/qniao/iot/gizwits/DeviceState.java
  2. 57
      src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java
  3. 4
      src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java
  4. 12
      src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java

46
src/main/java/com/qniao/iot/gizwits/DeviceState.java

@ -0,0 +1,46 @@
package com.qniao.iot.gizwits;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DeviceState {
/**
* 机器标识
*/
private Long machineId;
/**
* 设备物联地址(云盒物理标识)
*/
private Long machineIotMac;
/**
* 状态: 0:关机 1:生产中 2:待机
*/
private Integer status;
/**
* 计算单位
*/
private Integer countUnit;
/**
* 发生时间
*/
private Long updateTime;
@Override
public String toString() {
return "设备状态:{" +
"machineId='" + machineId + '\'' +
", status='" + status +
", updateTime='" + updateTime +
'\'' +
'}';
}
}

57
src/main/java/com/qniao/iot/gizwits/IotMonitoringDataJob.java

@ -6,6 +6,8 @@ import cn.hutool.db.Db;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONUtil;
import com.qniao.iot.gizwits.config.ApolloConfig;
import com.qniao.iot.gizwits.constant.ConfigConstant;
import com.qniao.iot.machine.event.MachineIotDataReceivedEvent;
import com.qniao.iot.machine.event.MachineIotDataReceivedEventKafkaDeserializationSchema;
import lombok.extern.slf4j.Slf4j;
@ -32,6 +34,8 @@ import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.elasticsearch.action.AliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
@ -54,20 +58,23 @@ import java.util.*;
public class IotMonitoringDataJob {
private static final RestHighLevelClient restHighLevelClient = new RestHighLevelClient(RestClient
.builder(new HttpHost("120.79.137.137", 9200, "http"))
.builder(new HttpHost(ApolloConfig.getStr(ConfigConstant.ES_HOST_NAME),
ApolloConfig.getInt(ConfigConstant.ES_POST),
ApolloConfig.getStr(ConfigConstant.ES_SCHEME)))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "qnol26215"));
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.ES_USER_NAME),
ApolloConfig.getStr(ConfigConstant.ES_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
})
.setRequestConfigCallback(requestConfigBuilder -> {
// 设置es连接超时时间
requestConfigBuilder.setConnectTimeout(3000);
requestConfigBuilder.setConnectTimeout(ApolloConfig.getInt(ConfigConstant.ES_CONNECT_TIMEOUT));
return requestConfigBuilder;
}));
private final static String SQL = "select qmrs.status \n" +
private final static String SQL = "select qmrs.machine_id, qmrs.iot_mac as machine_iot_mac, qmrs.status, qml.count_unit\n" +
"from qn_machine_realtime_state qmrs\n" +
" LEFT JOIN (select example_id, count_unit from qn_machine_list where is_delete = 0) qml\n" +
" ON qmrs.iot_mac = qml.example_id\n" +
@ -80,11 +87,10 @@ public class IotMonitoringDataJob {
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE);
// 获取设备数据源
KafkaSource<MachineIotDataReceivedEvent> source = KafkaSource.<MachineIotDataReceivedEvent>builder()
.setBootstrapServers("120.25.199.30:19092")
.setTopics("test")
//.setTopics("machine_iot_data_received_event")
.setGroupId("1235")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setBootstrapServers(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_BOOTSTRAP_SERVERS))
.setTopics(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_TOPICS))
.setGroupId(ApolloConfig.getStr(ConfigConstant.SOURCE_KAFKA_GROUP_ID))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "8000")
.setValueOnlyDeserializer(new MachineIotDataReceivedEventKafkaDeserializationSchema())
.build();
@ -93,12 +99,10 @@ public class IotMonitoringDataJob {
DataStreamSource<MachineIotDataReceivedEvent> dataStreamSource = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "machineIotDataReceivedEvent Kafka Source");
// 数据过滤
SingleOutputStreamOperator<MachineIotDataReceivedEvent> streamOperator = dataStreamSource
.filter((FilterFunction<MachineIotDataReceivedEvent>) value -> value.getReportTime() != null
&& value.getDataSource() != null && value.getMachinePwrStat() != null && value.getMachineIotMac() == 861193040814171L);
&& value.getDataSource() != null && value.getMachinePwrStat() != null);
// mac分组并进行工作时长的集合操作
DataStream<DeviceMonitoringData> machineIotDataReceivedEventDataStream = streamOperator
@ -156,7 +160,9 @@ public class IotMonitoringDataJob {
DeviceTotalData nowDeviceState = new DeviceTotalData();
if (lastedDeviceState.getReportTime() <= receivedEvent.getReportTime()) {
if (lastWorkingStat == null) {
lastWorkingStat = getDeviceStateListJson(receivedEvent.getMachineIotMac());
DeviceState deviceState = getDeviceStateListJson(receivedEvent.getMachineIotMac());
Integer status = deviceState == null ? null : deviceState.getStatus();
lastWorkingStat = status == null ? 0 : status;
lastPwStat = lastWorkingStat == 0 ? 0 : 1;
}
if (onData == null) {
@ -271,14 +277,14 @@ public class IotMonitoringDataJob {
}
private Integer getDeviceStateListJson(Long machineIotMac) throws SQLException {
private DeviceState getDeviceStateListJson(Long machineIotMac) throws SQLException {
// 查询数据库最新的设备状态
List<Integer> list = Db.use().query(SQL, Integer.class, machineIotMac);
List<DeviceState> list = Db.use().query(SQL, DeviceState.class, machineIotMac);
if (CollUtil.isNotEmpty(list)) {
return list.get(0);
}
return 0;
return null;
}
private DeviceTotalData getDeviceTotalData(MachineIotDataReceivedEvent event) throws Exception {
@ -409,9 +415,10 @@ public class IotMonitoringDataJob {
searchSourceBuilder.sort("reportTime", SortOrder.DESC);
searchSourceBuilder.size(1);
// 创建查询请求对象将查询对象配置到其中
SearchRequest searchRequest = new SearchRequest("device_monitoring_data");
SearchRequest searchRequest = new SearchRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_*");
searchRequest.source(searchSourceBuilder);
GetIndexRequest exist = new GetIndexRequest("device_monitoring_data");
String nowDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
GetIndexRequest exist = new GetIndexRequest(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + nowDate);
// 先判断客户端是否存在
boolean exists = restHighLevelClient.indices().exists(exist, RequestOptions.DEFAULT);
if (exists) {
@ -431,8 +438,6 @@ public class IotMonitoringDataJob {
}
}).name("machineIotDataReceivedEventDataStream keyBy stream");
machineIotDataReceivedEventDataStream.print();
// 写入es
sinkEs(machineIotDataReceivedEventDataStream);
@ -442,9 +447,9 @@ public class IotMonitoringDataJob {
private static void sinkEs(DataStream<DeviceMonitoringData> dataStream) {
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("120.79.137.137",
9200,
"http"));
httpHosts.add(new HttpHost(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_HOST),
ApolloConfig.getInt(ConfigConstant.SINK_ELASTICSEARCH_POST),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_SCHEME)));
ElasticsearchSink.Builder<DeviceMonitoringData> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
(ElasticsearchSinkFunction<DeviceMonitoringData>) (deviceMonitoringData, runtimeContext, requestIndexer) -> {
@ -453,7 +458,7 @@ public class IotMonitoringDataJob {
String indexDateSuffix = reportDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
//创建es 请求
IndexRequest indexRequest = Requests.indexRequest()
.index("device_monitoring_data" + "_" + indexDateSuffix)
.index(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_INDEX) + "_" + indexDateSuffix)
.source(BeanUtil.beanToMap(deviceMonitoringData));
requestIndexer.add(indexRequest);
}
@ -470,8 +475,8 @@ public class IotMonitoringDataJob {
restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic",
"qnol26215"));
new UsernamePasswordCredentials(ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_USER_NAME),
ApolloConfig.getStr(ConfigConstant.SINK_ELASTICSEARCH_PASSWORD)));
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {

4
src/main/java/com/qniao/iot/gizwits/config/ApolloConfig.java

@ -7,12 +7,12 @@ public class ApolloConfig {
private static final Config config = ConfigService.getAppConfig();
public static String get(String key, String defaultValue) {
public static String getStr(String key, String defaultValue) {
return config.getProperty(key, defaultValue);
}
public static String get(String key) {
public static String getStr(String key) {
return config.getProperty(key, null);
}

12
src/main/java/com/qniao/iot/gizwits/constant/ConfigConstant.java

@ -41,4 +41,16 @@ public interface ConfigConstant {
String SINK_ELASTICSEARCH_SCHEME = "sink.elasticsearch.scheme";
String SINK_ELASTICSEARCH_INDEX = "sink.elasticsearch.index";
String ES_HOST_NAME = "es.host.name";
String ES_POST = "es.post";
String ES_SCHEME = "es.scheme";
String ES_USER_NAME = "es.user.name";
String ES_PASSWORD = "es.password";
String ES_CONNECT_TIMEOUT = "es.connect.timeout";
}
Loading…
Cancel
Save