MatrixSDK集成技能Skill robius-matrix-integration

这个技能用于将Matrix SDK集成到Makepad应用程序中,实现聊天客户端的实时更新、时间线管理、房间操作等功能,基于Robrix和Moly代码库的最佳实践。关键词:Matrix SDK, Makepad, 集成, 聊天客户端, 滑动同步, 时间线, 实时更新, 异步操作, 架构设计

架构设计 0 次安装 0 次浏览 更新于 3/13/2026

name: robius-matrix-integration description: | 关键:用于Matrix SDK与Makepad的集成。触发条件: Matrix SDK,滑动同步,MatrixRequest,时间线, matrix-sdk,matrix client,robrix,matrix room, Matrix 集成,聊天客户端

Robius Matrix SDK 集成技能

基于Robrix和Moly代码库,将外部API集成到Makepad应用程序中的最佳实践。

源代码库:

  • Robrix:Matrix SDK 集成 - 滑动同步,时间线订阅,实时更新
  • Moly:OpenAI/LLM API 集成 - SSE 流,MCP 协议,多供应商支持

触发条件

在以下情况下使用此技能:

  • 将Matrix SDK集成到Makepad中
  • 使用Makepad构建Matrix客户端
  • 实现Matrix功能(房间,时间线,消息)
  • 在UI中处理Matrix SDK异步操作
  • 关键词:matrix-sdk,matrix client,robrix,matrix timeline,matrix room,滑动同步

概述

Robrix使用matrix-sdkmatrix-sdk-uicrate连接到Matrix homeserver。关键架构决策:

  1. 滑动同步:使用原生滑动同步进行高效房间列表更新
  2. 分离运行时:Tokio运行时运行Matrix操作,Makepad处理UI
  3. 请求/响应模式:UI发送请求,接收动作/更新
  4. 每个房间的后台任务:每个房间有专用的时间线订阅者任务

MatrixRequest 模式

请求枚举定义

/// 所有可以发送到Matrix工作任务的异步请求
pub enum MatrixRequest {
    /// 登录请求
    Login(LoginRequest),
    Logout { is_desktop: bool },

    /// 时间线操作
    PaginateRoomTimeline {
        room_id: OwnedRoomId,
        num_events: u16,
        direction: PaginationDirection,
    },
    SendMessage {
        room_id: OwnedRoomId,
        message: RoomMessageEventContent,
        replied_to: Option<Reply>,
    },
    EditMessage {
        room_id: OwnedRoomId,
        timeline_event_item_id: TimelineEventItemId,
        edited_content: EditedContent,
    },
    RedactMessage {
        room_id: OwnedRoomId,
        timeline_event_id: TimelineEventItemId,
        reason: Option<String>,
    },

    /// 房间操作
    JoinRoom { room_id: OwnedRoomId },
    LeaveRoom { room_id: OwnedRoomId },
    GetRoomMembers {
        room_id: OwnedRoomId,
        memberships: RoomMemberships,
        local_only: bool,
    },

    /// 用户操作
    GetUserProfile {
        user_id: OwnedUserId,
        room_id: Option<OwnedRoomId>,
        local_only: bool,
    },
    IgnoreUser {
        ignore: bool,
        room_member: RoomMember,
        room_id: OwnedRoomId,
    },

    /// 媒体操作
    FetchAvatar {
        mxc_uri: OwnedMxcUri,
        on_fetched: fn(AvatarUpdate),
    },
    FetchMedia {
        media_request: MediaRequestParameters,
        on_fetched: OnMediaFetchedFn,
        destination: MediaCacheEntryRef,
        update_sender: Option<crossbeam_channel::Sender<TimelineUpdate>>,
    },

    /// 打字/已读指示器
    SendTypingNotice { room_id: OwnedRoomId, typing: bool },
    ReadReceipt { room_id: OwnedRoomId, event_id: OwnedEventId },
    FullyReadReceipt { room_id: OwnedRoomId, event_id: OwnedEventId },

    /// 反应
    ToggleReaction {
        room_id: OwnedRoomId,
        timeline_event_id: TimelineEventItemId,
        reaction: String,
    },

    /// 订阅
    SubscribeToTypingNotices { room_id: OwnedRoomId, subscribe: bool },
    SubscribeToPinnedEvents { room_id: OwnedRoomId, subscribe: bool },
}

提交模式

static REQUEST_SENDER: Mutex<Option<UnboundedSender<MatrixRequest>>> = Mutex::new(None);

/// 从UI线程提交请求到异步运行时
pub fn submit_async_request(req: MatrixRequest) {
    if let Some(sender) = REQUEST_SENDER.lock().unwrap().as_ref() {
        sender.send(req).expect("BUG: matrix worker task receiver died!");
    }
}

