Python数据管道开发Skill python-pipeline

该技能用于构建模块化的Python数据处理管道,支持工作流协调、内容类型调度、Google Sheets/Drive API集成和AI服务应用。适用于内容处理、批量处理系统和数据工程场景,提高数据处理效率和自动化水平。关键词:Python、数据管道、模块化架构、调度器、Google Sheets集成、AI集成、ETL、数据处理、批量处理、工作流自动化。

数据工程 0 次安装 0 次浏览 更新于 3/15/2026

name: python-pipeline description: Python数据处理管道,采用模块化架构。适用于构建内容处理工作流、实现调度器模式、集成Google Sheets/Drive APIs或创建批量处理系统。涵盖了rosen-scraper、image-analyzer和social-scraper项目中的模式。

Python数据管道开发

使用Python构建生产级数据处理管道的模式。

架构模式

模块化处理器架构

src/
├── workflow.py              # 主协调器
├── dispatcher.py            # 内容类型路由器
├── processors/
│   ├── __init__.py
│   ├── base.py             # 抽象基类
│   ├── article_processor.py
│   ├── video_processor.py
│   └── audio_processor.py
├── services/
│   ├── sheets_service.py   # Google Sheets集成
│   ├── drive_service.py    # Google Drive集成
│   └── ai_service.py       # Gemini API包装器
├── utils/
│   ├── logger.py
│   └── rate_limiter.py
└── config.py               # 环境配置

调度器模式

from typing import Protocol
from urllib.parse import urlparse

class Processor(Protocol):
    def can_process(self, url: str) -> bool: ...
    def process(self, url: str, metadata: dict) -> dict: ...

class Dispatcher:
    def __init__(self):
        self.processors: list[Processor] = [
            ArticleProcessor(),
            VideoProcessor(),
            AudioProcessor(),
            SocialProcessor(),
        ]

    def dispatch(self, url: str, metadata: dict) -> dict:
        for processor in self.processors:
            if processor.can_process(url):
                return processor.process(url, metadata)
        raise ValueError(f"没有找到处理URL的处理器: {url}")

# 基于模式的路由
class ArticleProcessor:
    DOMAINS = ['nytimes.com', 'washingtonpost.com', 'medium.com']

    def can_process(self, url: str) -> bool:
        domain = urlparse(url).netloc.replace('www.', '')
        return any(d in domain for d in self.DOMAINS)

基于CSV的管道工作流

import csv
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Iterator

@dataclass
class Record:
    id: str
    url: str
    title: str | None = None
    content: str | None = None
    status: str = 'pending'

