数据摄入技能Skill ingesting-data

数据摄入技能专注于从多种源高效加载数据到数据库和系统的模式与实践。它涵盖ETL开发、云存储集成、API消费、流式数据处理等,支持批量处理和实时摄入,适用于数据工程、数据分析、ETL管道构建等场景。关键词:数据摄入、ETL开发、数据工程、云存储、API集成、实时数据处理、数据库迁移、更改数据捕获、ETL框架。

ETL开发 0 次安装 0 次浏览 更新于 3/23/2026

name: ingesting-data description: 从云存储、API、文件和流式源加载数据到数据库的数据摄入模式。用于导入CSV/JSON/Parquet文件,从S3/GCS桶拉取数据,消费API馈送,或构建ETL管道。

数据摄入模式

这个技能提供了从外部源获取数据到系统的模式。

何时使用这个技能

  • 导入CSV、JSON、Parquet或Excel文件
  • 从S3、GCS或Azure Blob存储加载数据
  • 消费REST/GraphQL API馈送
  • 构建ETL/ELT管道
  • 数据库迁移和CDC(更改数据捕获)
  • 从Kafka/Kinesis流式数据摄入

摄入模式决策树

您的数据源是什么?
├── 云存储(S3、GCS、Azure)→ 参见cloud-storage.md
├── 文件(CSV、JSON、Parquet)→ 参见file-formats.md
├── REST/GraphQL APIs → 参见api-feeds.md
├── 流式(Kafka、Kinesis)→ 参见streaming-sources.md
├── 传统数据库 → 参见database-migration.md
└── 需要完整ETL框架 → 参见etl-tools.md

按语言快速入门

Python(推荐用于ETL)

dlt(数据加载工具)- 现代Python ETL:

import dlt

# 定义源
@dlt.source
def github_source(repo: str):
    @dlt.resource(write_disposition="merge", primary_key="id")
    def issues():
        response = requests.get(f"https://api.github.com/repos/{repo}/issues")
        yield response.json()
    return issues

# 加载到目的地
pipeline = dlt.pipeline(
    pipeline_name="github_issues",
    destination="postgres",  # 或 duckdb、bigquery、snowflake
    dataset_name="github_data"
)

load_info = pipeline.run(github_source("owner/repo"))
print(load_info)

Polars用于文件处理(比pandas更快):

import polars as pl

# 读取CSV并推断模式
df = pl.read_csv("data.csv")

# 读取Parquet(列式,高效)
df = pl.read_parquet("s3://bucket/data.parquet")

# 读取JSON行
df = pl.read_ndjson("events.jsonl")

# 写入数据库
df.write_database(
    table_name="events",
    connection="postgresql://user:pass@localhost/db",
    if_table_exists="append"
)

TypeScript/Node.js

S3摄入:

import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3";
import { parse } from "csv-parse/sync";

const s3 = new S3Client({ region: "us-east-1" });

async function ingestFromS3(bucket: string, key: string) {
  const response = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
  const body = await response.Body?.transformToString();

  // 解析CSV
  const records = parse(body, { columns: true, skip_empty_lines: true });

  // 插入到数据库
  await db.insert(eventsTable).values(records);
}

API馈送轮询:

import { Hono } from "hono";

// 用于实时摄入的Webhook接收器
const app = new Hono();

app.post("/webhooks/stripe", async (c) => {
  const event = await c.req.json();

  // 验证webhook签名
  const signature = c.req.header("stripe-signature");
  // ... 验证逻辑

  // 摄入事件
  await db.insert(stripeEventsTable).values({
    eventId: event.id,
    type: event.type,
    data: event.data,
    receivedAt: new Date()
  });

  return c.json({ received: true });
});

Rust

高性能文件摄入:

use polars::prelude::*;
use aws_sdk_s3::Client;

