VectorDatabasePatterns VectorDatabasePatterns

本技能涉及向量数据库的使用,包括Pinecone、Qdrant、Weaviate等,以及嵌入策略、相似性搜索、性能优化和生产环境部署。关键词包括向量数据库、嵌入、相似性搜索、性能优化。

机器学习 0 次安装 0 次浏览 更新于 3/5/2026

向量数据库模式

概览

向量数据库是专门设计的数据库,用于高效存储、索引和查询高维向量。它们通过使用各种距离度量来寻找与查询向量“最接近”的向量,实现相似性搜索。这项技能涵盖了Pinecone、Qdrant、Weaviate、嵌入策略、相似性搜索、性能优化和生产考虑。

前提条件

  • 理解向量和嵌入
  • 了解机器学习概念
  • 熟悉Python或TypeScript
  • 理解相似性度量(余弦、欧几里得、点积)
  • 基本的数据库概念

核心概念

向量数据库基础

  • 向量:数据(文本、图像、音频)在高维空间的数值表示
  • 嵌入:由机器学习模型生成的向量,捕获语义含义
  • 距离度量:向量之间相似性的度量(余弦、欧几里得、点积)
  • 索引:使快速相似性搜索成为可能的数据结构
  • 元数据:与向量关联的附加信息,用于过滤

向量数据库类型

  • Pinecone:托管服务,易于设置,适合生产环境
  • Qdrant:开源,自托管选项,灵活
  • Weaviate:开源,GraphQL API,适合多模态

使用案例

  • 语义搜索(查找相似的文档、产品、图像)
  • 推荐系统
  • 异常检测
  • 自然语言处理任务
  • 计算机视觉应用
  • 个性化引擎
  • RAG(检索增强生成)的知识检索

实施指南

Pinecone

设置和索引

# 安装Pinecone客户端
# pip install pinecone-client

import pinecone
from pinecone import Pinecone, ServerlessSpec

# 初始化Pinecone
pc = Pinecone(api_key="your-api-key")

# 创建索引
pc.create_index(
    name="my-index",
    dimension=1536,  # OpenAI嵌入维度
    metric="cosine",  # 或 "euclidean", "dotproduct"
    spec=ServerlessSpec(
        cloud="aws",
        region="us-east-1"
    )
)

# 连接到索引
index = pc.Index("my-index")

# 检查索引统计
stats = index.describe_index_stats()
print(f"总向量数: {stats['total_vector_count']}")
print(f"维度: {stats['dimension']}")

插入向量

# 插入单个向量
index.upsert(
    vectors=[
        {
            "id": "doc1",
            "values": [0.1, 0.2, 0.3, ...],  # 1536维向量
            "metadata": {
                "title": "文档1",
                "category": "技术",
                "date": "2024-01-01"
            }
        }
    ]
)

# 插入多个向量
index.upsert(
    vectors=[
        {
            "id": "doc1",
            "values": vector1,
            "metadata": {"title": "文档1", "category": "技术"}
        },
        {
            "id": "doc2",
            "values": vector2,
            "metadata": {"title": "文档2", "category": "科学"}
        },
        {
            "id": "doc3",
            "values": vector3,
            "metadata": {"title": "文档3", "category": "技术"}
        }
    ],
    namespace="documents"
)

# 批量插入
from tqdm import tqdm
def upsert_in_batches(vectors, batch_size=100):
    for i in tqdm(range(0, len(vectors), batch_size)):
        batch = vectors[i:i + batch_size]
        index.upsert(vectors=batch)

查询

# 基本相似性搜索
results = index.query(
    vector=query_vector,
    top_k=10,
    include_metadata=True,
    include_values=False
)

for match in results['matches']:
    print(f"ID: {match['id']}, 得分: {match['score']}")
    print(f"元数据: {match['metadata']}")

# 查询带命名空间
results = index.query(
    vector=query_vector,
    top_k=10,
    namespace="documents",
    include_metadata=True
)

