向量索引调优Skill vector-index-tuning

向量索引调优技能用于优化向量索引的性能,包括调整HNSW参数、实施量化策略、平衡召回率和速度、管理内存使用和扩展搜索基础设施。适用于AI应用、数据科学和云基础设施中的高效向量搜索,关键词包括向量索引、调优、HNSW、量化、性能优化、内存、延迟、召回率。

AI应用 0 次安装 0 次浏览 更新于 3/22/2026

name: vector-index-tuning description: 为延迟、召回率和内存优化向量索引性能。在调整HNSW参数、选择量化策略或扩展向量搜索基础设施时使用。

向量索引调优

优化向量索引以获得生产性能的指南。

何时使用此技能

  • 调整HNSW参数
  • 实施量化
  • 优化内存使用
  • 减少搜索延迟
  • 平衡召回率与速度
  • 扩展到数十亿向量

核心概念

1. 索引类型选择

数据大小           推荐索引
────────────────────────────────────────
< 10K向量  →    平面索引(精确搜索)
10K - 1M   →    HNSW
1M - 100M  →    HNSW + 量化
> 100M     →    IVF + PQ 或 DiskANN

2. HNSW参数

参数 默认值 影响
M 16 每个节点的连接数,↑ = 更好召回率,更多内存
efConstruction 100 构建质量,↑ = 更好索引,更慢构建
efSearch 50 搜索质量,↑ = 更好召回率,更慢搜索

3. 量化类型

全精度(FP32): 4字节 × 维度数
半精度(FP16): 2字节 × 维度数
INT8标量:           1字节 × 维度数
乘积量化:  ~32-64字节总
二进制:                维度数/8字节

模板

模板1:HNSW参数调优

import numpy as np
from typing import List, Tuple
import time

def benchmark_hnsw_parameters(
    vectors: np.ndarray,
    queries: np.ndarray,
    ground_truth: np.ndarray,
    m_values: List[int] = [8, 16, 32, 64],
    ef_construction_values: List[int] = [64, 128, 256],
    ef_search_values: List[int] = [32, 64, 128, 256]
) -> List[dict]:
    """基准测试不同的HNSW配置。"""
    import hnswlib

    results = []
    dim = vectors.shape[1]
    n = vectors.shape[0]

    for m in m_values:
        for ef_construction in ef_construction_values:
            # 构建索引
            index = hnswlib.Index(space='cosine', dim=dim)
            index.init_index(max_elements=n, M=m, ef_construction=ef_construction)

            build_start = time.time()
            index.add_items(vectors)
            build_time = time.time() - build_start

            # 获取内存使用
            memory_bytes = index.element_count * (
                dim * 4 +  # 向量存储
                m * 2 * 4  # 图边(近似)
            )

            for ef_search in ef_search_values:
                index.set_ef(ef_search)

                # 测量搜索
                search_start = time.time()
                labels, distances = index.knn_query(queries, k=10)
                search_time = time.time() - search_start

                # 计算召回率
                recall = calculate_recall(labels, ground_truth, k=10)

                results.append({
                    "M": m,
                    "ef_construction": ef_construction,
                    "ef_search": ef_search,
                    "build_time_s": build_time,
                    "search_time_ms": search_time * 1000 / len(queries),
                    "recall@10": recall,
                    "memory_mb": memory_bytes / 1024 / 1024
                })

    return results


def calculate_recall(predictions: np.ndarray, ground_truth: np.ndarray, k: int) -> float:
    """计算召回率@k。"""
    correct = 0
    for pred, truth in zip(predictions, ground_truth):
        correct += len(set(pred[:k]) & set(truth[:k]))
    return correct / (len(predictions) * k)


def recommend_hnsw_params(
    num_vectors: int,
    target_recall: float = 0.95,
    max_latency_ms: float = 10,
    available_memory_gb: float = 8
) -> dict:
    """基于需求推荐HNSW参数。"""

    # 基础推荐
    if num_vectors < 100_000:
        m = 16
        ef_construction = 100
    elif num_vectors < 1_000_000:
        m = 32
        ef_construction = 200
    else:
        m = 48
        ef_construction = 256

    # 根据召回率目标调整ef_search
    if target_recall >= 0.99:
        ef_search = 256
    elif target_recall >= 0.95:
        ef_search = 128
    else:
        ef_search = 64

    return {
        "M": m,
        "ef_construction": ef_construction,
        "ef_search": ef_search,
        "notes": f"预估用于 {num_vectors:,} 向量, {target_recall:.0%} 召回率"
    }

模板2:量化策略

import numpy as np
from typing import Optional

