向量搜索模式 VectorSearchPatterns

向量搜索模式是一种利用向量嵌入和数据库进行语义搜索的技术,它通过将文本或图像转换为向量,然后在高维空间中测量向量之间的相似性来工作。这项技术适用于需要基于内容含义而非关键词进行搜索的场景,如语义文档搜索、产品推荐、问题回答等。关键词包括向量嵌入、语义搜索、相似性比较、机器学习模型。

机器学习 0 次安装 0 次浏览 更新于 3/5/2026

向量搜索模式

概览

向量搜索(也称为语义搜索或相似性搜索)能够基于它们的含义而不是精确的关键词匹配来查找相似项。它使用向量嵌入将数据表示为高维空间中的点,其中相似性是作为点之间的距离来衡量的。

前提条件

  • 理解向量数学和线性代数
  • 了解机器学习和嵌入
  • 熟悉Python和数值计算(numpy, scikit-learn)
  • 理解数据库概念和索引
  • 基本了解云服务(AWS, GCP, Azure)
  • 有向量数据库或相似性搜索的经验

关键概念

  • 向量/语义搜索:基于含义查找相似项,而不是精确的关键词匹配
  • 嵌入:将文本/图像转换为向量以进行相似性比较
  • 向量数据库:专门用于高效向量存储和检索的数据库
  • 距离度量:余弦相似度、欧几里得距离、点积用于测量向量相似性
  • 索引算法:HNSW(基于图的)、IVF(聚类)、PQ(量化)用于高效搜索
  • 混合搜索:将向量搜索与关键词搜索结合起来以获得更好的结果
  • RAG(检索增强生成):使用检索的上下文来改进LLM响应
  • 分块:将文档分割成较小的片段以更好地索引和检索
  • 查询优化:缓存、批处理、扩展技术以获得更好的性能
  • 扩展策略:分片、复制、水平扩展用于大型数据集
  • 评估指标:Recall@K, Precision@K, MAP, MRR, NDCG用于衡量搜索质量

向量搜索/语义搜索是什么

传统与向量搜索

传统关键词搜索:

  • 匹配确切的单词或短语
  • 需要精确拼写
  • 有限的上下文理解
  • 示例:“car” 不会匹配 “automobile”

向量/语义搜索:

  • 基于含义匹配
  • 理解上下文和同义词
  • 处理拼写错误和变体
  • 示例:“car” 将匹配 “automobile”, “vehicle”, “sedan”

工作原理

  1. 嵌入:使用ML模型将文本/图像转换为向量
  2. 索引:在向量数据库中存储向量并进行高效索引
  3. 查询:将查询转换为向量,找到最近邻
  4. 排名:按相似性排序返回结果
查询:"Find similar products"

文本 → 嵌入模型 → 向量:[0.1, -0.2, 0.8, ...]
                                    ↓
                             向量数据库
                                    ↓
                    [0.2, -0.1, 0.7, ...]  ← 产品A (0.92 相似)
                    [0.3, -0.3, 0.6, ...]  ← 产品B (0.87 相似)
                    [0.4, -0.4, 0.5, ...]  ← 产品C (0.81 相似)

嵌入基础

文本嵌入

文本嵌入将单词、句子或文档表示为密集向量。

from sentence_transformers import SentenceTransformer

# 加载预训练模型
model = SentenceTransformer('all-MiniLM-L6-v2')

# 生成嵌入
text = "The quick brown fox jumps over the lazy dog"
embedding = model.encode(text)

print(f"嵌入形状:{embedding.shape}")  # (384,)
print(f"嵌入:{embedding}")

流行的文本嵌入模型:

模型 维度 用例 提供者
text-embedding-ada-002 1536 通用目的 OpenAI
text-embedding-3-small 1536 快速,成本效益 OpenAI
all-MiniLM-L6-v2 384 轻量级,多语言 Hugging Face
e5-large-v2 1024 高质量 Hugging Face
Cohere embed-v3 1024 多语言 Cohere

图像嵌入

from PIL import Image
import clip
import torch

# 加载CLIP模型
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)

# 生成图像嵌入
image = preprocess(Image.open("product.jpg")).unsqueeze(0).to(device)
with torch.no_grad():
    image_features = model.encode_image(image).float()

print(f"图像嵌入形状:{image_features.shape}")

