EmbeddingModels EmbeddingModels

本技能提供全面的文本嵌入模型和使用指南,涵盖嵌入概念、流行模型、模型选择标准、嵌入生成、降维、微调、评估指标、存储策略、生产优化和用例,如语义搜索、聚类和分类。

NLP 0 次安装 0 次浏览 更新于 3/5/2026

嵌入模型

概览

全面的文本嵌入模型和使用指南。本技能涵盖了嵌入概念(密集向量、语义相似性、距离度量)、流行模型(OpenAI、Sentence Transformers、Cohere、BGE)、模型选择标准、嵌入生成(批量处理、缓存)、降维(PCA、t-SNE)、微调、评估指标(精确度、召回率、F1)、存储策略(numpy、pickle、HDF5)、生产优化和用例(语义搜索、聚类、分类)。

前提条件

  • 理解向量空间和线性代数
  • 了解机器学习基础
  • 熟悉Python和NumPy
  • 理解余弦相似度和距离度量
  • 基本了解transformers和深度学习
  • 熟悉scikit-learn进行评估

核心概念

嵌入基础

  • 密集向量:固定大小的文本数值表示
  • 语义相似性:相似含义具有相似向量
  • 维度:向量空间中的维度数
  • 距离度量:余弦相似度、欧几里得距离、曼哈顿距离

流行嵌入模型

  • OpenAI嵌入:text-embedding-3-small (1536维),text-embedding-3-large (3072维)
  • Sentence Transformers:all-MiniLM-L6-v2 (384维),all-mpnet-base-v2 (768维)
  • Cohere嵌入:embed-english-v3.0 (1024维)
  • BGE模型:BAAI/bge-small-en-v1.5 (384维),BAAI/bge-large-en-v1.5 (1024维)

模型选择标准

  • 用例:语义搜索、分类、聚类
  • 预算:免费(开源)与付费(API)
  • 性能:速度与准确性的权衡
  • 维度:较低维度=更快,较高维度=更准确

评估指标

  • 检索:Precision@K, Recall@K, MRR(平均倒数排名)
  • 分类:准确度、F1、精确度、召回率
  • 聚类:轮廓系数、Davies-Bouldin指数

存储格式

  • NumPy:快速加载,简单格式
  • Pickle:Python原生,支持元数据
  • HDF5:适用于大型数据集,支持压缩

实施指南

嵌入概念

理解嵌入

"""
嵌入是文本的密集向量表示,能够捕捉语义含义。

关键概念:
- 密集向量:固定大小的数值表示
- 语义相似性:相似含义具有相似向量
- 维度:向量中的维度数
- 距离度量:余弦相似度、欧几里得距离

示例:
"cat" -> [0.2, -0.5, 0.8, ...]  # 384维向量
"dog" -> [0.3, -0.4, 0.7, ...]  # 384维向量

"cat"和"dog"的向量相似,因为它们都是动物。
"""
class EmbeddingConcepts:
    """理解嵌入概念。"""

    @staticmethod
    def explain_embeddings():
        """解释嵌入概念。"""
        return {
            "dense_vectors": "固定大小的文本数值表示",
            "semantic_similarity": "相似含义具有相似向量",
            "dimensionality": "向量空间中的维度数",
            "distance_metrics": "余弦相似度、欧几里得距离"
        }

    @staticmethod
    def compare_distance_metrics():
        """比较不同的距离度量。"""
        import numpy as np

        # 示例向量
        vec1 = np.array([1, 0, 0])
        vec2 = np.array([0, 1, 0])

        # 余弦相似度
        dot_product = np.dot(vec1, vec2)
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        cosine_sim = dot_product / (norm1 * norm2)

        # 欧几里得距离
        euclidean_dist = np.linalg.norm(vec1 - vec2)

        # 曼哈顿距离
        manhattan_dist = np.sum(np.abs(vec1 - vec2))

        return {
            "cosine_similarity": cosine_sim,
            "euclidean_distance": euclidean_dist,
            "manhattan_distance": manhattan_dist
        }

# 使用
concepts = EmbeddingConcepts()
print(concepts.explain_embeddings())

metrics = concepts.compare_distance_metrics()
print(f"余弦相似度: {metrics['cosine_similarity']:.3f}")

流行模型

OpenAI嵌入

from openai import OpenAI
import numpy as np

class OpenAIEmbeddings:
    """OpenAI嵌入API。"""

    def __init__(self, api_key: str, model: str = "text-embedding-3-small"):
        self.client = OpenAI(api_key=api_key)
        self.model = model

    def embed_text(self, text: str) -> np.ndarray:
        """为单个文本生成嵌入。"""
        response = self.client.embeddings.create(
            input=text,
            model=self.model
        )
        return np.array(response.data[0].embedding)

    def embed_texts(self, texts: list) -> list[np.ndarray]:
        """为多个文本生成嵌入。"""
        response = self.client.embeddings.create(
            input=texts,
            model=self.model
        )
        return [np.array(data.embedding) for data in response.data]

    def get_embedding_dimension(self) -> int:
        """获取嵌入维度。"""
        sample = self.embed_text("sample")
        return len(sample)