def read_input(path: Path) -> Iterator[Record]:
    with open(path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield Record(**{k: v for k, v in row.items() if k in Record.__annotations__})

def write_output(records: list[Record], path: Path):
    with open(path, 'w', encoding='utf-8', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=list(Record.__annotations__.keys()))
        writer.writeheader()
        writer.writerows(asdict(r) for r in records)

def process_batch(input_path: Path, output_path: Path):
    dispatcher = Dispatcher()
    results = []

    for record in read_input(input_path):
        try:
            processed = dispatcher.dispatch(record.url, asdict(record))
            record.status = 'completed'
            record.title = processed.get('title')
            record.content = processed.get('content')
        except Exception as e:
            record.status = f'failed: {e}'
        results.append(record)

    write_output(results, output_path)

Google Sheets集成

import gspread
from google.oauth2.service_account import Credentials

SCOPES = [
    'https://www.googleapis.com/auth/spreadsheets',
    'https://www.googleapis.com/auth/drive'
]

class SheetsService:
    def __init__(self, credentials_path: str):
        creds = Credentials.from_service_account_file(credentials_path, scopes=SCOPES)
        self.client = gspread.authorize(creds)

    def get_worksheet(self, spreadsheet_id: str, sheet_name: str):
        spreadsheet = self.client.open_by_key(spreadsheet_id)
        return spreadsheet.worksheet(sheet_name)

    def read_all(self, worksheet) -> list[dict]:
        return worksheet.get_all_records()

    def append_row(self, worksheet, row: list):
        worksheet.append_row(row, value_input_option='USER_ENTERED')

    def batch_update(self, worksheet, updates: list[dict]):
        """高效更新多个单元格。"""
        # 格式: [{'range': 'A1', 'values': [[value]]}]
        worksheet.batch_update(updates, value_input_option='USER_ENTERED')

    def find_row_by_id(self, worksheet, id_value: str, id_column: int = 1) -> int | None:
        """通过ID值查找行号。"""
        try:
            cell = worksheet.find(id_value, in_column=id_column)
            return cell.row
        except gspread.CellNotFound:
            return None

速率限制

import time
from functools import wraps
from ratelimit import limits, sleep_and_retry

# 简单速率限制器
@sleep_and_retry
@limits(calls=10, period=60)  # 每分钟10次调用
def rate_limited_api_call(url: str):
    return requests.get(url)

# 自定义速率限制器,带退避
class RateLimiter:
    def __init__(self, calls_per_minute: int = 10):
        self.delay = 60 / calls_per_minute
        self.last_call = 0

    def wait(self):
        elapsed = time.time() - self.last_call
        if elapsed < self.delay:
            time.sleep(self.delay - elapsed)
        self.last_call = time.time()

# 使用
limiter = RateLimiter(calls_per_minute=10)

def fetch_with_rate_limit(url: str):
    limiter.wait()
    return requests.get(url)

进度跟踪与恢复能力

import json
from pathlib import Path

class ProgressTracker:
    def __init__(self, progress_file: Path):
        self.progress_file = progress_file
        self.state = self._load()

    def _load(self) -> dict:
        if self.progress_file.exists():
            return json.loads(self.progress_file.read_text())
        return {'processed_ids': [], 'last_row': 0, 'errors': []}

    def save(self):
        self.progress_file.write_text(json.dumps(self.state, indent=2))

    def mark_processed(self, record_id: str):
        self.state['processed_ids'].append(record_id)
        self.save()

    def is_processed(self, record_id: str) -> bool:
        return record_id in self.state['processed_ids']

    def log_error(self, record_id: str, error: str):
        self.state['errors'].append({'id': record_id, 'error': error})
        self.save()

# 在工作流中使用
tracker = ProgressTracker(Path('progress.json'))

for record in records:
    if tracker.is_processed(record.id):
        continue  # 跳过已处理的

    try:
        process(record)
        tracker.mark_processed(record.id)
    except Exception as e:
        tracker.log_error(record.id, str(e))

Gemini AI集成

import google.generativeai as genai
from pathlib import Path

genai.configure(api_key=os.environ['GEMINI_API_KEY'])

class AIService:
    def __init__(self, model: str = 'gemini-2.0-flash'):
        self.model = genai.GenerativeModel(model)

    def categorize(self, text: str, taxonomy: dict) -> dict:
        prompt = f"""分析此内容并进行分类。

内容:
{text[:10000]}  # 截断以避免令牌限制

分类法:
{json.dumps(taxonomy, indent=2)}

使用JSON响应,包含:
- category: 分类法中的一个类别
- tags: 相关标签列表
- summary: 2-3句摘要
"""
        response = self.model.generate_content(prompt)
        return json.loads(response.text)

    def extract_entities(self, text: str) -> list[dict]:
        prompt = f"""从此文本中提取命名实体。

文本:
{text[:10000]}

对于每个实体,提供:
- name: 实体名称
- type: 人、组织、地点、事件、作品或概念
- prominence: 基于文本中重要性的1-10分数

使用JSON数组响应实体。"""
        response = self.model.generate_content(prompt)
        return json.loads(response.text)

# 带成本跟踪的批量处理
class BatchAIProcessor:
    def __init__(self, ai_service: AIService):
        self.ai = ai_service
        self.total_tokens = 0
        self.cost_per_1k_tokens = 0.00025  # 根据您的模型调整

    def process_batch(self, items: list[str]) -> list[dict]:
        results = []
        for item in items:
            result = self.ai.categorize(item, TAXONOMY)
            self.total_tokens += len(item) // 4  # 粗略估计
            results.append(result)
        return results

    @property
    def estimated_cost(self) -> float:
        return (self.total_tokens / 1000) * self.cost_per_1k_tokens

使用Gemini Vision进行图像分类

import google.generativeai as genai
from PIL import Image
from pathlib import Path

def classify_image(image_path: Path, categories: list[str]) -> dict:
    model = genai.GenerativeModel('gemini-2.0-flash')
    image = Image.open(image_path)

    prompt = f"""分析此图像并进行分类。

可用类别: {', '.join(categories)}

使用JSON响应:
{{
  "category": "类别名称",
  "description": "简短描述",
  "suggested_filename": "描述性文件名(用连字符分隔)",
  "tags": ["标签1", "标签2", "标签3"]
}}
"""
    response = model.generate_content([prompt, image])
    return json.loads(response.text)

def organize_images(source_dir: Path, output_dir: Path):
    categories = ['自然', '人物', '建筑', '艺术', '技术', '其他']

    for image_path in source_dir.glob('*.{jpg,png,webp}'):
        try:
            result = classify_image(image_path, categories)
            category_dir = output_dir / result['category']
            category_dir.mkdir(exist_ok=True)

            new_name = f"{result['suggested_filename']}{image_path.suffix}"
            image_path.rename(category_dir / new_name)
        except Exception as e:
            (output_dir / 'failures').mkdir(exist_ok=True)
            image_path.rename(output_dir / 'failures' / image_path.name)

环境配置

from pathlib import Path
from dotenv import load_dotenv
import os

load_dotenv()

class Config:
    # API密钥
    GEMINI_API_KEY = os.environ['GEMINI_API_KEY']
    GOOGLE_SHEET_ID = os.environ['GOOGLE_SHEET_ID']

    # 路径
    PROJECT_ROOT = Path(__file__).parent.parent
    DATA_DIR = PROJECT_ROOT / 'data'
    OUTPUT_DIR = PROJECT_ROOT / 'output'
    CREDENTIALS_PATH = PROJECT_ROOT / 'google_credentials.json'

    # 速率限制
    API_CALLS_PER_MINUTE = 10
    BATCH_SIZE = 50

    @classmethod
    def ensure_dirs(cls):
        cls.DATA_DIR.mkdir(exist_ok=True)
        cls.OUTPUT_DIR.mkdir(exist_ok=True)

日志设置

import logging
from pathlib import Path
from datetime import datetime

def setup_logging(log_dir: Path, name: str = 'pipeline') -> logging.Logger:
    log_dir.mkdir(exist_ok=True)

    logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)

    # 控制台处理器(INFO及以上)
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    console.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))

    # 文件处理器(DEBUG及以上)
    log_file = log_dir / f"{name}_{datetime.now():%Y%m%d_%H%M%S}.log"
    file_handler = logging.FileHandler(log_file)
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    ))

    logger.addHandler(console)
    logger.addHandler(file_handler)

    return logger

常见陷阱

Google Sheets单元格限制:

MAX_CELL_LENGTH = 50000

def truncate_for_sheets(text: str) -> str:
    if len(text) > MAX_CELL_LENGTH:
        return text[:MAX_CELL_LENGTH - 20] + '... [已截断]'
    return text

CSV编码问题:

# 始终指定编码
with open(path, 'r', encoding='utf-8-sig') as f:  # BOM处理
    reader = csv.reader(f)

API配额管理:

# 缓存API响应
from functools import lru_cache

@lru_cache(maxsize=1000)
def cached_api_call(url: str) -> dict:
    return api_client.fetch(url)