You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

278 lines
7.3 KiB

import 'package:agora_rtm/agora_rtm.dart';
/// 声网 RTM 管理器,聚合常用的客户端与 StreamChannel 操作
class RTMManager {
RTMManager._internal();
static final RTMManager _instance = RTMManager._internal();
factory RTMManager() => _instance;
static RTMManager get instance => _instance;
RtmClient? _client;
bool _isInitialized = false;
bool _isLoggedIn = false;
String? _currentAppId;
String? _currentUserId;
final Map<String, StreamChannel> _streamChannels = {};
/// 监听 link state
void Function(LinkStateEvent event)? onLinkStateEvent;
/// 监听消息事件(频道 / 主题)
void Function(MessageEvent event)? onMessageEvent;
/// 监听 presence 事件
void Function(PresenceEvent event)? onPresenceEvent;
/// 监听 topic 事件
void Function(TopicEvent event)? onTopicEvent;
/// 监听 lock 事件
void Function(LockEvent event)? onLockEvent;
/// 监听 storage 事件
void Function(StorageEvent event)? onStorageEvent;
/// 监听 token 事件
void Function(TokenEvent event)? onTokenEvent;
/// 统一错误通知
void Function(RtmStatus status)? onOperationError;
/// 创建/初始化 RTM Client
Future<bool> initialize({
required String appId,
required String userId,
RtmConfig? config,
}) async {
if (_isInitialized &&
_client != null &&
_currentAppId == appId &&
_currentUserId == userId) {
return true;
}
await dispose();
final (status, client) = await RTM(appId, userId, config: config);
if (status.error) {
onOperationError?.call(status);
return false;
}
_client = client;
_currentAppId = appId;
_currentUserId = userId;
_isInitialized = true;
_registerClientListeners();
return true;
}
/// 登录 RTM
Future<bool> login(String token) async {
_ensureInitialized();
final (status, _) = await _client!.login(token);
final ok = _handleStatus(status);
if (ok) {
_isLoggedIn = true;
}
return ok;
}
/// 登出 RTM
Future<void> logout() async {
if (!_isInitialized || _client == null || !_isLoggedIn) return;
await leaveAllStreamChannels();
final (status, _) = await _client!.logout();
_handleStatus(status);
_isLoggedIn = false;
}
/// 刷新 Token
Future<bool> renewToken(String token) async {
_ensureInitialized();
final (status, _) = await _client!.renewToken(token);
return _handleStatus(status);
}
/// 设置底层参数(JSON)
Future<bool> setParameters(String paramsJson) async {
_ensureInitialized();
final status = await _client!.setParameters(paramsJson);
return _handleStatus(status);
}
/// 发布频道文本消息
Future<bool> publishChannelMessage({
required String channelName,
required String message,
RtmChannelType channelType = RtmChannelType.message,
String? customType,
bool storeInHistory = false,
}) async {
_ensureInitialized();
final (status, _) = await _client!.publish(
channelName,
message,
channelType: channelType,
customType: customType,
storeInHistory: storeInHistory,
);
return _handleStatus(status);
}
/// 创建/加入 StreamChannel
Future<bool> joinStreamChannel(
String channelName, {
String? token,
bool withMetadata = false,
bool withPresence = true,
bool withLock = false,
bool beQuiet = false,
}) async {
_ensureInitialized();
StreamChannel? channel = _streamChannels[channelName];
if (channel == null) {
final (createStatus, createdChannel) = await _client!.createStreamChannel(
channelName,
);
if (!_handleStatus(createStatus) || createdChannel == null) {
return false;
}
channel = createdChannel;
_streamChannels[channelName] = channel;
}
final (status, _) = await channel.join(
token: token,
withMetadata: withMetadata,
withPresence: withPresence,
withLock: withLock,
beQuiet: beQuiet,
);
return _handleStatus(status);
}
/// 离开 StreamChannel
Future<void> leaveStreamChannel(String channelName) async {
final channel = _streamChannels[channelName];
if (channel == null) return;
final (status, _) = await channel.leave();
_handleStatus(status);
await channel.release();
_streamChannels.remove(channelName);
}
/// 离开全部 StreamChannel
Future<void> leaveAllStreamChannels() async {
final names = _streamChannels.keys.toList();
for (final name in names) {
await leaveStreamChannel(name);
}
}
/// 加入主题
Future<bool> joinTopic({
required String channelName,
required String topic,
RtmMessageQos qos = RtmMessageQos.unordered,
RtmMessagePriority priority = RtmMessagePriority.normal,
String meta = '',
bool syncWithMedia = false,
}) async {
final channel = await _requireChannel(channelName);
final (status, _) = await channel.joinTopic(
topic,
qos: qos,
priority: priority,
meta: meta,
syncWithMedia: syncWithMedia,
);
return _handleStatus(status);
}
/// 发送主题消息
Future<bool> publishTopicMessage({
required String channelName,
required String topic,
required String message,
int sendTs = 0,
String? customType,
}) async {
final channel = await _requireChannel(channelName);
final (status, _) = await channel.publishTextMessage(
topic,
message,
sendTs: sendTs,
customType: customType,
);
return _handleStatus(status);
}
/// 释放 RTM Client
Future<void> dispose() async {
await leaveAllStreamChannels();
if (_client != null && _isLoggedIn) {
final (status, _) = await _client!.logout();
_handleStatus(status);
}
await _client?.release();
_client = null;
_isInitialized = false;
_isLoggedIn = false;
_currentAppId = null;
_currentUserId = null;
_streamChannels.clear();
}
bool get isInitialized => _isInitialized;
bool get isLoggedIn => _isLoggedIn;
String? get currentUserId => _currentUserId;
Iterable<String> get joinedStreamChannels => _streamChannels.keys;
void _registerClientListeners() {
if (_client == null) return;
_client!.addListener(
linkState: (event) => onLinkStateEvent?.call(event),
message: (event) => onMessageEvent?.call(event),
presence: (event) => onPresenceEvent?.call(event),
topic: (event) => onTopicEvent?.call(event),
lock: (event) => onLockEvent?.call(event),
storage: (event) => onStorageEvent?.call(event),
token: (event) => onTokenEvent?.call(event),
);
}
Future<StreamChannel> _requireChannel(String channelName) async {
if (!_streamChannels.containsKey(channelName)) {
final ok = await joinStreamChannel(channelName);
if (!ok) {
throw Exception('加入 StreamChannel 失败:$channelName');
}
}
final channel = _streamChannels[channelName];
if (channel == null) {
throw Exception('StreamChannel 不存在:$channelName');
}
return channel;
}
bool _handleStatus(RtmStatus status) {
if (status.error) {
onOperationError?.call(status);
return false;
}
return true;
}
void _ensureInitialized() {
if (!_isInitialized || _client == null) {
throw Exception('RTM Client 未初始化,请先调用 initialize');
}
}
}