# 使用
embeddings = OpenAIEmbeddings(api_key="your-api-key")

# 单个嵌入
embedding = embeddings.embed_text("Hello, world!")
print(f"嵌入形状: {embedding.shape}")

# 批量嵌入
texts = ["Hello", "World", "How are you?"]
batch_embeddings = embeddings.embed_texts(texts)
print(f"批量嵌入: {len(batch_embeddings)} 向量")

# 获取维度
dim = embeddings.get_embedding_dimension()
print(f"嵌入维度: {dim}")

Sentence Transformers

from sentence_transformers import SentenceTransformer
import numpy as np

class SentenceTransformerEmbeddings:
    """Sentence Transformers (Hugging Face)嵌入。"""

    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)

    def embed_text(self, text: str) -> np.ndarray:
        """为单个文本生成嵌入。"""
        embedding = self.model.encode(text)
        return embedding

    def embed_texts(self, texts: list) -> np.ndarray:
        """为多个文本生成嵌入。"""
        embeddings = self.model.encode(texts)
        return embeddings

    def embed_documents(self, documents: list) -> np.ndarray:
        """为文档生成嵌入。"""
        embeddings = self.model.encode(documents)
        return embeddings

    def compute_similarity(self, text1: str, text2: str) -> float:
        """计算两个文本之间的相似性。"""
        emb1 = self.embed_text(text1)
        emb2 = self.embed_text(text2)

        # 余弦相似度
        similarity = np.dot(emb1, emb2) / (
            np.linalg.norm(emb1) * np.linalg.norm(emb2)
        )

        return similarity

# 使用
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")

# 单个嵌入
embedding = embeddings.embed_text("Hello, world!")
print(f"嵌入形状: {embedding.shape}")

# 批量嵌入
texts = ["Hello", "World", "How are you?"]
batch_embeddings = embeddings.embed_texts(texts)
print(f"批量嵌入形状: {batch_embeddings.shape}")

# 计算相似性
similarity = embeddings.compute_similarity("cat", "dog")
print(f"'cat'和'dog'之间的相似性: {similarity:.3f}")

Cohere嵌入

import cohere
import numpy as np

class CohereEmbeddings:
    """Cohere嵌入API。"""

    def __init__(self, api_key: str, model: str = "embed-english-v3.0"):
        self.client = cohere.Client(api_key=api_key)
        self.model = model

    def embed_text(self, text: str) -> np.ndarray:
        """为单个文本生成嵌入。"""
        response = self.client.embed(
            texts=[text],
            model=self.model
        )
        return np.array(response.embeddings[0])

    def embed_texts(self, texts: list) -> np.ndarray:
        """为多个文本生成嵌入。"""
        response = self.client.embed(
            texts=texts,
            model=self.model
        )
        return np.array(response.embeddings)

    def embed_documents(self, documents: list) -> np.ndarray:
        """为文档生成嵌入。"""
        response = self.client.embed(
            texts=documents,
            input_type="search_document",
            model=self.model
        )
        return np.array(response.embeddings)

# 使用
embeddings = CohereEmbeddings(api_key="your-api-key")

# 单个嵌入
embedding = embeddings.embed_text("Hello, world!")
print(f"嵌入形状: {embedding.shape}")

# 批量嵌入
texts = ["Hello", "World", "How are you?"]
batch_embeddings = embeddings.embed_texts(texts)
print(f"批量嵌入形状: {batch_embeddings.shape}")

BGE模型

from sentence_transformers import SentenceTransformer
import numpy as np

class BGEEmbeddings:
    """BGE (BAAI General Embedding)模型。"""

    def __init__(self, model_name: str = "BAAI/bge-small-en-v1.5"):
        self.model = SentenceTransformer(model_name)

    def embed_text(self, text: str) -> np.ndarray:
        """为单个文本生成嵌入。"""
        embedding = self.model.encode(text)
        return embedding

    def embed_query(self, query: str) -> np.ndarray:
        """为查询生成嵌入(针对搜索优化)。"""
        embedding = self.model.encode(query)
        return embedding

    def embed_documents(self, documents: list) -> np.ndarray:
        """为文档生成嵌入。"""
        embeddings = self.model.encode(documents)
        return embeddings

    def compute_scores(self, query: str, documents: list) -> np.ndarray:
        """为文档计算相关性分数。"""
        query_emb = self.embed_query(query)
        doc_embeddings = self.embed_documents(documents)

        # 余弦相似度
        scores = np.dot(doc_embeddings, query_emb) / (
            np.linalg.norm(doc_embeddings, axis=1) * np.linalg.norm(query_emb)
        )

        return scores

# 使用
embeddings = BGEEmbeddings(model_name="BAAI/bge-small-en-v1.5")

# 查询嵌入
query = "What is capital of France?"
query_emb = embeddings.embed_query(query)