async fn ingest_parquet(client: &Client, bucket: &str, key: &str) -> Result<DataFrame> {
    // 从S3下载
    let resp = client.get_object()
        .bucket(bucket)
        .key(key)
        .send()
        .await?;

    let bytes = resp.body.collect().await?.into_bytes();

    // 用Polars解析
    let df = ParquetReader::new(Cursor::new(bytes))
        .finish()?;

    Ok(df)
}

Go

并发文件处理:

package main

import (
    "context"
    "encoding/csv"
    "github.com/aws/aws-sdk-go-v2/service/s3"
)

func ingestCSV(ctx context.Context, client *s3.Client, bucket, key string) error {
    resp, err := client.GetObject(ctx, &s3.GetObjectInput{
        Bucket: &bucket,
        Key:    &key,
    })
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    reader := csv.NewReader(resp.Body)
    records, err := reader.ReadAll()
    if err != nil {
        return err
    }

    // 批量插入到数据库
    return batchInsert(ctx, records)
}

摄入模式

1. 批处理摄入(文件/存储)

用于定期批量加载:

源 → 提取 → 转换 → 加载 → 验证
  ↓         ↓          ↓         ↓        ↓
 S3      下载    清洁/映射    插入     计数检查

关键考虑:

  • 对于大文件(>100MB)使用分块读取
  • 通过校验和实现幂等性
  • 跟踪文件处理状态
  • 处理部分失败

2. 流式摄入(实时)

用于连续数据流:

源 → 缓冲 → 处理 → 加载 → 确认
  ↓        ↓         ↓        ↓      ↓
Kafka   内存中    转换     DB   提交偏移量

关键考虑:

  • 至少一次 vs 恰好一次语义
  • 背压处理
  • 死信队列用于失败
  • 检查点管理

3. API轮询(馈送)

用于外部API数据:

调度 → 获取 → 去重 → 加载 → 更新游标
   ↓         ↓        ↓       ↓         ↓
 Cron     API调用    按ID    插入     最后时间戳

关键考虑:

  • 速率限制和退避
  • 增量加载(游标、时间戳)
  • API分页处理
  • 指数退避重试

4. 更改数据捕获(CDC)

用于数据库复制:

源数据库 → 捕获更改 → 转换 → 目标数据库
    ↓             ↓               ↓            ↓
 Postgres    Debezium/WAL     映射模式   插入/更新

关键考虑:

  • 初始快照 + 流式更改
  • 模式演化处理
  • 顺序保证
  • 冲突解决

库推荐

用例 Python TypeScript Rust Go
ETL框架 dlt, Meltano, Dagster - - -
云存储 boto3, gcsfs, adlfs @aws-sdk/, @google-cloud/ aws-sdk-s3, object_store aws-sdk-go-v2
文件处理 polars, pandas, pyarrow papaparse, xlsx, parquetjs polars-rs, arrow-rs encoding/csv, parquet-go
流式 confluent-kafka, aiokafka kafkajs rdkafka-rs franz-go, sarama
CDC Debezium, pg_logical - - -

参考文档

  • references/cloud-storage.md - S3、GCS、Azure Blob模式
  • references/file-formats.md - CSV、JSON、Parquet、Excel处理
  • references/api-feeds.md - REST轮询、webhooks、GraphQL订阅
  • references/streaming-sources.md - Kafka、Kinesis、Pub/Sub
  • references/database-migration.md - 模式迁移、CDC模式
  • references/etl-tools.md - dlt、Meltano、Airbyte、Fivetran

脚本

  • scripts/validate_csv_schema.py - 根据预期模式验证CSV
  • scripts/test_s3_connection.py - 测试S3桶连接性
  • scripts/generate_dlt_pipeline.py - 生成dlt管道骨架

与数据库技能链接

摄入后,链接到适当的数据库技能:

目的地 链接到技能
PostgreSQL, MySQL databases-relational
MongoDB, DynamoDB databases-document
Qdrant, Pinecone databases-vector(嵌入后)
ClickHouse, TimescaleDB databases-timeseries
Neo4j databases-graph

对于向量数据库,通过 ai-data-engineering 链接用于嵌入:

ingesting-data → ai-data-engineering → databases-vector