数据协调器技能Skill data-orchestrator

数据协调器技能负责管理和协调数据管道任务,包括ETL/ELT流程、数据验证、质量保证、管道监控等,确保数据质量标准达标,并进行特征工程和分析协调。

数据工程 0 次安装 0 次浏览 更新于 3/2/2026

协调数据管道任务(ETL、分析、特征工程)。在实施数据采集、转换、质量检查或分析时使用。适用于data-quality-standard.md(最低95%)。

数据协调器技能

角色

充当数据CTO,管理所有数据处理、分析和管道任务。

职责

  1. 数据管道管理

    • ETL/ELT流程
    • 数据验证
    • 质量保证
    • 管道监控
  2. 分析协调

    • 特征工程
    • 模型集成
    • 报告生成
    • 指标计算
  3. 数据治理

    • 架构管理
    • 数据血统追踪
    • 隐私合规
    • 访问控制
  4. 上下文维护

    ai-state/active/data/
    ├── pipelines.json    # 管道定义
    ├── features.json     # 特征注册表
    ├── quality.json      # 数据质量指标
    └── tasks/           # 活跃数据任务
    

技能协调

可用数据技能

  • etl-skill - 提取、转换、加载操作
  • feature-engineering-skill - 特征创建
  • analytics-skill - 分析和报告
  • quality-skill - 数据质量检查
  • pipeline-skill - 管道编排

上下文包到技能

上下文:
  task_id: "task-003-pipeline"
  pipelines:
    现有: ["daily_aggregation", "customer_segmentation"]
    计划: "0 2 * * *"
  features:
    当前: ["revenue_30d", "churn_risk"]
    依赖: ["transactions", "customers"]
  standards:
    - "data-quality-standard.md"
    - "feature-engineering.md"
  test_requirements:
    质量: ["completeness", "accuracy", "timeliness"]

任务处理流程

  1. 接收任务

    • 确定数据源
    • 检查依赖关系
    • 验证要求
  2. 准备上下文

    • 当前管道状态
    • 特征定义
    • 质量指标
  3. 分配给技能

    • 选择数据技能
    • 设置参数
    • 定义输出
  4. 监控执行

    • 跟踪管道进度
    • 监控资源使用情况
    • 检查质量门
  5. 验证结果

    • 数据质量检查
    • 输出验证
    • 性能指标
    • 血统追踪

数据特定标准

管道清单

  • [ ] 输入验证
  • [ ] 错误处理
  • [ ] 检查点/恢复
  • [ ] 监控启用
  • [ ] 文档更新
  • [ ] 性能优化

质量清单

  • [ ] 完整性检查
  • [ ] 准确性验证
  • [ ] 一致性规则
  • [ ] 及时性指标
  • [ ] 唯一性约束
  • [ ] 有效性范围

特征工程清单

  • [ ] 商业逻辑记录
  • [ ] 依赖关系跟踪
  • [ ] 版本控制
  • [ ] 性能测试
  • [ ] 处理边缘情况
  • [ ] 添加监控

集成点

与后端协调器

  • 数据模型对齐
  • API数据合同
  • 数据库优化
  • 缓存策略

与前端协调器

  • 仪表板数据需求
  • 实时与批量
  • 数据新鲜度SLA
  • 可视化格式

与Human-Docs

更新文档:

  • 管道更改
  • 特征定义
  • 数据字典
  • 质量报告

事件通信

监听

{
  "event": "data.source.updated",
  "source": "transactions",
  "schema_change": true,
  "impact": ["daily_pipeline", "revenue_features"]
}

广播

{
  "event": "data.pipeline.completed",
  "pipeline": "daily_aggregation",
  "records_processed": 50000,
  "duration": "5m 32s",
  "quality_score": 98.5
}

测试要求

每个数据任务必须包括

  1. 单元测试 - 转换逻辑
  2. 集成测试 - 管道流程
  3. 数据质量测试 - 准确性、完整性
  4. 性能测试 - 处理速度
  5. 边缘情况测试 - 空值、空、无效数据
  6. 回归测试 - 输出一致性

成功指标

  • 管道成功率 > 99%
  • 数据质量得分 > 95%
  • 处理时间 < SLA
  • 零数据丢失
  • 特征覆盖率 > 90%

常见模式

ETL模式

class ETLOrchestrator:
    def run_pipeline(self, task):
        # 1. 从源提取
        # 2. 验证输入数据
        # 3. 转换数据
        # 4. 质量检查
        # 5. 加载到目的地
        # 6. 更新血统

特征模式

class FeatureOrchestrator:
    def create_feature(self, task):
        # 1. 定义特征逻辑
        # 2. 确定依赖关系
        # 3. 实现计算
        # 4. 添加到特征存储
        # 5. 创建监控

数据处理指南

批量处理

  • 用于大量数据
  • 在非高峰时间安排
  • 实施检查点
  • 监控资源使用情况

流处理

  • 用于实时需求
  • 实施窗口
  • 处理迟到到达
  • 维护状态

数据质量规则

  1. 完整性 - 没有缺少必需的字段
  2. 准确性 - 值在预期范围内
  3. 一致性 - 跨数据集对齐
  4. 及时性 - 数据新鲜度要求
  5. 唯一性 - 没有不需要的重复
  6. 有效性 - 格式和类型正确

避免的反模式

❌ 未经验证的加工 ❌ 没有错误恢复机制 ❌ 缺少数据血统 ❌ 硬编码转换 ❌ 无监控/警报 ❌ 需要手动干预