# 文档嵌入
documents = [
    "Paris is capital of France.",
    "London is capital of UK.",
    "Berlin is capital of Germany."
]
doc_embeddings = embeddings.embed_documents(documents)

# 计算分数
scores = embeddings.compute_scores(query, documents)
print(f"Scores: {scores}")

模型选择标准

模型比较

import pandas as pd

class EmbeddingModelComparison:
    """比较不同的嵌入模型。"""

    def __init__(self):
        self.models = {
            "openai_small": {
                "name": "text-embedding-3-small",
                "dimensions": 1536,
                "cost_per_1k_tokens": 0.00002,
                "speed": "fast"
            },
            "openai_large": {
                "name": "text-embedding-3-large",
                "dimensions": 3072,
                "cost_per_1k_tokens": 0.00013,
                "speed": "medium"
            },
            "sentence_transformers": {
                "name": "all-MiniLM-L6-v2",
                "dimensions": 384,
                "cost_per_1k_tokens": 0,
                "speed": "fast"
            },
            "bge_small": {
                "name": "BAAI/bge-small-en-v1.5",
                "dimensions": 384,
                "cost_per_1k_tokens": 0,
                "speed": "medium"
            },
            "bge_large": {
                "name": "BAAI/bge-large-en-v1.5",
                "dimensions": 1024,
                "cost_per_1k_tokens": 0,
                "speed": "slow"
            }
        }

    def get_model_recommendation(
        self,
        use_case: str = "general",
        budget: str = "free",
        performance_priority: str = "speed"
    ) -> str:
        """根据标准推荐模型。"""

        # 按预算过滤
        if budget == "free":
            available_models = {
                k: v for k, v in self.models.items()
                if v["cost_per_1k_tokens"] == 0
            }
        else:
            available_models = self.models

        # 按用例过滤
        if use_case == "semantic_search":
            # 优先选择针对搜索优化的模型
            search_optimized = ["bge_small", "bge_large"]
            available_models = {
                k: v for k, v in available_models.items()
                if k in search_optimized
            }
        elif use_case == "classification":
            # 优先选择较大模型
            available_models = {
                k: v for k, v in available_models.items()
                if v["dimensions"] >= 768
            }
        elif use_case == "clustering":
            # 优先选择平衡模型
            available_models = {
                k: v for k, v in available_models.items()
                if 384 <= v["dimensions"] <= 768
            }

        # 按性能优先级排序
        if performance_priority == "speed":
            speed_order = {"fast": 0, "medium": 1, "slow": 2}
            sorted_models = sorted(
                available_models.items(),
                key=lambda x: speed_order.get(x[1]["speed"], 3)
            )
        elif performance_priority == "accuracy":
            sorted_models = sorted(
                available_models.items(),
                key=lambda x: -x[1]["dimensions"]
            )
        else:
            sorted_models = list(available_models.items())

        return sorted_models[0][0]

    def compare_models(self, model_names: list) -> pd.DataFrame:
        """在表格中比较模型。"""
        comparison_data = []

        for name in model_names:
            if name in self.models:
                model_info = self.models[name]
                comparison_data.append({
                    "Model": name,
                    "Dimensions": model_info["dimensions"],
                    "Cost/1K tokens": model_info["cost_per_1k_tokens"],
                    "Speed": model_info["speed"]
                })

        return pd.DataFrame(comparison_data)

# 使用
comparator = EmbeddingModelComparison()

# 获取推荐
recommendation = comparator.get_model_recommendation(
    use_case="semantic_search",
    budget="free",
    performance_priority="speed"
)
print(f"推荐模型: {recommendation}")

# 比较模型
comparison = comparator.compare_models([
    "openai_small", "sentence_transformers", "bge_small"
])
print(comparison)

选择决策树

class EmbeddingModelSelector:
    """模型选择的决策树。"""

    @staticmethod
    def select_model(
        data_size: str,
        latency_requirement: str,
        accuracy_requirement: str,
        budget: str
    ) -> str:
        """根据要求选择模型。"""

        # 小数据,低延迟,低准确度,免费预算
        if (data_size == "small" and latency_requirement == "low" and
            accuracy_requirement == "low" and budget == "free"):
            return "sentence_transformers"

        # 大数据,高延迟,高准确度,付费预算
        if (data_size == "large" and latency_requirement == "high" and
            accuracy_requirement == "high" and budget == "paid"):
            return "openai_large"

        # 中等数据,中等延迟,中等准确度,免费预算
        if (data_size == "medium" and latency_requirement == "medium" and
            accuracy_requirement == "medium" and budget == "free"):
            return "bge_small"

        # 语义搜索用例
        if latency_requirement == "low" and accuracy_requirement == "high":
            return "bge_large"

        # 默认
        return "sentence_transformers"

    @staticmethod
    def get_model_config(model_name: str) -> dict:
        """获取选定模型的配置。"""
        configs = {
            "sentence_transformers": {
                "model_name": "all-MiniLM-L6-v2",
                "dimensions": 384,
                "batch_size": 32,
                "normalize": True
            },
            "bge_small": {
                "model_name": "BAAI/bge-small-en-v1.5",
                "dimensions": 384,
                "batch_size": 32,
                "normalize": True,
                "query_prompt": "Represent this sentence for searching relevant passages:"
            },
            "bge_large": {
                "model_name": "BAAI/bge-large-en-v1.5",
                "dimensions": 1024,
                "batch_size": 16,
                "normalize": True,
                "query_prompt": "Represent this sentence for searching relevant passages:"
            },
            "openai_small": {
                "model_name": "text-embedding-3-small",
                "dimensions": 1536,
                "batch_size": 100,
                "normalize": False
            },
            "openai_large": {
                "model_name": "text-embedding-3-large",
                "dimensions": 3072,
                "batch_size": 100,
                "normalize": False
            }
        }

        return configs.get(model_name, configs["sentence_transformers"])