# 查询带过滤器
results = index.query(
    vector=query_vector,
    top_k=10,
    filter={
        "category": {"$eq": "技术"},
        "date": {"$gte": "2024-01-01"}
    },
    include_metadata=True
)

# 查询带复杂过滤器
results = index.query(
    vector=query_vector,
    top_k=10,
    filter={
        "$or": [
            {"category": {"$eq": "技术"}},
            {"category": {"$eq": "科学"}}
        ],
        "date": {"$gte": "2024-01-01"}
    },
    include_metadata=True
)

删除向量

# 删除单个向量
index.delete(ids=["doc1"])

# 删除多个向量
index.delete(ids=["doc1", "doc2", "doc3"])

# 删除命名空间中的所有向量
index.delete(delete_all=True, namespace="documents")

# 按过滤器删除
index.delete(
    filter={
        "category": {"$eq": "旧"},
        "date": {"$lt": "2023-01-01"}
    },
    namespace="documents"
)

Qdrant

集合和点

# 安装Qdrant客户端
# pip install qdrant-client

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct

# 初始化Qdrant客户端
client = QdrantClient(url="http://localhost:6333")

# 创建集合
client.create_collection(
    collection_name="documents",
    vectors_config=VectorParams(
        size=1536,
        distance=Distance.COSINE  # 或Distance.EUCLID, Distance.DOT
    )
)

# 创建具有多个向量的集合
client.create_collection(
    collection_name="multimodal",
    vectors_config={
        "text": VectorParams(size=1536, distance=Distance.COSINE),
        "image": VectorParams(size=512, distance=Distance.EUCLID)
    }
)

# 列出集合
collections = client.get_collections()
for collection in collections.collections:
    print(f"集合: {collection.name}")

# 获取集合信息
info = client.get_collection("documents")
print(f"向量计数: {info.vectors_count}")
print(f"点计数: {info.points_count}")

插入点

# 插入单个点
client.upsert(
    collection_name="documents",
    points=[
        PointStruct(
            id=1,
            vector=[0.1, 0.2, 0.3, ...],
            payload={
                "title": "文档1",
                "category": "技术",
                "date": "2024-01-01"
            }
        )
    ]
)

# 插入多个点
client.upsert(
    collection_name="documents",
    points=[
        PointStruct(id=1, vector=vector1, payload={"title": "Doc 1", "category": "技术"}),
        PointStruct(id=2, vector=vector2, payload={"title": "Doc 2", "category": "科学"}),
        PointStruct(id=3, vector=vector3, payload={"title": "Doc 3", "category": "技术"}),
    ]
)

# 批量插入
from qdrant_client.models import Batch
def insert_in_batches(points, batch_size=100):
    for i in range(0, len(points), batch_size):
        batch = points[i:i + batch_size]
        client.upsert(
            collection_name="documents",
            points=Batch(
                ids=[p.id for p in batch],
                vectors=[p.vector for p in batch],
                payloads=[p.payload for p in batch]
            )
        )

查询

# 基本搜索
results = client.search(
    collection_name="documents",
    query_vector=query_vector,
    limit=10,
    with_payload=True
)

for result in results:
    print(f"ID: {result.id}, 得分: {result.score}")
    print(f"有效载荷: {result.payload}")

# 搜索带过滤器
results = client.search(
    collection_name="documents",
    query_vector=query_vector,
    query_filter=Filter(
        must=[
            FieldCondition(
                key="category",
                match=MatchValue(value="技术")
            ),
            FieldCondition(
                key="date",
                range=Range(
                    gte="2024-01-01"
                )
            )
        ]
    ),
    limit=10,
    with_payload=True
)

# 搜索带命名向量
results = client.search(
    collection_name="multimodal",
    query_vector=NamedVector(
        name="text",
        vector=query_vector
    ),
    limit=10
)

# 混合搜索(向量+关键字)
from qdrant_client.models import SearchRequest

