name: AI数据工程 description: 为AI/ML系统构建数据管道、特征存储和嵌入生成。用于构建RAG管道、ML特征服务或数据转换。覆盖特征存储(Feast、Tecton)、嵌入管道、分块策略、编排(Dagster、Prefect、Airflow)、dbt转换、数据版本化(LakeFS)和实验跟踪(MLflow、W&B)。
AI数据工程
目的
构建AI/ML系统的数据基础设施,包括RAG管道、特征存储和嵌入生成。提供生产AI应用的架构模式、编排工作流和评估指标。
使用时机
使用此技能时:
- 构建RAG(检索增强生成)管道
- 实现语义搜索或向量数据库
- 设置ML特征存储以进行实时服务
- 创建嵌入生成管道
- 使用RAGAS指标评估RAG质量
- 编排AI系统的数据工作流
- 与前端技能集成(ai-chat、search-filter)
跳过此技能如果:
- 构建传统CRUD应用程序(使用数据库-关系型)
- 简单键值存储(使用数据库-非关系型)
- 应用程序中没有AI/ML组件
RAG管道架构
RAG管道有5个独立阶段。理解此架构对生产实现至关重要。
┌─────────────────────────────────────────────────────────────┐
│ RAG Pipeline (5 Stages) │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 数据摄入 → 加载文档(PDF、DOCX、Markdown) │
│ 2. 索引 → 分块(512个令牌)+ 嵌入 + 存储 │
│ 3. 检索 → 查询嵌入 + 向量搜索 + 过滤器 │
│ 4. 生成 → 上下文注入 + LLM流式响应 │
│ 5. 评估 → RAGAS指标(忠实度、相关性) │
│ │
└─────────────────────────────────────────────────────────────┘
完整RAG架构与实现模式,请参见:
references/rag-architecture.md- 详细的5阶段分解examples/langchain-rag/basic_rag.py- 工作实现
分块策略
分块是RAG质量最关键的决策。分块不当会破坏检索。
默认推荐:
- 大小: 512个令牌
- 重叠: 50-100个令牌
- 方法: 基于固定令牌
为何选择这些值:
- 太小(<256个令牌):丢失上下文,需要多次检索
- 太大(>1024个令牌):包含不相关内容,触及令牌限制
- 重叠防止块边界的信息丢失
特殊情况的替代策略:
# 代码感知分块(保留函数/类)
from langchain.text_splitter import RecursiveCharacterTextSplitter
code_splitter = RecursiveCharacterTextSplitter.from_language(
language="python",
chunk_size=512,
chunk_overlap=50
)
# 语义分块(基于意义拆分,而非令牌)
from langchain.text_splitter import SemanticChunker
semantic_splitter = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile" # 在语义边界拆分
)
参见: references/chunking-strategies.md 获取完整决策框架
嵌入生成
嵌入质量直接影响检索准确性。Voyage AI目前是业界最佳。
主要推荐:Voyage AI voyage-3
- 维度:1024
- MTEB分数:69.0(截至2025年12月最高)
- 成本:$$$ 但比OpenAI好9.74%
- 用途:需要最佳检索质量的生产系统
成本效益替代:OpenAI text-embedding-3-small
- 维度:1536
- MTEB分数:62.3
- 成本:$(比voyage-3便宜5倍)
- 用途:开发、原型设计、成本敏感应用
实现:
from langchain_voyageai import VoyageAIEmbeddings
from langchain_openai import OpenAIEmbeddings
# 生产(最佳质量)
embeddings = VoyageAIEmbeddings(
model="voyage-3",
voyage_api_key="your-api-key"
)
# 开发(成本效益)
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key="your-api-key"
)
参见: references/embedding-strategies.md 获取完整提供商比较
RAGAS评估指标
传统指标(BLEU、ROUGE)不衡量RAG质量。RAGAS提供LLM作为评判的评估。
4个核心指标:
| 指标 | 衡量内容 | 良好分数 |
|---|---|---|
| 忠实度 | 与检索上下文的事实一致性 | > 0.8 |
| 答案相关性 | 答案是否解决用户问题? | > 0.7 |
| 上下文精度 | 检索块是否实际相关? | > 0.6 |
| 上下文召回率 | 是否检索到所有必要块? | > 0.7 |
快速评估脚本:
# 运行RAGAS评估(TOKEN-FREE脚本执行)
python scripts/evaluate_rag.py --dataset eval_data.json --output results.json
手动实现:
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy
dataset = {
"question": ["What is the capital of France?"],
"answer": ["Paris is the capital of France."],
"contexts": [["France's capital is Paris."]],
"ground_truth": ["Paris"]
}
result = evaluate(dataset, metrics=[faithfulness, answer_relevancy])
print(f"Faithfulness: {result['faithfulness']}")
print(f"Answer Relevancy: {result['answer_relevancy']}")
参见: references/evaluation-metrics.md 获取完整RAGAS实现指南
特征存储
特征存储通过提供一致的特征计算解决“训练服务偏差”问题。
主要推荐:Feast - 开源,适用于任何后端(PostgreSQL、Redis、DynamoDB、S3、BigQuery、Snowflake)
基本用法:
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo/")
# 在线服务(低延迟)
features = store.get_online_features(
features=["user_features:total_orders"],
entity_rows=[{"user_id": 1001}]
).to_dict()
参见: references/feature-stores.md 获取完整Feast设置和替代方案(Tecton、Hopsworks)
LangChain编排
LangChain是LLM编排的主要框架,拥有最大生态系统(24,215+ API参考片段)。
Context7库ID: /websites/langchain_oss_python_langchain(可信度:高,片段:435)
基本RAG链:
from langchain_core.prompts import ChatPromptTemplate
from langchain_qdrant import QdrantVectorStore
from langchain_voyageai import VoyageAIEmbeddings
# 设置检索器
vectorstore = QdrantVectorStore(
client=qdrant_client,
embedding=VoyageAIEmbeddings(model="voyage-3")
)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 5})
# 构建链
prompt = ChatPromptTemplate.from_template(
"基于上下文回答:
{context}
问题:{question}"
)
chain = {"context": retriever, "question": lambda x: x} | prompt | ChatOpenAI() | StrOutputParser()
# 流式响应
for chunk in chain.stream("What is the capital of France?"):
print(chunk, end="", flush=True)
参见: references/langchain-patterns.md - 完整的LangChain 0.3+模式,包括流式和混合搜索
编排工具
现代AI管道需要超越cron作业的工作流编排。
主要推荐:Dagster(用于ML/AI管道) - 资产中心设计,最佳谱系跟踪,适合RAG
示例:嵌入管道
from dagster import asset
from langchain_voyageai import VoyageAIEmbeddings
@asset
def raw_documents():
"""从S3加载文档。"""
return documents
@asset
def chunked_documents(raw_documents):
"""拆分为512令牌块,重叠50令牌。"""
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)
return splitter.split_documents(raw_documents)
@asset
def embedded_documents(chunked_documents):
"""使用Voyage AI生成嵌入。"""
embeddings = VoyageAIEmbeddings(model="voyage-3")
return embeddings.embed_documents([doc.page_content for doc in chunked_documents])
参见: references/orchestration-tools.md 获取完整Dagster模式和替代方案(Prefect、Airflow 3.0、dbt)
与前端技能集成
ai-chat技能 → RAG后端
ai-chat技能使用RAG管道输出进行流式响应。
后端API(FastAPI):
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
@app.post("/api/rag/stream")
async def stream_rag(query: str):
async def generate():
chain = RetrievalQA.from_chain_type(llm=OpenAI(streaming=True), retriever=vectorstore.as_retriever())
async for chunk in chain.astream(query):
yield chunk
return StreamingResponse(generate(), media_type="text/plain")
参见: references/rag-architecture.md 获取完整前端集成模式
search-filter技能 → 语义搜索
search-filter技能使用语义搜索后端进行向量相似性。
后端(Qdrant + Voyage AI):
from qdrant_client import QdrantClient
from langchain_voyageai import VoyageAIEmbeddings
@app.post("/api/search/semantic")
async def semantic_search(query: str, filters: dict):
query_vector = VoyageAIEmbeddings(model="voyage-3").embed_query(query)
results = QdrantClient().search(
collection_name="documents",
query_vector=query_vector,
query_filter=filters,
limit=10
)
return {"results": results}
数据版本化
主要推荐:LakeFS(2025年11月收购DVC团队)
数据湖上的类Git操作:分支、提交、合并、时间旅行。适用于S3/Azure/GCS。
import lakefs
branch = lakefs.Branch("main").create("experiment-voyage-3")
branch.commit("Updated embeddings to voyage-3")
branch.merge_into("main")
参见: references/data-versioning.md 获取完整LakeFS设置
快速入门工作流
1. 设置向量数据库:
# 运行Qdrant设置脚本(TOKEN-FREE执行)
python scripts/setup_qdrant.py --collection docs --dimension 1024
2. 分块和嵌入文档:
# 分块文档(TOKEN-FREE执行)
python scripts/chunk_documents.py \
--input data/documents/ \
--chunk-size 512 \
--overlap 50 \
--output data/chunks/
3. 实现RAG管道:
参见 examples/langchain-rag/basic_rag.py 获取完整工作示例。
4. 使用RAGAS评估:
# 运行评估(TOKEN-FREE执行)
python scripts/evaluate_rag.py \
--dataset data/eval_qa.json \
--output results/ragas_metrics.json
5. 使用编排部署:
参见 examples/dagster-pipelines/embedding_pipeline.py 获取生产部署。
依赖项
所需Python包:
# 核心RAG
pip install langchain langchain-core langchain-openai langchain-voyageai langchain-qdrant
# 向量数据库
pip install qdrant-client
# 评估
pip install ragas datasets
# 特征存储
pip install feast
# 编排
pip install dagster dagster-webserver
# 数据版本化
pip install lakefs-client
替代方案可选:
# LlamaIndex(LangChain替代)
pip install llama-index
# dbt(SQL转换)
pip install dbt-core dbt-postgres
# Prefect(替代编排)
pip install prefect
故障排除
常见问题:
1. 检索质量差 - 检查块大小(尝试512令牌),增加重叠(50-100),尝试混合搜索,使用Cohere重新排序
2. 嵌入生成慢 - 批量处理文档(100-1000),使用异步API,用Redis缓存,开发时使用更小模型
3. LLM成本高 - 减少检索块(k=3),使用更便宜的重新排序模型,缓存频繁查询
参见: references/rag-architecture.md 获取完整故障排除指南
最佳实践
分块: 默认512令牌,重叠50令牌。复杂文档使用语义分块。源代码保留代码结构。
嵌入: 生产使用Voyage AI voyage-3,开发使用OpenAI text-embedding-3-small。切勿混合嵌入模型(如果更改则重新嵌入所有内容)。
评估: 每次管道更改运行RAGAS指标。维护50+问答对测试数据集。跟踪指标随时间变化。
编排: ML/AI管道使用Dagster,仅SQL转换使用dbt。版本控制所有管道代码。
前端集成: 始终流式响应LLM。实现重试逻辑。向用户显示引文/来源。优雅处理空结果。
附加资源
参考文档:
references/rag-architecture.md- 完整RAG管道指南references/chunking-strategies.md- 分块决策框架references/embedding-strategies.md- 嵌入模型比较references/langchain-patterns.md- LangChain 0.3+模式references/feature-stores.md- Feast设置和替代方案references/evaluation-metrics.md- RAGAS实现指南
工作示例:
examples/langchain-rag/basic_rag.py- 简单RAG链examples/langchain-rag/streaming_rag.py- 流式响应examples/langchain-rag/hybrid_search.py- 向量 + BM25examples/llamaindex-agents/query_engine.py- LlamaIndex替代examples/feast-features/- 完整特征存储设置examples/dagster-pipelines/embedding_pipeline.py- 生产管道
可执行脚本(TOKEN-FREE):
scripts/evaluate_rag.py- RAGAS评估运行器scripts/chunk_documents.py- 文档分块实用程序scripts/benchmark_retrieval.py- 检索质量基准测试scripts/setup_qdrant.py- Qdrant集合设置