流行的图像嵌入模型:

模型 维度 用例 提供者
CLIP ViT-B/32 512 图像-文本检索 OpenAI
CLIP ViT-L/14 768 高质量 OpenAI
ResNet-50 2048 图像相似性 PyTorch

多模态嵌入

# OpenAI CLIP用于多模态
import openai

client = openai.OpenAI()

# 文本嵌入
text_embedding = client.embeddings.create(
    model="text-embedding-ada-002",
    input="A red sports car"
)

# 图像嵌入(使用视觉模型)
image_embedding = client.images.embed(
    model="clip-vit-large-patch14",
    image=open("car.jpg", "rb")
)

向量数据库

Pinecone

设置:

import pinecone

# 初始化Pinecone
pc = pinecone.Pinecone(
    api_key="your-api-key"
)

# 创建索引
index_name = "products"
if index_name not in [index.name for index in pc.list_indexes()]:
    pc.create_index(
        name=index_name,
        dimension=1536,  # OpenAI嵌入维度
        metric="cosine",  # 相似性度量
        spec=pinecone.ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )
    )

# 获取索引
index = pc.Index(index_name)

更新向量:

# 更新(更新或插入)向量
vectors = [
    {
        "id": "prod_1",
        "values": [0.1, -0.2, 0.8, ...],  # 1536维
        "metadata": {
            "name": "Running Shoes",
            "category": "Sports",
            "price": 99.99
        }
    },
    {
        "id": "prod_2",
        "values": [0.2, -0.1, 0.7, ...],
        "metadata": {
            "name": "Basketball",
            "category": "Sports",
            "price": 29.99
        }
    }
]

index.upsert(vectors=vectors)

查询:

# 查询相似项
query_embedding = model.encode("athletic footwear")

results = index.query(
    vector=query_embedding.tolist(),
    top_k=5,
    include_metadata=True,
    filter={
        "category": {"$eq": "Sports"},
        "price": {"$lte": 100}
    }
)

for match in results['matches']:
    print(f"产品:{match['metadata']['name']}")
    print(f"分数:{match['score']}")

Weaviate

设置:

import weaviate

# 连接到Weaviate
client = weaviate.Client(
    url="http://localhost:8080"
)

# 创建类(模式)
client.schema.create_class({
    "class": "Product",
    "properties": [
        {
            "name": "name",
            "dataType": ["text"]
        },
        {
            "name": "description",
            "dataType": ["text"]
        },
        {
            "name": "price",
            "dataType": ["number"]
        },
        {
            "name": "category",
            "dataType": ["string"]
        }
    ],
    "vectorizer": "text2vec-openai",  # 使用OpenAI嵌入
    "moduleConfig": {
        "type": "text",
        "model": "ada",
        "version": "002"
    }
})

添加对象:

# 添加对象并自动向量化
product_obj = {
    "name": "Running Shoes",
    "description": "Comfortable running shoes for daily training",
    "price": 99.99,
    "category": "Sports"
}

client.data_object.create(
    class_name="Product",
    data_object=product_obj
)

查询:

# 语义搜索
query_text = "athletic footwear"

results = client.query.get(
    class_name="Product",
    properties=["name", "description", "price", "category"],
    near_text={
        "concepts": [query_text],
        "certainty": 0.7
    },
    limit=5
)

for result in results.objects:
    print(f"产品:{result.properties['name']}")
    print(f"确定性:{result.certainty}")

Qdrant

设置:

from qdrant_client import QdrantClient

# 初始化Qdrant
client = QdrantClient(url="http://localhost:6333")

# 创建集合
client.recreate_collection(
    collection_name="products",
    vectors_config={
        "size": 1536,  # 嵌入维度
        "distance": "Cosine"
    }
)

更新点:

# 插入向量
points = [
    {
        "id": 1,
        "vector": [0.1, -0.2, 0.8, ...],
        "payload": {
            "name": "Running Shoes",
            "category": "Sports",
            "price": 99.99
        }
    },
    {
        "id": 2,
        "vector": [0.2, -0.1, 0.7, ...],
        "payload": {
            "name": "Basketball",
            "category": "Sports",
            "price": 29.99
        }
    }
]

client.upsert(
    collection_name="products",
    points=points
)

查询:

# 搜索相似项
query_vector = model.encode("athletic footwear")