results = client.search_batch(
    collection_name="documents",
    requests=[
        SearchRequest(
            vector=NamedVector(name="text", vector=query_vector),
            limit=10,
            with_payload=True
        ),
        SearchRequest(
            vector=NamedVector(name="image", vector=image_query_vector),
            limit=10,
            with_payload=True
        )
    ]
)

过滤

# 精确匹配过滤器
filter = Filter(
    must=[
        FieldCondition(
            key="category",
            match=MatchValue(value="技术")
        )
    ]
)

# 范围过滤器
filter = Filter(
    must=[
        FieldCondition(
            key="price",
            range=Range(
                gte=100,
                lte=1000
            )
        )
    ]
)

# OR过滤器
filter = Filter(
    should=[
        FieldCondition(
            key="category",
            match=MatchValue(value="技术")
        ),
        FieldCondition(
            key="category",
            match=MatchValue(value="科学")
        )
    ],
    min_count=1
)

# 嵌套过滤器
filter = Filter(
    must=[
        FieldCondition(
            key="metadata.category",
            match=MatchValue(value="技术")
        )
    ]
)

# IS NULL过滤器
filter = Filter(
    must_not=[
        FieldCondition(
            key="deleted_at",
            is_null=True
        )
    ]
)

Weaviate

模式设置

# 安装Weaviate客户端
# pip install weaviate-client

import weaviate
from weaviate import Client

# 初始化Weaviate客户端
client = Client("http://localhost:8080")

# 定义模式
schema = {
    "classes": [
        {
            "class": "Document",
            "description": "一个文档",
            "vectorizer": "text2vec-openai",
            "properties": [
                {
                    "name": "title",
                    "dataType": ["string"],
                    "description": "文档的标题"
                },
                {
                    "name": "content",
                    "dataType": ["text"],
                    "description": "文档的内容"
                },
                {
                    "name": "category",
                    "dataType": ["string"],
                    "description": "文档的类别"
                },
                {
                    "name": "date",
                    "dataType": ["date"],
                    "description": "文档的日期"
                },
                {
                    "name": "metadata",
                    "dataType": ["object"],
                    "description": "附加元数据"
                }
            ]
        }
    ]
}

# 创建模式
client.schema.create(schema)

# 获取模式
schema = client.schema.get()
print(schema)

插入数据

# 插入单个对象
client.data_object.create(
    class_name="Document",
    data_object={
        "title": "文档1",
        "content": "这是文档1的内容",
        "category": "技术",
        "date": "2024-01-01T00:00:00Z",
        "metadata": {
            "author": "John Doe",
            "tags": ["技术", "人工智能"]
        }
    }
)

# 插入多个对象
objects = [
    {
        "title": "文档1",
        "content": "内容1",
        "category": "技术"
    },
    {
        "title": "文档2",
        "content": "内容2",
        "category": "科学"
    }
]

for obj in objects:
    client.data_object.create(
        class_name="Document",
        data_object=obj
    )

# 插入自定义向量
client.data_object.create(
    class_name="Document",
    data_object={
        "title": "文档1",
        "content": "内容1"
    },
    vector=[0.1, 0.2, 0.3, ...]
)

# 批量插入
from weaviate.batch import Batch

with Batch(client) as batch:
    for obj in objects:
        batch.add_data_object(
            data_object=obj,
            class_name="Document"
        )

查询

# 语义搜索
results = client.query.get(
    class_name="Document",
    properties=["title", "content", "category"]
).with_near_text({
    "concepts": ["人工智能"],
    "distance": 0.7
}).with_limit(10).do()

for result in results["data"]["Get"]["Document"]:
    print(f"标题: {result['title']}")
    print(f"距离: {result['_additional']['distance']}")

# 混合搜索(BM25 + 向量)
results = client.query.get(
    class_name="Document",
    properties=["title", "content"]
).with_hybrid(
    query="人工智能",
    alpha=0.7,  # 0 = 纯BM25, 1 = 纯向量
    vector=query_vector
).with_limit(10).do()

