name: ingesting-data description: 从云存储、API、文件和流式源加载数据到数据库的数据摄入模式。用于导入CSV/JSON/Parquet文件,从S3/GCS桶拉取数据,消费API馈送,或构建ETL管道。
数据摄入模式
这个技能提供了从外部源获取数据到系统的模式。
何时使用这个技能
- 导入CSV、JSON、Parquet或Excel文件
- 从S3、GCS或Azure Blob存储加载数据
- 消费REST/GraphQL API馈送
- 构建ETL/ELT管道
- 数据库迁移和CDC(更改数据捕获)
- 从Kafka/Kinesis流式数据摄入
摄入模式决策树
您的数据源是什么?
├── 云存储(S3、GCS、Azure)→ 参见cloud-storage.md
├── 文件(CSV、JSON、Parquet)→ 参见file-formats.md
├── REST/GraphQL APIs → 参见api-feeds.md
├── 流式(Kafka、Kinesis)→ 参见streaming-sources.md
├── 传统数据库 → 参见database-migration.md
└── 需要完整ETL框架 → 参见etl-tools.md
按语言快速入门
Python(推荐用于ETL)
dlt(数据加载工具)- 现代Python ETL:
import dlt
# 定义源
@dlt.source
def github_source(repo: str):
@dlt.resource(write_disposition="merge", primary_key="id")
def issues():
response = requests.get(f"https://api.github.com/repos/{repo}/issues")
yield response.json()
return issues
# 加载到目的地
pipeline = dlt.pipeline(
pipeline_name="github_issues",
destination="postgres", # 或 duckdb、bigquery、snowflake
dataset_name="github_data"
)
load_info = pipeline.run(github_source("owner/repo"))
print(load_info)
Polars用于文件处理(比pandas更快):
import polars as pl
# 读取CSV并推断模式
df = pl.read_csv("data.csv")
# 读取Parquet(列式,高效)
df = pl.read_parquet("s3://bucket/data.parquet")
# 读取JSON行
df = pl.read_ndjson("events.jsonl")
# 写入数据库
df.write_database(
table_name="events",
connection="postgresql://user:pass@localhost/db",
if_table_exists="append"
)
TypeScript/Node.js
S3摄入:
import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3";
import { parse } from "csv-parse/sync";
const s3 = new S3Client({ region: "us-east-1" });
async function ingestFromS3(bucket: string, key: string) {
const response = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
const body = await response.Body?.transformToString();
// 解析CSV
const records = parse(body, { columns: true, skip_empty_lines: true });
// 插入到数据库
await db.insert(eventsTable).values(records);
}
API馈送轮询:
import { Hono } from "hono";
// 用于实时摄入的Webhook接收器
const app = new Hono();
app.post("/webhooks/stripe", async (c) => {
const event = await c.req.json();
// 验证webhook签名
const signature = c.req.header("stripe-signature");
// ... 验证逻辑
// 摄入事件
await db.insert(stripeEventsTable).values({
eventId: event.id,
type: event.type,
data: event.data,
receivedAt: new Date()
});
return c.json({ received: true });
});
Rust
高性能文件摄入:
use polars::prelude::*;
use aws_sdk_s3::Client;
async fn ingest_parquet(client: &Client, bucket: &str, key: &str) -> Result<DataFrame> {
// 从S3下载
let resp = client.get_object()
.bucket(bucket)
.key(key)
.send()
.await?;
let bytes = resp.body.collect().await?.into_bytes();
// 用Polars解析
let df = ParquetReader::new(Cursor::new(bytes))
.finish()?;
Ok(df)
}
Go
并发文件处理:
package main
import (
"context"
"encoding/csv"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
func ingestCSV(ctx context.Context, client *s3.Client, bucket, key string) error {
resp, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
return err
}
defer resp.Body.Close()
reader := csv.NewReader(resp.Body)
records, err := reader.ReadAll()
if err != nil {
return err
}
// 批量插入到数据库
return batchInsert(ctx, records)
}
摄入模式
1. 批处理摄入(文件/存储)
用于定期批量加载:
源 → 提取 → 转换 → 加载 → 验证
↓ ↓ ↓ ↓ ↓
S3 下载 清洁/映射 插入 计数检查
关键考虑:
- 对于大文件(>100MB)使用分块读取
- 通过校验和实现幂等性
- 跟踪文件处理状态
- 处理部分失败
2. 流式摄入(实时)
用于连续数据流:
源 → 缓冲 → 处理 → 加载 → 确认
↓ ↓ ↓ ↓ ↓
Kafka 内存中 转换 DB 提交偏移量
关键考虑:
- 至少一次 vs 恰好一次语义
- 背压处理
- 死信队列用于失败
- 检查点管理
3. API轮询(馈送)
用于外部API数据:
调度 → 获取 → 去重 → 加载 → 更新游标
↓ ↓ ↓ ↓ ↓
Cron API调用 按ID 插入 最后时间戳
关键考虑:
- 速率限制和退避
- 增量加载(游标、时间戳)
- API分页处理
- 指数退避重试
4. 更改数据捕获(CDC)
用于数据库复制:
源数据库 → 捕获更改 → 转换 → 目标数据库
↓ ↓ ↓ ↓
Postgres Debezium/WAL 映射模式 插入/更新
关键考虑:
- 初始快照 + 流式更改
- 模式演化处理
- 顺序保证
- 冲突解决
库推荐
| 用例 | Python | TypeScript | Rust | Go |
|---|---|---|---|---|
| ETL框架 | dlt, Meltano, Dagster | - | - | - |
| 云存储 | boto3, gcsfs, adlfs | @aws-sdk/, @google-cloud/ | aws-sdk-s3, object_store | aws-sdk-go-v2 |
| 文件处理 | polars, pandas, pyarrow | papaparse, xlsx, parquetjs | polars-rs, arrow-rs | encoding/csv, parquet-go |
| 流式 | confluent-kafka, aiokafka | kafkajs | rdkafka-rs | franz-go, sarama |
| CDC | Debezium, pg_logical | - | - | - |
参考文档
references/cloud-storage.md- S3、GCS、Azure Blob模式references/file-formats.md- CSV、JSON、Parquet、Excel处理references/api-feeds.md- REST轮询、webhooks、GraphQL订阅references/streaming-sources.md- Kafka、Kinesis、Pub/Subreferences/database-migration.md- 模式迁移、CDC模式references/etl-tools.md- dlt、Meltano、Airbyte、Fivetran
脚本
scripts/validate_csv_schema.py- 根据预期模式验证CSVscripts/test_s3_connection.py- 测试S3桶连接性scripts/generate_dlt_pipeline.py- 生成dlt管道骨架
与数据库技能链接
摄入后,链接到适当的数据库技能:
| 目的地 | 链接到技能 |
|---|---|
| PostgreSQL, MySQL | databases-relational |
| MongoDB, DynamoDB | databases-document |
| Qdrant, Pinecone | databases-vector(嵌入后) |
| ClickHouse, TimescaleDB | databases-timeseries |
| Neo4j | databases-graph |
对于向量数据库,通过 ai-data-engineering 链接用于嵌入:
ingesting-data → ai-data-engineering → databases-vector