results = client.search(
    collection_name="products",
    query_vector=query_vector.tolist(),
    limit=5,
    with_payload=True,
    query_filter={
        "must": [
            {
                "key": "category",
                "match": {"value": "Sports"}
            },
            {
                "key": "price",
                "range": {"lte": 100}
            }
        ]
    }
)

for result in results:
    print(f"产品:{result.payload['name']}")
    print(f"分数:{result.score}")

Milvus

设置:

from pymilvus import connections, utility, Collection

# 连接到Milvus
connections.connect(host="localhost", port="19530")

# 定义集合模式
field_name = "product_id"
field_vector = "product_vector"
field_name = "product_name"
field_price = "price"
field_category = "category"

schema = [
    utility.FieldSchema(name=field_name, dtype=DataType.INT64, is_primary=True),
    utility.FieldSchema(name=field_vector, dtype=DataType.FLOAT_VECTOR, dim=1536),
    utility.FieldSchema(name=field_name, dtype=DataType.VARCHAR, max_length=256),
    utility.FieldSchema(name=field_price, dtype=DataType.DOUBLE),
    utility.FieldSchema(name=field_category, dtype=DataType.VARCHAR, max_length=64),
]

# 创建集合
collection_name = "products"
if utility.has_collection(collection_name):
    utility.drop_collection(collection_name)

collection = Collection(
    name=collection_name,
    schema=schema
)
collection.create()

# 创建索引
index_params = {
    "metric_type": "COSINE",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 128}
}
collection.create_index(
    field_name=field_vector,
    index_params=index_params
)
collection.load()

插入向量:

# 插入向量
entities = [
    [1, [0.1, -0.2, 0.8, ...], "Running Shoes", 99.99, "Sports"],
    [2, [0.2, -0.1, 0.7, ...], "Basketball", 29.99, "Sports"],
]

ids = collection.insert(entities)

查询:

# 搜索相似项
query_vector = model.encode("athletic footwear")

search_params = {
    "metric_type": "COSINE",
    "params": {"nprobe": 16}
}

results = collection.search(
    data=[query_vector.tolist()],
    anns_field=field_vector,
    param=search_params,
    limit=5,
    expr=f"category == 'Sports' && price <= 100"
)

for result in results[0]:
    print(f"产品:{result['entity']['product_name']}")
    print(f"距离:{result['distance']}")

Chroma

设置:

import chromadb

# 初始化Chroma
chroma_client = chromadb.Client()

# 创建集合
collection = chroma_client.create_collection(
    name="products",
    metadata={"hnsw:space": "cosine"}
)

添加文档:

# 添加文档并嵌入
documents = [
    "Comfortable running shoes for daily training",
    "Professional basketball for competitive play"
]

metadatas = [
    {"name": "Running Shoes", "category": "Sports", "price": 99.99},
    {"name": "Basketball", "category": "Sports", "price": 29.99}
]

ids = ["prod_1", "prod_2"]

collection.add(
    documents=documents,
    metadatas=metadatas,
    ids=ids
)

查询:

# 语义搜索
query_text = "athletic footwear"

results = collection.query(
    query_texts=[query_text],
    n_results=5,
    where={"category": "Sports", "price": {"$lte": 100}}
)

for result in results['ids'][0]:
    print(f"ID:{result}")
    print(f"文档:{results['documents'][0][results['ids'][0].index(result)]}")
    print(f"距离:{results['distances'][0][results['ids'][0].index(result)]}")

pgvector (PostgreSQL)

设置:

-- 启用pgvector扩展
CREATE EXTENSION IF NOT EXISTS vector;

-- 创建带有向量列的表
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(256),
    description TEXT,
    price DECIMAL(10,2),
    category VARCHAR(64),
    embedding vector(1536)  -- 1536维向量
);

-- 为快速搜索创建HNSW索引
CREATE INDEX ON products USING hnsw (embedding vector_cosine_ops);

插入向量:

-- 插入嵌入
INSERT INTO products (name, description, price, category, embedding)
VALUES (
    'Running Shoes',
    'Comfortable running shoes for daily training',
    99.99,
    'Sports',
    '[0.1, -0.2, 0.8, ...]'::vector
);

查询:

