AI数据工程Skill ai-data-engineering

AI数据工程技能专注于构建人工智能和机器学习系统的数据基础设施,特别用于检索增强生成(RAG)应用。它涵盖数据管道、特征存储、嵌入生成、工作流编排(如Dagster、Prefect)、数据版本化(LakeFS)和评估指标(如RAGAS)。适用于开发生产级AI应用,如语义搜索、实时特征服务和RAG系统,提供架构模式和实践指南。关键词:AI数据工程、RAG管道、特征存储、嵌入生成、Dagster、RAGAS、机器学习基础设施。

RAG应用 0 次安装 0 次浏览 更新于 3/23/2026

name: AI数据工程 description: 为AI/ML系统构建数据管道、特征存储和嵌入生成。用于构建RAG管道、ML特征服务或数据转换。覆盖特征存储(Feast、Tecton)、嵌入管道、分块策略、编排(Dagster、Prefect、Airflow)、dbt转换、数据版本化(LakeFS)和实验跟踪(MLflow、W&B)。

AI数据工程

目的

构建AI/ML系统的数据基础设施,包括RAG管道、特征存储和嵌入生成。提供生产AI应用的架构模式、编排工作流和评估指标。

使用时机

使用此技能时:

  • 构建RAG(检索增强生成)管道
  • 实现语义搜索或向量数据库
  • 设置ML特征存储以进行实时服务
  • 创建嵌入生成管道
  • 使用RAGAS指标评估RAG质量
  • 编排AI系统的数据工作流
  • 与前端技能集成(ai-chat、search-filter)

跳过此技能如果:

  • 构建传统CRUD应用程序(使用数据库-关系型)
  • 简单键值存储(使用数据库-非关系型)
  • 应用程序中没有AI/ML组件

RAG管道架构

RAG管道有5个独立阶段。理解此架构对生产实现至关重要。

┌─────────────────────────────────────────────────────────────┐
│                    RAG Pipeline (5 Stages)                   │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  1. 数据摄入 → 加载文档(PDF、DOCX、Markdown)              │
│  2. 索引 → 分块(512个令牌)+ 嵌入 + 存储                  │
│  3. 检索 → 查询嵌入 + 向量搜索 + 过滤器                    │
│  4. 生成 → 上下文注入 + LLM流式响应                         │
│  5. 评估 → RAGAS指标(忠实度、相关性)                      │
│                                                              │
└─────────────────────────────────────────────────────────────┘

完整RAG架构与实现模式,请参见:

  • references/rag-architecture.md - 详细的5阶段分解
  • examples/langchain-rag/basic_rag.py - 工作实现

分块策略

分块是RAG质量最关键的决策。分块不当会破坏检索。

默认推荐:

  • 大小: 512个令牌
  • 重叠: 50-100个令牌
  • 方法: 基于固定令牌

为何选择这些值:

  • 太小(<256个令牌):丢失上下文,需要多次检索
  • 太大(>1024个令牌):包含不相关内容,触及令牌限制
  • 重叠防止块边界的信息丢失

特殊情况的替代策略:

# 代码感知分块(保留函数/类)
from langchain.text_splitter import RecursiveCharacterTextSplitter

code_splitter = RecursiveCharacterTextSplitter.from_language(
    language="python",
    chunk_size=512,
    chunk_overlap=50
)

# 语义分块(基于意义拆分,而非令牌)
from langchain.text_splitter import SemanticChunker

semantic_splitter = SemanticChunker(
    embeddings=embeddings,
    breakpoint_threshold_type="percentile"  # 在语义边界拆分
)

参见: references/chunking-strategies.md 获取完整决策框架

嵌入生成

嵌入质量直接影响检索准确性。Voyage AI目前是业界最佳。

主要推荐:Voyage AI voyage-3

  • 维度:1024
  • MTEB分数:69.0(截至2025年12月最高)
  • 成本:$$$ 但比OpenAI好9.74%
  • 用途:需要最佳检索质量的生产系统

成本效益替代:OpenAI text-embedding-3-small

  • 维度:1536
  • MTEB分数:62.3
  • 成本:$(比voyage-3便宜5倍)
  • 用途:开发、原型设计、成本敏感应用

实现:

from langchain_voyageai import VoyageAIEmbeddings
from langchain_openai import OpenAIEmbeddings

# 生产(最佳质量)
embeddings = VoyageAIEmbeddings(
    model="voyage-3",
    voyage_api_key="your-api-key"
)