// UI中的使用示例
submit_async_request(MatrixRequest::SendMessage {
    room_id: room_id.clone(),
    message: RoomMessageEventContent::text_plain(&text),
    replied_to: self.reply_to.take(),
});

工作任务处理器

async fn matrix_worker_task(
    mut request_receiver: UnboundedReceiver<MatrixRequest>,
    login_sender: Sender<LoginRequest>,
) -> Result<()> {
    while let Some(request) = request_receiver.recv().await {
        match request {
            MatrixRequest::PaginateRoomTimeline { room_id, num_events, direction } => {
                let (timeline, sender) = {
                    let rooms = ALL_JOINED_ROOMS.lock().unwrap();
                    let Some(room_info) = rooms.get(&room_id) else {
                        continue;  // 房间尚未准备就绪
                    };
                    (room_info.timeline.clone(), room_info.update_sender.clone())
                };

                // 为此操作生成专用任务
                Handle::current().spawn(async move {
                    // 通知UI分页正在开始
                    sender.send(TimelineUpdate::PaginationRunning(direction)).unwrap();
                    SignalToUI::set_ui_signal();

                    // 执行分页
                    let res = if direction == PaginationDirection::Forwards {
                        timeline.paginate_forwards(num_events).await
                    } else {
                        timeline.paginate_backwards(num_events).await
                    };

                    // 发送结果到UI
                    match res {
                        Ok(fully_paginated) => {
                            sender.send(TimelineUpdate::PaginationIdle {
                                fully_paginated,
                                direction,
                            }).unwrap();
                        }
                        Err(error) => {
                            sender.send(TimelineUpdate::PaginationError {
                                error,
                                direction,
                            }).unwrap();
                        }
                    }
                    SignalToUI::set_ui_signal();
                });
            }

            MatrixRequest::JoinRoom { room_id } => {
                let Some(client) = get_client() else { continue };

                Handle::current().spawn(async move {
                    let result_action = if let Some(room) = client.get_room(&room_id) {
                        match room.join().await {
                            Ok(()) => JoinRoomResultAction::Joined { room_id },
                            Err(e) => JoinRoomResultAction::Failed { room_id, error: e },
                        }
                    } else {
                        match client.join_room_by_id(&room_id).await {
                            Ok(_) => JoinRoomResultAction::Joined { room_id },
                            Err(e) => JoinRoomResultAction::Failed { room_id, error: e },
                        }
                    };
                    Cx::post_action(result_action);
                });
            }
            // ... 处理其他请求
        }
    }
    Ok(())
}

时间线更新

TimelineUpdate 枚举

pub enum TimelineUpdate {
    /// 新项添加到时间线
    NewItems {
        new_items: Vector<Arc<TimelineItem>>,
        changed_indices: BTreeSet<usize>,
        is_append: bool,
    },
    /// 分页状态变化
    PaginationRunning(PaginationDirection),
    PaginationIdle {
        fully_paginated: bool,
        direction: PaginationDirection,
    },
    PaginationError {
        error: Error,
        direction: PaginationDirection,
    },
    /// 消息编辑结果
    MessageEdited {
        timeline_event_id: TimelineEventItemId,
        result: Result<(), Error>,
    },
    /// 房间成员已获取
    RoomMembersListFetched {
        members: Vec<RoomMember>,
        sort: PrecomputedMemberSort,
        is_local_fetch: bool,
    },
    /// 未读计数已更新
    NewUnreadMessagesCount(UnreadMessageCount),
    /// 用户权限级别已获取
    UserPowerLevels(UserPowerLevels),
}

每个房间的更新流

struct JoinedRoomDetails {
    room_id: OwnedRoomId,
    timeline: Arc<Timeline>,
    timeline_update_sender: crossbeam_channel::Sender<TimelineUpdate>,
    timeline_subscriber_handler_task: JoinHandle<()>,
    typing_notice_subscriber: Option<EventHandlerDropGuard>,
}

impl Drop for JoinedRoomDetails {
    fn drop(&mut self) {
        // 清理房间关闭时的后台任务
        self.timeline_subscriber_handler_task.abort();
        drop(self.typing_notice_subscriber.take());
    }
}

// 为房间生成订阅者
async fn spawn_timeline_subscriber(
    room_id: OwnedRoomId,
    timeline: Arc<Timeline>,
    sender: crossbeam_channel::Sender<TimelineUpdate>,
) -> JoinHandle<()> {
    tokio::spawn(async move {
        let (items, mut stream) = timeline.subscribe().await;

        // 发送初始项
        sender.send(TimelineUpdate::NewItems {
            new_items: items,
            changed_indices: BTreeSet::new(),
            is_append: false,
        }).unwrap();
        SignalToUI::set_ui_signal();

        // 监听更新
        while let Some(diff) = stream.next().await {
            let update = process_timeline_diff(diff);
            sender.send(update).unwrap();
            SignalToUI::set_ui_signal();
        }
    })
}