# 使用
selector = EmbeddingModelSelector()

model_name = selector.select_model(
    data_size="medium",
    latency_requirement="low",
    accuracy_requirement="medium",
    budget="free"
)

config = selector.get_model_config(model_name)
print(f"选定模型: {model_name}")
print(f"配置: {config}")

嵌入生成

批量处理

import numpy as np
from typing import List
from tqdm import tqdm

class BatchEmbeddingGenerator:
    """批量生成嵌入。"""

    def __init__(self, embedding_model, batch_size: int = 32):
        self.embedding_model = embedding_model
        self.batch_size = batch_size

    def generate_embeddings(
        self,
        texts: List[str],
        show_progress: bool = True
    ) -> np.ndarray:
        """为文本批量生成嵌入。"""
        all_embeddings = []

        # 批量处理
        batches = [
            texts[i:i + self.batch_size]
            for i in range(0, len(texts), self.batch_size)
        ]

        iterator = tqdm(batches) if show_progress else batches

        for batch in iterator:
            if hasattr(self.embedding_model, 'encode'):
                embeddings = self.embedding_model.encode(batch)
            elif hasattr(self.embedding_model, 'embed_texts'):
                embeddings = self.embedding_model.embed_texts(batch)
            else:
                embeddings = [self.embedding_model.embed_text(t) for t in batch]

            all_embeddings.extend(embeddings)

        return np.array(all_embeddings)

    def generate_embeddings_async(
        self,
        texts: List[str]
    ) -> np.ndarray:
        """异步生成嵌入。"""
        import asyncio

        async def process_batch(batch):
            if hasattr(self.embedding_model, 'encode'):
                return self.embedding_model.encode(batch)
            else:
                return [self.embedding_model.embed_text(t) for t in batch]

        async def process_all():
            batches = [
                texts[i:i + self.batch_size]
                for i in range(0, len(texts), self.batch_size)
            ]

            tasks = [process_batch(batch) for batch in batches]
            results = await asyncio.gather(*tasks)

            return np.concatenate(results)

        return asyncio.run(process_all())

# 使用
# 使用Sentence Transformers
from sentence_transformers import SentenceTransformer

model = SentenceTransformer('all-MiniLM-L6-v2')
generator = BatchEmbeddingGenerator(model, batch_size=64)

texts = ["Text 1", "Text 2", "Text 3", ...]  # 许多文本

# 生成嵌入
embeddings = generator.generate_embeddings(texts)
print(f"生成了{len(embeddings)}个嵌入,形状为{embeddings.shape}")

缓存

import numpy as np
import hashlib
import pickle
from pathlib import Path
from typing import Optional

class EmbeddingCache:
    """缓存嵌入以避免重新计算。"""

    def __init__(self, cache_dir: str = "./embedding_cache"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)

    def _get_cache_key(self, text: str) -> str:
        """为文本生成缓存键。"""
        return hashlib.md5(text.encode()).hexdigest()

    def get(self, text: str) -> Optional[np.ndarray]:
        """获取缓存的嵌入。"""
        cache_key = self._get_cache_key(text)
        cache_file = self.cache_dir / f"{cache_key}.pkl"

        if cache_file.exists():
            with open(cache_file, 'rb') as f:
                return pickle.load(f)

        return None

    def set(self, text: str, embedding: np.ndarray):
        """缓存嵌入。"""
        cache_key = self._get_cache_key(text)
        cache_file = self.cache_dir / f"{cache_key}.pkl"

        with open(cache_file, 'wb') as f:
            pickle.dump(embedding, f)

    def get_or_generate(
        self,
        text: str,
        embedding_model
    ) -> np.ndarray:
        """获取缓存的嵌入或生成新的嵌入。"""
        cached = self.get(text)
        if cached is not None:
            return cached

        # 生成新的嵌入
        if hasattr(embedding_model, 'encode'):
            embedding = embedding_model.encode(text)
        else:
            embedding = embedding_model.embed_text(text)

        # 缓存它
        self.set(text, embedding)

        return embedding

    def clear(self):
        """清除所有缓存。"""
        for file in self.cache_dir.glob("*.pkl"):
            file.unlink()

