|
|
@ -3,11 +3,10 @@ package com.qniao.iot.gizwits.source; |
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
import com.gizwits.noti.noticlient.OhMyNotiClient; |
|
|
import com.gizwits.noti.noticlient.OhMyNotiClient; |
|
|
import com.gizwits.noti.noticlient.OhMyNotiClientImpl; |
|
|
import com.gizwits.noti.noticlient.OhMyNotiClientImpl; |
|
|
|
|
|
import com.gizwits.noti.noticlient.bean.Credential; |
|
|
import com.gizwits.noti.noticlient.bean.req.NotiReqPushEvents; |
|
|
import com.gizwits.noti.noticlient.bean.req.NotiReqPushEvents; |
|
|
import com.gizwits.noti.noticlient.bean.req.body.AuthorizationData; |
|
|
|
|
|
import com.gizwits.noti.noticlient.config.SnotiCallback; |
|
|
import com.gizwits.noti.noticlient.config.SnotiCallback; |
|
|
import com.gizwits.noti.noticlient.config.SnotiConfig; |
|
|
import com.gizwits.noti.noticlient.config.SnotiConfig; |
|
|
import com.gizwits.noti.noticlient.enums.ProtocolType; |
|
|
|
|
|
import com.google.common.base.Preconditions; |
|
|
import com.google.common.base.Preconditions; |
|
|
import com.qniao.iot.gizwits.GizWitsProperties; |
|
|
import com.qniao.iot.gizwits.GizWitsProperties; |
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
@ -15,6 +14,7 @@ import org.apache.commons.collections.CollectionUtils; |
|
|
import org.apache.flink.configuration.Configuration; |
|
|
import org.apache.flink.configuration.Configuration; |
|
|
import org.apache.flink.streaming.api.functions.source.RichSourceFunction; |
|
|
import org.apache.flink.streaming.api.functions.source.RichSourceFunction; |
|
|
|
|
|
|
|
|
|
|
|
import java.util.Arrays; |
|
|
import java.util.List; |
|
|
import java.util.List; |
|
|
import java.util.stream.Collectors; |
|
|
import java.util.stream.Collectors; |
|
|
import java.util.stream.Stream; |
|
|
import java.util.stream.Stream; |
|
|
@ -33,10 +33,11 @@ public class GizWitsIotSource extends RichSourceFunction<JSONObject> { |
|
|
|
|
|
|
|
|
log.info("开始启动gizwits客户端..."); |
|
|
log.info("开始启动gizwits客户端..."); |
|
|
|
|
|
|
|
|
client = new OhMyNotiClientImpl() |
|
|
|
|
|
.addLoginAuthorizes(getSnotiLoginCredential()) |
|
|
|
|
|
.setCallback(getSnotiCallback()) |
|
|
|
|
|
.setSnotiConfig(getGizWitsConfig()); |
|
|
|
|
|
|
|
|
client = |
|
|
|
|
|
new OhMyNotiClientImpl() |
|
|
|
|
|
.setCredentials(getSnotiLoginCredential()) |
|
|
|
|
|
.setCallback(getSnotiCallback()) |
|
|
|
|
|
.setSnotiConfig(getSnotiConfig()); |
|
|
|
|
|
|
|
|
//启动client |
|
|
//启动client |
|
|
client.doStart(); |
|
|
client.doStart(); |
|
|
@ -57,19 +58,19 @@ public class GizWitsIotSource extends RichSourceFunction<JSONObject> { |
|
|
RUNNING = false; |
|
|
RUNNING = false; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private AuthorizationData[] getSnotiLoginCredential() { |
|
|
|
|
|
|
|
|
private List<Credential> getSnotiLoginCredential() { |
|
|
List<GizWitsProperties.Item> itemList = PROPERTIES.getItemList(); |
|
|
List<GizWitsProperties.Item> itemList = PROPERTIES.getItemList(); |
|
|
Preconditions.checkArgument(!CollectionUtils.isEmpty(itemList), "未配置gizwits登陆信息, gizwits初始化失败"); |
|
|
|
|
|
|
|
|
Preconditions.checkArgument(!CollectionUtils.isEmpty(itemList), "未配置snoti登陆信息, snoti初始化失败"); |
|
|
return itemList.stream() |
|
|
return itemList.stream() |
|
|
.map(it -> new AuthorizationData() |
|
|
|
|
|
//监听所有推送事件 |
|
|
|
|
|
.setProtocolType(ProtocolType.V2) |
|
|
|
|
|
.addEvents(NotiReqPushEvents.values()) |
|
|
|
|
|
.setSubkey(it.getSubKey()) |
|
|
|
|
|
.setAuth_id(it.getAuthId()) |
|
|
|
|
|
.setAuth_secret(it.getAuthSecret()) |
|
|
|
|
|
.setProduct_key(it.getProductKey())) |
|
|
|
|
|
.toArray(AuthorizationData[]::new); |
|
|
|
|
|
|
|
|
.map(it -> |
|
|
|
|
|
Credential.builder() |
|
|
|
|
|
.events(Arrays.asList(NotiReqPushEvents.values())) |
|
|
|
|
|
.subkey(it.getSubKey()) |
|
|
|
|
|
.authId(it.getAuthId()) |
|
|
|
|
|
.authSecret(it.getAuthSecret()) |
|
|
|
|
|
.productKey(it.getProductKey()) |
|
|
|
|
|
.build()) |
|
|
|
|
|
.collect(Collectors.toList()); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private SnotiCallback getSnotiCallback() { |
|
|
private SnotiCallback getSnotiCallback() { |
|
|
@ -84,19 +85,16 @@ public class GizWitsIotSource extends RichSourceFunction<JSONObject> { |
|
|
public void disconnected() { |
|
|
public void disconnected() { |
|
|
log.warn("gizwits客户端连接断开, 即将尝试重连..."); |
|
|
log.warn("gizwits客户端连接断开, 即将尝试重连..."); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
public void reload(AuthorizationData... authorizationData) { |
|
|
|
|
|
log.info("gizwits重载登录信息[{}]...", Stream.of(authorizationData).map(AuthorizationData::toString) |
|
|
|
|
|
.collect(Collectors.joining(","))); |
|
|
|
|
|
} |
|
|
|
|
|
}; |
|
|
}; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private SnotiConfig getGizWitsConfig() { |
|
|
|
|
|
|
|
|
private SnotiConfig getSnotiConfig() { |
|
|
return new SnotiConfig() |
|
|
return new SnotiConfig() |
|
|
.setAutomaticConfirmation(PROPERTIES.getAutomaticConfirmation()) |
|
|
.setAutomaticConfirmation(PROPERTIES.getAutomaticConfirmation()) |
|
|
|
|
|
.setWithMetrics(true) |
|
|
|
|
|
.setEnableCheckNoData(false) |
|
|
.setHost(PROPERTIES.getHost()) |
|
|
.setHost(PROPERTIES.getHost()) |
|
|
.setPort(PROPERTIES.getPort()); |
|
|
.setPort(PROPERTIES.getPort()); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} |
|
|
} |