CocoIndex数据索引框架Skill cocoindex

CocoIndex 是一个用于 AI 数据处理的实时数据转换框架,支持构建 ETL 工作流,包括文档嵌入到向量数据库、知识图谱构建、搜索索引创建等。关键功能包括增量处理、实时更新、多数据源和目标支持,适用于数据工程师和 AI 开发者进行高效数据转换和索引。关键词:数据转换、ETL、AI 数据处理、文档嵌入、向量数据库、知识图谱、实时索引。

ETL开发 0 次安装 0 次浏览 更新于 3/21/2026

name: cocoindex description: 用于 CocoIndex 库开发的全面工具包。当用户需要创建数据转换管道(流)、编写自定义函数或通过 CLI 或 API 操作流时使用。涵盖为 AI 数据处理构建 ETL 工作流,包括将文档嵌入向量数据库、构建知识图谱、创建搜索索引或使用增量更新处理数据流。

CocoIndex

概述

CocoIndex 是一个超高性能的实时数据转换框架,专为 AI 设计,支持增量处理。此技能允许构建索引流,从源提取数据,应用转换(分块、嵌入、LLM 提取),并导出到目标(向量数据库、图数据库、关系数据库)。

核心功能:

  1. 编写索引流 - 使用 Python 定义 ETL 管道
  2. 创建自定义函数 - 构建可重用的转换逻辑
  3. 操作流 - 使用 CLI 或 Python API 运行和管理流

关键特性:

  • 增量处理(仅处理更改的数据)
  • 实时更新(持续同步源更改到目标)
  • 内置函数(文本分块、嵌入、LLM 提取)
  • 多数据源(本地文件、S3、Azure Blob、Google Drive、Postgres)
  • 多目标(Postgres+pgvector、Qdrant、LanceDB、Neo4j、Kuzu)

详细文档: https://cocoindex.io/docs/ 搜索文档: https://cocoindex.io/docs/search?q=url encoded keyword

何时使用此技能

在用户请求时使用:

  • “为我的文档构建向量搜索索引”
  • “为代码/PDFs/图像创建嵌入管道”
  • “使用 LLMs 提取结构化信息”
  • “从文档构建知识图谱”
  • “设置实时文档索引”
  • “创建自定义转换函数”
  • “运行/更新我的 CocoIndex 流”

流编写工作流

步骤 1:理解需求

询问澄清性问题以理解:

数据源:

  • 数据在哪里?(本地文件、S3、数据库等)
  • 文件类型是什么?(文本、PDF、JSON、图像、代码等)
  • 更改频率如何?(一次性、周期性、连续)

转换:

  • 需要什么处理?(分块、嵌入、提取等)
  • 使用哪个嵌入模型?(SentenceTransformer、OpenAI、自定义)
  • 有任何自定义逻辑吗?(过滤、解析、丰富)

目标:

  • 结果应该去哪里?(Postgres、Qdrant、Neo4j等)
  • 什么模式?(字段、主键、索引)
  • 需要向量搜索吗?(指定相似度度量)

步骤 2:设置依赖

根据用户需求,指导添加 CocoIndex 及相应扩展:

必需依赖:

  • cocoindex - 核心功能、CLI 和大多数内置函数

可选扩展(根据需要添加):

  • cocoindex[embeddings] - 用于 SentenceTransformer 嵌入(当使用 SentenceTransformerEmbed 时)
  • cocoindex[colpali] - 用于 ColPali 图像/文档嵌入(当使用 ColPaliEmbedImageColPaliEmbedQuery 时)
  • cocoindex[lancedb] - 用于 LanceDB 目标(当导出到 LanceDB 时)
  • cocoindex[embeddings,lancedb] - 多个扩展可以组合

包含内容:

  • 基础包:核心功能、CLI、大多数内置函数、Postgres/Qdrant/Neo4j/Kuzu 目标
  • embeddings 扩展:SentenceTransformers 库用于本地嵌入模型
  • colpali 扩展:ColPali 引擎用于多模态文档/图像嵌入
  • lancedb 扩展:LanceDB 客户端库用于 LanceDB 向量数据库支持

