协调数据管道任务(ETL、分析、特征工程)。在实施数据采集、转换、质量检查或分析时使用。适用于data-quality-standard.md(最低95%)。
数据协调器技能
角色
充当数据CTO,管理所有数据处理、分析和管道任务。
职责
-
数据管道管理
- ETL/ELT流程
- 数据验证
- 质量保证
- 管道监控
-
分析协调
- 特征工程
- 模型集成
- 报告生成
- 指标计算
-
数据治理
- 架构管理
- 数据血统追踪
- 隐私合规
- 访问控制
-
上下文维护
ai-state/active/data/ ├── pipelines.json # 管道定义 ├── features.json # 特征注册表 ├── quality.json # 数据质量指标 └── tasks/ # 活跃数据任务
技能协调
可用数据技能
etl-skill- 提取、转换、加载操作feature-engineering-skill- 特征创建analytics-skill- 分析和报告quality-skill- 数据质量检查pipeline-skill- 管道编排
上下文包到技能
上下文:
task_id: "task-003-pipeline"
pipelines:
现有: ["daily_aggregation", "customer_segmentation"]
计划: "0 2 * * *"
features:
当前: ["revenue_30d", "churn_risk"]
依赖: ["transactions", "customers"]
standards:
- "data-quality-standard.md"
- "feature-engineering.md"
test_requirements:
质量: ["completeness", "accuracy", "timeliness"]
任务处理流程
-
接收任务
- 确定数据源
- 检查依赖关系
- 验证要求
-
准备上下文
- 当前管道状态
- 特征定义
- 质量指标
-
分配给技能
- 选择数据技能
- 设置参数
- 定义输出
-
监控执行
- 跟踪管道进度
- 监控资源使用情况
- 检查质量门
-
验证结果
- 数据质量检查
- 输出验证
- 性能指标
- 血统追踪
数据特定标准
管道清单
- [ ] 输入验证
- [ ] 错误处理
- [ ] 检查点/恢复
- [ ] 监控启用
- [ ] 文档更新
- [ ] 性能优化
质量清单
- [ ] 完整性检查
- [ ] 准确性验证
- [ ] 一致性规则
- [ ] 及时性指标
- [ ] 唯一性约束
- [ ] 有效性范围
特征工程清单
- [ ] 商业逻辑记录
- [ ] 依赖关系跟踪
- [ ] 版本控制
- [ ] 性能测试
- [ ] 处理边缘情况
- [ ] 添加监控
集成点
与后端协调器
- 数据模型对齐
- API数据合同
- 数据库优化
- 缓存策略
与前端协调器
- 仪表板数据需求
- 实时与批量
- 数据新鲜度SLA
- 可视化格式
与Human-Docs
更新文档:
- 管道更改
- 特征定义
- 数据字典
- 质量报告
事件通信
监听
{
"event": "data.source.updated",
"source": "transactions",
"schema_change": true,
"impact": ["daily_pipeline", "revenue_features"]
}
广播
{
"event": "data.pipeline.completed",
"pipeline": "daily_aggregation",
"records_processed": 50000,
"duration": "5m 32s",
"quality_score": 98.5
}
测试要求
每个数据任务必须包括
- 单元测试 - 转换逻辑
- 集成测试 - 管道流程
- 数据质量测试 - 准确性、完整性
- 性能测试 - 处理速度
- 边缘情况测试 - 空值、空、无效数据
- 回归测试 - 输出一致性
成功指标
- 管道成功率 > 99%
- 数据质量得分 > 95%
- 处理时间 < SLA
- 零数据丢失
- 特征覆盖率 > 90%
常见模式
ETL模式
class ETLOrchestrator:
def run_pipeline(self, task):
# 1. 从源提取
# 2. 验证输入数据
# 3. 转换数据
# 4. 质量检查
# 5. 加载到目的地
# 6. 更新血统
特征模式
class FeatureOrchestrator:
def create_feature(self, task):
# 1. 定义特征逻辑
# 2. 确定依赖关系
# 3. 实现计算
# 4. 添加到特征存储
# 5. 创建监控
数据处理指南
批量处理
- 用于大量数据
- 在非高峰时间安排
- 实施检查点
- 监控资源使用情况
流处理
- 用于实时需求
- 实施窗口
- 处理迟到到达
- 维护状态
数据质量规则
- 完整性 - 没有缺少必需的字段
- 准确性 - 值在预期范围内
- 一致性 - 跨数据集对齐
- 及时性 - 数据新鲜度要求
- 唯一性 - 没有不需要的重复
- 有效性 - 格式和类型正确
避免的反模式
❌ 未经验证的加工 ❌ 没有错误恢复机制 ❌ 缺少数据血统 ❌ 硬编码转换 ❌ 无监控/警报 ❌ 需要手动干预