name: cocoindex description: 用于 CocoIndex 库开发的全面工具包。当用户需要创建数据转换管道(流)、编写自定义函数或通过 CLI 或 API 操作流时使用。涵盖为 AI 数据处理构建 ETL 工作流,包括将文档嵌入向量数据库、构建知识图谱、创建搜索索引或使用增量更新处理数据流。
CocoIndex
概述
CocoIndex 是一个超高性能的实时数据转换框架,专为 AI 设计,支持增量处理。此技能允许构建索引流,从源提取数据,应用转换(分块、嵌入、LLM 提取),并导出到目标(向量数据库、图数据库、关系数据库)。
核心功能:
- 编写索引流 - 使用 Python 定义 ETL 管道
- 创建自定义函数 - 构建可重用的转换逻辑
- 操作流 - 使用 CLI 或 Python API 运行和管理流
关键特性:
- 增量处理(仅处理更改的数据)
- 实时更新(持续同步源更改到目标)
- 内置函数(文本分块、嵌入、LLM 提取)
- 多数据源(本地文件、S3、Azure Blob、Google Drive、Postgres)
- 多目标(Postgres+pgvector、Qdrant、LanceDB、Neo4j、Kuzu)
详细文档: https://cocoindex.io/docs/ 搜索文档: https://cocoindex.io/docs/search?q=url encoded keyword
何时使用此技能
在用户请求时使用:
- “为我的文档构建向量搜索索引”
- “为代码/PDFs/图像创建嵌入管道”
- “使用 LLMs 提取结构化信息”
- “从文档构建知识图谱”
- “设置实时文档索引”
- “创建自定义转换函数”
- “运行/更新我的 CocoIndex 流”
流编写工作流
步骤 1:理解需求
询问澄清性问题以理解:
数据源:
- 数据在哪里?(本地文件、S3、数据库等)
- 文件类型是什么?(文本、PDF、JSON、图像、代码等)
- 更改频率如何?(一次性、周期性、连续)
转换:
- 需要什么处理?(分块、嵌入、提取等)
- 使用哪个嵌入模型?(SentenceTransformer、OpenAI、自定义)
- 有任何自定义逻辑吗?(过滤、解析、丰富)
目标:
- 结果应该去哪里?(Postgres、Qdrant、Neo4j等)
- 什么模式?(字段、主键、索引)
- 需要向量搜索吗?(指定相似度度量)
步骤 2:设置依赖
根据用户需求,指导添加 CocoIndex 及相应扩展:
必需依赖:
cocoindex- 核心功能、CLI 和大多数内置函数
可选扩展(根据需要添加):
cocoindex[embeddings]- 用于 SentenceTransformer 嵌入(当使用SentenceTransformerEmbed时)cocoindex[colpali]- 用于 ColPali 图像/文档嵌入(当使用ColPaliEmbedImage或ColPaliEmbedQuery时)cocoindex[lancedb]- 用于 LanceDB 目标(当导出到 LanceDB 时)cocoindex[embeddings,lancedb]- 多个扩展可以组合
包含内容:
- 基础包:核心功能、CLI、大多数内置函数、Postgres/Qdrant/Neo4j/Kuzu 目标
embeddings扩展:SentenceTransformers 库用于本地嵌入模型colpali扩展:ColPali 引擎用于多模态文档/图像嵌入lancedb扩展:LanceDB 客户端库用于 LanceDB 向量数据库支持
用户可以使用他们喜欢的包管理器(pip、uv、poetry 等)安装或添加到 pyproject.toml。
安装详情: https://cocoindex.io/docs/getting_started/installation
步骤 3:设置环境
首先检查现有环境:
-
检查
COCOINDEX_DATABASE_URL是否存在于环境变量中- 如果未找到,使用默认值:
postgres://cocoindex:cocoindex@localhost/cocoindex
- 如果未找到,使用默认值:
-
对于需要 LLM API 的流(嵌入、提取):
- 询问用户想要使用哪个 LLM 提供商:
- OpenAI - 生成和嵌入
- Anthropic - 仅生成
- Gemini - 生成和嵌入
- Voyage - 仅嵌入
- Ollama - 本地模型(生成和嵌入)
- 检查环境变量中是否存在相应的 API 密钥
- 如果未找到,要求用户提供 API 密钥值
- 切勿在没有 LLM 的情况下创建简化示例 - 始终获取正确的 API 密钥并使用真实的 LLM 函数
- 询问用户想要使用哪个 LLM 提供商:
指导用户创建 .env 文件:
# 数据库连接(必需 - 内部存储)
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
# LLM API 密钥(添加您需要的)
OPENAI_API_KEY=sk-... # 用于 OpenAI(生成 + 嵌入)
ANTHROPIC_API_KEY=sk-ant-... # 用于 Anthropic(仅生成)
GOOGLE_API_KEY=... # 用于 Gemini(生成 + 嵌入)
VOYAGE_API_KEY=pa-... # 用于 Voyage(仅嵌入)
# Ollama 不需要 API 密钥(本地)
更多 LLM 选项: https://cocoindex.io/docs/ai/llm
创建基本项目结构:
# main.py
from dotenv import load_dotenv
import cocoindex
@cocoindex.flow_def(name="FlowName")
def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# 流定义在此
pass
if __name__ == "__main__":
load_dotenv()
cocoindex.init()
my_flow.update()
步骤 4:编写流
遵循此结构:
@cocoindex.flow_def(name="描述性名称")
def flow_name(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# 1. 导入源数据
data_scope["source_name"] = flow_builder.add_source(
cocoindex.sources.SourceType(...)
)
# 2. 为输出创建收集器
collector = data_scope.add_collector()
# 3. 转换数据(遍历行)
with data_scope["source_name"].row() as item:
# 应用转换
item["new_field"] = item["existing_field"].transform(
cocoindex.functions.FunctionName(...)
)
...
# 嵌套迭代(例如,文档内的块)
with item["nested_table"].row() as nested_item:
# 更多转换
nested_item["embedding"] = nested_item["text"].transform(...)
# 收集数据用于导出
collector.collect(
field1=nested_item["field1"],
field2=item["field2"],
generated_id=cocoindex.GeneratedField.UUID
)
# 4. 导出到目标
collector.export(
"target_name",
cocoindex.targets.TargetType(...),
primary_key_fields=["field1"],
vector_indexes=[...] # 如果需要
)
关键原则:
- 每个源在顶层数据作用域中创建一个字段
- 使用
.row()遍历表数据 - 关键:始终将转换后的数据分配给行字段 - 使用
item["new_field"] = item["existing_field"].transform(...),而不是本地变量如new_field = item["existing_field"].transform(...) - 转换创建新字段而不改变现有数据
- 收集器从任何作用域级别收集数据
- 导出必须在顶层进行(不能在行迭代内)
常见错误避免:
❌ 错误: 使用本地变量进行转换
with data_scope["files"].row() as file:
summary = file["content"].transform(...) # ❌ 本地变量
summaries_collector.collect(filename=file["filename"], summary=summary)
✅ 正确: 分配给行字段
with data_scope["files"].row() as file:
file["summary"] = file["content"].transform(...) # ✅ 字段分配
summaries_collector.collect(filename=file["filename"], summary=file["summary"])
❌ 错误: 创建不必要的数据类以镜像流字段
from dataclasses import dataclass
@dataclass
class FileSummary: # ❌ 不必要 - CocoIndex 自动管理字段
filename: str
summary: str
embedding: list[float]
# 此数据类从未在流中使用!
步骤 5:设计流解决方案
重要: 以下列出的模式是常见的起点,但您不能详尽列举所有可能场景。当用户需求不匹配现有模式时:
- 从多个模式组合元素 - 创意地混合源、转换和目标
- 查看其他示例 - 参见 https://github.com/cocoindex-io/cocoindex?tab=readme-ov-file#-examples-and-demo 获取多样化的真实世界用例(人脸识别、多模态搜索、产品推荐、患者表单提取等)
- 从第一性原理思考 - 使用核心 API(源、转换、收集器、导出)并应用常识解决新问题
- 发挥创意 - CocoIndex 灵活;组件的独特组合可以解决独特问题
常见起点模式(参见详细示例的参考资料):
用于文本嵌入: 加载 references/flow_patterns.md 并参考“模式 1:简单文本嵌入”
用于代码嵌入: 加载 references/flow_patterns.md 并参考“模式 2:带语言检测的代码嵌入”
用于 LLM 提取 + 知识图谱: 加载 references/flow_patterns.md 并参考“模式 3:基于 LLM 的提取到知识图谱”
用于实时更新: 加载 references/flow_patterns.md 并参考“模式 4:带刷新间隔的实时更新”
用于自定义函数: 加载 references/flow_patterns.md 并参考“模式 5:自定义转换函数”
用于可重用查询逻辑: 加载 references/flow_patterns.md 并参考“模式 6:可重用逻辑的转换流”
用于并发控制: 加载 references/flow_patterns.md 并参考“模式 7:并发控制”
模式组合示例:
如果用户要求“从 S3 索引图像,使用视觉 API 生成标题,并存储在 Qdrant 中”,组合:
- AmazonS3 源(来自 S3 示例)
- 用于视觉 API 调用的自定义函数(来自自定义函数模式)
- EmbedText 嵌入标题(来自嵌入模式)
- Qdrant 目标(来自目标示例)
没有单一模式覆盖此精确场景,但构建块是可组合的。
步骤 6:测试和运行
指导用户测试:
# 1. 带设置运行
cocoindex update --setup -f main # -f 强制设置无需确认提示
# 2. 启动服务器并重定向用户到 CocoInsight
cocoindex server -ci main
# 然后在 https://cocoindex.io/cocoinsight 打开 CocoInsight
数据类型
CocoIndex 具有独立于编程语言的类型系统。所有数据类型在流定义时确定,使模式清晰可预测。
重要:何时定义类型:
- 自定义函数: 返回值的类型注释是必需的(这些是类型推断的真相来源)
- 流字段: 类型注释不需要 - CocoIndex 自动从源、函数和转换推断类型
- 数据类/Pydantic 模型: 仅当它们实际使用时创建(作为函数参数/返回或 ExtractByLlm 的 output_type),而不是镜像流字段模式
类型注释要求:
- 自定义函数的返回值: 必须使用特定类型注释 - 这些是类型推断的真相来源
- 自定义函数的参数: 宽松 - 可以使用
Any、dict[str, Any]或省略注释;引擎已经知道类型 - 流定义: 不需要显式类型注释 - CocoIndex 自动从源和函数推断类型
为什么特定返回类型重要: 自定义函数返回类型让 CocoIndex 推断流中的字段类型而无需处理真实数据。这支持创建适当的目标模式(例如,具有固定维度的向量索引)。
常见类型类别:
-
原始类型:
str、int、float、bool、bytes、datetime.date、datetime.datetime、uuid.UUID -
向量类型(嵌入): 如果计划作为向量导出到目标,请在返回类型中指定维度,因为大多数目标需要固定向量维度
cocoindex.Vector[cocoindex.Float32, typing.Literal[768]]- 768 维 float32 向量(推荐)list[float]无维度也有效
-
结构类型: 数据类、NamedTuple 或 Pydantic 模型
- 返回类型:必须使用特定类(例如,
Person) - 参数:可以使用
dict[str, Any]或Any
- 返回类型:必须使用特定类(例如,
-
表类型:
- KTable(键控):
dict[K, V]其中 K = 键类型(原始或冻结结构),V = 结构类型 - LTable(有序):
list[R]其中 R = 结构类型 - 参数:可以使用
dict[Any, Any]或list[Any]
- KTable(键控):
-
Json 类型:
cocoindex.Json用于非结构化/动态数据 -
可选类型:
T | None用于可空值
示例:
from dataclasses import dataclass
from typing import Literal
import cocoindex
@dataclass
class Person:
name: str
age: int
# ✅ 带维度的向量(推荐用于向量搜索)
@cocoindex.op.function(behavior_version=1)
def embed_text(text: str) -> cocoindex.Vector[cocoindex.Float32, Literal[768]]:
"""生成 768 维嵌入 - 向量索引需要维度。"""
# ... 嵌入逻辑 ...
return embedding # numpy 数组或 768 浮点数列表
# ✅ 结构返回类型,宽松参数
@cocoindex.op.function(behavior_version=1)
def process_person(person: dict[str, Any]) -> Person:
"""参数可以是 dict[str, Any],返回必须是特定结构。"""
return Person(name=person["name"], age=person["age"])
# ✅ LTable 返回类型
@cocoindex.op.function(behavior_version=1)
def filter_people(people: list[Any]) -> list[Person]:
"""返回类型指定特定结构列表。"""
return [p for p in people if p.age >= 18]
# ❌ 错误:dict[str, str] 不是有效的特定 CocoIndex 类型
# @cocoindex.op.function(...)
# def bad_example(person: Person) -> dict[str, str]:
# return {"name": person.name}
全面数据类型文档: https://cocoindex.io/docs/core/data_types
自定义函数
当用户需要自定义转换逻辑时,创建自定义函数。
决策:独立函数 vs 规范+执行器
使用独立函数当:
- 简单转换
- 不需要配置
- 不需要设置/初始化
使用规范+执行器当:
- 需要配置(模型名称、API 端点、参数)
- 需要设置(加载模型、建立连接)
- 复杂多步处理
创建独立函数
@cocoindex.op.function(behavior_version=1)
def my_function(input_arg: str, optional_arg: int | None = None) -> dict:
"""
函数描述。
参数:
input_arg: 描述
optional_arg: 可选描述
"""
# 转换逻辑
return {"result": f"processed-{input_arg}"}
要求:
- 装饰器:
@cocoindex.op.function() - 所有参数和返回值的类型注释
- 可选参数:
cache=True用于昂贵操作,behavior_version(必需与缓存)
创建规范+执行器函数
# 1. 定义配置规范
class MyFunction(cocoindex.op.FunctionSpec):
"""MyFunction 的配置。"""
model_name: str
threshold: float = 0.5
# 2. 定义执行器
@cocoindex.op.executor_class(cache=True, behavior_version=1)
class MyFunctionExecutor:
spec: MyFunction # 必需:链接到规范
model = None # 实例变量用于状态
def prepare(self) -> None:
"""可选:执行前运行一次。"""
# 加载模型、设置连接等
self.model = load_model(self.spec.model_name)
def __call__(self, text: str) -> dict:
"""必需:为每个数据行执行。"""
# 使用 self.spec 进行配置
# 使用 self.model 用于加载的资源
result = self.model.process(text)
return {"result": result}
何时启用缓存:
- LLM API 调用
- 模型推理
- 外部 API 调用
- 计算昂贵的操作
重要: 当函数逻辑更改时,递增 behavior_version 以使缓存失效。
详细示例和模式,加载 references/custom_functions.md。
更多自定义函数: https://cocoindex.io/docs/custom_ops/custom_functions
操作流
CLI 操作
设置流(创建资源):
cocoindex setup main
一次性更新:
cocoindex update main
# 带自动设置
cocoindex update --setup main
# 强制重置所有内容,然后设置和更新
cocoindex update --reset main
实时更新(连续监控):
cocoindex update main.py -L
# 需要源上的 refresh_interval 或源特定更改捕获
删除流(移除所有资源):
cocoindex drop main.py
检查流:
cocoindex show main.py:FlowName
无副作用测试:
cocoindex evaluate main.py:FlowName --output-dir ./test_output
完整 CLI 参考,加载 references/cli_operations.md。
CLI 文档: https://cocoindex.io/docs/core/cli
API 操作
基本设置:
from dotenv import load_dotenv
import cocoindex
load_dotenv()
cocoindex.init()
@cocoindex.flow_def(name="MyFlow")
def my_flow(flow_builder, data_scope):
# ... 流定义 ...
pass
一次性更新:
stats = my_flow.update()
print(f"处理了 {stats.total_rows} 行")
# 异步
stats = await my_flow.update_async()
实时更新:
# 作为上下文管理器
with cocoindex.FlowLiveUpdater(my_flow) as updater:
# 更新器在后台运行
# 您的应用程序逻辑在此
pass
# 手动控制
updater = cocoindex.FlowLiveUpdater(
my_flow,
cocoindex.FlowLiveUpdaterOptions(
live_mode=True,
print_stats=True
)
)
updater.start()
# ... 应用程序逻辑 ...
updater.wait()
设置/删除:
my_flow.setup(report_to_stdout=True)
my_flow.drop(report_to_stdout=True)
cocoindex.setup_all_flows()
cocoindex.drop_all_flows()
使用转换流查询:
@cocoindex.transform_flow()
def text_to_embedding(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[list[float]]:
return text.transform(
cocoindex.functions.SentenceTransformerEmbed(model="...")
)
# 在流中用于索引
doc["embedding"] = text_to_embedding(doc["content"])
# 用于查询
query_embedding = text_to_embedding.eval("搜索查询")
完整 API 参考和模式,加载 references/api_operations.md。
API 文档: https://cocoindex.io/docs/core/flow_methods
内置函数
文本处理
SplitRecursively - 智能分块文本
doc["chunks"] = doc["content"].transform(
cocoindex.functions.SplitRecursively(),
language="markdown", # 或 "python"、"javascript" 等
chunk_size=2000,
chunk_overlap=500
)
ParseJson - 解析 JSON 字符串
data = json_string.transform(cocoindex.functions.ParseJson())
DetectProgrammingLanguage - 从文件名检测语言
file["language"] = file["filename"].transform(
cocoindex.functions.DetectProgrammingLanguage()
)
嵌入
SentenceTransformerEmbed - 本地嵌入模型
# 要求:cocoindex[embeddings]
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2"
)
)
EmbedText - LLM API 嵌入
这是使用 LLM API(OpenAI、Voyage 等)生成嵌入的推荐方式。
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.EmbedText(
api_type=cocoindex.LlmApiType.OPENAI,
model="text-embedding-3-small",
)
)
ColPaliEmbedImage - 多模态图像嵌入
# 要求:cocoindex[colpali]
image["embedding"] = image["img_bytes"].transform(
cocoindex.functions.ColPaliEmbedImage(model="vidore/colpali-v1.2")
)
LLM 提取
ExtractByLlm - 使用 LLM 提取结构化数据
这是使用 LLMs 进行提取和摘要任务的推荐方式。它支持结构化输出(数据类、Pydantic 模型)和简单文本输出(str)。
import dataclasses
# 用于结构化提取
@dataclasses.dataclass
class ProductInfo:
name: str
price: float
category: str
item["product_info"] = item["text"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini"
),
output_type=ProductInfo,
instruction="提取产品信息"
)
)
# 用于文本摘要/生成
file["summary"] = file["content"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o-mini"
),
output_type=str,
instruction="用一段话总结此文档"
)
)
常见源和目标
浏览所有源: https://cocoindex.io/docs/sources/ 浏览所有目标: https://cocoindex.io/docs/targets/
源
LocalFile:
cocoindex.sources.LocalFile(
path="documents",
included_patterns=["*.md", "*.txt"],
excluded_patterns=["**/.*", "node_modules"]
)
AmazonS3:
cocoindex.sources.AmazonS3(
bucket="my-bucket",
prefix="documents/",
aws_access_key_id=cocoindex.add_transient_auth_entry("..."),
aws_secret_access_key=cocoindex.add_transient_auth_entry("...")
)
Postgres:
cocoindex.sources.Postgres(
connection=cocoindex.add_auth_entry("conn", cocoindex.sources.PostgresConnection(...)),
query="SELECT id, content FROM documents"
)
目标
Postgres(带向量支持):
collector.export(
"target_name",
cocoindex.targets.Postgres(),
primary_key_fields=["id"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY
)
]
)
Qdrant:
collector.export(
"target_name",
cocoindex.targets.Qdrant(collection_name="my_collection"),
primary_key_fields=["id"]
)
LanceDB:
# 要求:cocoindex[lancedb]
collector.export(
"target_name",
cocoindex.targets.LanceDB(uri="lancedb_data", table_name="my_table"),
primary_key_fields=["id"]
)
Neo4j(节点):
collector.export(
"nodes",
cocoindex.targets.Neo4j(
connection=neo4j_conn,
mapping=cocoindex.targets.Nodes(label="Entity")
),
primary_key_fields=["id"]
)
Neo4j(关系):
collector.export(
"relationships",
cocoindex.targets.Neo4j(
connection=neo4j_conn,
mapping=cocoindex.targets.Relationships(
rel_type="RELATES_TO",
source=cocoindex.targets.NodeFromFields(
label="Entity",
fields=[cocoindex.targets.TargetFieldMapping(source="source_id", target="id")]
),
target=cocoindex.targets.NodeFromFields(
label="Entity",
fields=[cocoindex.targets.TargetFieldMapping(source="target_id", target="id")]
)
)
),
primary_key_fields=["id"]
)
常见问题和解决方案
“流未找到”
- 检查 APP_TARGET 格式:
cocoindex show main.py - 使用
--app-dir如果不在项目根目录 - 验证流名称与装饰器匹配
“数据库连接失败”
- 检查
.env是否有COCOINDEX_DATABASE_URL - 测试连接:
psql $COCOINDEX_DATABASE_URL - 使用
--env-file指定自定义位置
“模式不匹配”
- 重新运行设置:
cocoindex setup main.py - 删除并重新创建:
cocoindex drop main.py && cocoindex setup main.py
“实时更新立即退出”
- 添加
refresh_interval到源 - 或使用源特定更改捕获(Postgres 通知、S3 事件)
“内存不足”
- 在源上添加并发限制:
max_inflight_rows、max_inflight_bytes - 在
.env中设置全局限制:COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS
参考文档
此技能包括常见模式和操作的全面参考文档:
- references/flow_patterns.md - 常见流模式的完整示例(文本嵌入、代码嵌入、知识图谱、实时更新、并发控制等)
- references/custom_functions.md - 创建自定义函数的详细指南,带示例(独立函数、规范+执行器模式、LLM 调用、外部 API、缓存)
- references/cli_operations.md - 完整 CLI 参考,包含所有命令、选项和工作流
- references/api_operations.md - Python API 参考,带程序化流控制、实时更新、查询和应用程序集成模式的示例
当用户需要时加载这些参考:
- 特定模式的详细示例
- 完整 API 文档
- 高级使用场景
- 故障排除指导
全面文档: https://cocoindex.io/docs/ 搜索特定主题: https://cocoindex.io/docs/search?q=url encoded keyword