# 使用
cache = EmbeddingCache()

# 第一次调用 - 生成并缓存
embedding1 = cache.get_or_generate("Hello, world!", embedding_model)

# 第二次调用 - 从缓存中检索
embedding2 = cache.get_or_generate("Hello, world!", embedding_model)

print(f"嵌入是否相等: {np.allclose(embedding1, embedding2)}")

降维

import numpy as np
from sklearn.decomposition import PCA, TSNE
from sklearn.manifold import MDS
import matplotlib.pyplot as plt

class DimensionalityReducer:
    """降低嵌入维度。"""

    def __init__(self, method: str = "pca"):
        self.method = method
        self.reducer = None

    def fit(self, embeddings: np.ndarray, n_components: int = 2):
        """拟合降维。"""
        if self.method == "pca":
            self.reducer = PCA(n_components=n_components)
        elif self.method == "tsne":
            self.reducer = TSNE(n_components=n_components)
        elif self.method == "mds":
            self.reducer = MDS(n_components=n_components)
        else:
            raise ValueError(f"未知方法: {self.method}")

        self.reducer.fit(embeddings)
        return self.reducer

    def transform(self, embeddings: np.ndarray) -> np.ndarray:
        """将嵌入转换到降维空间。"""
        return self.reducer.transform(embeddings)

    def fit_transform(self, embeddings: np.ndarray, n_components: int = 2) -> np.ndarray:
        """一步完成拟合和转换。"""
        self.fit(embeddings, n_components)
        return self.transform(embeddings)

    def visualize(self, embeddings: np.ndarray, labels: list = None):
        """可视化降维嵌入。"""
        reduced = self.fit_transform(embeddings)

        plt.figure(figsize=(10, 8))
        scatter = plt.scatter(reduced[:, 0], reduced[:, 1], c=labels, cmap='viridis', alpha=0.6)
        plt.colorbar(scatter)
        plt.xlabel('Component 1')
        plt.ylabel('Component 2')
        plt.title(f'Embedding Visualization ({self.method.upper()})')
        plt.show()

# 使用
# 生成样本嵌入
np.random.seed(42)
embeddings = np.random.randn(100, 384)  # 100个样本,384维
labels = np.random.randint(0, 3, 100)  # 3个聚类

# PCA降维
pca_reducer = DimensionalityReducer(method="pca")
pca_reduced = pca_reducer.fit_transform(embeddings, n_components=2)
print(f"PCA降维形状: {pca_reduced.shape}")

# t-SNE可视化
tsne_reducer = DimensionalityReducer(method="tsne")
tsne_reducer.visualize(embeddings, labels)

微调嵌入

from sentence_transformers import SentenceTransformer, InputExample, losses
from torch.utils.data import DataLoader
import torch
from typing import List

class EmbeddingFineTuner:
    """在自定义数据上微调嵌入模型。"""

    def __init__(
        self,
        model_name: str = "all-MiniLM-L6-v2",
        max_length: int = 512,
        batch_size: int = 16,
        epochs: int = 3
    ):
        self.model = SentenceTransformer(model_name)
        self.max_length = max_length
        self.batch_size = batch_size
        self.epochs = epochs

    def prepare_data(
        self,
        texts: List[str],
        labels: List[int] = None
    ) -> List[InputExample]:
        """准备微调数据。"""
        examples = []

        for i, text in enumerate(texts):
            label = labels[i] if labels else 0
            examples.append(InputExample(texts=[text], label=label))

        return examples

    def fine_tune(
        self,
        train_texts: List[str],
        train_labels: List[int],
        val_texts: List[str] = None,
        val_labels: List[int] = None
    ):
        """微调模型。"""
        # 准备数据
        train_examples = self.prepare_data(train_texts, train_labels)

        # 创建数据加载器
        train_dataloader = DataLoader(
            train_examples,
            shuffle=True,
            batch_size=self.batch_size
        )

        # 定义损失函数
        train_loss = losses.CosineSimilarityLoss(model=self.model)

        # 微调
        self.model.fit(
            train_objectives=[(train_dataloader, train_loss)],
            epochs=self.epochs,
            warmup_steps=100,
            use_amp=True
        )

        # 保存微调后的模型
        self.model.save(f"./fine_tuned_model")

        return self.model

    def evaluate(
        self,
        texts: List[str],
        labels: List[int]
    ) -> float:
        """评估微调后的模型。"""
        from sentence_transformers.evaluation import SentenceEvaluator

        evaluator = SentenceEvaluator(
            texts,
            labels
        )

        metrics = evaluator(self.model)

        return metrics

# 使用
finetuner = EmbeddingFineTuner(
    model_name="all-MiniLM-L6-v2",
    epochs=3
)

# 微调数据
train_texts = [
    "This is a positive sentence.",
    "This is another positive sentence.",
    "This is a negative sentence.",
    "This is another negative sentence."
]
train_labels = [1, 1, 0, 0]