-- 语义搜索
SELECT 
    id,
    name,
    description,
    price,
    category,
    1 - (embedding <=> '[0.1, -0.2, 0.8, ...]'::vector) AS similarity
FROM products
WHERE category = 'Sports'
  AND price <= 100
ORDER BY embedding <=> '[0.1, -0.2, 0.8, ...]'::vector
LIMIT 5;

Redis Vector Search

设置:

import redis
from redis.commands.search.field import VectorField

# 连接到Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# 创建索引
r.ft.create_index(
    name="products_idx",
    schema=[
        VectorField("embedding", "HNSW", {
            "TYPE": "FLOAT32",
            "DIM": 1536,
            "DISTANCE_METRIC": "COSINE",
            "INITIAL_CAP": 1000,
            "BLOCK_SIZE": 128
        }),
        "name", "TEXT",
        "category", "TAG",
        "price", "NUMERIC"
    ]
)

添加文档:

# 添加文档并嵌入
r.hset(
    "prod:1",
    mapping={
        "name": "Running Shoes",
        "category": "Sports",
        "price": 99.99,
        "embedding": np.array([0.1, -0.2, 0.8, ...]).astype(np.float32).tobytes()
    }
)

r.hset(
    "prod:2",
    mapping={
        "name": "Basketball",
        "category": "Sports",
        "price": 29.99,
        "embedding": np.array([0.2, -0.1, 0.7, ...]).astype(np.float32).tobytes()
    }
)

查询:

# 语义搜索
query_vector = model.encode("athletic footwear").astype(np.float32).tobytes()

results = r.ft.search(
    index_name="products_idx",
    query="*=>[KNN 5 @embedding $vector]",
    query_params={
        "vector": query_vector
    },
    filter="@category:{Sports} @price:[-inf 100]"
)

for result in results.docs:
    print(f"产品:{result.name}")
    print(f"分数:{result.__score}")

距离度量

余弦相似度

测量两个向量之间角度的余弦值。范围:[-1, 1]。

import numpy as np

def cosine_similarity(v1, v2):
    dot_product = np.dot(v1, v2)
    norm_v1 = np.linalg.norm(v1)
    norm_v2 = np.linalg.norm(v2)
    return dot_product / (norm_v1 * norm_v2)

# 距离 = 1 - 相似度
cosine_distance = 1 - cosine_similarity(v1, v2)

何时使用:

  • 文本嵌入(OpenAI, Cohere)
  • 方向不重要
  • 大小不重要

欧几里得距离

测量两个向量之间的直线距离。

def euclidean_distance(v1, v2):
    return np.linalg.norm(v1 - v2)

何时使用:

  • 当大小重要时
  • 空间数据
  • 一些图像嵌入

点积

测量两个向量的点积。

def dot_product(v1, v2):
    return np.dot(v1, v2)

何时使用:

  • 归一化向量
  • 比余弦更快
  • 对于归一化向量与余弦相同

比较:

度量 范围 用例 优点 缺点
余弦 [0, 2] 文本嵌入 大小独立
欧几里得 [0, ∞] 空间数据 直观 大小依赖
点积 [-1, 1] 归一化向量 最快 需要归一化

索引算法

HNSW (Hierarchical Navigable Small World)

基于图的近似最近邻算法。

# Pinecone默认使用HNSW
index = pc.Index("products", metric="cosine")

# HNSW参数
index.update(
    name="products",
    spec=pinecone.ServerlessSpec(
        cloud="aws",
        region="us-east-1"
    )
)

优点:

  • 查询快
  • 适用于大型数据集
  • 可扩展

缺点:

  • 近似(不精确)
  • 需要调整(M, efConstruction)

IVF (Inverted File)

将空间划分为Voronoi单元。

# Milvus IVF配置
index_params = {
    "metric_type": "COSINE",
    "index_type": "IVF_FLAT",
    "params": {
        "nlist": 128  # 聚类数量
    }
}

collection.create_index(
    field_name="embedding",
    index_params=index_params
)

优点:

  • 适用于中等数据集
  • 比暴力搜索快
  • 精确搜索

缺点:

  • 需要调整(nlist)
  • 对于大型数据集比HNSW慢

PQ (Product Quantization)

压缩向量以实现更快的搜索和更少的内存使用。