用户可以使用他们喜欢的包管理器(pip、uv、poetry 等)安装或添加到 pyproject.toml

安装详情: https://cocoindex.io/docs/getting_started/installation

步骤 3:设置环境

首先检查现有环境:

  1. 检查 COCOINDEX_DATABASE_URL 是否存在于环境变量中

    • 如果未找到,使用默认值:postgres://cocoindex:cocoindex@localhost/cocoindex
  2. 对于需要 LLM API 的流(嵌入、提取):

    • 询问用户想要使用哪个 LLM 提供商:
      • OpenAI - 生成和嵌入
      • Anthropic - 仅生成
      • Gemini - 生成和嵌入
      • Voyage - 仅嵌入
      • Ollama - 本地模型(生成和嵌入)
    • 检查环境变量中是否存在相应的 API 密钥
    • 如果未找到,要求用户提供 API 密钥值
    • 切勿在没有 LLM 的情况下创建简化示例 - 始终获取正确的 API 密钥并使用真实的 LLM 函数

指导用户创建 .env 文件:

# 数据库连接(必需 - 内部存储)
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex

# LLM API 密钥(添加您需要的)
OPENAI_API_KEY=sk-...          # 用于 OpenAI(生成 + 嵌入)
ANTHROPIC_API_KEY=sk-ant-...   # 用于 Anthropic(仅生成)
GOOGLE_API_KEY=...             # 用于 Gemini(生成 + 嵌入)
VOYAGE_API_KEY=pa-...          # 用于 Voyage(仅嵌入)
# Ollama 不需要 API 密钥(本地)

更多 LLM 选项: https://cocoindex.io/docs/ai/llm

创建基本项目结构:

# main.py
from dotenv import load_dotenv
import cocoindex

@cocoindex.flow_def(name="FlowName")
def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
    # 流定义在此
    pass

if __name__ == "__main__":
    load_dotenv()
    cocoindex.init()
    my_flow.update()

步骤 4:编写流

遵循此结构:

