name: robius-matrix-integration description: | 关键:用于Matrix SDK与Makepad的集成。触发条件: Matrix SDK,滑动同步,MatrixRequest,时间线, matrix-sdk,matrix client,robrix,matrix room, Matrix 集成,聊天客户端
Robius Matrix SDK 集成技能
基于Robrix和Moly代码库,将外部API集成到Makepad应用程序中的最佳实践。
源代码库:
- Robrix:Matrix SDK 集成 - 滑动同步,时间线订阅,实时更新
- Moly:OpenAI/LLM API 集成 - SSE 流,MCP 协议,多供应商支持
触发条件
在以下情况下使用此技能:
- 将Matrix SDK集成到Makepad中
- 使用Makepad构建Matrix客户端
- 实现Matrix功能(房间,时间线,消息)
- 在UI中处理Matrix SDK异步操作
- 关键词:matrix-sdk,matrix client,robrix,matrix timeline,matrix room,滑动同步
概述
Robrix使用matrix-sdk和matrix-sdk-uicrate连接到Matrix homeserver。关键架构决策:
- 滑动同步:使用原生滑动同步进行高效房间列表更新
- 分离运行时:Tokio运行时运行Matrix操作,Makepad处理UI
- 请求/响应模式:UI发送请求,接收动作/更新
- 每个房间的后台任务:每个房间有专用的时间线订阅者任务
MatrixRequest 模式
请求枚举定义
/// 所有可以发送到Matrix工作任务的异步请求
pub enum MatrixRequest {
/// 登录请求
Login(LoginRequest),
Logout { is_desktop: bool },
/// 时间线操作
PaginateRoomTimeline {
room_id: OwnedRoomId,
num_events: u16,
direction: PaginationDirection,
},
SendMessage {
room_id: OwnedRoomId,
message: RoomMessageEventContent,
replied_to: Option<Reply>,
},
EditMessage {
room_id: OwnedRoomId,
timeline_event_item_id: TimelineEventItemId,
edited_content: EditedContent,
},
RedactMessage {
room_id: OwnedRoomId,
timeline_event_id: TimelineEventItemId,
reason: Option<String>,
},
/// 房间操作
JoinRoom { room_id: OwnedRoomId },
LeaveRoom { room_id: OwnedRoomId },
GetRoomMembers {
room_id: OwnedRoomId,
memberships: RoomMemberships,
local_only: bool,
},
/// 用户操作
GetUserProfile {
user_id: OwnedUserId,
room_id: Option<OwnedRoomId>,
local_only: bool,
},
IgnoreUser {
ignore: bool,
room_member: RoomMember,
room_id: OwnedRoomId,
},
/// 媒体操作
FetchAvatar {
mxc_uri: OwnedMxcUri,
on_fetched: fn(AvatarUpdate),
},
FetchMedia {
media_request: MediaRequestParameters,
on_fetched: OnMediaFetchedFn,
destination: MediaCacheEntryRef,
update_sender: Option<crossbeam_channel::Sender<TimelineUpdate>>,
},
/// 打字/已读指示器
SendTypingNotice { room_id: OwnedRoomId, typing: bool },
ReadReceipt { room_id: OwnedRoomId, event_id: OwnedEventId },
FullyReadReceipt { room_id: OwnedRoomId, event_id: OwnedEventId },
/// 反应
ToggleReaction {
room_id: OwnedRoomId,
timeline_event_id: TimelineEventItemId,
reaction: String,
},
/// 订阅
SubscribeToTypingNotices { room_id: OwnedRoomId, subscribe: bool },
SubscribeToPinnedEvents { room_id: OwnedRoomId, subscribe: bool },
}
提交模式
static REQUEST_SENDER: Mutex<Option<UnboundedSender<MatrixRequest>>> = Mutex::new(None);
/// 从UI线程提交请求到异步运行时
pub fn submit_async_request(req: MatrixRequest) {
if let Some(sender) = REQUEST_SENDER.lock().unwrap().as_ref() {
sender.send(req).expect("BUG: matrix worker task receiver died!");
}
}
// UI中的使用示例
submit_async_request(MatrixRequest::SendMessage {
room_id: room_id.clone(),
message: RoomMessageEventContent::text_plain(&text),
replied_to: self.reply_to.take(),
});
工作任务处理器
async fn matrix_worker_task(
mut request_receiver: UnboundedReceiver<MatrixRequest>,
login_sender: Sender<LoginRequest>,
) -> Result<()> {
while let Some(request) = request_receiver.recv().await {
match request {
MatrixRequest::PaginateRoomTimeline { room_id, num_events, direction } => {
let (timeline, sender) = {
let rooms = ALL_JOINED_ROOMS.lock().unwrap();
let Some(room_info) = rooms.get(&room_id) else {
continue; // 房间尚未准备就绪
};
(room_info.timeline.clone(), room_info.update_sender.clone())
};
// 为此操作生成专用任务
Handle::current().spawn(async move {
// 通知UI分页正在开始
sender.send(TimelineUpdate::PaginationRunning(direction)).unwrap();
SignalToUI::set_ui_signal();
// 执行分页
let res = if direction == PaginationDirection::Forwards {
timeline.paginate_forwards(num_events).await
} else {
timeline.paginate_backwards(num_events).await
};
// 发送结果到UI
match res {
Ok(fully_paginated) => {
sender.send(TimelineUpdate::PaginationIdle {
fully_paginated,
direction,
}).unwrap();
}
Err(error) => {
sender.send(TimelineUpdate::PaginationError {
error,
direction,
}).unwrap();
}
}
SignalToUI::set_ui_signal();
});
}
MatrixRequest::JoinRoom { room_id } => {
let Some(client) = get_client() else { continue };
Handle::current().spawn(async move {
let result_action = if let Some(room) = client.get_room(&room_id) {
match room.join().await {
Ok(()) => JoinRoomResultAction::Joined { room_id },
Err(e) => JoinRoomResultAction::Failed { room_id, error: e },
}
} else {
match client.join_room_by_id(&room_id).await {
Ok(_) => JoinRoomResultAction::Joined { room_id },
Err(e) => JoinRoomResultAction::Failed { room_id, error: e },
}
};
Cx::post_action(result_action);
});
}
// ... 处理其他请求
}
}
Ok(())
}
时间线更新
TimelineUpdate 枚举
pub enum TimelineUpdate {
/// 新项添加到时间线
NewItems {
new_items: Vector<Arc<TimelineItem>>,
changed_indices: BTreeSet<usize>,
is_append: bool,
},
/// 分页状态变化
PaginationRunning(PaginationDirection),
PaginationIdle {
fully_paginated: bool,
direction: PaginationDirection,
},
PaginationError {
error: Error,
direction: PaginationDirection,
},
/// 消息编辑结果
MessageEdited {
timeline_event_id: TimelineEventItemId,
result: Result<(), Error>,
},
/// 房间成员已获取
RoomMembersListFetched {
members: Vec<RoomMember>,
sort: PrecomputedMemberSort,
is_local_fetch: bool,
},
/// 未读计数已更新
NewUnreadMessagesCount(UnreadMessageCount),
/// 用户权限级别已获取
UserPowerLevels(UserPowerLevels),
}
每个房间的更新流
struct JoinedRoomDetails {
room_id: OwnedRoomId,
timeline: Arc<Timeline>,
timeline_update_sender: crossbeam_channel::Sender<TimelineUpdate>,
timeline_subscriber_handler_task: JoinHandle<()>,
typing_notice_subscriber: Option<EventHandlerDropGuard>,
}
impl Drop for JoinedRoomDetails {
fn drop(&mut self) {
// 清理房间关闭时的后台任务
self.timeline_subscriber_handler_task.abort();
drop(self.typing_notice_subscriber.take());
}
}
// 为房间生成订阅者
async fn spawn_timeline_subscriber(
room_id: OwnedRoomId,
timeline: Arc<Timeline>,
sender: crossbeam_channel::Sender<TimelineUpdate>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let (items, mut stream) = timeline.subscribe().await;
// 发送初始项
sender.send(TimelineUpdate::NewItems {
new_items: items,
changed_indices: BTreeSet::new(),
is_append: false,
}).unwrap();
SignalToUI::set_ui_signal();
// 监听更新
while let Some(diff) = stream.next().await {
let update = process_timeline_diff(diff);
sender.send(update).unwrap();
SignalToUI::set_ui_signal();
}
})
}
在UI中处理更新
impl Widget for RoomScreen {
fn handle_event(&mut self, cx: &mut Cx, event: &Event, scope: &mut Scope) {
// 在Signal事件上轮询时间线更新
if let Event::Signal = event {
while let Ok(update) = self.timeline_state.update_receiver.try_recv() {
match update {
TimelineUpdate::NewItems { new_items, changed_indices, is_append } => {
self.apply_new_items(cx, new_items, changed_indices, is_append);
}
TimelineUpdate::PaginationIdle { fully_paginated, direction } => {
self.set_pagination_idle(cx, direction, fully_paginated);
}
TimelineUpdate::PaginationError { error, direction } => {
self.show_pagination_error(cx, direction, &error);
}
// ... 处理其他更新
}
}
}
self.view.handle_event(cx, event, scope);
}
}
房间列表更新
RoomsListUpdate 枚举
pub enum RoomsListUpdate {
NotLoaded,
LoadedRooms { max_rooms: Option<u32> },
AddInvitedRoom(InvitedRoomInfo),
AddJoinedRoom(JoinedRoomInfo),
ClearRooms,
UpdateLatestEvent {
room_id: OwnedRoomId,
timestamp: MilliSecondsSinceUnixEpoch,
latest_message_text: String,
},
UpdateNumUnreadMessages {
room_id: OwnedRoomId,
unread_messages: UnreadMessageCount,
unread_mentions: u64,
},
UpdateRoomName { new_room_name: RoomNameId },
UpdateRoomAvatar { room_id: OwnedRoomId, avatar: FetchedRoomAvatar },
RemoveRoom { room_id: OwnedRoomId, new_state: RoomState },
Status { status: String },
ScrollToRoom(OwnedRoomId),
}
static PENDING_ROOM_UPDATES: SegQueue<RoomsListUpdate> = SegQueue::new();
pub fn enqueue_rooms_list_update(update: RoomsListUpdate) {
PENDING_ROOM_UPDATES.push(update);
SignalToUI::set_ui_signal();
}
客户端构建模式
async fn build_client(
homeserver_url: &str,
data_dir: &Path,
) -> Result<(Client, ClientSessionPersisted)> {
// 为此会话生成唯一子文件夹
let db_subfolder = format!("db_{}", chrono::Local::now().format("%F_%H_%M_%S_%f"));
let db_path = data_dir.join(db_subfolder);
// 为加密生成随机密码
let passphrase: String = {
use rand::{Rng, thread_rng};
thread_rng()
.sample_iter(rand::distributions::Alphanumeric)
.take(32)
.map(char::from)
.collect()
};
let client = Client::builder()
.server_name_or_homeserver_url(homeserver_url)
.sqlite_store(&db_path, Some(&passphrase))
.sliding_sync_version_builder(VersionBuilder::DiscoverNative)
.with_decryption_settings(DecryptionSettings {
sender_device_trust_requirement: TrustRequirement::Untrusted,
})
.with_encryption_settings(EncryptionSettings {
auto_enable_cross_signing: true,
backup_download_strategy: BackupDownloadStrategy::OneShot,
auto_enable_backups: true,
})
.request_config(
RequestConfig::new().timeout(Duration::from_secs(60))
)
.build()
.await?;
Ok((client, ClientSessionPersisted { homeserver: homeserver_url.to_string(), db_path, passphrase }))
}
最佳实践
- 始终生成任务:不要阻塞工作任务接收循环
- 使用crossbeam通道进行每个房间的更新:比全局队列更高效
- 始终调用SignalToUI::set_ui_signal():在排队任何更新后
- 处理房间未就绪:跳过尚未在
ALL_JOINED_ROOMS中的房间请求 - 在drop时清理:当房间关闭时中止后台任务
- 使用Cx::post_action获取结果:发布动作在App::handle_actions中处理
- 对高频率更新使用SegQueue:无锁用于房间列表更新
参考文件
references/matrix-client.md- Matrix客户端设置和登录模式(Robrix)references/timeline-handling.md- Matrix时间线订阅模式(Robrix)references/moly-api-integration.md- Moly API集成模式- OpenAI客户端与SSE流
- 平台无关异步流
- MCP(模型上下文协议)集成
- 工具批准流程
- 本地服务器的MolyClient
- 多供应商支持的BotContext