class VectorQuantizer:
    """向量压缩的量化策略。"""

    @staticmethod
    def scalar_quantize_int8(
        vectors: np.ndarray,
        min_val: Optional[float] = None,
        max_val: Optional[float] = None
    ) -> Tuple[np.ndarray, dict]:
        """标量量化为INT8。"""
        if min_val is None:
            min_val = vectors.min()
        if max_val is None:
            max_val = vectors.max()

        # 缩放到0-255范围
        scale = 255.0 / (max_val - min_val)
        quantized = np.clip(
            np.round((vectors - min_val) * scale),
            0, 255
        ).astype(np.uint8)

        params = {"min_val": min_val, "max_val": max_val, "scale": scale}
        return quantized, params

    @staticmethod
    def dequantize_int8(
        quantized: np.ndarray,
        params: dict
    ) -> np.ndarray:
        """反量化INT8向量。"""
        return quantized.astype(np.float32) / params["scale"] + params["min_val"]

    @staticmethod
    def product_quantize(
        vectors: np.ndarray,
        n_subvectors: int = 8,
        n_centroids: int = 256
    ) -> Tuple[np.ndarray, dict]:
        """用于激进压缩的乘积量化。"""
        from sklearn.cluster import KMeans

        n, dim = vectors.shape
        assert dim % n_subvectors == 0
        subvector_dim = dim // n_subvectors

        codebooks = []
        codes = np.zeros((n, n_subvectors), dtype=np.uint8)

        for i in range(n_subvectors):
            start = i * subvector_dim
            end = (i + 1) * subvector_dim
            subvectors = vectors[:, start:end]

            kmeans = KMeans(n_clusters=n_centroids, random_state=42)
            codes[:, i] = kmeans.fit_predict(subvectors)
            codebooks.append(kmeans.cluster_centers_)

        params = {
            "codebooks": codebooks,
            "n_subvectors": n_subvectors,
            "subvector_dim": subvector_dim
        }
        return codes, params

    @staticmethod
    def binary_quantize(vectors: np.ndarray) -> np.ndarray:
        """二进制量化(每个维度的符号)。"""
        # 转换为二进制:正数 = 1,负数 = 0
        binary = (vectors > 0).astype(np.uint8)

        # 将比特打包成字节
        n, dim = vectors.shape
        packed_dim = (dim + 7) // 8

        packed = np.zeros((n, packed_dim), dtype=np.uint8)
        for i in range(dim):
            byte_idx = i // 8
            bit_idx = i % 8
            packed[:, byte_idx] |= (binary[:, i] << bit_idx)

        return packed


def estimate_memory_usage(
    num_vectors: int,
    dimensions: int,
    quantization: str = "fp32",
    index_type: str = "hnsw",
    hnsw_m: int = 16
) -> dict:
    """估计不同配置的内存使用。"""

    # 向量存储
    bytes_per_dimension = {
        "fp32": 4,
        "fp16": 2,
        "int8": 1,
        "pq": 0.05,  # 近似
        "binary": 0.125
    }

    vector_bytes = num_vectors * dimensions * bytes_per_dimension[quantization]

    # 索引开销
    if index_type == "hnsw":
        # 每个节点有约M*2条边,每条边4字节(int32)
        index_bytes = num_vectors * hnsw_m * 2 * 4
    elif index_type == "ivf":
        # 倒排列表 + 中心点
        index_bytes = num_vectors * 8 + 65536 * dimensions * 4
    else:
        index_bytes = 0

    total_bytes = vector_bytes + index_bytes

    return {
        "vector_storage_mb": vector_bytes / 1024 / 1024,
        "index_overhead_mb": index_bytes / 1024 / 1024,
        "total_mb": total_bytes / 1024 / 1024,
        "total_gb": total_bytes / 1024 / 1024 / 1024
    }

模板3:Qdrant索引配置

from qdrant_client import QdrantClient
from qdrant_client.http import models

def create_optimized_collection(
    client: QdrantClient,
    collection_name: str,
    vector_size: int,
    num_vectors: int,
    optimize_for: str = "balanced"  # "recall", "speed", "memory"
) -> None:
    """创建具有优化设置的集合。"""

    # 基于优化目标的HNSW配置
    hnsw_configs = {
        "recall": models.HnswConfigDiff(m=32, ef_construct=256),
        "speed": models.HnswConfigDiff(m=16, ef_construct=64),
        "balanced": models.HnswConfigDiff(m=16, ef_construct=128),
        "memory": models.HnswConfigDiff(m=8, ef_construct=64)
    }

    # 量化配置
    quantization_configs = {
        "recall": None,  # 最大召回率时不使用量化
        "speed": models.ScalarQuantization(
            scalar=models.ScalarQuantizationConfig(
                type=models.ScalarType.INT8,
                quantile=0.99,
                always_ram=True
            )
        ),
        "balanced": models.ScalarQuantization(
            scalar=models.ScalarQuantizationConfig(
                type=models.ScalarType.INT8,
                quantile=0.99,
                always_ram=False
            )
        ),
        "memory": models.ProductQuantization(
            product=models.ProductQuantizationConfig(
                compression=models.CompressionRatio.X16,
                always_ram=False
            )
        )
    }

    # 优化器配置
    optimizer_configs = {
        "recall": models.OptimizersConfigDiff(
            indexing_threshold=10000,
            memmap_threshold=50000
        ),
        "speed": models.OptimizersConfigDiff(
            indexing_threshold=5000,
            memmap_threshold=20000
        ),
        "balanced": models.OptimizersConfigDiff(
            indexing_threshold=20000,
            memmap_threshold=50000
        ),
        "memory": models.OptimizersConfigDiff(
            indexing_threshold=50000,
            memmap_threshold=10000  # 更早使用磁盘
        )
    }

    client.create_collection(
        collection_name=collection_name,
        vectors_config=models.VectorParams(
            size=vector_size,
            distance=models.Distance.COSINE
        ),
        hnsw_config=hnsw_configs[optimize_for],
        quantization_config=quantization_configs[optimize_for],
        optimizers_config=optimizer_configs[optimize_for]
    )


