嵌入模型
概览
全面的文本嵌入模型和使用指南。本技能涵盖了嵌入概念(密集向量、语义相似性、距离度量)、流行模型(OpenAI、Sentence Transformers、Cohere、BGE)、模型选择标准、嵌入生成(批量处理、缓存)、降维(PCA、t-SNE)、微调、评估指标(精确度、召回率、F1)、存储策略(numpy、pickle、HDF5)、生产优化和用例(语义搜索、聚类、分类)。
前提条件
- 理解向量空间和线性代数
- 了解机器学习基础
- 熟悉Python和NumPy
- 理解余弦相似度和距离度量
- 基本了解transformers和深度学习
- 熟悉scikit-learn进行评估
核心概念
嵌入基础
- 密集向量:固定大小的文本数值表示
- 语义相似性:相似含义具有相似向量
- 维度:向量空间中的维度数
- 距离度量:余弦相似度、欧几里得距离、曼哈顿距离
流行嵌入模型
- OpenAI嵌入:text-embedding-3-small (1536维),text-embedding-3-large (3072维)
- Sentence Transformers:all-MiniLM-L6-v2 (384维),all-mpnet-base-v2 (768维)
- Cohere嵌入:embed-english-v3.0 (1024维)
- BGE模型:BAAI/bge-small-en-v1.5 (384维),BAAI/bge-large-en-v1.5 (1024维)
模型选择标准
- 用例:语义搜索、分类、聚类
- 预算:免费(开源)与付费(API)
- 性能:速度与准确性的权衡
- 维度:较低维度=更快,较高维度=更准确
评估指标
- 检索:Precision@K, Recall@K, MRR(平均倒数排名)
- 分类:准确度、F1、精确度、召回率
- 聚类:轮廓系数、Davies-Bouldin指数
存储格式
- NumPy:快速加载,简单格式
- Pickle:Python原生,支持元数据
- HDF5:适用于大型数据集,支持压缩
实施指南
嵌入概念
理解嵌入
"""
嵌入是文本的密集向量表示,能够捕捉语义含义。
关键概念:
- 密集向量:固定大小的数值表示
- 语义相似性:相似含义具有相似向量
- 维度:向量中的维度数
- 距离度量:余弦相似度、欧几里得距离
示例:
"cat" -> [0.2, -0.5, 0.8, ...] # 384维向量
"dog" -> [0.3, -0.4, 0.7, ...] # 384维向量
"cat"和"dog"的向量相似,因为它们都是动物。
"""
class EmbeddingConcepts:
"""理解嵌入概念。"""
@staticmethod
def explain_embeddings():
"""解释嵌入概念。"""
return {
"dense_vectors": "固定大小的文本数值表示",
"semantic_similarity": "相似含义具有相似向量",
"dimensionality": "向量空间中的维度数",
"distance_metrics": "余弦相似度、欧几里得距离"
}
@staticmethod
def compare_distance_metrics():
"""比较不同的距离度量。"""
import numpy as np
# 示例向量
vec1 = np.array([1, 0, 0])
vec2 = np.array([0, 1, 0])
# 余弦相似度
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
cosine_sim = dot_product / (norm1 * norm2)
# 欧几里得距离
euclidean_dist = np.linalg.norm(vec1 - vec2)
# 曼哈顿距离
manhattan_dist = np.sum(np.abs(vec1 - vec2))
return {
"cosine_similarity": cosine_sim,
"euclidean_distance": euclidean_dist,
"manhattan_distance": manhattan_dist
}
# 使用
concepts = EmbeddingConcepts()
print(concepts.explain_embeddings())
metrics = concepts.compare_distance_metrics()
print(f"余弦相似度: {metrics['cosine_similarity']:.3f}")
流行模型
OpenAI嵌入
from openai import OpenAI
import numpy as np
class OpenAIEmbeddings:
"""OpenAI嵌入API。"""
def __init__(self, api_key: str, model: str = "text-embedding-3-small"):
self.client = OpenAI(api_key=api_key)
self.model = model
def embed_text(self, text: str) -> np.ndarray:
"""为单个文本生成嵌入。"""
response = self.client.embeddings.create(
input=text,
model=self.model
)
return np.array(response.data[0].embedding)
def embed_texts(self, texts: list) -> list[np.ndarray]:
"""为多个文本生成嵌入。"""
response = self.client.embeddings.create(
input=texts,
model=self.model
)
return [np.array(data.embedding) for data in response.data]
def get_embedding_dimension(self) -> int:
"""获取嵌入维度。"""
sample = self.embed_text("sample")
return len(sample)
# 使用
embeddings = OpenAIEmbeddings(api_key="your-api-key")
# 单个嵌入
embedding = embeddings.embed_text("Hello, world!")
print(f"嵌入形状: {embedding.shape}")
# 批量嵌入
texts = ["Hello", "World", "How are you?"]
batch_embeddings = embeddings.embed_texts(texts)
print(f"批量嵌入: {len(batch_embeddings)} 向量")
# 获取维度
dim = embeddings.get_embedding_dimension()
print(f"嵌入维度: {dim}")
Sentence Transformers
from sentence_transformers import SentenceTransformer
import numpy as np
class SentenceTransformerEmbeddings:
"""Sentence Transformers (Hugging Face)嵌入。"""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
def embed_text(self, text: str) -> np.ndarray:
"""为单个文本生成嵌入。"""
embedding = self.model.encode(text)
return embedding
def embed_texts(self, texts: list) -> np.ndarray:
"""为多个文本生成嵌入。"""
embeddings = self.model.encode(texts)
return embeddings
def embed_documents(self, documents: list) -> np.ndarray:
"""为文档生成嵌入。"""
embeddings = self.model.encode(documents)
return embeddings
def compute_similarity(self, text1: str, text2: str) -> float:
"""计算两个文本之间的相似性。"""
emb1 = self.embed_text(text1)
emb2 = self.embed_text(text2)
# 余弦相似度
similarity = np.dot(emb1, emb2) / (
np.linalg.norm(emb1) * np.linalg.norm(emb2)
)
return similarity
# 使用
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")
# 单个嵌入
embedding = embeddings.embed_text("Hello, world!")
print(f"嵌入形状: {embedding.shape}")
# 批量嵌入
texts = ["Hello", "World", "How are you?"]
batch_embeddings = embeddings.embed_texts(texts)
print(f"批量嵌入形状: {batch_embeddings.shape}")
# 计算相似性
similarity = embeddings.compute_similarity("cat", "dog")
print(f"'cat'和'dog'之间的相似性: {similarity:.3f}")
Cohere嵌入
import cohere
import numpy as np
class CohereEmbeddings:
"""Cohere嵌入API。"""
def __init__(self, api_key: str, model: str = "embed-english-v3.0"):
self.client = cohere.Client(api_key=api_key)
self.model = model
def embed_text(self, text: str) -> np.ndarray:
"""为单个文本生成嵌入。"""
response = self.client.embed(
texts=[text],
model=self.model
)
return np.array(response.embeddings[0])
def embed_texts(self, texts: list) -> np.ndarray:
"""为多个文本生成嵌入。"""
response = self.client.embed(
texts=texts,
model=self.model
)
return np.array(response.embeddings)
def embed_documents(self, documents: list) -> np.ndarray:
"""为文档生成嵌入。"""
response = self.client.embed(
texts=documents,
input_type="search_document",
model=self.model
)
return np.array(response.embeddings)
# 使用
embeddings = CohereEmbeddings(api_key="your-api-key")
# 单个嵌入
embedding = embeddings.embed_text("Hello, world!")
print(f"嵌入形状: {embedding.shape}")
# 批量嵌入
texts = ["Hello", "World", "How are you?"]
batch_embeddings = embeddings.embed_texts(texts)
print(f"批量嵌入形状: {batch_embeddings.shape}")
BGE模型
from sentence_transformers import SentenceTransformer
import numpy as np
class BGEEmbeddings:
"""BGE (BAAI General Embedding)模型。"""
def __init__(self, model_name: str = "BAAI/bge-small-en-v1.5"):
self.model = SentenceTransformer(model_name)
def embed_text(self, text: str) -> np.ndarray:
"""为单个文本生成嵌入。"""
embedding = self.model.encode(text)
return embedding
def embed_query(self, query: str) -> np.ndarray:
"""为查询生成嵌入(针对搜索优化)。"""
embedding = self.model.encode(query)
return embedding
def embed_documents(self, documents: list) -> np.ndarray:
"""为文档生成嵌入。"""
embeddings = self.model.encode(documents)
return embeddings
def compute_scores(self, query: str, documents: list) -> np.ndarray:
"""为文档计算相关性分数。"""
query_emb = self.embed_query(query)
doc_embeddings = self.embed_documents(documents)
# 余弦相似度
scores = np.dot(doc_embeddings, query_emb) / (
np.linalg.norm(doc_embeddings, axis=1) * np.linalg.norm(query_emb)
)
return scores
# 使用
embeddings = BGEEmbeddings(model_name="BAAI/bge-small-en-v1.5")
# 查询嵌入
query = "What is capital of France?"
query_emb = embeddings.embed_query(query)
# 文档嵌入
documents = [
"Paris is capital of France.",
"London is capital of UK.",
"Berlin is capital of Germany."
]
doc_embeddings = embeddings.embed_documents(documents)
# 计算分数
scores = embeddings.compute_scores(query, documents)
print(f"Scores: {scores}")
模型选择标准
模型比较
import pandas as pd
class EmbeddingModelComparison:
"""比较不同的嵌入模型。"""
def __init__(self):
self.models = {
"openai_small": {
"name": "text-embedding-3-small",
"dimensions": 1536,
"cost_per_1k_tokens": 0.00002,
"speed": "fast"
},
"openai_large": {
"name": "text-embedding-3-large",
"dimensions": 3072,
"cost_per_1k_tokens": 0.00013,
"speed": "medium"
},
"sentence_transformers": {
"name": "all-MiniLM-L6-v2",
"dimensions": 384,
"cost_per_1k_tokens": 0,
"speed": "fast"
},
"bge_small": {
"name": "BAAI/bge-small-en-v1.5",
"dimensions": 384,
"cost_per_1k_tokens": 0,
"speed": "medium"
},
"bge_large": {
"name": "BAAI/bge-large-en-v1.5",
"dimensions": 1024,
"cost_per_1k_tokens": 0,
"speed": "slow"
}
}
def get_model_recommendation(
self,
use_case: str = "general",
budget: str = "free",
performance_priority: str = "speed"
) -> str:
"""根据标准推荐模型。"""
# 按预算过滤
if budget == "free":
available_models = {
k: v for k, v in self.models.items()
if v["cost_per_1k_tokens"] == 0
}
else:
available_models = self.models
# 按用例过滤
if use_case == "semantic_search":
# 优先选择针对搜索优化的模型
search_optimized = ["bge_small", "bge_large"]
available_models = {
k: v for k, v in available_models.items()
if k in search_optimized
}
elif use_case == "classification":
# 优先选择较大模型
available_models = {
k: v for k, v in available_models.items()
if v["dimensions"] >= 768
}
elif use_case == "clustering":
# 优先选择平衡模型
available_models = {
k: v for k, v in available_models.items()
if 384 <= v["dimensions"] <= 768
}
# 按性能优先级排序
if performance_priority == "speed":
speed_order = {"fast": 0, "medium": 1, "slow": 2}
sorted_models = sorted(
available_models.items(),
key=lambda x: speed_order.get(x[1]["speed"], 3)
)
elif performance_priority == "accuracy":
sorted_models = sorted(
available_models.items(),
key=lambda x: -x[1]["dimensions"]
)
else:
sorted_models = list(available_models.items())
return sorted_models[0][0]
def compare_models(self, model_names: list) -> pd.DataFrame:
"""在表格中比较模型。"""
comparison_data = []
for name in model_names:
if name in self.models:
model_info = self.models[name]
comparison_data.append({
"Model": name,
"Dimensions": model_info["dimensions"],
"Cost/1K tokens": model_info["cost_per_1k_tokens"],
"Speed": model_info["speed"]
})
return pd.DataFrame(comparison_data)
# 使用
comparator = EmbeddingModelComparison()
# 获取推荐
recommendation = comparator.get_model_recommendation(
use_case="semantic_search",
budget="free",
performance_priority="speed"
)
print(f"推荐模型: {recommendation}")
# 比较模型
comparison = comparator.compare_models([
"openai_small", "sentence_transformers", "bge_small"
])
print(comparison)
选择决策树
class EmbeddingModelSelector:
"""模型选择的决策树。"""
@staticmethod
def select_model(
data_size: str,
latency_requirement: str,
accuracy_requirement: str,
budget: str
) -> str:
"""根据要求选择模型。"""
# 小数据,低延迟,低准确度,免费预算
if (data_size == "small" and latency_requirement == "low" and
accuracy_requirement == "low" and budget == "free"):
return "sentence_transformers"
# 大数据,高延迟,高准确度,付费预算
if (data_size == "large" and latency_requirement == "high" and
accuracy_requirement == "high" and budget == "paid"):
return "openai_large"
# 中等数据,中等延迟,中等准确度,免费预算
if (data_size == "medium" and latency_requirement == "medium" and
accuracy_requirement == "medium" and budget == "free"):
return "bge_small"
# 语义搜索用例
if latency_requirement == "low" and accuracy_requirement == "high":
return "bge_large"
# 默认
return "sentence_transformers"
@staticmethod
def get_model_config(model_name: str) -> dict:
"""获取选定模型的配置。"""
configs = {
"sentence_transformers": {
"model_name": "all-MiniLM-L6-v2",
"dimensions": 384,
"batch_size": 32,
"normalize": True
},
"bge_small": {
"model_name": "BAAI/bge-small-en-v1.5",
"dimensions": 384,
"batch_size": 32,
"normalize": True,
"query_prompt": "Represent this sentence for searching relevant passages:"
},
"bge_large": {
"model_name": "BAAI/bge-large-en-v1.5",
"dimensions": 1024,
"batch_size": 16,
"normalize": True,
"query_prompt": "Represent this sentence for searching relevant passages:"
},
"openai_small": {
"model_name": "text-embedding-3-small",
"dimensions": 1536,
"batch_size": 100,
"normalize": False
},
"openai_large": {
"model_name": "text-embedding-3-large",
"dimensions": 3072,
"batch_size": 100,
"normalize": False
}
}
return configs.get(model_name, configs["sentence_transformers"])
# 使用
selector = EmbeddingModelSelector()
model_name = selector.select_model(
data_size="medium",
latency_requirement="low",
accuracy_requirement="medium",
budget="free"
)
config = selector.get_model_config(model_name)
print(f"选定模型: {model_name}")
print(f"配置: {config}")
嵌入生成
批量处理
import numpy as np
from typing import List
from tqdm import tqdm
class BatchEmbeddingGenerator:
"""批量生成嵌入。"""
def __init__(self, embedding_model, batch_size: int = 32):
self.embedding_model = embedding_model
self.batch_size = batch_size
def generate_embeddings(
self,
texts: List[str],
show_progress: bool = True
) -> np.ndarray:
"""为文本批量生成嵌入。"""
all_embeddings = []
# 批量处理
batches = [
texts[i:i + self.batch_size]
for i in range(0, len(texts), self.batch_size)
]
iterator = tqdm(batches) if show_progress else batches
for batch in iterator:
if hasattr(self.embedding_model, 'encode'):
embeddings = self.embedding_model.encode(batch)
elif hasattr(self.embedding_model, 'embed_texts'):
embeddings = self.embedding_model.embed_texts(batch)
else:
embeddings = [self.embedding_model.embed_text(t) for t in batch]
all_embeddings.extend(embeddings)
return np.array(all_embeddings)
def generate_embeddings_async(
self,
texts: List[str]
) -> np.ndarray:
"""异步生成嵌入。"""
import asyncio
async def process_batch(batch):
if hasattr(self.embedding_model, 'encode'):
return self.embedding_model.encode(batch)
else:
return [self.embedding_model.embed_text(t) for t in batch]
async def process_all():
batches = [
texts[i:i + self.batch_size]
for i in range(0, len(texts), self.batch_size)
]
tasks = [process_batch(batch) for batch in batches]
results = await asyncio.gather(*tasks)
return np.concatenate(results)
return asyncio.run(process_all())
# 使用
# 使用Sentence Transformers
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
generator = BatchEmbeddingGenerator(model, batch_size=64)
texts = ["Text 1", "Text 2", "Text 3", ...] # 许多文本
# 生成嵌入
embeddings = generator.generate_embeddings(texts)
print(f"生成了{len(embeddings)}个嵌入,形状为{embeddings.shape}")
缓存
import numpy as np
import hashlib
import pickle
from pathlib import Path
from typing import Optional
class EmbeddingCache:
"""缓存嵌入以避免重新计算。"""
def __init__(self, cache_dir: str = "./embedding_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _get_cache_key(self, text: str) -> str:
"""为文本生成缓存键。"""
return hashlib.md5(text.encode()).hexdigest()
def get(self, text: str) -> Optional[np.ndarray]:
"""获取缓存的嵌入。"""
cache_key = self._get_cache_key(text)
cache_file = self.cache_dir / f"{cache_key}.pkl"
if cache_file.exists():
with open(cache_file, 'rb') as f:
return pickle.load(f)
return None
def set(self, text: str, embedding: np.ndarray):
"""缓存嵌入。"""
cache_key = self._get_cache_key(text)
cache_file = self.cache_dir / f"{cache_key}.pkl"
with open(cache_file, 'wb') as f:
pickle.dump(embedding, f)
def get_or_generate(
self,
text: str,
embedding_model
) -> np.ndarray:
"""获取缓存的嵌入或生成新的嵌入。"""
cached = self.get(text)
if cached is not None:
return cached
# 生成新的嵌入
if hasattr(embedding_model, 'encode'):
embedding = embedding_model.encode(text)
else:
embedding = embedding_model.embed_text(text)
# 缓存它
self.set(text, embedding)
return embedding
def clear(self):
"""清除所有缓存。"""
for file in self.cache_dir.glob("*.pkl"):
file.unlink()
# 使用
cache = EmbeddingCache()
# 第一次调用 - 生成并缓存
embedding1 = cache.get_or_generate("Hello, world!", embedding_model)
# 第二次调用 - 从缓存中检索
embedding2 = cache.get_or_generate("Hello, world!", embedding_model)
print(f"嵌入是否相等: {np.allclose(embedding1, embedding2)}")
降维
import numpy as np
from sklearn.decomposition import PCA, TSNE
from sklearn.manifold import MDS
import matplotlib.pyplot as plt
class DimensionalityReducer:
"""降低嵌入维度。"""
def __init__(self, method: str = "pca"):
self.method = method
self.reducer = None
def fit(self, embeddings: np.ndarray, n_components: int = 2):
"""拟合降维。"""
if self.method == "pca":
self.reducer = PCA(n_components=n_components)
elif self.method == "tsne":
self.reducer = TSNE(n_components=n_components)
elif self.method == "mds":
self.reducer = MDS(n_components=n_components)
else:
raise ValueError(f"未知方法: {self.method}")
self.reducer.fit(embeddings)
return self.reducer
def transform(self, embeddings: np.ndarray) -> np.ndarray:
"""将嵌入转换到降维空间。"""
return self.reducer.transform(embeddings)
def fit_transform(self, embeddings: np.ndarray, n_components: int = 2) -> np.ndarray:
"""一步完成拟合和转换。"""
self.fit(embeddings, n_components)
return self.transform(embeddings)
def visualize(self, embeddings: np.ndarray, labels: list = None):
"""可视化降维嵌入。"""
reduced = self.fit_transform(embeddings)
plt.figure(figsize=(10, 8))
scatter = plt.scatter(reduced[:, 0], reduced[:, 1], c=labels, cmap='viridis', alpha=0.6)
plt.colorbar(scatter)
plt.xlabel('Component 1')
plt.ylabel('Component 2')
plt.title(f'Embedding Visualization ({self.method.upper()})')
plt.show()
# 使用
# 生成样本嵌入
np.random.seed(42)
embeddings = np.random.randn(100, 384) # 100个样本,384维
labels = np.random.randint(0, 3, 100) # 3个聚类
# PCA降维
pca_reducer = DimensionalityReducer(method="pca")
pca_reduced = pca_reducer.fit_transform(embeddings, n_components=2)
print(f"PCA降维形状: {pca_reduced.shape}")
# t-SNE可视化
tsne_reducer = DimensionalityReducer(method="tsne")
tsne_reducer.visualize(embeddings, labels)
微调嵌入
from sentence_transformers import SentenceTransformer, InputExample, losses
from torch.utils.data import DataLoader
import torch
from typing import List
class EmbeddingFineTuner:
"""在自定义数据上微调嵌入模型。"""
def __init__(
self,
model_name: str = "all-MiniLM-L6-v2",
max_length: int = 512,
batch_size: int = 16,
epochs: int = 3
):
self.model = SentenceTransformer(model_name)
self.max_length = max_length
self.batch_size = batch_size
self.epochs = epochs
def prepare_data(
self,
texts: List[str],
labels: List[int] = None
) -> List[InputExample]:
"""准备微调数据。"""
examples = []
for i, text in enumerate(texts):
label = labels[i] if labels else 0
examples.append(InputExample(texts=[text], label=label))
return examples
def fine_tune(
self,
train_texts: List[str],
train_labels: List[int],
val_texts: List[str] = None,
val_labels: List[int] = None
):
"""微调模型。"""
# 准备数据
train_examples = self.prepare_data(train_texts, train_labels)
# 创建数据加载器
train_dataloader = DataLoader(
train_examples,
shuffle=True,
batch_size=self.batch_size
)
# 定义损失函数
train_loss = losses.CosineSimilarityLoss(model=self.model)
# 微调
self.model.fit(
train_objectives=[(train_dataloader, train_loss)],
epochs=self.epochs,
warmup_steps=100,
use_amp=True
)
# 保存微调后的模型
self.model.save(f"./fine_tuned_model")
return self.model
def evaluate(
self,
texts: List[str],
labels: List[int]
) -> float:
"""评估微调后的模型。"""
from sentence_transformers.evaluation import SentenceEvaluator
evaluator = SentenceEvaluator(
texts,
labels
)
metrics = evaluator(self.model)
return metrics
# 使用
finetuner = EmbeddingFineTuner(
model_name="all-MiniLM-L6-v2",
epochs=3
)
# 微调数据
train_texts = [
"This is a positive sentence.",
"This is another positive sentence.",
"This is a negative sentence.",
"This is another negative sentence."
]
train_labels = [1, 1, 0, 0]
# 微调
fine_tuned_model = finetuner.fine_tune(train_texts, train_labels)
# 评估
test_texts = ["This is a test sentence.", "Another test sentence."]
test_labels = [1, 0]
metrics = finetuner.evaluate(test_texts, test_labels)
print(f"评估指标: {metrics}")
评估指标
import numpy as np
from typing import List, Tuple
from sklearn.metrics import (
accuracy_score,
f1_score,
precision_score,
recall_score
)
class EmbeddingEvaluator:
"""评估嵌入模型性能。"""
@staticmethod
def cosine_similarity(emb1: np.ndarray, emb2: np.ndarray) -> float:
"""计算嵌入之间的余弦相似度。"""
dot_product = np.dot(emb1, emb2)
norm1 = np.linalg.norm(emb1)
norm2 = np.linalg.norm(emb2)
return dot_product / (norm1 * norm2)
@staticmethod
def evaluate_retrieval(
query_embeddings: np.ndarray,
document_embeddings: np.ndarray,
relevant_docs: List[List[int]],
k: int = 10
) -> dict:
"""评估检索性能。"""
results = {
"precision": [],
"recall": [],
"f1": []
}
for query_idx, relevant in enumerate(relevant_docs):
# 计算相似度
similarities = []
for doc_idx in range(len(document_embeddings)):
sim = EmbeddingEvaluator.cosine_similarity(
query_embeddings[query_idx],
document_embeddings[doc_idx]
)
similarities.append(sim)
# 获取top-k文档
top_k_indices = np.argsort(similarities)[-k:]
# 计算指标
retrieved_set = set(top_k_indices)
relevant_set = set(relevant)
true_positives = len(retrieved_set & relevant_set)
precision = true_positives / k
recall = true_positives / len(relevant) if relevant else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
results["precision"].append(precision)
results["recall"].append(recall)
results["f1"].append(f1)
return {
"avg_precision": np.mean(results["precision"]),
"avg_recall": np.mean(results["recall"]),
"avg_f1": np.mean(results["f1"])
}
@staticmethod
def evaluate_clustering(
embeddings: np.ndarray,
true_labels: np.ndarray,
predicted_labels: np.ndarray
) -> dict:
"""评估聚类性能。"""
return {
"accuracy": accuracy_score(true_labels, predicted_labels),
"precision": precision_score(true_labels, predicted_labels, average='weighted'),
"recall": recall_score(true_labels, predicted_labels, average='weighted'),
"f1": f1_score(true_labels, predicted_labels, average='weighted')
}
# 使用
evaluator = EmbeddingEvaluator()
# 生成样本数据
np.random.seed(42)
query_embeddings = np.random.randn(10, 384)
document_embeddings = np.random.randn(100, 384)
relevant_docs = [
[5, 10, 15], # 查询0的相关文档
[2, 8, 12], # 查询1的相关文档
# ...更多查询
]
# 评估检索
retrieval_metrics = evaluator.evaluate_retrieval(
query_embeddings,
document_embeddings,
relevant_docs,
k=10
)
print(f"检索指标: {retrieval_metrics}")
存储策略
import numpy as np
import pickle
from pathlib import Path
from typing import List
import h5py
class EmbeddingStorage:
"""高效存储嵌入。"""
def __init__(self, storage_type: str = "numpy"):
self.storage_type = storage_type
self.storage_path = Path("./embeddings")
self.storage_path.mkdir(parents=True, exist_ok=True)
def save_numpy(self, embeddings: np.ndarray, filename: str):
"""将嵌入保存为numpy文件。"""
file_path = self.storage_path / f"{filename}.npy"
np.save(file_path, embeddings)
def load_numpy(self, filename: str) -> np.ndarray:
"""从numpy文件加载嵌入。"""
file_path = self.storage_path / f"{filename}.npy"
return np.load(file_path)
def save_pickle(self, embeddings: np.ndarray, metadata: dict, filename: str):
"""将嵌入和元数据保存为pickle。"""
file_path = self.storage_path / f"{filename}.pkl"
data = {
"embeddings": embeddings,
"metadata": metadata
}
with open(file_path, 'wb') as f:
pickle.dump(data, f)
def load_pickle(self, filename: str) -> tuple:
"""加载嵌入和元数据。"""
file_path = self.storage_path / f"{filename}.pkl"
with open(file_path, 'rb') as f:
data = pickle.load(f)
return data["embeddings"], data["metadata"]
def save_hdf5(self, embeddings: np.ndarray, texts: List[str], filename: str):
"""将嵌入保存为HDF5格式。"""
file_path = self.storage_path / f"{filename}.h5"
with h5py.File(file_path, 'w') as f:
f.create_dataset('embeddings', data=embeddings)
f.create_dataset('texts', data=texts)
def load_hdf5(self, filename: str) -> tuple:
"""从HDF5格式加载嵌入。"""
file_path = self.storage_path / f"{filename}.h5"
with h5py.File(file_path, 'r') as f:
embeddings = f['embeddings'][:]
texts = f['texts'][:]
return embeddings, texts
# 使用
storage = EmbeddingStorage(storage_type="numpy")
# 生成样本嵌入
embeddings = np.random.randn(100, 384)
texts = [f"Text {i}" for i in range(100)]
metadata = {"model": "all-MiniLM-L6-v2", "dimension": 384}
# 保存
storage.save_numpy(embeddings, "embeddings")
storage.save_pickle(embeddings, metadata, "embeddings_with_meta")
storage.save_hdf5(embeddings, texts, "embeddings_hdf5")
# 加载
loaded_embeddings = storage.load_numpy("embeddings")
loaded_with_meta, meta = storage.load_pickle("embeddings_with_meta")
loaded_embeddings, loaded_texts = storage.load_hdf5("embeddings_hdf5")
生产优化
import numpy as np
from typing import List
from concurrent.futures import ThreadPoolExecutor
import time
class ProductionEmbeddingOptimizer:
"""为生产优化嵌入生成。"""
def __init__(self, embedding_model, max_workers: int = 4):
self.embedding_model = embedding_model
self.max_workers = max_workers
def batch_generate(
self,
texts: List[str],
batch_size: int = 32
) -> np.ndarray:
"""批量生成嵌入。"""
all_embeddings = []
batches = [
texts[i:i + batch_size]
for i in range(0, len(texts), batch_size)
]
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for batch in batches:
future = executor.submit(self._process_batch, batch)
futures.append(future)
for future in futures:
embeddings = future.result()
all_embeddings.extend(embeddings)
return np.array(all_embeddings)
def _process_batch(self, batch: List[str]) -> np.ndarray:
"""处理单个批次。"""
if hasattr(self.embedding_model, 'encode'):
return self.embedding_model.encode(batch)
elif hasattr(self.embedding_model, 'embed_texts'):
embeddings = self.embedding_model.embed_texts(batch)
return np.array(embeddings)
else:
return np.array([self.embedding_model.embed_text(t) for t in batch])
def benchmark(self, texts: List[str], num_runs: int = 5) -> dict:
"""基准测试嵌入生成。"""
results = {
"latency_ms": [],
"throughput_per_sec": []
}
for _ in range(num_runs):
start_time = time.time()
embeddings = self.batch_generate(texts)
end_time = time.time()
latency = (end_time - start_time) * 1000
throughput = len(texts) / (end_time - start_time)
results["latency_ms"].append(latency)
results["throughput_per_sec"].append(throughput)
return {
"avg_latency_ms": np.mean(results["latency_ms"]),
"avg_throughput_per_sec": np.mean(results["throughput_per_sec"]),
"min_latency_ms": np.min(results["latency_ms"]),
"max_latency_ms": np.max(results["latency_ms"])
}
# 使用
optimizer = ProductionEmbeddingOptimizer(embedding_model, max_workers=4)
# 基准测试
texts = ["Text " + str(i) for i in range(100)]
benchmark_results = optimizer.benchmark(texts, num_runs=5)
print(f"平均延迟: {benchmark_results['avg_latency_ms']:.2f}ms")
print(f"平均吞吐量: {benchmark_results['avg_throughput_per_sec']:.2f} 个文本/秒")
用例
语义搜索
import numpy as np
from typing import List, Tuple
class SemanticSearch:
"""使用嵌入进行语义搜索。"""
def __init__(self, embedding_model):
self.embedding_model = embedding_model
self.document_embeddings = None
self.documents = None
def index_documents(self, documents: List[str]):
"""通过生成嵌入索引文档。"""
self.documents = documents
self.document_embeddings = self.embedding_model.encode(documents)
def search(
self,
query: str,
top_k: int = 5
) -> List[Tuple[str, float]]:
"""搜索相似文档。"""
# 生成查询嵌入
query_embedding = self.embedding_model.encode(query)
# 计算相似度
similarities = []
for doc_embedding in self.document_embeddings:
sim = np.dot(query_embedding, doc_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(doc_embedding)
)
similarities.append(sim)
# 获取top-k
top_indices = np.argsort(similarities)[-top_k:][::-1]
results = []
for idx in top_indices:
results.append((self.documents[idx], similarities[idx]))
return results
# 使用
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
search = SemanticSearch(model)
# 索引文档
documents = [
"Paris is the capital of France.",
"London is the capital of the UK.",
"Berlin is the capital of Germany.",
"Madrid is the capital of Spain."
]
search.index_documents(documents)
# 搜索
results = search.search("capital of France", top_k=2)
for doc, score in results:
print(f"Score: {score:.3f} | {doc}")
聚类
import numpy as np
from sklearn.cluster import KMeans, DBSCAN
from sklearn.metrics import silhouette_score
import matplotlib.pyplot as plt
class EmbeddingClustering:
"""使用嵌入进行文档聚类。"""
def __init__(self, n_clusters: int = 3):
self.n_clusters = n_clusters
def kmeans_cluster(self, embeddings: np.ndarray) -> np.ndarray:
"""K-means聚类。"""
kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
labels = kmeans.fit_predict(embeddings)
return labels
def dbscan_cluster(self, embeddings: np.ndarray) -> np.ndarray:
"""DBSCAN聚类(基于密度)。"""
dbscan = DBSCAN(eps=0.5, min_samples=5)
labels = dbscan.fit_predict(embeddings)
return labels
def visualize_clusters(self, embeddings: np.ndarray, labels: np.ndarray):
"""在2D中可视化聚类。"""
from sklearn.decomposition import PCA
# 降维至2D以进行可视化
pca = PCA(n_components=2)
embeddings_2d = pca.fit_transform(embeddings)
plt.figure(figsize=(10, 8))
scatter = plt.scatter(
embeddings_2d[:, 0],
embeddings_2d[:, 1],
c=labels,
cmap='viridis',
alpha=0.6
)
plt.colorbar(scatter)
plt.xlabel('Component 1')
plt.ylabel('Component 2')
plt.title('Document Clusters')
plt.show()
def evaluate_clustering(self, embeddings: np.ndarray, labels: np.ndarray) -> float:
"""评估聚类质量。"""
if len(set(labels)) > 1:
return silhouette_score(embeddings, labels)
return 0.0
# 使用
clusterer = EmbeddingClustering(n_clusters=3)
# 生成样本嵌入
np.random.seed(42)
embeddings = np.random.randn(100, 384)
# K-means聚类
kmeans_labels = clusterer.kmeans_cluster(embeddings)
silhouette = clusterer.evaluate_clustering(embeddings, kmeans_labels)
print(f"K-means轮廓系数: {silhouette:.3f}")
# 可视化
clusterer.visualize_clusters(embeddings, kmeans_labels)
分类
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
class EmbeddingClassifier:
"""使用嵌入对文档进行分类。"""
def __init__(self):
self.classifier = LogisticRegression(random_state=42)
def train(
self,
embeddings: np.ndarray,
labels: np.ndarray,
test_size: float = 0.2
) -> dict:
"""在嵌入上训练分类器。"""
X_train, X_test, y_train, y_test = train_test_split(
embeddings, labels, test_size=test_size, random_state=42
)
self.classifier.fit(X_train, y_train)
# 在测试集上预测
y_pred = self.classifier.predict(X_test)
# 评估
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred)
return {
"accuracy": accuracy,
"report": report
}
def predict(self, embeddings: np.ndarray) -> np.ndarray:
"""对嵌入进行标签预测。"""
return self.classifier.predict(embeddings)
def predict_proba(self, embeddings: np.ndarray) -> np.ndarray:
"""对嵌入进行概率预测。"""
return self.classifier.predict_proba(embeddings)
# 使用
classifier = EmbeddingClassifier()
# 生成样本数据
np.random.seed(42)
embeddings = np.random.randn(100, 384)
labels = np.random.randint(0, 3, 100) # 3个类别
# 训练
results = classifier.train(embeddings, labels)
print(f"准确度: {results['accuracy']:.3f}")
print(f"分类报告:
{results['report']}")
最佳实践
嵌入生成
- 使用批量处理以提高效率
- 实现缓存以避免重复计算
- 标准化嵌入以进行一致的相似度计算
- 选择合适的批量大小基于内存限制
模型选择
- 考虑用例:语义搜索、分类、聚类
- 评估权衡:速度与准确性与成本
- 在特定数据上测试多个模型
- 在生产中监控性能
存储
- 使用高效格式:HDF5用于大型数据集
- 包含元数据:模型名称、维度、时间戳
- 实现版本控制:跟踪模型变更
- 考虑压缩以提高存储效率
生产
- 为API调用实现重试逻辑
- 使用连接池以提高性能
- 监控付费API的成本
- 设置适当的超时以确保可靠性
评估
- 使用适当的指标:检索的精确度、召回率、F1
- 在优化前建立基线
- 监控随时间的漂移
- A/B测试不同模型