# 微调
fine_tuned_model = finetuner.fine_tune(train_texts, train_labels)

# 评估
test_texts = ["This is a test sentence.", "Another test sentence."]
test_labels = [1, 0]
metrics = finetuner.evaluate(test_texts, test_labels)
print(f"评估指标: {metrics}")

评估指标

import numpy as np
from typing import List, Tuple
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    precision_score,
    recall_score
)

class EmbeddingEvaluator:
    """评估嵌入模型性能。"""

    @staticmethod
    def cosine_similarity(emb1: np.ndarray, emb2: np.ndarray) -> float:
        """计算嵌入之间的余弦相似度。"""
        dot_product = np.dot(emb1, emb2)
        norm1 = np.linalg.norm(emb1)
        norm2 = np.linalg.norm(emb2)
        return dot_product / (norm1 * norm2)

    @staticmethod
    def evaluate_retrieval(
        query_embeddings: np.ndarray,
        document_embeddings: np.ndarray,
        relevant_docs: List[List[int]],
        k: int = 10
    ) -> dict:
        """评估检索性能。"""
        results = {
            "precision": [],
            "recall": [],
            "f1": []
        }

        for query_idx, relevant in enumerate(relevant_docs):
            # 计算相似度
            similarities = []
            for doc_idx in range(len(document_embeddings)):
                sim = EmbeddingEvaluator.cosine_similarity(
                    query_embeddings[query_idx],
                    document_embeddings[doc_idx]
                )
                similarities.append(sim)

            # 获取top-k文档
            top_k_indices = np.argsort(similarities)[-k:]

            # 计算指标
            retrieved_set = set(top_k_indices)
            relevant_set = set(relevant)

            true_positives = len(retrieved_set & relevant_set)
            precision = true_positives / k
            recall = true_positives / len(relevant) if relevant else 0
            f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

            results["precision"].append(precision)
            results["recall"].append(recall)
            results["f1"].append(f1)

        return {
            "avg_precision": np.mean(results["precision"]),
            "avg_recall": np.mean(results["recall"]),
            "avg_f1": np.mean(results["f1"])
        }

    @staticmethod
    def evaluate_clustering(
        embeddings: np.ndarray,
        true_labels: np.ndarray,
        predicted_labels: np.ndarray
    ) -> dict:
        """评估聚类性能。"""
        return {
            "accuracy": accuracy_score(true_labels, predicted_labels),
            "precision": precision_score(true_labels, predicted_labels, average='weighted'),
            "recall": recall_score(true_labels, predicted_labels, average='weighted'),
            "f1": f1_score(true_labels, predicted_labels, average='weighted')
        }

# 使用
evaluator = EmbeddingEvaluator()

# 生成样本数据
np.random.seed(42)
query_embeddings = np.random.randn(10, 384)
document_embeddings = np.random.randn(100, 384)
relevant_docs = [
    [5, 10, 15],  # 查询0的相关文档
    [2, 8, 12],  # 查询1的相关文档
    # ...更多查询
]

# 评估检索
retrieval_metrics = evaluator.evaluate_retrieval(
    query_embeddings,
    document_embeddings,
    relevant_docs,
    k=10
)

print(f"检索指标: {retrieval_metrics}")

存储策略

import numpy as np
import pickle
from pathlib import Path
from typing import List
import h5py

class EmbeddingStorage:
    """高效存储嵌入。"""

    def __init__(self, storage_type: str = "numpy"):
        self.storage_type = storage_type
        self.storage_path = Path("./embeddings")
        self.storage_path.mkdir(parents=True, exist_ok=True)

    def save_numpy(self, embeddings: np.ndarray, filename: str):
        """将嵌入保存为numpy文件。"""
        file_path = self.storage_path / f"{filename}.npy"
        np.save(file_path, embeddings)

    def load_numpy(self, filename: str) -> np.ndarray:
        """从numpy文件加载嵌入。"""
        file_path = self.storage_path / f"{filename}.npy"
        return np.load(file_path)

    def save_pickle(self, embeddings: np.ndarray, metadata: dict, filename: str):
        """将嵌入和元数据保存为pickle。"""
        file_path = self.storage_path / f"{filename}.pkl"

        data = {
            "embeddings": embeddings,
            "metadata": metadata
        }

        with open(file_path, 'wb') as f:
            pickle.dump(data, f)

    def load_pickle(self, filename: str) -> tuple:
        """加载嵌入和元数据。"""
        file_path = self.storage_path / f"{filename}.pkl"

        with open(file_path, 'rb') as f:
            data = pickle.load(f)

        return data["embeddings"], data["metadata"]

    def save_hdf5(self, embeddings: np.ndarray, texts: List[str], filename: str):
        """将嵌入保存为HDF5格式。"""
        file_path = self.storage_path / f"{filename}.h5"

        with h5py.File(file_path, 'w') as f:
            f.create_dataset('embeddings', data=embeddings)
            f.create_dataset('texts', data=texts)

    def load_hdf5(self, filename: str) -> tuple:
        """从HDF5格式加载嵌入。"""
        file_path = self.storage_path / f"{filename}.h5"

        with h5py.File(file_path, 'r') as f:
            embeddings = f['embeddings'][:]
            texts = f['texts'][:]

        return embeddings, texts

