name: p2p-networking description: 使用通用组件构建去中心化Guts网络的点对点网络模式
Guts的P2P网络技能
您正在为一个去中心化代码协作平台实现点对点网络。
Commonware P2P概述
commonware-p2p crate提供经过身份验证、加密的节点通信。
网络架构
┌─────────────────────────────────────────────────────────────┐
│ Guts P2P网络 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 节点A │────│ 节点B │────│ 节点C │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └───────────────┼───────────────┘ │
│ │ │
│ ┌────┴────┐ │
│ │ 节点D │ │
│ └─────────┘ │
│ │
│ 协议: Noise_XX + Ed25519 │
│ 传输: QUIC / TCP │
│ 发现: DHT + 引导节点 │
│ │
└─────────────────────────────────────────────────────────────┘
消息类型
use serde::{Deserialize, Serialize};
use commonware_codec::Codec;
#[derive(Debug, Clone, Serialize, Deserialize, Codec)]
pub enum Message {
// 握手
Hello { version: u32, capabilities: Vec<Capability> },
HelloAck { version: u32, capabilities: Vec<Capability> },
// 仓库同步
GetRefs { repository: RepositoryId },
Refs { repository: RepositoryId, refs: Vec<Ref> },
GetObjects { repository: RepositoryId, objects: Vec<ObjectId> },
Objects { repository: RepositoryId, objects: Vec<Object> },
// 公告
NewCommit { repository: RepositoryId, commit: CommitId },
NewRepository { repository: RepositoryInfo },
// 流言传播
Gossip { topic: Topic, data: Vec<u8> },
// 保活
Ping { nonce: u64 },
Pong { nonce: u64 },
}
节点管理
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct PeerManager {
peers: Arc<RwLock<HashMap<PeerId, PeerState>>>,
config: PeerConfig,
}
#[derive(Debug)]
pub struct PeerState {
pub id: PeerId,
pub address: SocketAddr,
pub connection: Connection,
pub last_seen: Instant,
pub repositories: HashSet<RepositoryId>,
pub capabilities: Vec<Capability>,
}
impl PeerManager {
pub async fn connect(&self, addr: SocketAddr) -> Result<PeerId> {
// 建立加密连接
let connection = Connection::connect(addr, &self.config.keypair).await?;
// 交换hello消息
let peer_info = self.handshake(&connection).await?;
// 存储节点状态
let peer_id = peer_info.id.clone();
self.peers.write().await.insert(peer_id.clone(), PeerState {
id: peer_id.clone(),
address: addr,
connection,
last_seen: Instant::now(),
repositories: HashSet::new(),
capabilities: peer_info.capabilities,
});
Ok(peer_id)
}
pub async fn broadcast(&self, message: Message) -> Result<()> {
let peers = self.peers.read().await;
let futures: Vec<_> = peers.values()
.map(|peer| peer.connection.send(message.clone()))
.collect();
futures::future::try_join_all(futures).await?;
Ok(())
}
}
流言传播协议
use std::collections::HashSet;
pub struct GossipProtocol {
seen_messages: HashSet<MessageId>,
fanout: usize,
peer_manager: Arc<PeerManager>,
}
impl GossipProtocol {
pub async fn broadcast(&mut self, topic: Topic, data: Vec<u8>) -> Result<()> {
let message_id = MessageId::from_content(&topic, &data);
// 避免重复广播
if !self.seen_messages.insert(message_id.clone()) {
return Ok(());
}
// 随机选择节点
let peers = self.peer_manager.random_peers(self.fanout).await;
// 发送给选中的节点
for peer in peers {
peer.send(Message::Gossip {
topic: topic.clone(),
data: data.clone(),
}).await?;
}
Ok(())
}
pub async fn handle_gossip(&mut self, peer: PeerId, message: Message) -> Result<()> {
if let Message::Gossip { topic, data } = message {
let message_id = MessageId::from_content(&topic, &data);
// 新消息,处理和重新广播
if self.seen_messages.insert(message_id) {
self.process_message(topic.clone(), data.clone()).await?;
self.broadcast(topic, data).await?;
}
}
Ok(())
}
}
仓库同步
pub struct RepoSync {
peer_manager: Arc<PeerManager>,
storage: Arc<Storage>,
}
impl RepoSync {
pub async fn sync_repository(&self, repo_id: RepositoryId) -> Result<()> {
// 查找拥有此仓库的节点
let peers = self.peer_manager
.peers_with_repository(&repo_id)
.await;
if peers.is_empty() {
return Err(SyncError::NoPeers);
}
// 从节点获取引用
let local_refs = self.storage.get_refs(&repo_id).await?;
for peer in peers {
let remote_refs = self.fetch_refs(&peer, &repo_id).await?;
// 查找缺失的对象
let missing = self.diff_refs(&local_refs, &remote_refs);
if !missing.is_empty() {
// 获取缺失的对象
let objects = self.fetch_objects(&peer, &repo_id, missing).await?;
// 存储对象
for object in objects {
self.storage.put_object(&repo_id, object).await?;
}
}
}
Ok(())
}
}
连接配置
pub struct NetworkConfig {
/// 监听地址用于传入连接
pub listen_addr: SocketAddr,
/// 引导节点用于初始节点发现
pub bootstrap_nodes: Vec<SocketAddr>,
/// 最大并发连接数
pub max_connections: usize,
/// 连接超时
pub connection_timeout: Duration,
/// 保活间隔
pub keepalive_interval: Duration,
/// 节点密钥对用于身份验证
pub keypair: Ed25519Keypair,
}
impl Default for NetworkConfig {
fn default() -> Self {
Self {
listen_addr: "0.0.0.0:9000".parse().unwrap(),
bootstrap_nodes: vec![],
max_connections: 50,
connection_timeout: Duration::from_secs(10),
keepalive_interval: Duration::from_secs(30),
keypair: Ed25519Keypair::generate(),
}
}
}
安全考虑
- 身份验证: 所有节点通过Ed25519进行身份验证
- 加密: 所有流量使用Noise协议加密
- 速率限制: 限制每个节点的消息数量以防止DoS攻击
- 节点评分: 跟踪节点行为,断开不良行为者
- 消息验证: 在处理前验证所有消息