# 过滤搜索
results = client.query.get(
    class_name="Document",
    properties=["title", "content", "category"]
).with_where({
    "path": ["category"],
    "operator": "Equal",
    "valueString": "技术"
}).with_near_text({
    "concepts": ["AI"]
}).with_limit(10).do()

# 范围过滤
results = client.query.get(
    class_name="Document",
    properties=["title", "date"]
).with_where({
    "operator": "And",
    "operands": [
        {
            "path": ["category"],
            "operator": "Equal",
            "valueString": "技术"
        },
        {
            "path": ["date"],
            "operator": "GreaterThan",
            "valueDate": "2024-01-01T00:00:00Z"
        }
    ]
}).with_near_text({
    "concepts": ["AI"]
}).do()

嵌入策略

文本嵌入

# 使用OpenAI嵌入
from openai import OpenAI

client = OpenAI(api_key="your-api-key")
def get_embedding(text: str) -> list:
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

# 批量嵌入
def get_embeddings(texts: list) -> list:
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return [item.embedding for item in response.data]

# 长文本分块
def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> list:
    chunks = []
    for i in range(0, len(text), chunk_size - overlap):
        chunks.append(text[i:i + chunk_size])
    return chunks

图像嵌入

# 使用CLIP进行图像嵌入
from PIL import Image
import clip
import torch

# 加载CLIP模型
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)
def get_image_embedding(image_path: str) -> list:
    image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)
    with torch.no_grad():
        image_features = model.encode_image(image)
    return image_features.cpu().numpy().tolist()[0]

# 批量图像嵌入
def get_image_embeddings(image_paths: list) -> list:
    images = torch.stack([preprocess(Image.open(path)) for path in image_paths]).to(device)
    with torch.no_grad():
        image_features = model.encode_image(images)
    return image_features.cpu().numpy().tolist()

多模态嵌入

# 使用OpenAI CLIP进行文本-图像相似性
def get_text_embedding(text: str) -> list:
    text_tokens = clip.tokenize([text]).to(device)
    with torch.no_grad():
        text_features = model.encode_text(text_tokens)
    return text_features.cpu().numpy().tolist()[0]

def get_image_embedding(image_path: str) -> list:
    image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)
    with torch.no_grad():
        image_features = model.encode_image(image)
    return image_features.cpu().numpy().tolist()[0]

# 计算相似性
import numpy as np
def cosine_similarity(vec1: list, vec2: list) -> float:
    v1 = np.array(vec1)
    v2 = np.array(vec2)
    return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

相似性搜索

余弦相似度

import numpy as np
def cosine_similarity(vec1: list, vec2: list) -> float:
    v1 = np.array(vec1)
    v2 = np.array(vec2)
    return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

# 示例
vector_a = [1, 2, 3]
vector_b = [2, 4, 6]
similarity = cosine_similarity(vector_a, vector_b)
print(f"余弦相似度: {similarity}")

欧几里得距离

import numpy as np
def euclidean_distance(vec1: list, vec2: list) -> float:
    v1 = np.array(vec1)
    v2 = np.array(vec2)
    return np.linalg.norm(v1 - v2)

# 示例
vector_a = [1, 2, 3]
vector_b = [2, 4, 6]
distance = euclidean_distance(vector_a, vector_b)
print(f"欧几里得距离: {distance}")

点积

import numpy as np
def dot_product(vec1: list, vec2: list) -> float:
    v1 = np.array(vec1)
    v2 = np.array(vec2)
    return np.dot(v1, v2)

# 示例
vector_a = [1, 2, 3]
vector_b = [2, 4, 6]
product = dot_product(vector_a, vector_b)
print(f"点积: {product}")

性能优化

批量操作

# Pinecone批量插入
def upsert_in_batches(vectors, batch_size=100):
    for i in range(0, len(vectors), batch_size):
        batch = vectors[i:i + batch_size]
        index.upsert(vectors=batch)

# Qdrant批量插入
from qdrant_client.models import Batch
def insert_in_batches(points, batch_size=100):
    for i in range(0, len(points), batch_size):
        batch = points[i:i + batch_size]
        client.upsert(
            collection_name="documents",
            points=Batch(
                ids=[p.id for p in batch],
                vectors=[p.vector for p in batch],
                payloads=[p.payload for p in batch]
            )
        )