# Milvus PQ配置
index_params = {
    "metric_type": "COSINE",
    "index_type": "IVF_PQ",
    "params": {
        "nlist": 128,
        "m": 8  # 子向量数量
    }
}

collection.create_index(
    field_name="embedding",
    index_params=index_params
)

优点:

  • 查询非常快
  • 内存使用低
  • 适用于非常大的数据集

缺点:

  • 精度损失
  • 需要调整
  • 更复杂

比较:

| 算法 | 速度 | 内存 | 精度 | 用例 | |----------|-------|--------|-----------| | HNSW | 快 | 中等 | 高 | 大型数据集 | | IVF | 中等 | 低 | 精确 | 中等数据集 | | PQ | 非常快 | 非常低 | 中等 | 非常大的数据集 | | Flat | 慢 | 高 | 精确 | 小型数据集 |

混合搜索(向量+关键词)

结合语义和关键词搜索

from qdrant_client import QdrantClient

client = QdrantClient(url="http://localhost:6333")

# 混合搜索,包括向量和关键词
results = client.search(
    collection_name="products",
    query_vector=query_embedding.tolist(),
    query_filter={
        "must": [
            {
                "key": "category",
                "match": {"value": "Sports"}
            },
            {
                "key": "name",
                "match": {"value": "*running*"}  # 关键词匹配
            }
        ]
    },
    limit=10
)

互惠排名融合(RRF)

结合多个来源的结果。

def reciprocal_rank_fusion(vector_results, keyword_results, k=60):
    scores = {}
    
    # 评分向量结果
    for i, result in enumerate(vector_results):
        doc_id = result['id']
        rank = i + 1
        scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank)
    
    # 评分关键词结果
    for i, result in enumerate(keyword_results):
        doc_id = result['id']
        rank = i + 1
        scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank)
    
    # 按组合得分排序
    sorted_results = sorted(
        scores.items(),
        key=lambda x: x[1],
        reverse=True
    )
    
    return sorted_results[:10]

加权混合搜索

def weighted_hybrid_search(vector_results, keyword_results, alpha=0.5):
    combined = {}
    
    # 组合权重
    for result in vector_results:
        doc_id = result['id']
        combined[doc_id] = combined.get(doc_id, 0) + alpha * result['score']
    
    for result in keyword_results:
        doc_id = result['id']
        combined[doc_id] = combined.get(doc_id, 0) + (1 - alpha) * result['score']
    
    # 按组合得分排序
    sorted_results = sorted(
        combined.items(),
        key=lambda x: x[1],
        reverse=True
    )
    
    return sorted_results[:10]

过滤和元数据

预过滤

在向量搜索前过滤结果。

# Pinecone预过滤
results = index.query(
    vector=query_embedding.tolist(),
    top_k=10,
    filter={
        "category": {"$eq": "Sports"},
        "price": {"$lte": 100},
        "in_stock": {"$eq": True}
    }
)

后过滤

在向量搜索后过滤结果。

# 先获取结果
results = index.query(
    vector=query_embedding.tolist(),
    top_k=100  # 获取更多结果
)

# 然后过滤
filtered_results = [
    r for r in results['matches']
    if r['metadata']['price'] <= 100
    and r['metadata']['category'] == 'Sports'
    and r['metadata']['in_stock'] == True
][:10]  # 取前10

元数据模式设计

# 好的元数据模式
metadata = {
    "name": "Running Shoes",           # 字符串用于过滤
    "category": "Sports",              # 字符串用于过滤
    "price": 99.99,                  # 数值用于范围过滤
    "in_stock": True,                 # 布尔值用于过滤
    "brand": "Nike",                   # 字符串用于过滤
    "color": ["red", "blue"],         # 数组用于过滤
    "rating": 4.5,                    # 数值用于排序
    "created_at": "2024-01-01"        # 日期用于过滤
}

RAG (Retrieval Augmented Generation)模式

基本RAG流程

from openai import OpenAI

client = OpenAI()

# 1. 检索相关文档
query_embedding = model.encode("What are the benefits of running?")
results = index.query(
    vector=query_embedding.tolist(),
    top_k=5
)

# 2. 用检索到的上下文构建提示
context = "

".join([
    f"{r['metadata']['name']}: {r['metadata']['description']}"
    for r in results['matches']
])

prompt = f"""
上下文:
{context}

问题:What are the benefits of running?

根据上述上下文回答问题。
"""