# 开发(成本效益)
embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small",
    openai_api_key="your-api-key"
)

参见: references/embedding-strategies.md 获取完整提供商比较

RAGAS评估指标

传统指标(BLEU、ROUGE)不衡量RAG质量。RAGAS提供LLM作为评判的评估。

4个核心指标:

指标 衡量内容 良好分数
忠实度 与检索上下文的事实一致性 > 0.8
答案相关性 答案是否解决用户问题? > 0.7
上下文精度 检索块是否实际相关? > 0.6
上下文召回率 是否检索到所有必要块? > 0.7

快速评估脚本:

# 运行RAGAS评估(TOKEN-FREE脚本执行)
python scripts/evaluate_rag.py --dataset eval_data.json --output results.json

手动实现:

from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy

dataset = {
    "question": ["What is the capital of France?"],
    "answer": ["Paris is the capital of France."],
    "contexts": [["France's capital is Paris."]],
    "ground_truth": ["Paris"]
}

result = evaluate(dataset, metrics=[faithfulness, answer_relevancy])
print(f"Faithfulness: {result['faithfulness']}")
print(f"Answer Relevancy: {result['answer_relevancy']}")

参见: references/evaluation-metrics.md 获取完整RAGAS实现指南

特征存储

特征存储通过提供一致的特征计算解决“训练服务偏差”问题。

主要推荐:Feast - 开源,适用于任何后端(PostgreSQL、Redis、DynamoDB、S3、BigQuery、Snowflake)

基本用法:

from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo/")

# 在线服务(低延迟)
features = store.get_online_features(
    features=["user_features:total_orders"],
    entity_rows=[{"user_id": 1001}]
).to_dict()

参见: references/feature-stores.md 获取完整Feast设置和替代方案(Tecton、Hopsworks)

LangChain编排

LangChain是LLM编排的主要框架,拥有最大生态系统(24,215+ API参考片段)。

Context7库ID: /websites/langchain_oss_python_langchain(可信度:高,片段:435)

基本RAG链:

from langchain_core.prompts import ChatPromptTemplate
from langchain_qdrant import QdrantVectorStore
from langchain_voyageai import VoyageAIEmbeddings

# 设置检索器
vectorstore = QdrantVectorStore(
    client=qdrant_client,
    embedding=VoyageAIEmbeddings(model="voyage-3")
)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 5})

# 构建链
prompt = ChatPromptTemplate.from_template(
    "基于上下文回答:
{context}

问题:{question}"
)
chain = {"context": retriever, "question": lambda x: x} | prompt | ChatOpenAI() | StrOutputParser()

# 流式响应
for chunk in chain.stream("What is the capital of France?"):
    print(chunk, end="", flush=True)

参见: references/langchain-patterns.md - 完整的LangChain 0.3+模式,包括流式和混合搜索

编排工具

现代AI管道需要超越cron作业的工作流编排。

主要推荐:Dagster(用于ML/AI管道) - 资产中心设计,最佳谱系跟踪,适合RAG

示例:嵌入管道

from dagster import asset
from langchain_voyageai import VoyageAIEmbeddings

@asset
def raw_documents():
    """从S3加载文档。"""
    return documents

@asset
def chunked_documents(raw_documents):
    """拆分为512令牌块,重叠50令牌。"""
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)
    return splitter.split_documents(raw_documents)

@asset
def embedded_documents(chunked_documents):
    """使用Voyage AI生成嵌入。"""
    embeddings = VoyageAIEmbeddings(model="voyage-3")
    return embeddings.embed_documents([doc.page_content for doc in chunked_documents])

参见: references/orchestration-tools.md 获取完整Dagster模式和替代方案(Prefect、Airflow 3.0、dbt)

与前端技能集成

ai-chat技能 → RAG后端

ai-chat技能使用RAG管道输出进行流式响应。

后端API(FastAPI):

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

@app.post("/api/rag/stream")
async def stream_rag(query: str):
    async def generate():
        chain = RetrievalQA.from_chain_type(llm=OpenAI(streaming=True), retriever=vectorstore.as_retriever())
        async for chunk in chain.astream(query):
            yield chunk
    return StreamingResponse(generate(), media_type="text/plain")

参见: references/rag-architecture.md 获取完整前端集成模式

search-filter技能 → 语义搜索

search-filter技能使用语义搜索后端进行向量相似性。

后端(Qdrant + Voyage AI):