@cocoindex.flow_def(name="描述性名称")
def flow_name(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
    # 1. 导入源数据
    data_scope["source_name"] = flow_builder.add_source(
        cocoindex.sources.SourceType(...)
    )

    # 2. 为输出创建收集器
    collector = data_scope.add_collector()

    # 3. 转换数据(遍历行)
    with data_scope["source_name"].row() as item:
        # 应用转换
        item["new_field"] = item["existing_field"].transform(
            cocoindex.functions.FunctionName(...)
        )

        ...

        # 嵌套迭代(例如,文档内的块)
        with item["nested_table"].row() as nested_item:
            # 更多转换
            nested_item["embedding"] = nested_item["text"].transform(...)

            # 收集数据用于导出
            collector.collect(
                field1=nested_item["field1"],
                field2=item["field2"],
                generated_id=cocoindex.GeneratedField.UUID
            )

    # 4. 导出到目标
    collector.export(
        "target_name",
        cocoindex.targets.TargetType(...),
        primary_key_fields=["field1"],
        vector_indexes=[...]  # 如果需要
    )

关键原则:

  • 每个源在顶层数据作用域中创建一个字段
  • 使用 .row() 遍历表数据
  • 关键:始终将转换后的数据分配给行字段 - 使用 item["new_field"] = item["existing_field"].transform(...),而不是本地变量如 new_field = item["existing_field"].transform(...)
  • 转换创建新字段而不改变现有数据
  • 收集器从任何作用域级别收集数据
  • 导出必须在顶层进行(不能在行迭代内)

常见错误避免:

错误: 使用本地变量进行转换

with data_scope["files"].row() as file:
    summary = file["content"].transform(...)  # ❌ 本地变量
    summaries_collector.collect(filename=file["filename"], summary=summary)

正确: 分配给行字段

with data_scope["files"].row() as file:
    file["summary"] = file["content"].transform(...)  # ✅ 字段分配
    summaries_collector.collect(filename=file["filename"], summary=file["summary"])

错误: 创建不必要的数据类以镜像流字段

from dataclasses import dataclass

@dataclass
class FileSummary:  # ❌ 不必要 - CocoIndex 自动管理字段
    filename: str
    summary: str
    embedding: list[float]

# 此数据类从未在流中使用!

步骤 5:设计流解决方案

重要: 以下列出的模式是常见的起点,但您不能详尽列举所有可能场景。当用户需求不匹配现有模式时:

  1. 从多个模式组合元素 - 创意地混合源、转换和目标
  2. 查看其他示例 - 参见 https://github.com/cocoindex-io/cocoindex?tab=readme-ov-file#-examples-and-demo 获取多样化的真实世界用例(人脸识别、多模态搜索、产品推荐、患者表单提取等)
  3. 从第一性原理思考 - 使用核心 API(源、转换、收集器、导出)并应用常识解决新问题
  4. 发挥创意 - CocoIndex 灵活;组件的独特组合可以解决独特问题

常见起点模式(参见详细示例的参考资料):

用于文本嵌入: 加载 references/flow_patterns.md 并参考“模式 1:简单文本嵌入”

用于代码嵌入: 加载 references/flow_patterns.md 并参考“模式 2:带语言检测的代码嵌入”

用于 LLM 提取 + 知识图谱: 加载 references/flow_patterns.md 并参考“模式 3:基于 LLM 的提取到知识图谱”

用于实时更新: 加载 references/flow_patterns.md 并参考“模式 4:带刷新间隔的实时更新”

用于自定义函数: 加载 references/flow_patterns.md 并参考“模式 5:自定义转换函数”

用于可重用查询逻辑: 加载 references/flow_patterns.md 并参考“模式 6:可重用逻辑的转换流”

用于并发控制: 加载 references/flow_patterns.md 并参考“模式 7:并发控制”

模式组合示例:

如果用户要求“从 S3 索引图像,使用视觉 API 生成标题,并存储在 Qdrant 中”,组合:

  • AmazonS3 源(来自 S3 示例)
  • 用于视觉 API 调用的自定义函数(来自自定义函数模式)
  • EmbedText 嵌入标题(来自嵌入模式)
  • Qdrant 目标(来自目标示例)

没有单一模式覆盖此精确场景,但构建块是可组合的。

步骤 6:测试和运行

指导用户测试:

# 1. 带设置运行
cocoindex update --setup -f main   # -f 强制设置无需确认提示

# 2. 启动服务器并重定向用户到 CocoInsight
cocoindex server -ci main
# 然后在 https://cocoindex.io/cocoinsight 打开 CocoInsight

数据类型

CocoIndex 具有独立于编程语言的类型系统。所有数据类型在流定义时确定,使模式清晰可预测。

重要:何时定义类型:

  • 自定义函数: 返回值的类型注释是必需的(这些是类型推断的真相来源)
  • 流字段: 类型注释不需要 - CocoIndex 自动从源、函数和转换推断类型
  • 数据类/Pydantic 模型: 仅当它们实际使用时创建(作为函数参数/返回或 ExtractByLlm 的 output_type),而不是镜像流字段模式

类型注释要求:

  • 自定义函数的返回值: 必须使用特定类型注释 - 这些是类型推断的真相来源
  • 自定义函数的参数: 宽松 - 可以使用 Anydict[str, Any] 或省略注释;引擎已经知道类型
  • 流定义: 不需要显式类型注释 - CocoIndex 自动从源和函数推断类型

为什么特定返回类型重要: 自定义函数返回类型让 CocoIndex 推断流中的字段类型而无需处理真实数据。这支持创建适当的目标模式(例如,具有固定维度的向量索引)。

常见类型类别:

  1. 原始类型: strintfloatboolbytesdatetime.datedatetime.datetimeuuid.UUID

  2. 向量类型(嵌入): 如果计划作为向量导出到目标,请在返回类型中指定维度,因为大多数目标需要固定向量维度

    • cocoindex.Vector[cocoindex.Float32, typing.Literal[768]] - 768 维 float32 向量(推荐)
    • list[float] 无维度也有效
  3. 结构类型: 数据类、NamedTuple 或 Pydantic 模型

    • 返回类型:必须使用特定类(例如,Person
    • 参数:可以使用 dict[str, Any]Any
  4. 表类型:

    • KTable(键控): dict[K, V] 其中 K = 键类型(原始或冻结结构),V = 结构类型
    • LTable(有序): list[R] 其中 R = 结构类型
    • 参数:可以使用 dict[Any, Any]list[Any]
  5. Json 类型: cocoindex.Json 用于非结构化/动态数据

  6. 可选类型: T | None 用于可空值

示例:

from dataclasses import dataclass
from typing import Literal
import cocoindex

@dataclass
class Person:
    name: str
    age: int

# ✅ 带维度的向量(推荐用于向量搜索)
@cocoindex.op.function(behavior_version=1)
def embed_text(text: str) -> cocoindex.Vector[cocoindex.Float32, Literal[768]]:
    """生成 768 维嵌入 - 向量索引需要维度。"""
    # ... 嵌入逻辑 ...
    return embedding  # numpy 数组或 768 浮点数列表

# ✅ 结构返回类型,宽松参数
@cocoindex.op.function(behavior_version=1)
def process_person(person: dict[str, Any]) -> Person:
    """参数可以是 dict[str, Any],返回必须是特定结构。"""
    return Person(name=person["name"], age=person["age"])

# ✅ LTable 返回类型
@cocoindex.op.function(behavior_version=1)
def filter_people(people: list[Any]) -> list[Person]:
    """返回类型指定特定结构列表。"""
    return [p for p in people if p.age >= 18]

# ❌ 错误:dict[str, str] 不是有效的特定 CocoIndex 类型
# @cocoindex.op.function(...)
# def bad_example(person: Person) -> dict[str, str]:
#     return {"name": person.name}

全面数据类型文档: https://cocoindex.io/docs/core/data_types

自定义函数

当用户需要自定义转换逻辑时,创建自定义函数。

决策:独立函数 vs 规范+执行器

使用独立函数当:

  • 简单转换
  • 不需要配置
  • 不需要设置/初始化

使用规范+执行器当:

  • 需要配置(模型名称、API 端点、参数)
  • 需要设置(加载模型、建立连接)
  • 复杂多步处理

创建独立函数

@cocoindex.op.function(behavior_version=1)
def my_function(input_arg: str, optional_arg: int | None = None) -> dict:
    """
    函数描述。

    参数:
        input_arg: 描述
        optional_arg: 可选描述
    """
    # 转换逻辑
    return {"result": f"processed-{input_arg}"}

要求:

  • 装饰器:@cocoindex.op.function()
  • 所有参数和返回值的类型注释
  • 可选参数:cache=True 用于昂贵操作,behavior_version(必需与缓存)

创建规范+执行器函数

# 1. 定义配置规范
class MyFunction(cocoindex.op.FunctionSpec):
    """MyFunction 的配置。"""
    model_name: str
    threshold: float = 0.5

# 2. 定义执行器
@cocoindex.op.executor_class(cache=True, behavior_version=1)
class MyFunctionExecutor:
    spec: MyFunction  # 必需:链接到规范
    model = None      # 实例变量用于状态

    def prepare(self) -> None:
        """可选:执行前运行一次。"""
        # 加载模型、设置连接等
        self.model = load_model(self.spec.model_name)

    def __call__(self, text: str) -> dict:
        """必需:为每个数据行执行。"""
        # 使用 self.spec 进行配置
        # 使用 self.model 用于加载的资源
        result = self.model.process(text)
        return {"result": result}

何时启用缓存:

  • LLM API 调用
  • 模型推理
  • 外部 API 调用
  • 计算昂贵的操作

重要: 当函数逻辑更改时,递增 behavior_version 以使缓存失效。

详细示例和模式,加载 references/custom_functions.md

更多自定义函数: https://cocoindex.io/docs/custom_ops/custom_functions

操作流

CLI 操作

设置流(创建资源):

cocoindex setup main

一次性更新:

cocoindex update main

# 带自动设置
cocoindex update --setup main

# 强制重置所有内容,然后设置和更新
cocoindex update --reset main

实时更新(连续监控):

cocoindex update main.py -L

# 需要源上的 refresh_interval 或源特定更改捕获

删除流(移除所有资源):

cocoindex drop main.py

检查流:

cocoindex show main.py:FlowName

无副作用测试:

cocoindex evaluate main.py:FlowName --output-dir ./test_output

完整 CLI 参考,加载 references/cli_operations.md

CLI 文档: https://cocoindex.io/docs/core/cli

API 操作

基本设置:

from dotenv import load_dotenv
import cocoindex

load_dotenv()
cocoindex.init()

@cocoindex.flow_def(name="MyFlow")
def my_flow(flow_builder, data_scope):
    # ... 流定义 ...
    pass

一次性更新:

stats = my_flow.update()
print(f"处理了 {stats.total_rows} 行")

# 异步
stats = await my_flow.update_async()

实时更新:

# 作为上下文管理器
with cocoindex.FlowLiveUpdater(my_flow) as updater:
    # 更新器在后台运行
    # 您的应用程序逻辑在此
    pass

# 手动控制
updater = cocoindex.FlowLiveUpdater(
    my_flow,
    cocoindex.FlowLiveUpdaterOptions(
        live_mode=True,
        print_stats=True
    )
)
updater.start()
# ... 应用程序逻辑 ...
updater.wait()

设置/删除:

my_flow.setup(report_to_stdout=True)
my_flow.drop(report_to_stdout=True)
cocoindex.setup_all_flows()
cocoindex.drop_all_flows()

使用转换流查询:

@cocoindex.transform_flow()
def text_to_embedding(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[list[float]]:
    return text.transform(
        cocoindex.functions.SentenceTransformerEmbed(model="...")
    )

# 在流中用于索引
doc["embedding"] = text_to_embedding(doc["content"])

# 用于查询
query_embedding = text_to_embedding.eval("搜索查询")

完整 API 参考和模式,加载 references/api_operations.md

API 文档: https://cocoindex.io/docs/core/flow_methods

内置函数

文本处理

SplitRecursively - 智能分块文本

doc["chunks"] = doc["content"].transform(
    cocoindex.functions.SplitRecursively(),
    language="markdown",  # 或 "python"、"javascript" 等
    chunk_size=2000,
    chunk_overlap=500
)

ParseJson - 解析 JSON 字符串

data = json_string.transform(cocoindex.functions.ParseJson())

DetectProgrammingLanguage - 从文件名检测语言

file["language"] = file["filename"].transform(
    cocoindex.functions.DetectProgrammingLanguage()
)

嵌入

SentenceTransformerEmbed - 本地嵌入模型

# 要求:cocoindex[embeddings]
chunk["embedding"] = chunk["text"].transform(
    cocoindex.functions.SentenceTransformerEmbed(
        model="sentence-transformers/all-MiniLM-L6-v2"
    )
)

EmbedText - LLM API 嵌入

这是使用 LLM API(OpenAI、Voyage 等)生成嵌入的推荐方式

chunk["embedding"] = chunk["text"].transform(
    cocoindex.functions.EmbedText(
        api_type=cocoindex.LlmApiType.OPENAI,
        model="text-embedding-3-small",
    )
)

ColPaliEmbedImage - 多模态图像嵌入

# 要求:cocoindex[colpali]
image["embedding"] = image["img_bytes"].transform(
    cocoindex.functions.ColPaliEmbedImage(model="vidore/colpali-v1.2")
)

LLM 提取

ExtractByLlm - 使用 LLM 提取结构化数据

这是使用 LLMs 进行提取和摘要任务的推荐方式。它支持结构化输出(数据类、Pydantic 模型)和简单文本输出(str)。

import dataclasses

# 用于结构化提取
@dataclasses.dataclass
class ProductInfo:
    name: str
    price: float
    category: str

item["product_info"] = item["text"].transform(
    cocoindex.functions.ExtractByLlm(
        llm_spec=cocoindex.LlmSpec(
            api_type=cocoindex.LlmApiType.OPENAI,
            model="gpt-4o-mini"
        ),
        output_type=ProductInfo,
        instruction="提取产品信息"
    )
)

# 用于文本摘要/生成
file["summary"] = file["content"].transform(
    cocoindex.functions.ExtractByLlm(
        llm_spec=cocoindex.LlmSpec(
            api_type=cocoindex.LlmApiType.OPENAI,
            model="gpt-4o-mini"
        ),
        output_type=str,
        instruction="用一段话总结此文档"
    )
)

常见源和目标

浏览所有源: https://cocoindex.io/docs/sources/ 浏览所有目标: https://cocoindex.io/docs/targets/

LocalFile:

cocoindex.sources.LocalFile(
    path="documents",
    included_patterns=["*.md", "*.txt"],
    excluded_patterns=["**/.*", "node_modules"]
)

AmazonS3:

cocoindex.sources.AmazonS3(
    bucket="my-bucket",
    prefix="documents/",
    aws_access_key_id=cocoindex.add_transient_auth_entry("..."),
    aws_secret_access_key=cocoindex.add_transient_auth_entry("...")
)

Postgres:

cocoindex.sources.Postgres(
    connection=cocoindex.add_auth_entry("conn", cocoindex.sources.PostgresConnection(...)),
    query="SELECT id, content FROM documents"
)

目标

Postgres(带向量支持):

collector.export(
    "target_name",
    cocoindex.targets.Postgres(),
    primary_key_fields=["id"],
    vector_indexes=[
        cocoindex.VectorIndexDef(
            field_name="embedding",
            metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY
        )
    ]
)

Qdrant:

collector.export(
    "target_name",
    cocoindex.targets.Qdrant(collection_name="my_collection"),
    primary_key_fields=["id"]
)

LanceDB:

# 要求:cocoindex[lancedb]
collector.export(
    "target_name",
    cocoindex.targets.LanceDB(uri="lancedb_data", table_name="my_table"),
    primary_key_fields=["id"]
)

Neo4j(节点):

collector.export(
    "nodes",
    cocoindex.targets.Neo4j(
        connection=neo4j_conn,
        mapping=cocoindex.targets.Nodes(label="Entity")
    ),
    primary_key_fields=["id"]
)

Neo4j(关系):

collector.export(
    "relationships",
    cocoindex.targets.Neo4j(
        connection=neo4j_conn,
        mapping=cocoindex.targets.Relationships(
            rel_type="RELATES_TO",
            source=cocoindex.targets.NodeFromFields(
                label="Entity",
                fields=[cocoindex.targets.TargetFieldMapping(source="source_id", target="id")]
            ),
            target=cocoindex.targets.NodeFromFields(
                label="Entity",
                fields=[cocoindex.targets.TargetFieldMapping(source="target_id", target="id")]
            )
        )
    ),
    primary_key_fields=["id"]
)

常见问题和解决方案

“流未找到”

  • 检查 APP_TARGET 格式:cocoindex show main.py
  • 使用 --app-dir 如果不在项目根目录
  • 验证流名称与装饰器匹配

“数据库连接失败”

  • 检查 .env 是否有 COCOINDEX_DATABASE_URL
  • 测试连接:psql $COCOINDEX_DATABASE_URL
  • 使用 --env-file 指定自定义位置

“模式不匹配”

  • 重新运行设置:cocoindex setup main.py
  • 删除并重新创建:cocoindex drop main.py && cocoindex setup main.py

“实时更新立即退出”

  • 添加 refresh_interval 到源
  • 或使用源特定更改捕获(Postgres 通知、S3 事件)

“内存不足”

  • 在源上添加并发限制:max_inflight_rowsmax_inflight_bytes
  • .env 中设置全局限制:COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS

参考文档

此技能包括常见模式和操作的全面参考文档:

  • references/flow_patterns.md - 常见流模式的完整示例(文本嵌入、代码嵌入、知识图谱、实时更新、并发控制等)
  • references/custom_functions.md - 创建自定义函数的详细指南,带示例(独立函数、规范+执行器模式、LLM 调用、外部 API、缓存)
  • references/cli_operations.md - 完整 CLI 参考,包含所有命令、选项和工作流
  • references/api_operations.md - Python API 参考,带程序化流控制、实时更新、查询和应用程序集成模式的示例

当用户需要时加载这些参考:

  • 特定模式的详细示例
  • 完整 API 文档
  • 高级使用场景
  • 故障排除指导

全面文档: https://cocoindex.io/docs/ 搜索特定主题: https://cocoindex.io/docs/search?q=url encoded keyword