在UI中处理更新

impl Widget for RoomScreen {
    fn handle_event(&mut self, cx: &mut Cx, event: &Event, scope: &mut Scope) {
        // 在Signal事件上轮询时间线更新
        if let Event::Signal = event {
            while let Ok(update) = self.timeline_state.update_receiver.try_recv() {
                match update {
                    TimelineUpdate::NewItems { new_items, changed_indices, is_append } => {
                        self.apply_new_items(cx, new_items, changed_indices, is_append);
                    }
                    TimelineUpdate::PaginationIdle { fully_paginated, direction } => {
                        self.set_pagination_idle(cx, direction, fully_paginated);
                    }
                    TimelineUpdate::PaginationError { error, direction } => {
                        self.show_pagination_error(cx, direction, &error);
                    }
                    // ... 处理其他更新
                }
            }
        }

        self.view.handle_event(cx, event, scope);
    }
}

房间列表更新

RoomsListUpdate 枚举

pub enum RoomsListUpdate {
    NotLoaded,
    LoadedRooms { max_rooms: Option<u32> },
    AddInvitedRoom(InvitedRoomInfo),
    AddJoinedRoom(JoinedRoomInfo),
    ClearRooms,
    UpdateLatestEvent {
        room_id: OwnedRoomId,
        timestamp: MilliSecondsSinceUnixEpoch,
        latest_message_text: String,
    },
    UpdateNumUnreadMessages {
        room_id: OwnedRoomId,
        unread_messages: UnreadMessageCount,
        unread_mentions: u64,
    },
    UpdateRoomName { new_room_name: RoomNameId },
    UpdateRoomAvatar { room_id: OwnedRoomId, avatar: FetchedRoomAvatar },
    RemoveRoom { room_id: OwnedRoomId, new_state: RoomState },
    Status { status: String },
    ScrollToRoom(OwnedRoomId),
}

static PENDING_ROOM_UPDATES: SegQueue<RoomsListUpdate> = SegQueue::new();

pub fn enqueue_rooms_list_update(update: RoomsListUpdate) {
    PENDING_ROOM_UPDATES.push(update);
    SignalToUI::set_ui_signal();
}

客户端构建模式

async fn build_client(
    homeserver_url: &str,
    data_dir: &Path,
) -> Result<(Client, ClientSessionPersisted)> {
    // 为此会话生成唯一子文件夹
    let db_subfolder = format!("db_{}", chrono::Local::now().format("%F_%H_%M_%S_%f"));
    let db_path = data_dir.join(db_subfolder);

    // 为加密生成随机密码
    let passphrase: String = {
        use rand::{Rng, thread_rng};
        thread_rng()
            .sample_iter(rand::distributions::Alphanumeric)
            .take(32)
            .map(char::from)
            .collect()
    };

    let client = Client::builder()
        .server_name_or_homeserver_url(homeserver_url)
        .sqlite_store(&db_path, Some(&passphrase))
        .sliding_sync_version_builder(VersionBuilder::DiscoverNative)
        .with_decryption_settings(DecryptionSettings {
            sender_device_trust_requirement: TrustRequirement::Untrusted,
        })
        .with_encryption_settings(EncryptionSettings {
            auto_enable_cross_signing: true,
            backup_download_strategy: BackupDownloadStrategy::OneShot,
            auto_enable_backups: true,
        })
        .request_config(
            RequestConfig::new().timeout(Duration::from_secs(60))
        )
        .build()
        .await?;

    Ok((client, ClientSessionPersisted { homeserver: homeserver_url.to_string(), db_path, passphrase }))
}

最佳实践

  1. 始终生成任务:不要阻塞工作任务接收循环
  2. 使用crossbeam通道进行每个房间的更新:比全局队列更高效
  3. 始终调用SignalToUI::set_ui_signal():在排队任何更新后
  4. 处理房间未就绪:跳过尚未在ALL_JOINED_ROOMS中的房间请求
  5. 在drop时清理:当房间关闭时中止后台任务
  6. 使用Cx::post_action获取结果:发布动作在App::handle_actions中处理
  7. 对高频率更新使用SegQueue:无锁用于房间列表更新

参考文件

  • references/matrix-client.md - Matrix客户端设置和登录模式(Robrix)
  • references/timeline-handling.md - Matrix时间线订阅模式(Robrix)
  • references/moly-api-integration.md - Moly API集成模式
    • OpenAI客户端与SSE流
    • 平台无关异步流
    • MCP(模型上下文协议)集成
    • 工具批准流程
    • 本地服务器的MolyClient
    • 多供应商支持的BotContext