向量搜索模式
概览
向量搜索(也称为语义搜索或相似性搜索)能够基于它们的含义而不是精确的关键词匹配来查找相似项。它使用向量嵌入将数据表示为高维空间中的点,其中相似性是作为点之间的距离来衡量的。
前提条件
- 理解向量数学和线性代数
- 了解机器学习和嵌入
- 熟悉Python和数值计算(numpy, scikit-learn)
- 理解数据库概念和索引
- 基本了解云服务(AWS, GCP, Azure)
- 有向量数据库或相似性搜索的经验
关键概念
- 向量/语义搜索:基于含义查找相似项,而不是精确的关键词匹配
- 嵌入:将文本/图像转换为向量以进行相似性比较
- 向量数据库:专门用于高效向量存储和检索的数据库
- 距离度量:余弦相似度、欧几里得距离、点积用于测量向量相似性
- 索引算法:HNSW(基于图的)、IVF(聚类)、PQ(量化)用于高效搜索
- 混合搜索:将向量搜索与关键词搜索结合起来以获得更好的结果
- RAG(检索增强生成):使用检索的上下文来改进LLM响应
- 分块:将文档分割成较小的片段以更好地索引和检索
- 查询优化:缓存、批处理、扩展技术以获得更好的性能
- 扩展策略:分片、复制、水平扩展用于大型数据集
- 评估指标:Recall@K, Precision@K, MAP, MRR, NDCG用于衡量搜索质量
向量搜索/语义搜索是什么
传统与向量搜索
传统关键词搜索:
- 匹配确切的单词或短语
- 需要精确拼写
- 有限的上下文理解
- 示例:“car” 不会匹配 “automobile”
向量/语义搜索:
- 基于含义匹配
- 理解上下文和同义词
- 处理拼写错误和变体
- 示例:“car” 将匹配 “automobile”, “vehicle”, “sedan”
工作原理
- 嵌入:使用ML模型将文本/图像转换为向量
- 索引:在向量数据库中存储向量并进行高效索引
- 查询:将查询转换为向量,找到最近邻
- 排名:按相似性排序返回结果
查询:"Find similar products"
文本 → 嵌入模型 → 向量:[0.1, -0.2, 0.8, ...]
↓
向量数据库
↓
[0.2, -0.1, 0.7, ...] ← 产品A (0.92 相似)
[0.3, -0.3, 0.6, ...] ← 产品B (0.87 相似)
[0.4, -0.4, 0.5, ...] ← 产品C (0.81 相似)
嵌入基础
文本嵌入
文本嵌入将单词、句子或文档表示为密集向量。
from sentence_transformers import SentenceTransformer
# 加载预训练模型
model = SentenceTransformer('all-MiniLM-L6-v2')
# 生成嵌入
text = "The quick brown fox jumps over the lazy dog"
embedding = model.encode(text)
print(f"嵌入形状:{embedding.shape}") # (384,)
print(f"嵌入:{embedding}")
流行的文本嵌入模型:
| 模型 | 维度 | 用例 | 提供者 |
|---|---|---|---|
| text-embedding-ada-002 | 1536 | 通用目的 | OpenAI |
| text-embedding-3-small | 1536 | 快速,成本效益 | OpenAI |
| all-MiniLM-L6-v2 | 384 | 轻量级,多语言 | Hugging Face |
| e5-large-v2 | 1024 | 高质量 | Hugging Face |
| Cohere embed-v3 | 1024 | 多语言 | Cohere |
图像嵌入
from PIL import Image
import clip
import torch
# 加载CLIP模型
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)
# 生成图像嵌入
image = preprocess(Image.open("product.jpg")).unsqueeze(0).to(device)
with torch.no_grad():
image_features = model.encode_image(image).float()
print(f"图像嵌入形状:{image_features.shape}")
流行的图像嵌入模型:
| 模型 | 维度 | 用例 | 提供者 |
|---|---|---|---|
| CLIP ViT-B/32 | 512 | 图像-文本检索 | OpenAI |
| CLIP ViT-L/14 | 768 | 高质量 | OpenAI |
| ResNet-50 | 2048 | 图像相似性 | PyTorch |
多模态嵌入
# OpenAI CLIP用于多模态
import openai
client = openai.OpenAI()
# 文本嵌入
text_embedding = client.embeddings.create(
model="text-embedding-ada-002",
input="A red sports car"
)
# 图像嵌入(使用视觉模型)
image_embedding = client.images.embed(
model="clip-vit-large-patch14",
image=open("car.jpg", "rb")
)
向量数据库
Pinecone
设置:
import pinecone
# 初始化Pinecone
pc = pinecone.Pinecone(
api_key="your-api-key"
)
# 创建索引
index_name = "products"
if index_name not in [index.name for index in pc.list_indexes()]:
pc.create_index(
name=index_name,
dimension=1536, # OpenAI嵌入维度
metric="cosine", # 相似性度量
spec=pinecone.ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
# 获取索引
index = pc.Index(index_name)
更新向量:
# 更新(更新或插入)向量
vectors = [
{
"id": "prod_1",
"values": [0.1, -0.2, 0.8, ...], # 1536维
"metadata": {
"name": "Running Shoes",
"category": "Sports",
"price": 99.99
}
},
{
"id": "prod_2",
"values": [0.2, -0.1, 0.7, ...],
"metadata": {
"name": "Basketball",
"category": "Sports",
"price": 29.99
}
}
]
index.upsert(vectors=vectors)
查询:
# 查询相似项
query_embedding = model.encode("athletic footwear")
results = index.query(
vector=query_embedding.tolist(),
top_k=5,
include_metadata=True,
filter={
"category": {"$eq": "Sports"},
"price": {"$lte": 100}
}
)
for match in results['matches']:
print(f"产品:{match['metadata']['name']}")
print(f"分数:{match['score']}")
Weaviate
设置:
import weaviate
# 连接到Weaviate
client = weaviate.Client(
url="http://localhost:8080"
)
# 创建类(模式)
client.schema.create_class({
"class": "Product",
"properties": [
{
"name": "name",
"dataType": ["text"]
},
{
"name": "description",
"dataType": ["text"]
},
{
"name": "price",
"dataType": ["number"]
},
{
"name": "category",
"dataType": ["string"]
}
],
"vectorizer": "text2vec-openai", # 使用OpenAI嵌入
"moduleConfig": {
"type": "text",
"model": "ada",
"version": "002"
}
})
添加对象:
# 添加对象并自动向量化
product_obj = {
"name": "Running Shoes",
"description": "Comfortable running shoes for daily training",
"price": 99.99,
"category": "Sports"
}
client.data_object.create(
class_name="Product",
data_object=product_obj
)
查询:
# 语义搜索
query_text = "athletic footwear"
results = client.query.get(
class_name="Product",
properties=["name", "description", "price", "category"],
near_text={
"concepts": [query_text],
"certainty": 0.7
},
limit=5
)
for result in results.objects:
print(f"产品:{result.properties['name']}")
print(f"确定性:{result.certainty}")
Qdrant
设置:
from qdrant_client import QdrantClient
# 初始化Qdrant
client = QdrantClient(url="http://localhost:6333")
# 创建集合
client.recreate_collection(
collection_name="products",
vectors_config={
"size": 1536, # 嵌入维度
"distance": "Cosine"
}
)
更新点:
# 插入向量
points = [
{
"id": 1,
"vector": [0.1, -0.2, 0.8, ...],
"payload": {
"name": "Running Shoes",
"category": "Sports",
"price": 99.99
}
},
{
"id": 2,
"vector": [0.2, -0.1, 0.7, ...],
"payload": {
"name": "Basketball",
"category": "Sports",
"price": 29.99
}
}
]
client.upsert(
collection_name="products",
points=points
)
查询:
# 搜索相似项
query_vector = model.encode("athletic footwear")
results = client.search(
collection_name="products",
query_vector=query_vector.tolist(),
limit=5,
with_payload=True,
query_filter={
"must": [
{
"key": "category",
"match": {"value": "Sports"}
},
{
"key": "price",
"range": {"lte": 100}
}
]
}
)
for result in results:
print(f"产品:{result.payload['name']}")
print(f"分数:{result.score}")
Milvus
设置:
from pymilvus import connections, utility, Collection
# 连接到Milvus
connections.connect(host="localhost", port="19530")
# 定义集合模式
field_name = "product_id"
field_vector = "product_vector"
field_name = "product_name"
field_price = "price"
field_category = "category"
schema = [
utility.FieldSchema(name=field_name, dtype=DataType.INT64, is_primary=True),
utility.FieldSchema(name=field_vector, dtype=DataType.FLOAT_VECTOR, dim=1536),
utility.FieldSchema(name=field_name, dtype=DataType.VARCHAR, max_length=256),
utility.FieldSchema(name=field_price, dtype=DataType.DOUBLE),
utility.FieldSchema(name=field_category, dtype=DataType.VARCHAR, max_length=64),
]
# 创建集合
collection_name = "products"
if utility.has_collection(collection_name):
utility.drop_collection(collection_name)
collection = Collection(
name=collection_name,
schema=schema
)
collection.create()
# 创建索引
index_params = {
"metric_type": "COSINE",
"index_type": "IVF_FLAT",
"params": {"nlist": 128}
}
collection.create_index(
field_name=field_vector,
index_params=index_params
)
collection.load()
插入向量:
# 插入向量
entities = [
[1, [0.1, -0.2, 0.8, ...], "Running Shoes", 99.99, "Sports"],
[2, [0.2, -0.1, 0.7, ...], "Basketball", 29.99, "Sports"],
]
ids = collection.insert(entities)
查询:
# 搜索相似项
query_vector = model.encode("athletic footwear")
search_params = {
"metric_type": "COSINE",
"params": {"nprobe": 16}
}
results = collection.search(
data=[query_vector.tolist()],
anns_field=field_vector,
param=search_params,
limit=5,
expr=f"category == 'Sports' && price <= 100"
)
for result in results[0]:
print(f"产品:{result['entity']['product_name']}")
print(f"距离:{result['distance']}")
Chroma
设置:
import chromadb
# 初始化Chroma
chroma_client = chromadb.Client()
# 创建集合
collection = chroma_client.create_collection(
name="products",
metadata={"hnsw:space": "cosine"}
)
添加文档:
# 添加文档并嵌入
documents = [
"Comfortable running shoes for daily training",
"Professional basketball for competitive play"
]
metadatas = [
{"name": "Running Shoes", "category": "Sports", "price": 99.99},
{"name": "Basketball", "category": "Sports", "price": 29.99}
]
ids = ["prod_1", "prod_2"]
collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
查询:
# 语义搜索
query_text = "athletic footwear"
results = collection.query(
query_texts=[query_text],
n_results=5,
where={"category": "Sports", "price": {"$lte": 100}}
)
for result in results['ids'][0]:
print(f"ID:{result}")
print(f"文档:{results['documents'][0][results['ids'][0].index(result)]}")
print(f"距离:{results['distances'][0][results['ids'][0].index(result)]}")
pgvector (PostgreSQL)
设置:
-- 启用pgvector扩展
CREATE EXTENSION IF NOT EXISTS vector;
-- 创建带有向量列的表
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(256),
description TEXT,
price DECIMAL(10,2),
category VARCHAR(64),
embedding vector(1536) -- 1536维向量
);
-- 为快速搜索创建HNSW索引
CREATE INDEX ON products USING hnsw (embedding vector_cosine_ops);
插入向量:
-- 插入嵌入
INSERT INTO products (name, description, price, category, embedding)
VALUES (
'Running Shoes',
'Comfortable running shoes for daily training',
99.99,
'Sports',
'[0.1, -0.2, 0.8, ...]'::vector
);
查询:
-- 语义搜索
SELECT
id,
name,
description,
price,
category,
1 - (embedding <=> '[0.1, -0.2, 0.8, ...]'::vector) AS similarity
FROM products
WHERE category = 'Sports'
AND price <= 100
ORDER BY embedding <=> '[0.1, -0.2, 0.8, ...]'::vector
LIMIT 5;
Redis Vector Search
设置:
import redis
from redis.commands.search.field import VectorField
# 连接到Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 创建索引
r.ft.create_index(
name="products_idx",
schema=[
VectorField("embedding", "HNSW", {
"TYPE": "FLOAT32",
"DIM": 1536,
"DISTANCE_METRIC": "COSINE",
"INITIAL_CAP": 1000,
"BLOCK_SIZE": 128
}),
"name", "TEXT",
"category", "TAG",
"price", "NUMERIC"
]
)
添加文档:
# 添加文档并嵌入
r.hset(
"prod:1",
mapping={
"name": "Running Shoes",
"category": "Sports",
"price": 99.99,
"embedding": np.array([0.1, -0.2, 0.8, ...]).astype(np.float32).tobytes()
}
)
r.hset(
"prod:2",
mapping={
"name": "Basketball",
"category": "Sports",
"price": 29.99,
"embedding": np.array([0.2, -0.1, 0.7, ...]).astype(np.float32).tobytes()
}
)
查询:
# 语义搜索
query_vector = model.encode("athletic footwear").astype(np.float32).tobytes()
results = r.ft.search(
index_name="products_idx",
query="*=>[KNN 5 @embedding $vector]",
query_params={
"vector": query_vector
},
filter="@category:{Sports} @price:[-inf 100]"
)
for result in results.docs:
print(f"产品:{result.name}")
print(f"分数:{result.__score}")
距离度量
余弦相似度
测量两个向量之间角度的余弦值。范围:[-1, 1]。
import numpy as np
def cosine_similarity(v1, v2):
dot_product = np.dot(v1, v2)
norm_v1 = np.linalg.norm(v1)
norm_v2 = np.linalg.norm(v2)
return dot_product / (norm_v1 * norm_v2)
# 距离 = 1 - 相似度
cosine_distance = 1 - cosine_similarity(v1, v2)
何时使用:
- 文本嵌入(OpenAI, Cohere)
- 方向不重要
- 大小不重要
欧几里得距离
测量两个向量之间的直线距离。
def euclidean_distance(v1, v2):
return np.linalg.norm(v1 - v2)
何时使用:
- 当大小重要时
- 空间数据
- 一些图像嵌入
点积
测量两个向量的点积。
def dot_product(v1, v2):
return np.dot(v1, v2)
何时使用:
- 归一化向量
- 比余弦更快
- 对于归一化向量与余弦相同
比较:
| 度量 | 范围 | 用例 | 优点 | 缺点 |
|---|---|---|---|---|
| 余弦 | [0, 2] | 文本嵌入 | 大小独立 | 慢 |
| 欧几里得 | [0, ∞] | 空间数据 | 直观 | 大小依赖 |
| 点积 | [-1, 1] | 归一化向量 | 最快 | 需要归一化 |
索引算法
HNSW (Hierarchical Navigable Small World)
基于图的近似最近邻算法。
# Pinecone默认使用HNSW
index = pc.Index("products", metric="cosine")
# HNSW参数
index.update(
name="products",
spec=pinecone.ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
优点:
- 查询快
- 适用于大型数据集
- 可扩展
缺点:
- 近似(不精确)
- 需要调整(M, efConstruction)
IVF (Inverted File)
将空间划分为Voronoi单元。
# Milvus IVF配置
index_params = {
"metric_type": "COSINE",
"index_type": "IVF_FLAT",
"params": {
"nlist": 128 # 聚类数量
}
}
collection.create_index(
field_name="embedding",
index_params=index_params
)
优点:
- 适用于中等数据集
- 比暴力搜索快
- 精确搜索
缺点:
- 需要调整(nlist)
- 对于大型数据集比HNSW慢
PQ (Product Quantization)
压缩向量以实现更快的搜索和更少的内存使用。
# Milvus PQ配置
index_params = {
"metric_type": "COSINE",
"index_type": "IVF_PQ",
"params": {
"nlist": 128,
"m": 8 # 子向量数量
}
}
collection.create_index(
field_name="embedding",
index_params=index_params
)
优点:
- 查询非常快
- 内存使用低
- 适用于非常大的数据集
缺点:
- 精度损失
- 需要调整
- 更复杂
比较:
| 算法 | 速度 | 内存 | 精度 | 用例 | |----------|-------|--------|-----------| | HNSW | 快 | 中等 | 高 | 大型数据集 | | IVF | 中等 | 低 | 精确 | 中等数据集 | | PQ | 非常快 | 非常低 | 中等 | 非常大的数据集 | | Flat | 慢 | 高 | 精确 | 小型数据集 |
混合搜索(向量+关键词)
结合语义和关键词搜索
from qdrant_client import QdrantClient
client = QdrantClient(url="http://localhost:6333")
# 混合搜索,包括向量和关键词
results = client.search(
collection_name="products",
query_vector=query_embedding.tolist(),
query_filter={
"must": [
{
"key": "category",
"match": {"value": "Sports"}
},
{
"key": "name",
"match": {"value": "*running*"} # 关键词匹配
}
]
},
limit=10
)
互惠排名融合(RRF)
结合多个来源的结果。
def reciprocal_rank_fusion(vector_results, keyword_results, k=60):
scores = {}
# 评分向量结果
for i, result in enumerate(vector_results):
doc_id = result['id']
rank = i + 1
scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank)
# 评分关键词结果
for i, result in enumerate(keyword_results):
doc_id = result['id']
rank = i + 1
scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank)
# 按组合得分排序
sorted_results = sorted(
scores.items(),
key=lambda x: x[1],
reverse=True
)
return sorted_results[:10]
加权混合搜索
def weighted_hybrid_search(vector_results, keyword_results, alpha=0.5):
combined = {}
# 组合权重
for result in vector_results:
doc_id = result['id']
combined[doc_id] = combined.get(doc_id, 0) + alpha * result['score']
for result in keyword_results:
doc_id = result['id']
combined[doc_id] = combined.get(doc_id, 0) + (1 - alpha) * result['score']
# 按组合得分排序
sorted_results = sorted(
combined.items(),
key=lambda x: x[1],
reverse=True
)
return sorted_results[:10]
过滤和元数据
预过滤
在向量搜索前过滤结果。
# Pinecone预过滤
results = index.query(
vector=query_embedding.tolist(),
top_k=10,
filter={
"category": {"$eq": "Sports"},
"price": {"$lte": 100},
"in_stock": {"$eq": True}
}
)
后过滤
在向量搜索后过滤结果。
# 先获取结果
results = index.query(
vector=query_embedding.tolist(),
top_k=100 # 获取更多结果
)
# 然后过滤
filtered_results = [
r for r in results['matches']
if r['metadata']['price'] <= 100
and r['metadata']['category'] == 'Sports'
and r['metadata']['in_stock'] == True
][:10] # 取前10
元数据模式设计
# 好的元数据模式
metadata = {
"name": "Running Shoes", # 字符串用于过滤
"category": "Sports", # 字符串用于过滤
"price": 99.99, # 数值用于范围过滤
"in_stock": True, # 布尔值用于过滤
"brand": "Nike", # 字符串用于过滤
"color": ["red", "blue"], # 数组用于过滤
"rating": 4.5, # 数值用于排序
"created_at": "2024-01-01" # 日期用于过滤
}
RAG (Retrieval Augmented Generation)模式
基本RAG流程
from openai import OpenAI
client = OpenAI()
# 1. 检索相关文档
query_embedding = model.encode("What are the benefits of running?")
results = index.query(
vector=query_embedding.tolist(),
top_k=5
)
# 2. 用检索到的上下文构建提示
context = "
".join([
f"{r['metadata']['name']}: {r['metadata']['description']}"
for r in results['matches']
])
prompt = f"""
上下文:
{context}
问题:What are the benefits of running?
根据上述上下文回答问题。
"""
# 3. 生成响应
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
)
print(response.choices[0].message.content)
重新排名结果
from sentence_transformers import CrossEncoder
# 加载重排器
reranker = CrossEncoder('ms-marco-MiniLM-L-6-v3')
# 1. 获取初始结果
initial_results = index.query(
vector=query_embedding.tolist(),
top_k=20
)
# 2. 用交叉编码器重新排名
query = "What are the benefits of running?"
documents = [r['metadata']['description'] for r in initial_results['matches']]
reranked_scores = reranker.predict(
[(query, doc) for doc in documents]
)
# 3. 组合并排序
for i, result in enumerate(initial_results['matches']):
result['rerank_score'] = reranked_scores[i]
final_results = sorted(
initial_results['matches'],
key=lambda x: x['rerank_score'],
reverse=True
)[:10] # 前10
混合RAG
def hybrid_rag(query):
# 向量搜索
query_embedding = model.encode(query)
vector_results = index.query(
vector=query_embedding.tolist(),
top_k=10
)
# 关键词搜索
keyword_results = keyword_search(query)
# 结合RRF
combined = reciprocal_rank_fusion(
vector_results['matches'],
keyword_results
)
# 使用顶部结果进行RAG
context = "
".join([
f"{r['metadata']['name']}: {r['metadata']['description']}"
for r in combined[:5]
])
# 生成响应
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": f"Context:
{context}
Question: {query}"}
]
)
return response.choices[0].message.content
文档分块策略
固定大小分块
def fixed_size_chunking(text, chunk_size=500):
chunks = []
for i in range(0, len(text), chunk_size):
chunk = text[i:i + chunk_size]
chunks.append(chunk)
return chunks
text = "This is a long document that needs to be chunked..."
chunks = fixed_size_chunking(text, chunk_size=500)
语义分块
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
def semantic_chunking(text):
sentences = text.split('. ') # 简单句子分割
chunks = []
current_chunk = []
current_embedding = None
for sentence in sentences:
sentence_embedding = model.encode(sentence)
if current_embedding is None:
current_chunk.append(sentence)
current_embedding = sentence_embedding
else:
# 检查相似性
similarity = cosine_similarity(current_embedding, sentence_embedding)
if similarity < 0.7: # 低相似性,新块
chunks.append('. '.join(current_chunk))
current_chunk = [sentence]
current_embedding = sentence_embedding
else:
current_chunk.append(sentence)
if current_chunk:
chunks.append('. '.join(current_chunk))
return chunks
滑动窗口分块
def sliding_window_chunking(text, window_size=500, stride=250):
chunks = []
for i in range(0, len(text) - window_size + 1, stride):
chunk = text[i:i + window_size]
chunks.append(chunk)
return chunks
text = "This is a long document that needs to be chunked..."
chunks = sliding_window_chunking(text, window_size=500, stride=250)
查询优化
查询缓存
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_query(query_text):
query_embedding = model.encode(query_text)
return index.query(
vector=query_embedding.tolist(),
top_k=10
)
批量查询
def batch_query(queries):
query_embeddings = model.encode(queries)
results = []
for embedding in query_embeddings:
result = index.query(
vector=embedding.tolist(),
top_k=10
)
results.append(result)
return results
查询扩展
def query_expansion(query):
# 生成变体
variations = [
query,
query.replace("shoes", "footwear"),
query.replace("running", "athletic"),
]
# 查询所有变体
all_results = []
for variation in variations:
embedding = model.encode(variation)
results = index.query(
vector=embedding.tolist(),
top_k=10
)
all_results.extend(results['matches'])
# 去重和重新排名
seen = set()
unique_results = []
for result in all_results:
if result['id'] not in seen:
seen.add(result['id'])
unique_results.append(result)
return unique_results[:10]
扩展向量搜索
水平扩展
# 多个Pinecone索引
indexes = [
pc.Index("products_shard_1"),
pc.Index("products_shard_2"),
pc.Index("products_shard_3"),
]
def query_all_shards(query_embedding):
results = []
for index in indexes:
result = index.query(
vector=query_embedding.tolist(),
top_k=10
)
results.extend(result['matches'])
# 合并并重新排名
merged = merge_results(results)
return merged[:10]
分片策略
def get_shard_index(product_id, num_shards=3):
# 一致性哈希
shard_id = product_id % num_shards
return f"products_shard_{shard_id}"
# 插入到正确的分片
product_id = 123
shard_index_name = get_shard_index(product_id)
shard_index = pc.Index(shard_index_name)
复制
# 复制数据以扩展读取
primary_index = pc.Index("products_primary")
replica_index = pc.Index("products_replica")
# 从最近的副本查询
results = replica_index.query(
vector=query_embedding.tolist(),
top_k=10
)
成本优化
降维
from sklearn.decomposition import PCA
# 原始嵌入(1536维)
original_embeddings = model.encode(texts)
# 降低到256维
pca = PCA(n_components=256)
reduced_embeddings = pca.fit_transform(original_embeddings)
print(f"原始:{original_embeddings.shape}")
print(f"降低:{reduced_embeddings.shape}")
量化
import numpy as np
# Float32到uint8
def quantize_vector(vector, bits=8):
# 查找最小值和最大值
min_val = np.min(vector)
max_val = np.max(vector)
# 量化
quantized = np.round(
(vector - min_val) / (max_val - min_val) * (2**bits - 1)
).astype(np.uint8)
return quantized
# 存储量化向量
quantized_embeddings = [quantize_vector(v) for v in embeddings]
缓存策略
from functools import lru_cache
import hashlib
def get_cache_key(text):
return hashlib.md5(text.encode()).hexdigest()
@lru_cache(maxsize=1000)
def cached_search(query_text):
cache_key = get_cache_key(query_text)
# 检查缓存
if cache_key in search_cache:
return search_cache[cache_key]
# 执行搜索
query_embedding = model.encode(query_text)
results = index.query(
vector=query_embedding.tolist(),
top_k=10
)
# 缓存结果
search_cache[cache_key] = results
return results
评估指标
Recall@K
def recall_at_k(relevant_ids, retrieved_ids, k):
retrieved_at_k = set(retrieved_ids[:k])
relevant_set = set(relevant_ids)
recall = len(retrieved_at_k & relevant_set) / len(relevant_set)
return recall
# 示例
relevant_ids = [1, 5, 8, 12] # 真实情况
retrieved_ids = [1, 3, 5, 8, 10, 12] # 检索结果
recall_5 = recall_at_k(relevant_ids, retrieved_ids, k=5)
print(f"Recall@5: {recall_5}")
Precision@K
def precision_at_k(relevant_ids, retrieved_ids, k):
retrieved_at_k = set(retrieved_ids[:k])
relevant_set = set(relevant_ids)
precision = len(retrieved_at_k & relevant_set) / k
return precision
precision_5 = precision_at_k(relevant_ids, retrieved_ids, k=5)
print(f"Precision@5: {precision_5}")
平均倒数排名(MRR)
def mean_reciprocal_rank(relevant_ids, retrieved_ids):
mrr = 0
for relevant_id in relevant_ids:
try:
rank = retrieved_ids.index(relevant_id) + 1
mrr += 1 / rank
except ValueError:
mrr += 0
return mrr / len(relevant_ids)
mrr = mean_reciprocal_rank(relevant_ids, retrieved_ids)
print(f"MRR: {mrr}")
归一化折扣累积增益(NDCG)
def dcg(relevance_scores, k):
dcg = relevance_scores[0]
for i in range(1, min(k, len(relevance_scores))):
dcg += relevance_scores[i] / np.log2(i + 2)
return dcg
def ndcg(relevance_scores, k):
# 理想DCG
ideal_relevance = sorted(relevance_scores, reverse=True)
idcg = dcg(ideal_relevance, k)
# 实际DCG
actual_dcg = dcg(relevance_scores, k)
return actual_dcg / idcg if idcg > 0 else 0
relevance_scores = [1, 0, 1, 0, 1] # 二进制相关性
ndcg_5 = ndcg(relevance_scores, k=5)
print(f"NDCG@5: {ndcg_5}")
常见用例
语义文档搜索
def search_documents(query, top_k=10):
query_embedding = model.encode(query)
results = index.query(
vector=query_embedding.tolist(),
top_k=top_k,
include_metadata=True
)
return [
{
"id": r['id'],
"title": r['metadata']['title'],
"content": r['metadata']['content'],
"score": r['score']
}
for r in results['matches']
]
相似产品推荐
def similar_products(product_id, top_k=5):
# 获取产品嵌入
product = get_product(product_id)
product_embedding = product['embedding']
# 查找相似产品
results = index.query(
vector=product_embedding.tolist(),
top_k=top_k + 1, # +1排除自身
filter={"category": {"$eq": product['category']}}
)
# 排除产品本身
similar = [
r for r in results['matches']
if r['id'] != product_id
]
return similar[:top_k]
问题回答
def answer_question(question):
# 检索相关文档
query_embedding = model.encode(question)
results = index.query(
vector=query_embedding.tolist(),
top_k=5
)
# 构建上下文
context = "
".join([
f"文档:{r['metadata']['text']}"
for r in results['matches']
])
# 生成答案
response = client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "Answer the question based on the provided context."
},
{
"role": "user",
"content": f"Context:
{context}
Question: {question}"
}
]
)
return response.choices[0].message.content
重复项检测
def find_duplicates(text, threshold=0.95):
text_embedding = model.encode(text)
# 查找相似文档
results = index.query(
vector=text_embedding.tolist(),
top_k=10
)
# 按阈值过滤
duplicates = [
r for r in results['matches']
if r['score'] >= threshold
]
return duplicates
最佳实践和陷阱
最佳实践
-
嵌入选择
- 根据用例选择合适的模型
- 考虑嵌入维度与性能
- 在承诺前测试多个模型
-
索引配置
- 调整HNSW参数(M, efConstruction)
- 选择合适的距离度量
- 考虑速度和准确性之间的权衡
-
查询优化
- 尽可能使用预过滤
- 实施查询缓存
- 考虑重新排名以获得更好的结果
-
元数据设计
- 存储相关元数据以进行过滤
- 使用适当的数据类型
- 索引元数据字段
-
监控
- 跟踪查询延迟
- 监控命中率
- 为异常设置警报
常见陷阱
-
嵌入维度不匹配
# 错误:不同维度 model1 = SentenceTransformer('all-MiniLM-L6-v2') # 384维 model2 = SentenceTransformer('all-MiniLM-L12-v2') # 768维 # 正确:所有嵌入使用相同模型 model = SentenceTransformer('all-MiniLM-L6-v2') -
不归一化向量
# 错误:不归一化 vector = model.encode(text) # 正确:归一化以进行余弦相似度计算 vector = model.encode(text) vector = vector / np.linalg.norm(vector) # 归一化 -
忽略元数据过滤
# 错误:无过滤 results = index.query(vector=query, top_k=10) # 正确:应用过滤器 results = index.query( vector=query, top_k=10, filter={"category": {"$eq": "Sports"}} ) -
不处理空结果
# 错误:假设结果存在 results = index.query(vector=query, top_k=10) for r in results['matches']: print(r) # 正确:处理空结果 results = index.query(vector=query, top_k=10) if not results['matches']: return []
相关技能
04-database/vector-database06-ai-ml-production/rag-patterns06-ai-ml-production/embeddings