索引策略

# Pinecone:选择适当的索引类型
# 对于较小的数据集: p1 pods
# 对于较大的数据集: p2 pods
# 对于生产环境: s1 pods (SSD)

# Qdrant:配置HNSW参数
client.create_collection(
    collection_name="documents",
    vectors_config=VectorParams(
        size=1536,
        distance=Distance.COSINE,
        hnsw_config={
            "m": 16,  # 每个节点的连接数
            "ef_construct": 100  # 索引构建速度
        }
    )
)

缓存

# 缓存嵌入
import hashlib
import pickle
from functools import lru_cache
def get_embedding_cache_key(text: str) -> str:
    return hashlib.md5(text.encode()).hexdigest()

@lru_cache(maxsize=1000)
def get_cached_embedding(text: str) -> list:
    cache_key = get_embedding_cache_key(text)
    # 检查缓存
    # 如果不在缓存中,计算并存储
    return get_embedding(text)

生产考虑

扩展

# Pinecone:扩展索引
# 增加副本数量以提高吞吐量
# 使用更大的pod类型以获得更多存储

# Qdrant:分片
client.create_collection(
    collection_name="documents",
    vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
    shard_number=4  # 分片数量
)

# Weaviate:多节点设置
# 配置复制因子

监控

# Pinecone:监控索引统计
stats = index.describe_index_stats()
print(f"总向量数: {stats['total_vector_count']}")
print(f"维度: {stats['dimension']}")

# Qdrant:监控集合信息
info = client.get_collection("documents")
print(f"向量计数: {info.vectors_count}")
print(f"点计数: {info.points_count}")

# Weaviate:监控集群
cluster_status = client.cluster.get_nodes()
print(cluster_status)

备份和恢复

# Pinecone:导出数据
# 使用Pinecone的导出功能

# Qdrant:快照
client.create_snapshot(collection_name="documents")

# Weaviate:备份
# 使用Weaviate的备份工具

成本优化

选择正确的服务

  • Pinecone:托管服务,易于设置,适合生产环境
  • Qdrant:开源,自托管选项,灵活
  • Weaviate:开源,GraphQL API,适合多模态

存储优化

# 使用较小的嵌入模型
# text-embedding-3-small (1536维) vs text-embedding-3-large (3072维)

# 压缩向量
# 使用量化或降维

# 删除旧数据
# 实施保留策略

查询优化

# 使用过滤器减少搜索空间
# 限制top_k结果
# 使用适当的距离度量

最佳实践

  1. 选择适当的嵌入模型

    • 对于文本:OpenAI text-embedding-3-small或ada-002
    • 对于图像:CLIP、DINO或特定领域的模型
    • 对于多模态:CLIP或类似模型
  2. 预处理数据

    • 通过删除特殊字符来清理文本
    • 规范化空白
    • 转换为小写以保持一致性
  3. 使用适当的分块

    • 分块长文档
    • 使用语义分块
    • 保持块之间的上下文
  4. 实现缓存

    • 缓存嵌入以减少API调用
    • 缓存查询结果
    • 使用Redis进行缓存
  5. 监控性能

    • 跟踪查询延迟
    • 监控存储使用情况
    • 为异常设置警报
  6. 有效使用过滤器

    • 使用元数据过滤器减少搜索空间
    • 结合向量搜索和关键字搜索
    • 适当时使用混合搜索
  7. 优雅地处理错误

    • 实施重试逻辑
    • 处理速率限制
    • 记录错误以供调试
  8. 彻底测试

    • 使用真实数据进行测试
    • 评估搜索质量
    • 基准性能
  9. 安全

    • 在生产中使用身份验证
    • 加密敏感数据
    • 遵循最小权限原则
  10. 可扩展性

    • 设计水平扩展
    • 使用适当的分片策略
    • 监控资源使用情况

相关技能