# 使用
storage = EmbeddingStorage(storage_type="numpy")

# 生成样本嵌入
embeddings = np.random.randn(100, 384)
texts = [f"Text {i}" for i in range(100)]
metadata = {"model": "all-MiniLM-L6-v2", "dimension": 384}

# 保存
storage.save_numpy(embeddings, "embeddings")
storage.save_pickle(embeddings, metadata, "embeddings_with_meta")
storage.save_hdf5(embeddings, texts, "embeddings_hdf5")

# 加载
loaded_embeddings = storage.load_numpy("embeddings")
loaded_with_meta, meta = storage.load_pickle("embeddings_with_meta")
loaded_embeddings, loaded_texts = storage.load_hdf5("embeddings_hdf5")

生产优化

import numpy as np
from typing import List
from concurrent.futures import ThreadPoolExecutor
import time

class ProductionEmbeddingOptimizer:
    """为生产优化嵌入生成。"""

    def __init__(self, embedding_model, max_workers: int = 4):
        self.embedding_model = embedding_model
        self.max_workers = max_workers

    def batch_generate(
        self,
        texts: List[str],
        batch_size: int = 32
    ) -> np.ndarray:
        """批量生成嵌入。"""
        all_embeddings = []

        batches = [
            texts[i:i + batch_size]
            for i in range(0, len(texts), batch_size)
        ]

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = []
            for batch in batches:
                future = executor.submit(self._process_batch, batch)
                futures.append(future)

            for future in futures:
                embeddings = future.result()
                all_embeddings.extend(embeddings)

        return np.array(all_embeddings)

    def _process_batch(self, batch: List[str]) -> np.ndarray:
        """处理单个批次。"""
        if hasattr(self.embedding_model, 'encode'):
            return self.embedding_model.encode(batch)
        elif hasattr(self.embedding_model, 'embed_texts'):
            embeddings = self.embedding_model.embed_texts(batch)
            return np.array(embeddings)
        else:
            return np.array([self.embedding_model.embed_text(t) for t in batch])

    def benchmark(self, texts: List[str], num_runs: int = 5) -> dict:
        """基准测试嵌入生成。"""
        results = {
            "latency_ms": [],
            "throughput_per_sec": []
        }

        for _ in range(num_runs):
            start_time = time.time()

            embeddings = self.batch_generate(texts)

            end_time = time.time()
            latency = (end_time - start_time) * 1000
            throughput = len(texts) / (end_time - start_time)

            results["latency_ms"].append(latency)
            results["throughput_per_sec"].append(throughput)

        return {
            "avg_latency_ms": np.mean(results["latency_ms"]),
            "avg_throughput_per_sec": np.mean(results["throughput_per_sec"]),
            "min_latency_ms": np.min(results["latency_ms"]),
            "max_latency_ms": np.max(results["latency_ms"])
        }

# 使用
optimizer = ProductionEmbeddingOptimizer(embedding_model, max_workers=4)

# 基准测试
texts = ["Text " + str(i) for i in range(100)]
benchmark_results = optimizer.benchmark(texts, num_runs=5)

print(f"平均延迟: {benchmark_results['avg_latency_ms']:.2f}ms")
print(f"平均吞吐量: {benchmark_results['avg_throughput_per_sec']:.2f} 个文本/秒")

用例

语义搜索

import numpy as np
from typing import List, Tuple

class SemanticSearch:
    """使用嵌入进行语义搜索。"""

    def __init__(self, embedding_model):
        self.embedding_model = embedding_model
        self.document_embeddings = None
        self.documents = None

    def index_documents(self, documents: List[str]):
        """通过生成嵌入索引文档。"""
        self.documents = documents
        self.document_embeddings = self.embedding_model.encode(documents)

    def search(
        self,
        query: str,
        top_k: int = 5
    ) -> List[Tuple[str, float]]:
        """搜索相似文档。"""
        # 生成查询嵌入
        query_embedding = self.embedding_model.encode(query)

        # 计算相似度
        similarities = []
        for doc_embedding in self.document_embeddings:
            sim = np.dot(query_embedding, doc_embedding) / (
                np.linalg.norm(query_embedding) * np.linalg.norm(doc_embedding)
            )
            similarities.append(sim)

        # 获取top-k
        top_indices = np.argsort(similarities)[-top_k:][::-1]

        results = []
        for idx in top_indices:
            results.append((self.documents[idx], similarities[idx]))

        return results

# 使用
from sentence_transformers import SentenceTransformer

model = SentenceTransformer('all-MiniLM-L6-v2')
search = SemanticSearch(model)