# 3. 生成响应
response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": prompt}
    ]
)

print(response.choices[0].message.content)

重新排名结果

from sentence_transformers import CrossEncoder

# 加载重排器
reranker = CrossEncoder('ms-marco-MiniLM-L-6-v3')

# 1. 获取初始结果
initial_results = index.query(
    vector=query_embedding.tolist(),
    top_k=20
)

# 2. 用交叉编码器重新排名
query = "What are the benefits of running?"
documents = [r['metadata']['description'] for r in initial_results['matches']]

reranked_scores = reranker.predict(
    [(query, doc) for doc in documents]
)

# 3. 组合并排序
for i, result in enumerate(initial_results['matches']):
    result['rerank_score'] = reranked_scores[i]

final_results = sorted(
    initial_results['matches'],
    key=lambda x: x['rerank_score'],
    reverse=True
)[:10]  # 前10

混合RAG

def hybrid_rag(query):
    # 向量搜索
    query_embedding = model.encode(query)
    vector_results = index.query(
        vector=query_embedding.tolist(),
        top_k=10
    )
    
    # 关键词搜索
    keyword_results = keyword_search(query)
    
    # 结合RRF
    combined = reciprocal_rank_fusion(
        vector_results['matches'],
        keyword_results
    )
    
    # 使用顶部结果进行RAG
    context = "

".join([
        f"{r['metadata']['name']}: {r['metadata']['description']}"
        for r in combined[:5]
    ])
    
    # 生成响应
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": f"Context:
{context}

Question: {query}"}
        ]
    )
    
    return response.choices[0].message.content

文档分块策略

固定大小分块

def fixed_size_chunking(text, chunk_size=500):
    chunks = []
    for i in range(0, len(text), chunk_size):
        chunk = text[i:i + chunk_size]
        chunks.append(chunk)
    return chunks

text = "This is a long document that needs to be chunked..."
chunks = fixed_size_chunking(text, chunk_size=500)

语义分块

from sentence_transformers import SentenceTransformer

model = SentenceTransformer('all-MiniLM-L6-v2')

def semantic_chunking(text):
    sentences = text.split('. ')  # 简单句子分割
    chunks = []
    current_chunk = []
    current_embedding = None
    
    for sentence in sentences:
        sentence_embedding = model.encode(sentence)
        
        if current_embedding is None:
            current_chunk.append(sentence)
            current_embedding = sentence_embedding
        else:
            # 检查相似性
            similarity = cosine_similarity(current_embedding, sentence_embedding)
            
            if similarity < 0.7:  # 低相似性,新块
                chunks.append('. '.join(current_chunk))
                current_chunk = [sentence]
                current_embedding = sentence_embedding
            else:
                current_chunk.append(sentence)
    
    if current_chunk:
        chunks.append('. '.join(current_chunk))
    
    return chunks

滑动窗口分块

def sliding_window_chunking(text, window_size=500, stride=250):
    chunks = []
    for i in range(0, len(text) - window_size + 1, stride):
        chunk = text[i:i + window_size]
        chunks.append(chunk)
    return chunks

text = "This is a long document that needs to be chunked..."
chunks = sliding_window_chunking(text, window_size=500, stride=250)

查询优化

查询缓存

from functools import lru_cache

@lru_cache(maxsize=100)
def cached_query(query_text):
    query_embedding = model.encode(query_text)
    return index.query(
        vector=query_embedding.tolist(),
        top_k=10
    )

批量查询

def batch_query(queries):
    query_embeddings = model.encode(queries)
    
    results = []
    for embedding in query_embeddings:
        result = index.query(
            vector=embedding.tolist(),
            top_k=10
        )
        results.append(result)
    
    return results

查询扩展

def query_expansion(query):
    # 生成变体
    variations = [
        query,
        query.replace("shoes", "footwear"),
        query.replace("running", "athletic"),
    ]
    
    # 查询所有变体
    all_results = []
    for variation in variations:
        embedding = model.encode(variation)
        results = index.query(
            vector=embedding.tolist(),
            top_k=10
        )
        all_results.extend(results['matches'])
    
    # 去重和重新排名
    seen = set()
    unique_results = []
    for result in all_results:
        if result['id'] not in seen:
            seen.add(result['id'])
            unique_results.append(result)
    
    return unique_results[:10]