def tune_search_parameters(
    client: QdrantClient,
    collection_name: str,
    target_recall: float = 0.95
) -> dict:
    """为目标召回率调优搜索参数。"""

    # 搜索参数推荐
    if target_recall >= 0.99:
        search_params = models.SearchParams(
            hnsw_ef=256,
            exact=False,
            quantization=models.QuantizationSearchParams(
                ignore=True,  # 搜索时不使用量化
                rescore=True
            )
        )
    elif target_recall >= 0.95:
        search_params = models.SearchParams(
            hnsw_ef=128,
            exact=False,
            quantization=models.QuantizationSearchParams(
                ignore=False,
                rescore=True,
                oversampling=2.0
            )
        )
    else:
        search_params = models.SearchParams(
            hnsw_ef=64,
            exact=False,
            quantization=models.QuantizationSearchParams(
                ignore=False,
                rescore=False
            )
        )

    return search_params

模板4:性能监控

import time
from dataclasses import dataclass
from typing import List
import numpy as np

@dataclass
class SearchMetrics:
    latency_p50_ms: float
    latency_p95_ms: float
    latency_p99_ms: float
    recall: float
    qps: float


class VectorSearchMonitor:
    """监控向量搜索性能。"""

    def __init__(self, ground_truth_fn=None):
        self.latencies = []
        self.recalls = []
        self.ground_truth_fn = ground_truth_fn

    def measure_search(
        self,
        search_fn,
        query_vectors: np.ndarray,
        k: int = 10,
        num_iterations: int = 100
    ) -> SearchMetrics:
        """基准测试搜索性能。"""
        latencies = []

        for _ in range(num_iterations):
            for query in query_vectors:
                start = time.perf_counter()
                results = search_fn(query, k=k)
                latency = (time.perf_counter() - start) * 1000
                latencies.append(latency)

        latencies = np.array(latencies)
        total_queries = num_iterations * len(query_vectors)
        total_time = sum(latencies) / 1000  # 秒

        return SearchMetrics(
            latency_p50_ms=np.percentile(latencies, 50),
            latency_p95_ms=np.percentile(latencies, 95),
            latency_p99_ms=np.percentile(latencies, 99),
            recall=self._calculate_recall(search_fn, query_vectors, k) if self.ground_truth_fn else 0,
            qps=total_queries / total_time
        )

    def _calculate_recall(self, search_fn, queries: np.ndarray, k: int) -> float:
        """计算相对于基准事实的召回率。"""
        if not self.ground_truth_fn:
            return 0

        correct = 0
        total = 0

        for query in queries:
            predicted = set(search_fn(query, k=k))
            actual = set(self.ground_truth_fn(query, k=k))
            correct += len(predicted & actual)
            total += k

        return correct / total


def profile_index_build(
    build_fn,
    vectors: np.ndarray,
    batch_sizes: List[int] = [1000, 10000, 50000]
) -> dict:
    """分析索引构建性能。"""
    results = {}

    for batch_size in batch_sizes:
        times = []
        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            start = time.perf_counter()
            build_fn(batch)
            times.append(time.perf_counter() - start)

        results[batch_size] = {
            "avg_batch_time_s": np.mean(times),
            "vectors_per_second": batch_size / np.mean(times)
        }

    return results

最佳实践

应该做的

  • 用真实查询基准测试 - 合成数据可能不代表生产环境
  • 持续监控召回率 - 可能因数据漂移而下降
  • 从默认值开始 - 仅需时调优
  • 使用量化 - 显著节省内存
  • 考虑分层存储 - 热/冷数据分离

不应该做的

  • 不要过早过度优化 - 先分析
  • 不要忽略构建时间 - 索引更新有成本
  • 不要忘记重新索引 - 规划维护
  • 不要跳过预热 - 冷索引慢

资源