# 索引文档
documents = [
    "Paris is the capital of France.",
    "London is the capital of the UK.",
    "Berlin is the capital of Germany.",
    "Madrid is the capital of Spain."
]
search.index_documents(documents)

# 搜索
results = search.search("capital of France", top_k=2)
for doc, score in results:
    print(f"Score: {score:.3f} | {doc}")

聚类

import numpy as np
from sklearn.cluster import KMeans, DBSCAN
from sklearn.metrics import silhouette_score
import matplotlib.pyplot as plt

class EmbeddingClustering:
    """使用嵌入进行文档聚类。"""

    def __init__(self, n_clusters: int = 3):
        self.n_clusters = n_clusters

    def kmeans_cluster(self, embeddings: np.ndarray) -> np.ndarray:
        """K-means聚类。"""
        kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
        labels = kmeans.fit_predict(embeddings)
        return labels

    def dbscan_cluster(self, embeddings: np.ndarray) -> np.ndarray:
        """DBSCAN聚类(基于密度)。"""
        dbscan = DBSCAN(eps=0.5, min_samples=5)
        labels = dbscan.fit_predict(embeddings)
        return labels

    def visualize_clusters(self, embeddings: np.ndarray, labels: np.ndarray):
        """在2D中可视化聚类。"""
        from sklearn.decomposition import PCA

        # 降维至2D以进行可视化
        pca = PCA(n_components=2)
        embeddings_2d = pca.fit_transform(embeddings)

        plt.figure(figsize=(10, 8))
        scatter = plt.scatter(
            embeddings_2d[:, 0],
            embeddings_2d[:, 1],
            c=labels,
            cmap='viridis',
            alpha=0.6
        )
        plt.colorbar(scatter)
        plt.xlabel('Component 1')
        plt.ylabel('Component 2')
        plt.title('Document Clusters')
        plt.show()

    def evaluate_clustering(self, embeddings: np.ndarray, labels: np.ndarray) -> float:
        """评估聚类质量。"""
        if len(set(labels)) > 1:
            return silhouette_score(embeddings, labels)
        return 0.0

# 使用
clusterer = EmbeddingClustering(n_clusters=3)

# 生成样本嵌入
np.random.seed(42)
embeddings = np.random.randn(100, 384)

# K-means聚类
kmeans_labels = clusterer.kmeans_cluster(embeddings)
silhouette = clusterer.evaluate_clustering(embeddings, kmeans_labels)
print(f"K-means轮廓系数: {silhouette:.3f}")

# 可视化
clusterer.visualize_clusters(embeddings, kmeans_labels)

分类

import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

class EmbeddingClassifier:
    """使用嵌入对文档进行分类。"""

    def __init__(self):
        self.classifier = LogisticRegression(random_state=42)

    def train(
        self,
        embeddings: np.ndarray,
        labels: np.ndarray,
        test_size: float = 0.2
    ) -> dict:
        """在嵌入上训练分类器。"""
        X_train, X_test, y_train, y_test = train_test_split(
            embeddings, labels, test_size=test_size, random_state=42
        )

        self.classifier.fit(X_train, y_train)

        # 在测试集上预测
        y_pred = self.classifier.predict(X_test)

        # 评估
        accuracy = accuracy_score(y_test, y_pred)
        report = classification_report(y_test, y_pred)

        return {
            "accuracy": accuracy,
            "report": report
        }

    def predict(self, embeddings: np.ndarray) -> np.ndarray:
        """对嵌入进行标签预测。"""
        return self.classifier.predict(embeddings)

    def predict_proba(self, embeddings: np.ndarray) -> np.ndarray:
        """对嵌入进行概率预测。"""
        return self.classifier.predict_proba(embeddings)

# 使用
classifier = EmbeddingClassifier()

# 生成样本数据
np.random.seed(42)
embeddings = np.random.randn(100, 384)
labels = np.random.randint(0, 3, 100)  # 3个类别

# 训练
results = classifier.train(embeddings, labels)
print(f"准确度: {results['accuracy']:.3f}")
print(f"分类报告:
{results['report']}")

最佳实践

嵌入生成

  • 使用批量处理以提高效率
  • 实现缓存以避免重复计算
  • 标准化嵌入以进行一致的相似度计算
  • 选择合适的批量大小基于内存限制

模型选择

  • 考虑用例:语义搜索、分类、聚类
  • 评估权衡:速度与准确性与成本
  • 在特定数据上测试多个模型
  • 在生产中监控性能

存储

  • 使用高效格式:HDF5用于大型数据集
  • 包含元数据:模型名称、维度、时间戳
  • 实现版本控制:跟踪模型变更
  • 考虑压缩以提高存储效率

生产

  • 为API调用实现重试逻辑
  • 使用连接池以提高性能
  • 监控付费API的成本
  • 设置适当的超时以确保可靠性

评估

  • 使用适当的指标:检索的精确度、召回率、F1
  • 在优化前建立基线
  • 监控随时间的漂移
  • A/B测试不同模型

相关技能