扩展向量搜索

水平扩展

# 多个Pinecone索引
indexes = [
    pc.Index("products_shard_1"),
    pc.Index("products_shard_2"),
    pc.Index("products_shard_3"),
]

def query_all_shards(query_embedding):
    results = []
    for index in indexes:
        result = index.query(
            vector=query_embedding.tolist(),
            top_k=10
        )
        results.extend(result['matches'])
    
    # 合并并重新排名
    merged = merge_results(results)
    return merged[:10]

分片策略

def get_shard_index(product_id, num_shards=3):
    # 一致性哈希
    shard_id = product_id % num_shards
    return f"products_shard_{shard_id}"

# 插入到正确的分片
product_id = 123
shard_index_name = get_shard_index(product_id)
shard_index = pc.Index(shard_index_name)

复制

# 复制数据以扩展读取
primary_index = pc.Index("products_primary")
replica_index = pc.Index("products_replica")

# 从最近的副本查询
results = replica_index.query(
    vector=query_embedding.tolist(),
    top_k=10
)

成本优化

降维

from sklearn.decomposition import PCA

# 原始嵌入(1536维)
original_embeddings = model.encode(texts)

# 降低到256维
pca = PCA(n_components=256)
reduced_embeddings = pca.fit_transform(original_embeddings)

print(f"原始:{original_embeddings.shape}")
print(f"降低:{reduced_embeddings.shape}")

量化

import numpy as np

# Float32到uint8
def quantize_vector(vector, bits=8):
    # 查找最小值和最大值
    min_val = np.min(vector)
    max_val = np.max(vector)
    
    # 量化
    quantized = np.round(
        (vector - min_val) / (max_val - min_val) * (2**bits - 1)
    ).astype(np.uint8)
    
    return quantized

# 存储量化向量
quantized_embeddings = [quantize_vector(v) for v in embeddings]

缓存策略

from functools import lru_cache
import hashlib

def get_cache_key(text):
    return hashlib.md5(text.encode()).hexdigest()

@lru_cache(maxsize=1000)
def cached_search(query_text):
    cache_key = get_cache_key(query_text)
    
    # 检查缓存
    if cache_key in search_cache:
        return search_cache[cache_key]
    
    # 执行搜索
    query_embedding = model.encode(query_text)
    results = index.query(
        vector=query_embedding.tolist(),
        top_k=10
    )
    
    # 缓存结果
    search_cache[cache_key] = results
    
    return results

评估指标

Recall@K

def recall_at_k(relevant_ids, retrieved_ids, k):
    retrieved_at_k = set(retrieved_ids[:k])
    relevant_set = set(relevant_ids)
    
    recall = len(retrieved_at_k & relevant_set) / len(relevant_set)
    return recall

# 示例
relevant_ids = [1, 5, 8, 12]  # 真实情况
retrieved_ids = [1, 3, 5, 8, 10, 12]  # 检索结果

recall_5 = recall_at_k(relevant_ids, retrieved_ids, k=5)
print(f"Recall@5: {recall_5}")

Precision@K

def precision_at_k(relevant_ids, retrieved_ids, k):
    retrieved_at_k = set(retrieved_ids[:k])
    relevant_set = set(relevant_ids)
    
    precision = len(retrieved_at_k & relevant_set) / k
    return precision

precision_5 = precision_at_k(relevant_ids, retrieved_ids, k=5)
print(f"Precision@5: {precision_5}")

平均倒数排名(MRR)

def mean_reciprocal_rank(relevant_ids, retrieved_ids):
    mrr = 0
    for relevant_id in relevant_ids:
        try:
            rank = retrieved_ids.index(relevant_id) + 1
            mrr += 1 / rank
        except ValueError:
            mrr += 0
    
    return mrr / len(relevant_ids)

mrr = mean_reciprocal_rank(relevant_ids, retrieved_ids)
print(f"MRR: {mrr}")

归一化折扣累积增益(NDCG)

def dcg(relevance_scores, k):
    dcg = relevance_scores[0]
    for i in range(1, min(k, len(relevance_scores))):
        dcg += relevance_scores[i] / np.log2(i + 2)
    return dcg

