向量数据库模式
概览
向量数据库是专门设计的数据库,用于高效存储、索引和查询高维向量。它们通过使用各种距离度量来寻找与查询向量“最接近”的向量,实现相似性搜索。这项技能涵盖了Pinecone、Qdrant、Weaviate、嵌入策略、相似性搜索、性能优化和生产考虑。
前提条件
- 理解向量和嵌入
- 了解机器学习概念
- 熟悉Python或TypeScript
- 理解相似性度量(余弦、欧几里得、点积)
- 基本的数据库概念
核心概念
向量数据库基础
- 向量:数据(文本、图像、音频)在高维空间的数值表示
- 嵌入:由机器学习模型生成的向量,捕获语义含义
- 距离度量:向量之间相似性的度量(余弦、欧几里得、点积)
- 索引:使快速相似性搜索成为可能的数据结构
- 元数据:与向量关联的附加信息,用于过滤
向量数据库类型
- Pinecone:托管服务,易于设置,适合生产环境
- Qdrant:开源,自托管选项,灵活
- Weaviate:开源,GraphQL API,适合多模态
使用案例
- 语义搜索(查找相似的文档、产品、图像)
- 推荐系统
- 异常检测
- 自然语言处理任务
- 计算机视觉应用
- 个性化引擎
- RAG(检索增强生成)的知识检索
实施指南
Pinecone
设置和索引
# 安装Pinecone客户端
# pip install pinecone-client
import pinecone
from pinecone import Pinecone, ServerlessSpec
# 初始化Pinecone
pc = Pinecone(api_key="your-api-key")
# 创建索引
pc.create_index(
name="my-index",
dimension=1536, # OpenAI嵌入维度
metric="cosine", # 或 "euclidean", "dotproduct"
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
# 连接到索引
index = pc.Index("my-index")
# 检查索引统计
stats = index.describe_index_stats()
print(f"总向量数: {stats['total_vector_count']}")
print(f"维度: {stats['dimension']}")
插入向量
# 插入单个向量
index.upsert(
vectors=[
{
"id": "doc1",
"values": [0.1, 0.2, 0.3, ...], # 1536维向量
"metadata": {
"title": "文档1",
"category": "技术",
"date": "2024-01-01"
}
}
]
)
# 插入多个向量
index.upsert(
vectors=[
{
"id": "doc1",
"values": vector1,
"metadata": {"title": "文档1", "category": "技术"}
},
{
"id": "doc2",
"values": vector2,
"metadata": {"title": "文档2", "category": "科学"}
},
{
"id": "doc3",
"values": vector3,
"metadata": {"title": "文档3", "category": "技术"}
}
],
namespace="documents"
)
# 批量插入
from tqdm import tqdm
def upsert_in_batches(vectors, batch_size=100):
for i in tqdm(range(0, len(vectors), batch_size)):
batch = vectors[i:i + batch_size]
index.upsert(vectors=batch)
查询
# 基本相似性搜索
results = index.query(
vector=query_vector,
top_k=10,
include_metadata=True,
include_values=False
)
for match in results['matches']:
print(f"ID: {match['id']}, 得分: {match['score']}")
print(f"元数据: {match['metadata']}")
# 查询带命名空间
results = index.query(
vector=query_vector,
top_k=10,
namespace="documents",
include_metadata=True
)
# 查询带过滤器
results = index.query(
vector=query_vector,
top_k=10,
filter={
"category": {"$eq": "技术"},
"date": {"$gte": "2024-01-01"}
},
include_metadata=True
)
# 查询带复杂过滤器
results = index.query(
vector=query_vector,
top_k=10,
filter={
"$or": [
{"category": {"$eq": "技术"}},
{"category": {"$eq": "科学"}}
],
"date": {"$gte": "2024-01-01"}
},
include_metadata=True
)
删除向量
# 删除单个向量
index.delete(ids=["doc1"])
# 删除多个向量
index.delete(ids=["doc1", "doc2", "doc3"])
# 删除命名空间中的所有向量
index.delete(delete_all=True, namespace="documents")
# 按过滤器删除
index.delete(
filter={
"category": {"$eq": "旧"},
"date": {"$lt": "2023-01-01"}
},
namespace="documents"
)
Qdrant
集合和点
# 安装Qdrant客户端
# pip install qdrant-client
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
# 初始化Qdrant客户端
client = QdrantClient(url="http://localhost:6333")
# 创建集合
client.create_collection(
collection_name="documents",
vectors_config=VectorParams(
size=1536,
distance=Distance.COSINE # 或Distance.EUCLID, Distance.DOT
)
)
# 创建具有多个向量的集合
client.create_collection(
collection_name="multimodal",
vectors_config={
"text": VectorParams(size=1536, distance=Distance.COSINE),
"image": VectorParams(size=512, distance=Distance.EUCLID)
}
)
# 列出集合
collections = client.get_collections()
for collection in collections.collections:
print(f"集合: {collection.name}")
# 获取集合信息
info = client.get_collection("documents")
print(f"向量计数: {info.vectors_count}")
print(f"点计数: {info.points_count}")
插入点
# 插入单个点
client.upsert(
collection_name="documents",
points=[
PointStruct(
id=1,
vector=[0.1, 0.2, 0.3, ...],
payload={
"title": "文档1",
"category": "技术",
"date": "2024-01-01"
}
)
]
)
# 插入多个点
client.upsert(
collection_name="documents",
points=[
PointStruct(id=1, vector=vector1, payload={"title": "Doc 1", "category": "技术"}),
PointStruct(id=2, vector=vector2, payload={"title": "Doc 2", "category": "科学"}),
PointStruct(id=3, vector=vector3, payload={"title": "Doc 3", "category": "技术"}),
]
)
# 批量插入
from qdrant_client.models import Batch
def insert_in_batches(points, batch_size=100):
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
client.upsert(
collection_name="documents",
points=Batch(
ids=[p.id for p in batch],
vectors=[p.vector for p in batch],
payloads=[p.payload for p in batch]
)
)
查询
# 基本搜索
results = client.search(
collection_name="documents",
query_vector=query_vector,
limit=10,
with_payload=True
)
for result in results:
print(f"ID: {result.id}, 得分: {result.score}")
print(f"有效载荷: {result.payload}")
# 搜索带过滤器
results = client.search(
collection_name="documents",
query_vector=query_vector,
query_filter=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="技术")
),
FieldCondition(
key="date",
range=Range(
gte="2024-01-01"
)
)
]
),
limit=10,
with_payload=True
)
# 搜索带命名向量
results = client.search(
collection_name="multimodal",
query_vector=NamedVector(
name="text",
vector=query_vector
),
limit=10
)
# 混合搜索(向量+关键字)
from qdrant_client.models import SearchRequest
results = client.search_batch(
collection_name="documents",
requests=[
SearchRequest(
vector=NamedVector(name="text", vector=query_vector),
limit=10,
with_payload=True
),
SearchRequest(
vector=NamedVector(name="image", vector=image_query_vector),
limit=10,
with_payload=True
)
]
)
过滤
# 精确匹配过滤器
filter = Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="技术")
)
]
)
# 范围过滤器
filter = Filter(
must=[
FieldCondition(
key="price",
range=Range(
gte=100,
lte=1000
)
)
]
)
# OR过滤器
filter = Filter(
should=[
FieldCondition(
key="category",
match=MatchValue(value="技术")
),
FieldCondition(
key="category",
match=MatchValue(value="科学")
)
],
min_count=1
)
# 嵌套过滤器
filter = Filter(
must=[
FieldCondition(
key="metadata.category",
match=MatchValue(value="技术")
)
]
)
# IS NULL过滤器
filter = Filter(
must_not=[
FieldCondition(
key="deleted_at",
is_null=True
)
]
)
Weaviate
模式设置
# 安装Weaviate客户端
# pip install weaviate-client
import weaviate
from weaviate import Client
# 初始化Weaviate客户端
client = Client("http://localhost:8080")
# 定义模式
schema = {
"classes": [
{
"class": "Document",
"description": "一个文档",
"vectorizer": "text2vec-openai",
"properties": [
{
"name": "title",
"dataType": ["string"],
"description": "文档的标题"
},
{
"name": "content",
"dataType": ["text"],
"description": "文档的内容"
},
{
"name": "category",
"dataType": ["string"],
"description": "文档的类别"
},
{
"name": "date",
"dataType": ["date"],
"description": "文档的日期"
},
{
"name": "metadata",
"dataType": ["object"],
"description": "附加元数据"
}
]
}
]
}
# 创建模式
client.schema.create(schema)
# 获取模式
schema = client.schema.get()
print(schema)
插入数据
# 插入单个对象
client.data_object.create(
class_name="Document",
data_object={
"title": "文档1",
"content": "这是文档1的内容",
"category": "技术",
"date": "2024-01-01T00:00:00Z",
"metadata": {
"author": "John Doe",
"tags": ["技术", "人工智能"]
}
}
)
# 插入多个对象
objects = [
{
"title": "文档1",
"content": "内容1",
"category": "技术"
},
{
"title": "文档2",
"content": "内容2",
"category": "科学"
}
]
for obj in objects:
client.data_object.create(
class_name="Document",
data_object=obj
)
# 插入自定义向量
client.data_object.create(
class_name="Document",
data_object={
"title": "文档1",
"content": "内容1"
},
vector=[0.1, 0.2, 0.3, ...]
)
# 批量插入
from weaviate.batch import Batch
with Batch(client) as batch:
for obj in objects:
batch.add_data_object(
data_object=obj,
class_name="Document"
)
查询
# 语义搜索
results = client.query.get(
class_name="Document",
properties=["title", "content", "category"]
).with_near_text({
"concepts": ["人工智能"],
"distance": 0.7
}).with_limit(10).do()
for result in results["data"]["Get"]["Document"]:
print(f"标题: {result['title']}")
print(f"距离: {result['_additional']['distance']}")
# 混合搜索(BM25 + 向量)
results = client.query.get(
class_name="Document",
properties=["title", "content"]
).with_hybrid(
query="人工智能",
alpha=0.7, # 0 = 纯BM25, 1 = 纯向量
vector=query_vector
).with_limit(10).do()
# 过滤搜索
results = client.query.get(
class_name="Document",
properties=["title", "content", "category"]
).with_where({
"path": ["category"],
"operator": "Equal",
"valueString": "技术"
}).with_near_text({
"concepts": ["AI"]
}).with_limit(10).do()
# 范围过滤
results = client.query.get(
class_name="Document",
properties=["title", "date"]
).with_where({
"operator": "And",
"operands": [
{
"path": ["category"],
"operator": "Equal",
"valueString": "技术"
},
{
"path": ["date"],
"operator": "GreaterThan",
"valueDate": "2024-01-01T00:00:00Z"
}
]
}).with_near_text({
"concepts": ["AI"]
}).do()
嵌入策略
文本嵌入
# 使用OpenAI嵌入
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
def get_embedding(text: str) -> list:
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
# 批量嵌入
def get_embeddings(texts: list) -> list:
response = client.embeddings.create(
model="text-embedding-3-small",
input=texts
)
return [item.embedding for item in response.data]
# 长文本分块
def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> list:
chunks = []
for i in range(0, len(text), chunk_size - overlap):
chunks.append(text[i:i + chunk_size])
return chunks
图像嵌入
# 使用CLIP进行图像嵌入
from PIL import Image
import clip
import torch
# 加载CLIP模型
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)
def get_image_embedding(image_path: str) -> list:
image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)
with torch.no_grad():
image_features = model.encode_image(image)
return image_features.cpu().numpy().tolist()[0]
# 批量图像嵌入
def get_image_embeddings(image_paths: list) -> list:
images = torch.stack([preprocess(Image.open(path)) for path in image_paths]).to(device)
with torch.no_grad():
image_features = model.encode_image(images)
return image_features.cpu().numpy().tolist()
多模态嵌入
# 使用OpenAI CLIP进行文本-图像相似性
def get_text_embedding(text: str) -> list:
text_tokens = clip.tokenize([text]).to(device)
with torch.no_grad():
text_features = model.encode_text(text_tokens)
return text_features.cpu().numpy().tolist()[0]
def get_image_embedding(image_path: str) -> list:
image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)
with torch.no_grad():
image_features = model.encode_image(image)
return image_features.cpu().numpy().tolist()[0]
# 计算相似性
import numpy as np
def cosine_similarity(vec1: list, vec2: list) -> float:
v1 = np.array(vec1)
v2 = np.array(vec2)
return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
相似性搜索
余弦相似度
import numpy as np
def cosine_similarity(vec1: list, vec2: list) -> float:
v1 = np.array(vec1)
v2 = np.array(vec2)
return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
# 示例
vector_a = [1, 2, 3]
vector_b = [2, 4, 6]
similarity = cosine_similarity(vector_a, vector_b)
print(f"余弦相似度: {similarity}")
欧几里得距离
import numpy as np
def euclidean_distance(vec1: list, vec2: list) -> float:
v1 = np.array(vec1)
v2 = np.array(vec2)
return np.linalg.norm(v1 - v2)
# 示例
vector_a = [1, 2, 3]
vector_b = [2, 4, 6]
distance = euclidean_distance(vector_a, vector_b)
print(f"欧几里得距离: {distance}")
点积
import numpy as np
def dot_product(vec1: list, vec2: list) -> float:
v1 = np.array(vec1)
v2 = np.array(vec2)
return np.dot(v1, v2)
# 示例
vector_a = [1, 2, 3]
vector_b = [2, 4, 6]
product = dot_product(vector_a, vector_b)
print(f"点积: {product}")
性能优化
批量操作
# Pinecone批量插入
def upsert_in_batches(vectors, batch_size=100):
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
index.upsert(vectors=batch)
# Qdrant批量插入
from qdrant_client.models import Batch
def insert_in_batches(points, batch_size=100):
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
client.upsert(
collection_name="documents",
points=Batch(
ids=[p.id for p in batch],
vectors=[p.vector for p in batch],
payloads=[p.payload for p in batch]
)
)
索引策略
# Pinecone:选择适当的索引类型
# 对于较小的数据集: p1 pods
# 对于较大的数据集: p2 pods
# 对于生产环境: s1 pods (SSD)
# Qdrant:配置HNSW参数
client.create_collection(
collection_name="documents",
vectors_config=VectorParams(
size=1536,
distance=Distance.COSINE,
hnsw_config={
"m": 16, # 每个节点的连接数
"ef_construct": 100 # 索引构建速度
}
)
)
缓存
# 缓存嵌入
import hashlib
import pickle
from functools import lru_cache
def get_embedding_cache_key(text: str) -> str:
return hashlib.md5(text.encode()).hexdigest()
@lru_cache(maxsize=1000)
def get_cached_embedding(text: str) -> list:
cache_key = get_embedding_cache_key(text)
# 检查缓存
# 如果不在缓存中,计算并存储
return get_embedding(text)
生产考虑
扩展
# Pinecone:扩展索引
# 增加副本数量以提高吞吐量
# 使用更大的pod类型以获得更多存储
# Qdrant:分片
client.create_collection(
collection_name="documents",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
shard_number=4 # 分片数量
)
# Weaviate:多节点设置
# 配置复制因子
监控
# Pinecone:监控索引统计
stats = index.describe_index_stats()
print(f"总向量数: {stats['total_vector_count']}")
print(f"维度: {stats['dimension']}")
# Qdrant:监控集合信息
info = client.get_collection("documents")
print(f"向量计数: {info.vectors_count}")
print(f"点计数: {info.points_count}")
# Weaviate:监控集群
cluster_status = client.cluster.get_nodes()
print(cluster_status)
备份和恢复
# Pinecone:导出数据
# 使用Pinecone的导出功能
# Qdrant:快照
client.create_snapshot(collection_name="documents")
# Weaviate:备份
# 使用Weaviate的备份工具
成本优化
选择正确的服务
- Pinecone:托管服务,易于设置,适合生产环境
- Qdrant:开源,自托管选项,灵活
- Weaviate:开源,GraphQL API,适合多模态
存储优化
# 使用较小的嵌入模型
# text-embedding-3-small (1536维) vs text-embedding-3-large (3072维)
# 压缩向量
# 使用量化或降维
# 删除旧数据
# 实施保留策略
查询优化
# 使用过滤器减少搜索空间
# 限制top_k结果
# 使用适当的距离度量
最佳实践
-
选择适当的嵌入模型
- 对于文本:OpenAI text-embedding-3-small或ada-002
- 对于图像:CLIP、DINO或特定领域的模型
- 对于多模态:CLIP或类似模型
-
预处理数据
- 通过删除特殊字符来清理文本
- 规范化空白
- 转换为小写以保持一致性
-
使用适当的分块
- 分块长文档
- 使用语义分块
- 保持块之间的上下文
-
实现缓存
- 缓存嵌入以减少API调用
- 缓存查询结果
- 使用Redis进行缓存
-
监控性能
- 跟踪查询延迟
- 监控存储使用情况
- 为异常设置警报
-
有效使用过滤器
- 使用元数据过滤器减少搜索空间
- 结合向量搜索和关键字搜索
- 适当时使用混合搜索
-
优雅地处理错误
- 实施重试逻辑
- 处理速率限制
- 记录错误以供调试
-
彻底测试
- 使用真实数据进行测试
- 评估搜索质量
- 基准性能
-
安全
- 在生产中使用身份验证
- 加密敏感数据
- 遵循最小权限原则
-
可扩展性
- 设计水平扩展
- 使用适当的分片策略
- 监控资源使用情况