from qdrant_client import QdrantClient
from langchain_voyageai import VoyageAIEmbeddings

@app.post("/api/search/semantic")
async def semantic_search(query: str, filters: dict):
    query_vector = VoyageAIEmbeddings(model="voyage-3").embed_query(query)
    results = QdrantClient().search(
        collection_name="documents",
        query_vector=query_vector,
        query_filter=filters,
        limit=10
    )
    return {"results": results}

数据版本化

主要推荐:LakeFS(2025年11月收购DVC团队)

数据湖上的类Git操作:分支、提交、合并、时间旅行。适用于S3/Azure/GCS。

import lakefs

branch = lakefs.Branch("main").create("experiment-voyage-3")
branch.commit("Updated embeddings to voyage-3")
branch.merge_into("main")

参见: references/data-versioning.md 获取完整LakeFS设置

快速入门工作流

1. 设置向量数据库:

# 运行Qdrant设置脚本(TOKEN-FREE执行)
python scripts/setup_qdrant.py --collection docs --dimension 1024

2. 分块和嵌入文档:

# 分块文档(TOKEN-FREE执行)
python scripts/chunk_documents.py \
  --input data/documents/ \
  --chunk-size 512 \
  --overlap 50 \
  --output data/chunks/

3. 实现RAG管道:

参见 examples/langchain-rag/basic_rag.py 获取完整工作示例。

4. 使用RAGAS评估:

# 运行评估(TOKEN-FREE执行)
python scripts/evaluate_rag.py \
  --dataset data/eval_qa.json \
  --output results/ragas_metrics.json

5. 使用编排部署:

参见 examples/dagster-pipelines/embedding_pipeline.py 获取生产部署。

依赖项

所需Python包:

# 核心RAG
pip install langchain langchain-core langchain-openai langchain-voyageai langchain-qdrant

# 向量数据库
pip install qdrant-client

# 评估
pip install ragas datasets

# 特征存储
pip install feast

# 编排
pip install dagster dagster-webserver

# 数据版本化
pip install lakefs-client

替代方案可选:

# LlamaIndex(LangChain替代)
pip install llama-index

# dbt(SQL转换)
pip install dbt-core dbt-postgres

# Prefect(替代编排)
pip install prefect

故障排除

常见问题:

1. 检索质量差 - 检查块大小(尝试512令牌),增加重叠(50-100),尝试混合搜索,使用Cohere重新排序

2. 嵌入生成慢 - 批量处理文档(100-1000),使用异步API,用Redis缓存,开发时使用更小模型

3. LLM成本高 - 减少检索块(k=3),使用更便宜的重新排序模型,缓存频繁查询

参见: references/rag-architecture.md 获取完整故障排除指南

最佳实践

分块: 默认512令牌,重叠50令牌。复杂文档使用语义分块。源代码保留代码结构。

嵌入: 生产使用Voyage AI voyage-3,开发使用OpenAI text-embedding-3-small。切勿混合嵌入模型(如果更改则重新嵌入所有内容)。

评估: 每次管道更改运行RAGAS指标。维护50+问答对测试数据集。跟踪指标随时间变化。

编排: ML/AI管道使用Dagster,仅SQL转换使用dbt。版本控制所有管道代码。

前端集成: 始终流式响应LLM。实现重试逻辑。向用户显示引文/来源。优雅处理空结果。

附加资源

参考文档:

  • references/rag-architecture.md - 完整RAG管道指南
  • references/chunking-strategies.md - 分块决策框架
  • references/embedding-strategies.md - 嵌入模型比较
  • references/langchain-patterns.md - LangChain 0.3+模式
  • references/feature-stores.md - Feast设置和替代方案
  • references/evaluation-metrics.md - RAGAS实现指南

工作示例:

  • examples/langchain-rag/basic_rag.py - 简单RAG链
  • examples/langchain-rag/streaming_rag.py - 流式响应
  • examples/langchain-rag/hybrid_search.py - 向量 + BM25
  • examples/llamaindex-agents/query_engine.py - LlamaIndex替代
  • examples/feast-features/ - 完整特征存储设置
  • examples/dagster-pipelines/embedding_pipeline.py - 生产管道

可执行脚本(TOKEN-FREE):

  • scripts/evaluate_rag.py - RAGAS评估运行器
  • scripts/chunk_documents.py - 文档分块实用程序
  • scripts/benchmark_retrieval.py - 检索质量基准测试
  • scripts/setup_qdrant.py - Qdrant集合设置