def ndcg(relevance_scores, k):
    # 理想DCG
    ideal_relevance = sorted(relevance_scores, reverse=True)
    idcg = dcg(ideal_relevance, k)
    
    # 实际DCG
    actual_dcg = dcg(relevance_scores, k)
    
    return actual_dcg / idcg if idcg > 0 else 0

relevance_scores = [1, 0, 1, 0, 1]  # 二进制相关性
ndcg_5 = ndcg(relevance_scores, k=5)
print(f"NDCG@5: {ndcg_5}")

常见用例

语义文档搜索

def search_documents(query, top_k=10):
    query_embedding = model.encode(query)
    
    results = index.query(
        vector=query_embedding.tolist(),
        top_k=top_k,
        include_metadata=True
    )
    
    return [
        {
            "id": r['id'],
            "title": r['metadata']['title'],
            "content": r['metadata']['content'],
            "score": r['score']
        }
        for r in results['matches']
    ]

相似产品推荐

def similar_products(product_id, top_k=5):
    # 获取产品嵌入
    product = get_product(product_id)
    product_embedding = product['embedding']
    
    # 查找相似产品
    results = index.query(
        vector=product_embedding.tolist(),
        top_k=top_k + 1,  # +1排除自身
        filter={"category": {"$eq": product['category']}}
    )
    
    # 排除产品本身
    similar = [
        r for r in results['matches']
        if r['id'] != product_id
    ]
    
    return similar[:top_k]

问题回答

def answer_question(question):
    # 检索相关文档
    query_embedding = model.encode(question)
    results = index.query(
        vector=query_embedding.tolist(),
        top_k=5
    )
    
    # 构建上下文
    context = "

".join([
        f"文档:{r['metadata']['text']}"
        for r in results['matches']
    ])
    
    # 生成答案
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {
                "role": "system",
                "content": "Answer the question based on the provided context."
            },
            {
                "role": "user",
                "content": f"Context:
{context}

Question: {question}"
            }
        ]
    )
    
    return response.choices[0].message.content

重复项检测

def find_duplicates(text, threshold=0.95):
    text_embedding = model.encode(text)
    
    # 查找相似文档
    results = index.query(
        vector=text_embedding.tolist(),
        top_k=10
    )
    
    # 按阈值过滤
    duplicates = [
        r for r in results['matches']
        if r['score'] >= threshold
    ]
    
    return duplicates

最佳实践和陷阱

最佳实践

  1. 嵌入选择

    • 根据用例选择合适的模型
    • 考虑嵌入维度与性能
    • 在承诺前测试多个模型
  2. 索引配置

    • 调整HNSW参数(M, efConstruction)
    • 选择合适的距离度量
    • 考虑速度和准确性之间的权衡
  3. 查询优化

    • 尽可能使用预过滤
    • 实施查询缓存
    • 考虑重新排名以获得更好的结果
  4. 元数据设计

    • 存储相关元数据以进行过滤
    • 使用适当的数据类型
    • 索引元数据字段
  5. 监控

    • 跟踪查询延迟
    • 监控命中率
    • 为异常设置警报

常见陷阱

  1. 嵌入维度不匹配

    # 错误:不同维度
    model1 = SentenceTransformer('all-MiniLM-L6-v2')  # 384维
    model2 = SentenceTransformer('all-MiniLM-L12-v2')  # 768维
    
    # 正确:所有嵌入使用相同模型
    model = SentenceTransformer('all-MiniLM-L6-v2')
    
  2. 不归一化向量

    # 错误:不归一化
    vector = model.encode(text)
    
    # 正确:归一化以进行余弦相似度计算
    vector = model.encode(text)
    vector = vector / np.linalg.norm(vector)  # 归一化
    
  3. 忽略元数据过滤

    # 错误:无过滤
    results = index.query(vector=query, top_k=10)
    
    # 正确:应用过滤器
    results = index.query(
        vector=query,
        top_k=10,
        filter={"category": {"$eq": "Sports"}}
    )
    
  4. 不处理空结果

    # 错误:假设结果存在
    results = index.query(vector=query, top_k=10)
    for r in results['matches']:
        print(r)
    
    # 正确:处理空结果
    results = index.query(vector=query, top_k=10)
    if not results['matches']:
        return []
    

相关技能

  • 04-database/vector-database
  • 06-ai-ml-production/rag-patterns
  • 06-ai-ml-production/embeddings