Starlake配置技能(完整参考)Skill config

这个技能提供了Starlake数据管道配置的完整参考指南,涵盖环境变量配置、JSON模式验证、生产最佳实践等,适用于数据工程师、ETL开发人员和数据架构师,帮助构建高效的数据处理流程。关键词:Starlake,数据工程,ETL,配置,YAML,JSON模式,数据管道,最佳实践。

数据工程 0 次安装 0 次浏览 更新于 3/15/2026

skill_name: config version: 2.0.0 description: 全面的Starlake数据管道配置模式,完整的环境变量目录,JSON模式引用,以及生产就绪最佳实践 tags: [ starlake, data-engineering, etl, yaml, configuration, schema, spark, duckdb, bigquery, snowflake, airflow, dagster, ] author: Starlake Team created: 2026-02-06 updated: 2026-02-06

Starlake配置技能(完整参考)

使用官方JSON模式和生产测试模式从官方文档中提取的专家知识,用于创建和验证Starlake数据管道配置。

概述

Starlake使用YAML配置文件,根据JSON模式进行验证,可用地址:

核心配置文件

文件结构

metadata/
├── application.sl.yml           # 全局应用配置,连接
├── env.sl.yml                   # 全局环境变量
├── env.{ENV}.sl.yml            # 环境特定覆盖(生产、开发等)
├── types/
│   ├── default.sl.yml          # 内置类型定义
│   └── custom.sl.yml           # 自定义类型定义
├── load/
│   └── {domain}/
│       ├── _config.sl.yml      # 域级配置
│       └── {table}.sl.yml      # 表模式
├── transform/
│   └── {domain}/
│       ├── {task}.sl.yml       # 任务配置
│       ├── {task}.sql          # SQL转换
│       └── {task}.py           # Python转换(可选)
├── extract/
│   └── {config}.sl.yml         # JDBC/API提取配置
├── dags/
│   ├── {dag}.sl.yml            # DAG配置
│   └── template/
│       └── {template}.py.j2    # 自定义DAG模板
└── expectations/
    └── {name}.j2               # Jinja数据质量宏

环境变量参考

核心变量

变量 目的 默认值 示例
SL_ROOT 项目根目录 - /projects/100/101
SL_ENV 环境选择器用于环境文件 - DEV, PROD, DUCKDB, BQ
SL_DATASETS 数据集目录位置 {{root}}/datasets /projects/100/101/datasets
SL_METADATA 元数据目录位置 {{root}}/metadata /projects/100/101/metadata
SL_INCOMING 传入文件目录 {{root}}/incoming /projects/100/101/incoming
SL_ARCHIVE 归档处理过的文件 true true / false
SL_FS 文件系统类型 - file://, s3a://, hdfs://
SL_TIMEZONE 日期操作的时区 UTC Europe/Paris, America/New_York

区域特定变量

变量 目的 默认值
SL_AREA_PENDING 待处理文件 pending
SL_AREA_UNRESOLVED 不匹配模式的文件 unresolved
SL_AREA_ARCHIVE 处理过的文件归档 archive
SL_AREA_INGESTING 正在处理的文件 ingesting
SL_AREA_ACCEPTED 有效记录位置 accepted
SL_AREA_REJECTED 无效记录位置 rejected
SL_AREA_BUSINESS 转换结果位置 business
SL_AREA_REPLAY 拒绝记录重放 replay
SL_AREA_HIVE_DATABASE Hive数据库名称模式 ${domain}_${area}

组件特定变量

变量 目的 组件 默认值
SL_METRICS_ACTIVE 启用指标计算 加载/转换 true
SL_HIVE 存储为Hive/Databricks表 Spark false
SL_AUDIT_SINK_TYPE 审计日志目的地 审计 BigQuerySink, FsSink
SL_API_HTTP_PORT API服务器端口 API 11000
SL_API_DOMAIN API服务器域/IP API localhost
SL_UI_PORT UI服务器端口 UI 8080
SL_STARLAKE_PATH Starlake可执行文件路径 编排器 /usr/local/bin/starlake

预定义模板变量(自动生成)

变量 格式 示例 用途
sl_date yyyyMMdd 20260206 文件名模式
sl_datetime yyyyMMddHHmmss 20260206143000 时间戳模式
sl_year yyyy 2026 分区
sl_month MM 02 分区
sl_day dd 06 分区
sl_hour HH 14 时间分区
sl_minute mm 30 时间分区
sl_second ss 00 时间分区
sl_milli SSS 123 精度
sl_epoch_second 自1970年以来的秒数 1738850400 时间戳
sl_epoch_milli 自1970年以来的毫秒数 1738850400000 时间戳

应用配置

完整应用结构

# metadata/application.sl.yml
version: 1
application:
  name: "my-data-platform"
  connectionRef: "{{activeConnection}}"

  # 默认写入格式
  defaultWriteFormat: parquet # parquet, delta, iceberg, json, csv

  # 加载策略类(文件处理顺序)
  loadStrategyClass: "ai.starlake.job.load.IngestionTimeStrategy"

  # SCD2默认列名
  scd2StartTimestamp: "sl_start_ts"
  scd2EndTimestamp: "sl_end_ts"

  # 日期操作的时区
  timezone: "UTC"

  # 存储路径
  datasets: "{{root}}/datasets"
  incoming: "{{root}}/incoming"
  metadata: "{{root}}/metadata"

  # 处理
  loader: native # 或 spark
  grouped: true
  parallelism: 4

  # 区域配置
  area:
    pending: "pending"
    unresolved: "unresolved"
    archive: "archive"
    ingesting: "ingesting"
    accepted: "accepted"
    rejected: "rejected"
    business: "business"
    replay: "replay"
    hiveDatabase: "${domain}_${area}"

  # 审计配置
  audit:
    sink:
      connectionRef: "{{activeConnection}}"

  # 访问策略(BigQuery列级安全)
  accessPolicies:
    apply: true
    location: EU
    taxonomy: RGPD

  # 默认DAG引用用于编排
  dagRef:
    load: "default_load_dag"
    transform: "default_transform_dag"

  # 连接(见连接类型部分)
  connections:
    duckdb-local:
      type: jdbc
      options:
        url: "jdbc:duckdb:{{sl_root_local}}/datasets/duckdb.db"
        driver: "org.duckdb.DuckDBDriver"

    duckdb-s3:
      type: jdbc
      options:
        url: "jdbc:duckdb:{{sl_root_local}}/datasets/duckdb.db"
        driver: "org.duckdb.DuckDBDriver"
        # DuckDB S3扩展选项
        s3_endpoint: "{{S3_ENDPOINT}}"
        s3_access_key_id: "{{S3_ACCESS_KEY}}"
        s3_secret_access_key: "{{S3_SECRET_KEY}}"
        s3_use_ssl: "false"
        s3_url_style: "path"
        s3_region: "us-east-1"

  # 数据质量
  expectations:
    active: true

  # 指标
  metrics:
    active: true
    discreteMaxCardinality: 10
    path: "{{SL_ROOT}}/metrics"

  # Spark配置(如果使用Spark加载器)
  spark:
    # Delta Lake
    sql:
      extensions: "io.delta.sql.DeltaSparkSessionExtension"
      catalog:
        spark_catalog: "org.apache.spark.sql.delta.catalog.DeltaCatalog"

    # Iceberg(额外配置)
    # sql.extensions: "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    # sql.catalog.spark_catalog: org.apache.iceberg.spark.SparkSessionCatalog
    # sql.catalog.spark_catalog.type: hadoop
    # sql.catalog.spark_catalog.warehouse: "{{SL_ROOT}}/warehouse"

    # Hadoop S3A配置(用于Spark S3访问)
    hadoop.fs.s3a.endpoint: "http://localhost:8333"
    hadoop.fs.s3a.access.key: "{{S3_ACCESS_KEY}}"
    hadoop.fs.s3a.secret.key: "{{S3_SECRET_KEY}}"
    hadoop.fs.s3a.path.style.access: "true"
    hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"

连接类型参考

BigQuery

connections:
  bigquery:
    type: "bigquery"
    sparkFormat: "bigquery" # 可选:使用Spark连接器
    options:
      location: "europe-west1" # 或 "us-central1"
      authType: "APPLICATION_DEFAULT" # 最常见
      # authType: "SERVICE_ACCOUNT_JSON_KEYFILE"  # 用于服务账户
      # jsonKeyfile: "/path/to/key.json"
      # authType: "ACCESS_TOKEN"  # 用于短期令牌
      # gcpAccessToken: "TOKEN"
      authScopes: "https://www.googleapis.com/auth/cloud-platform"
      writeMethod: "direct" # 或 "indirect"(需要sparkFormat)
      temporaryGcsBucket: "bucket_name" # 没有gs://前缀

Snowflake

connections:
  snowflake:
    type: jdbc
    sparkFormat: snowflake # 可选:用于Spark操作
    options:
      url: "jdbc:snowflake://{{SNOWFLAKE_ACCOUNT}}.snowflakecomputing.com"
      driver: "net.snowflake.client.jdbc.SnowflakeDriver"
      user: "{{SNOWFLAKE_USER}}"
      password: "{{SNOWFLAKE_PASSWORD}}"
      warehouse: "{{SNOWFLAKE_WAREHOUSE}}"
      db: "{{SNOWFLAKE_DB}}"
      keep_column_case: "off"
      preActions: "ALTER SESSION SET TIMESTAMP_TYPE_MAPPING = 'TIMESTAMP_LTZ'; ALTER SESSION SET QUOTED_IDENTIFIERS_IGNORE_CASE = true"

      # 使用sparkFormat时,用sf-前缀键:
      # sfUrl: "{{SNOWFLAKE_ACCOUNT}}.snowflakecomputing.com"
      # sfUser: "{{SNOWFLAKE_USER}}"
      # sfPassword: "{{SNOWFLAKE_PASSWORD}}"
      # sfWarehouse: "{{SNOWFLAKE_WAREHOUSE}}"
      # sfDatabase: "{{SNOWFLAKE_DB}}"

Amazon Redshift

connections:
  redshift:
    type: jdbc
    sparkFormat: "io.github.spark_redshift_community.spark.redshift"
    # 在Databricks上:sparkFormat: "redshift"
    options:
      url: "jdbc:redshift://account.region.redshift.amazonaws.com:5439/database"
      driver: com.amazon.redshift.Driver
      user: "{{REDSHIFT_USER}}"
      password: "{{REDSHIFT_PASSWORD}}"
      tempdir: "s3a://bucketName/data"
      tempdir_region: "eu-central-1" # 在AWS外需要
      aws_iam_role: "arn:aws:iam::aws_count_id:role/role_name"

PostgreSQL

connections:
  postgresql:
    type: jdbc
    sparkFormat: jdbc # 可选:用于Spark操作
    options:
      url: "jdbc:postgresql://{{POSTGRES_HOST}}:{{POSTGRES_PORT}}/{{POSTGRES_DATABASE}}"
      driver: "org.postgresql.Driver"
      user: "{{DATABASE_USER}}"
      password: "{{DATABASE_PASSWORD}}"
      quoteIdentifiers: false

DuckDB

connections:
  duckdb:
    type: jdbc
    options:
      url: "jdbc:duckdb:{{DUCKDB_PATH}}"
      driver: "org.duckdb.DuckDBDriver"
      user: "{{DATABASE_USER}}"
      password: "{{DATABASE_PASSWORD}}"
      # DuckDB S3扩展
      s3_endpoint: "{{S3_ENDPOINT}}"
      s3_access_key_id: "{{S3_ACCESS_KEY}}"
      s3_secret_access_key: "{{S3_SECRET_KEY}}"
      s3_use_ssl: "false"
      s3_url_style: "path"
      s3_region: "us-east-1"
      # DuckDB自定义主目录
      # SL_DUCKDB_HOME: "{{SL_ROOT}}/.duckdb"

      # DuckDB SECRET自定义主目录
      # SL_DUCKDB_SECRET_HOME: "{{SL_ROOT}}/.duckdb/stored_secrets"


DuckLake

connections:
  duckdb:
    type: jdbc
    options:
      url: "jdbc:duckdb:{{DUCKDB_PATH}}"
      driver: "org.duckdb.DuckDBDriver"
      user: "{{DATABASE_USER}}"
      password: "{{DATABASE_PASSWORD}}"

      # DuckLake(DuckDB元数据存储在PostgreSQL上)
      # 用户应首先有postgres数据库
      # 用户还应用以下SQL命令在DuckDB中创建秘密:
      # CREATE OR REPLACE PERSISTENT SECRET pg_{{SL_DB_ID}}
      #     (TYPE postgres, HOST '{{PG_HOST}}',PORT {{PG_PORT}}, DATABASE {{SL_DB_ID}}, USER '{{PG_USERNAME}}',PASSWORD '{{PG_PASSWORD}}')
      # CREATE OR REPLACE PERSISTENT SECRET {{SL_DB_ID}}
      #     (TYPE ducklake, METADATA_PATH '',DATA_PATH '{{SL_DATA_PATH}}', METADATA_PARAMETERS MAP {'TYPE': 'postgres', 'SECRET': 'pg_{{SL_DB_ID}}'});
      
      preActions: "ATTACH IF NOT EXISTS 'ducklake:{{SL_DB_ID}}' AS {{SL_DB_ID}}; USE {{SL_DB_ID}};"

      # 如果SL_DATA_PATH在S3上,使用DuckDB S3扩展
      s3_endpoint: "{{S3_ENDPOINT}}"
      s3_access_key_id: "{{S3_ACCESS_KEY}}"
      s3_secret_access_key: "{{S3_SECRET_KEY}}"
      s3_use_ssl: "false"
      s3_url_style: "path"
      s3_region: "us-east-1"

      SL_DATA_PATH: "{{SL_ROOT}}/ducklake_data/{{SL_DB_ID}}" # DuckLake数据路径(可在S3或本地)

      # DuckDB自定义主目录
      # SL_DUCKDB_HOME: "{{SL_ROOT}}/.duckdb"
      
      # DuckDB SECRET自定义主目录
      # SL_DUCKDB_SECRET_HOME: "{{SL_ROOT}}/.duckdb/stored_secrets"


Apache Spark(本地/Databricks)

connections:
  spark:
    type: "spark"
    options: {} # 任何spark.*配置可以放在application.spark部分

本地文件系统

connections:
  local:
    type: local

加载配置

域配置(_config.sl.yml)

# metadata/load/sales/_config.sl.yml
load:
  name: "sales"
  directory: "{{root}}/incoming/sales" # 可选:覆盖传入位置
  ack: "ack" # 可选:需要.ack文件进行处理

  # 域级元数据(所有表继承)
  metadata:
    format: DSV
    separator: ","
    withHeader: true
    encoding: "UTF-8"

    writeStrategy:
      type: APPEND

    dagRef: "sales_load_dag" # 可选:自定义编排DAG

表配置(table.sl.yml)

# metadata/load/sales/orders.sl.yml
table:
  pattern: "orders_(?<mode>FULL|INCR)_.*\\.csv" # 命名组用于自适应策略
  comment: "销售订单表"
  primaryKey: ["order_id"]

  metadata:
    format: "DSV"
    encoding: "UTF-8"
    withHeader: true
    separator: ";"
    quote: '"'
    escape: "\\"
    filter: "^[^#].*" # 可选:过滤行(跳过注释)
    ack: "ack" # 可选:需要确认文件
    emptyIsNull: false
    fillWithDefaultValue: false
    loader: "native" # 可选:使用原生数据库加载器

    # 写入策略(见写入策略部分)
    writeStrategy:
      type: "UPSERT_BY_KEY"
      key: ["order_id"]
      timestamp: "updated_at"
      on: TARGET

    # 接收器配置
    sink:
      connectionRef: "bigquery" # 可选:覆盖连接

      # BigQuery:单字段,Spark:字段列表
      partition:
        field: "order_date" # BigQuery
        # - "order_date"     # Spark

      clustering:
        - "customer_id"
        - "status"

      requirePartitionFilter: false # 仅BigQuery
      days: 90 # BigQuery:分区过期天数

      materializedView: false # BigQuery
      enableRefresh: false # BigQuery
      refreshIntervalMs: 3600000 # BigQuery:1小时

      format: "parquet" # Spark:parquet, delta, iceberg
      coalesce: true # Spark:写入前减少分区

      options:
        compression: "snappy" # Spark选项

    # DAG引用(覆盖域级)
    dagRef: "orders_load_dag"

    # Spark CSV选项(见Spark文档)
    options:
      dateFormat: "yyyy-MM-dd"
      timestampFormat: "yyyy-MM-dd HH:mm:ss"

  # 属性(列)- 见属性部分
  attributes:
    - name: "order_id"
      type: "long"
      required: true
      comment: "唯一订单标识符"

    - name: "customer_id"
      type: "long"
      required: true
      foreignKey: "customers.customer_id"

    - name: "order_date"
      type: "date"
      required: true

    - name: "total_amount"
      type: "decimal"
      metric: "continuous" # 或 "discrete"

    - name: "email"
      type: "string"
      privacy: "SHA256" # 隐私哈希

    - name: "credit_card"
      type: "string"
      privacy: "HIDE" # 完全隐藏

    # 嵌套JSON/XML结构
    - name: "address"
      type: "struct"
      array: false
      attributes:
        - name: "street"
          type: "string"
        - name: "city"
          type: "string"
        - name: "zipcode"
          type: "string"

  # 期望(数据质量检查)
  expectations:
    - expect: "is_col_value_not_unique('order_id') => result(0) == 1"
      failOnError: true
    - expect: "is_row_count_to_be_between(1, 1000000) => result(0) == 1"
      failOnError: false

  # 访问控制(表级)
  acl:
    - role: "roles/bigquery.dataViewer" # BigQuery
      grants:
        - "user:user@domain.com"
        - "group:group@domain.com"
        - "serviceAccount:sa@project.iam.gserviceaccount.com"
    # Spark示例:
    # - role: SELECT
    #   grants:
    #     - "user@domain.com"

  # 行级安全
  rls:
    - name: "仅美国"
      predicate: "country = 'USA'"
      grants:
        - "group:usa_team"

    - name: "近期数据"
      predicate: "order_date > CURRENT_DATE - INTERVAL 90 DAY"
      grants:
        - "user:analyst@domain.com"

文件格式选项

格式 描述 配置键
DSV 分隔值(CSV、TSV等) format: DSV
JSON 每行一个JSON对象 format: JSON
JSON_FLAT 扁平JSON(无嵌套/重复) format: JSON_FLAT
JSON_ARRAY 单个对象数组 format: JSON_ARRAY
XML XML文件 format: XML
POSITION 固定宽度位置 format: POSITION

属性配置

attributes:
  - name: "product_id"
    type: "integer"
    required: true
    comment: "产品主键"

    # 可选转换
    rename: "id" # 原始列名(数据库中重命名)
    default: "0" # 如果为空的默认值
    trim: "Both" # 无、左、右、两者
    ignore: false # 跳过加载此列
    script: "UPPER(product_id)" # 转换表达式

    # 关系
    foreignKey: "products.product_id"

    # 指标(用于自动指标计算)
    metric: "continuous" # 或 "discrete"

    # 隐私转换
    privacy: "SHA256" # HIDE, MD5, SHA1, SHA256, SHA512, AES

    # BigQuery列级安全
    accessPolicy: "PII" # 引用BigQuery策略标签

    # 固定宽度格式(POSITION)
    position:
      first: 0
      last: 10

属性类型目录

内置基本类型

类型 基本类型 模式 示例 用例
string string .+ “Hello World” 文本字段
int, integer long [-|\\+|0-9][0-9]* 1234 ID、计数
long long [-|\\+|0-9][0-9]* -64564 大数字
short short -?\\d+ 564 小整数
byte byte . x 单字节
double double [-+]?\\d*\\.?\\d+[Ee]?[-+]?\\d* -45.78 浮点
decimal decimal -?\\d*\\.{0,1}\\d+ -45.787686786876 精确小数
boolean boolean (?i)true|yes|[y1]<-TF->(?i)false|no|[n0] TruE 布尔标志

日期/时间类型

类型 模式/格式 示例 用例
date yyyy-MM-dd 2018-07-21 标准日期
timestamp ISO_DATE_TIME 2019-12-31 23:59:02 时间戳
basic_iso_date yyyyMMdd 20180721 紧凑日期
iso_local_date yyyy-MM-dd 2018-07-21 本地日期
iso_offset_date yyyy-MM-ddXXX 2018-07-21+02:00 带偏移的日期
iso_date yyyy-MM-ddXXX 2018-07-21+02:00 ISO日期
iso_local_date_time yyyy-MM-ddTHH:mm:ss 2018-07-21T14:30:00 本地日期时间
iso_offset_date_time yyyy-MM-ddTHH:mm:ssXXX 2018-07-21T14:30:00+02:00 带偏移的日期时间
iso_zoned_date_time yyyy-MM-ddTHH:mm:ss[VV] 2018-07-21T14:30:00[Europe/Paris] 带时区的日期时间
iso_date_time ISO_DATE_TIME 2018-07-21T14:30:00+02:00 完整ISO日期时间
iso_ordinal_date yyyy-DDD 2018-202 序数日期
iso_week_date YYYY-Www-D 2018-W29-6 周日期
iso_instant yyyy-MM-ddTHH:mm:ss.SSSZ 2018-07-21T14:30:00.000Z UTC瞬间
rfc_1123_date_time RFC 1123 Sat, 21 Jul 2018 14:30:00 GMT HTTP日期

自定义类型

# metadata/types/custom.sl.yml
types:
  - name: "email"
    pattern: "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
    primitiveType: "string"
    sample: "user@example.com"
    comment: "电子邮件地址格式"

  - name: "phone_fr"
    pattern: "^(\\+33|0)[1-9](\\d{2}){4}$"
    primitiveType: "string"
    sample: "+33612345678"
    comment: "法国电话号码"

  - name: "iban"
    pattern: "^[A-Z]{2}\\d{2}[A-Z0-9]{1,30}$"
    primitiveType: "string"
    sample: "FR7630006000011234567890189"
    comment: "IBAN格式"

写入策略

策略比较

策略 描述 必需选项 用例
APPEND 插入所有行 事件日志、事实表
OVERWRITE 替换整个表 暂存、完全刷新
UPSERT_BY_KEY 更新现有,插入新 key, on: TARGET 维度表
UPSERT_BY_KEY_AND_TIMESTAMP 带时间戳检查的Upsert key, timestamp, on: TARGET CDC、增量
OVERWRITE_BY_PARTITION 替换特定分区 需要 sink.partition 分区事实表
DELETE_THEN_INSERT 删除匹配键,然后插入 key 事务更新
SCD2 缓慢变化维度类型2 key, timestamp, startTs, endTs, on: BOTH 历史跟踪
ADAPTATIVE 运行时策略选择 types: { strategy: 'condition' } 动态路由

APPEND策略

writeStrategy:
  type: "APPEND"

用于:事件日志、事务日志、仅追加事实表。

OVERWRITE策略

writeStrategy:
  type: "OVERWRITE"

用于:暂存表、完全刷新场景、临时表。

UPSERT_BY_KEY策略

writeStrategy:
  type: "UPSERT_BY_KEY"
  key: ["customer_id"]
  on: TARGET # 检查目标表中的键

用于:维度表、主数据、配置表。

UPSERT_BY_KEY_AND_TIMESTAMP策略

writeStrategy:
  type: "UPSERT_BY_KEY_AND_TIMESTAMP"
  key: ["order_id"]
  timestamp: "updated_at"
  on: TARGET

用于:变更数据捕获(CDC)、带时间戳的增量更新。

OVERWRITE_BY_PARTITION策略

writeStrategy:
  type: "OVERWRITE_BY_PARTITION"
  on: TARGET

# 需要sink.partition配置
sink:
  partition:
    - "year"
    - "month"
    - "day"

用于:每日/每月分区事实表。

DELETE_THEN_INSERT策略

writeStrategy:
  type: "DELETE_THEN_INSERT"
  key: ["product_id", "store_id"]

用于:事务更新、在插入前清除特定记录。

SCD2策略(缓慢变化维度类型2)

writeStrategy:
  type: "SCD2"
  key: ["customer_id"]
  timestamp: "effective_date"
  startTs: "valid_from" # 可选,默认为application.scd2StartTimestamp
  endTs: "valid_to" # 可选,默认为application.scd2EndTimestamp
  on: BOTH

用于:维度变化的历史跟踪、审计跟踪。

SCD2行为:

  • 插入新版本:valid_from = timestamp, valid_to = NULL
  • 更新前一个版本:valid_to = timestamp
  • 维护变化的完整历史

ADAPTATIVE策略(动态选择)

# 按星期几(周日完全刷新)
writeStrategy:
  types:
    APPEND: 'dayOfWeek != 7'
    OVERWRITE: 'dayOfWeek == 7'

# 按文件名模式(命名组)
table:
  pattern: "orders_(?<mode>FULL|INCR)_.*\\.csv"
  writeStrategy:
    types:
      OVERWRITE: 'group("mode") == "FULL"'
      APPEND: 'group("mode") == "INCR"'

# 按文件大小(如果大则完全加载)
writeStrategy:
  types:
    OVERWRITE: 'fileSizeMo > 100'
    APPEND: 'fileSizeMo <= 100'

# 按日期(每月第一天)
writeStrategy:
  types:
    OVERWRITE: 'isFirstDayOfMonth'
    APPEND: '!isFirstDayOfMonth'

自适应标准:

标准 描述 示例
group(index/name) 从模式捕获组 group(1) == "F", group("mode") == "FULL"
fileSize, fileSizeB 文件大小(字节) fileSize > 1000
fileSizeKo/Mo/Go/To 文件大小(单位) fileSizeMo > 100
isFirstDayOfMonth 当前日期检查 isFirstDayOfMonth
isLastDayOfMonth 当前日期检查 isLastDayOfMonth
dayOfWeek 1-7(周一至周日) dayOfWeek == 7
isFileFirstDayOfMonth 文件修改日期检查 isFileFirstDayOfMonth
isFileLastDayOfMonth 文件修改日期检查 isFileLastDayOfMonth
fileDayOfWeek 文件修改日1-7 fileDayOfWeek == 1

转换配置

任务配置(task.sl.yml)

# metadata/transform/analytics/daily_sales.sl.yml
task:
  name: "daily_sales" # 可选:覆盖文件名
  domain: "analytics" # 可选:覆盖文件夹名
  table: "daily_sales" # 可选:覆盖文件名

  # 写入策略(与加载相同选项)
  writeStrategy:
    type: "OVERWRITE_BY_PARTITION"

  # 接收器配置
  sink:
    connectionRef: "bigquery" # 可选:写入不同数据库
    partition:
      - "report_date"
    clustering:
      - "region"
    format: "delta"
    options:
      compression: "snappy"

  # 读取源数据的连接(与接收器分开)
  connectionRef: "source_connection"

  # SQL解析
  parseSQL: true # 设置为false以使用自定义INSERT/UPDATE/MERGE语句

  # 期望(数据质量)
  expectations:
    - expect: "is_row_count_to_be_between(1, 1000000) => result(0) == 1"
      failOnError: true

  # 访问控制(与加载相同)
  acl:
    - role: SELECT
      grants:
        - "group:analytics_team"

  rls:
    - name: "近期数据"
      predicate: "report_date > CURRENT_DATE - INTERVAL 90 DAY"
      grants:
        - "user:analyst@domain.com"

  # 列描述(用于计算列)
  attributesDesc:
    - name: "total_revenue"
      comment: "所有销售金额的总和"
      accessPolicy: "SENSITIVE" # BigQuery CLS

  # DAG引用
  dagRef: "daily_analytics_dag"

SQL转换(task.sql)

-- metadata/transform/analytics/daily_sales.sql
SELECT
  DATE(order_date) as report_date,
  region,
  COUNT(*) as order_count,
  SUM(total_amount) as total_revenue,
  AVG(total_amount) as avg_order_value
FROM {{sales}}.orders
WHERE order_date BETWEEN '{{sl_start_date}}' AND '{{sl_end_date}}'
GROUP BY DATE(order_date), region
ORDER BY report_date DESC, region

模板变量:

  • {{domain}} - 引用其他域
  • {{sl_start_date}}, {{sl_end_date}} - 增量处理窗口
  • {{table}} - 当前表名(在期望中)

Python转换(task.py

# metadata/transform/analytics/complex_transform.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count

if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()

    # 读取源数据
    df = spark.sql("SELECT * FROM sales.orders WHERE order_date >= CURRENT_DATE - INTERVAL 30 DAYS")

    # 转换逻辑
    result_df = df.groupBy("customer_id").agg(
        count("*").alias("order_count"),
        sum("total_amount").alias("total_spent")
    ).filter(col("order_count") > 5)

    # 必需:创建名为SL_THIS的临时视图
    result_df.createOrReplaceTempView("SL_THIS")

重要: Python转换必须创建名为 SL_THIS 的临时视图。

命令行选项: 通过 --options key1=value1,key2=value2 传递,变为 --key1 value1 --key2 value2


期望(数据质量)

期望语法

expectations:
  - expect: "<query_name>(<params>) => <condition>"
    failOnError: true # 或false以继续警告

内置期望宏

# metadata/expectations/default.j2(Jinja宏)

# 唯一性检查
{% macro is_col_value_not_unique(col, table='SL_THIS') %}
    SELECT max(cnt)
    FROM (SELECT {{ col }}, count(*) as cnt FROM {{ table }}
    GROUP BY {{ col }}
    HAVING cnt > 1)
{% endmacro %}

# 行数范围
{% macro is_row_count_to_be_between(min_value, max_value, table_name = 'SL_THIS') -%}
    SELECT
        CASE
            WHEN count(*) BETWEEN {{min_value}} AND {{max_value}} THEN 1
        ELSE
            0
        END
    FROM {{table_name}}
{%- endmacro %}

# 值计数
{% macro count_by_value(col, value, table='SL_THIS') %}
    SELECT count(*)
    FROM {{ table }}
    WHERE {{ col }} LIKE '{{ value }}'
{% endmacro %}

期望示例

expectations:
  # 唯一性:order_id必须唯一
  - expect: "is_col_value_not_unique('order_id') => result(0) == 1"
    failOnError: true

  # 行数:在100到1百万行之间
  - expect: "is_row_count_to_be_between(100, 1000000) => result(0) == 1"
    failOnError: false

  # 值计数:至少10个美国记录
  - expect: "count_by_value('country', 'USA') => result(0) >= 10"
    failOnError: false

  # 自定义SQL:无负金额
  - expect: "SELECT COUNT(*) FROM SL_THIS WHERE amount < 0 => count == 0"
    failOnError: true

  # 空值检查:email不为空
  - expect: "SELECT COUNT(*) FROM SL_THIS WHERE email IS NULL => count == 0"
    failOnError: true

条件中可用变量

变量 类型 描述
count Long 查询结果中的行数
result Seq[Any] 第一行值(0索引)
results Seq[Seq[Any]] 所有行(用于多行结果)

提取配置

JDBC提取

# metadata/extract/source_db.sl.yml
version: 1
extract:
  connectionRef: "source_postgres"

  jdbcSchemas:
    - schema: "sales"

      # 自定义备注查询(用于DB2等数据库)
      columnRemarks: "SELECT COLUMN_NAME, COLUMN_TEXT FROM SYSCAT.COLUMNS WHERE ..."
      tableRemarks: "SELECT TABLE_TEXT FROM SYSCAT.TABLES WHERE ..."

      # 要提取的表类型
      tableTypes:
        - "TABLE"
        - "VIEW"
        # - "SYSTEM TABLE"
        # - "GLOBAL TEMPORARY"

      # 要提取的表
      tables:
        - name: "*" # 或特定表名或模式

          fullExport: true # false用于增量

          # 增量配置
          partitionColumn: "id" # 用于并行提取
          numPartitions: 4 # 并行级别
          timestamp: "updated_at" # 用于增量跟踪

          # JDBC调优
          fetchSize: 1000 # JDBC获取大小

          # 自定义查询(覆盖表名)
          sql: "SELECT * FROM orders WHERE region = 'EMEA'"

          # 列选择(可选)
          columns:
            - "order_id"
            - "customer_id"
            - "order_date"
            - "total_amount"

提取命令:

# 完全提取
starlake extract --config metadata/extract/source_db.sl.yml

# 增量(使用时间戳跟踪)
starlake extract --config metadata/extract/source_db.sl.yml --incremental

OpenAPI提取

# metadata/extract/api.sl.yml
version: 1
extract:
  openAPI:
    basePath: /api/v2

    domains:
      - name: customers_api

        # 模式过滤(正则)
        schemas:
          exclude:
            - "Model\\.Common\\.Id"
            - "Internal\\..*"

        # 路由选择
        routes:
          - paths:
              include:
                - "/users"
                - "/orders"
                - "/products"

新鲜度监控

使用时间戳列跟踪数据新鲜度:

# 检查特定表的新鲜度
starlake freshness --tables dataset1.table1,dataset2.table2 --persist true

监控表:审计模式中的 SL_LAST_EXPORT


DAG配置(编排)

DAG配置文件

# metadata/dags/sales_load_dag.sl.yml
dag:
  name: "sales_load_dag"
  schedule: "0 2 * * *" # Cron表达式(每天凌晨2点)
  catchup: true # 处理历史运行
  default_pool: "default_pool"
  description: "从SFTP的每日销售数据加载"

  # Airflow特定
  tags:
    - "production"
    - "sales"
    - "daily"

  # 任务的环境变量
  options:
    sl_env_var: '{"SL_ROOT": "${root_path}", "SL_DATASETS": "${root_path}/datasets", "SL_TIMEZONE": "Europe/Paris"}'

  # 加载策略(如何触发加载)
  load:
    strategy: "FILE_SENSOR" # FILE_SENSOR, FILE_SENSOR_DOMAIN, ACK_FILE_SENSOR, NONE
    options:
      incoming_path: "{{SL_ROOT}}/incoming/{{domain}}"
      pending_path: "{{SL_ROOT}}/datasets/pending/{{domain}}"
      global_ack_file_path: "{{SL_ROOT}}/datasets/pending/{{domain}}/{{{{ds}}}}.ack"

  # 自定义模板(可选)
  template:
    file: "custom_template.py.j2" # 相对于metadata/dags/template或绝对路径

DAG分配层次结构

优先级(从低到高):

  1. 项目级: application.dagRef.load / application.dagRef.transform
  2. 域级: load.metadata.dagRef_config.sl.yml
  3. 表级: table.metadata.dagReftable.sl.yml
  4. 转换级: task.dagReftask.sl.yml

较低级别覆盖较高级别。


加载策略

标准策略

策略类 描述 排序
ai.starlake.job.load.IngestionTimeStrategy 按文件修改时间加载 最旧优先
ai.starlake.job.load.IngestionNameStrategy 按字典序文件名顺序加载 字母顺序

配置:

application:
  loadStrategyClass: "ai.starlake.job.load.IngestionNameStrategy"

自定义加载策略

实现 ai.starlake.job.load.LoadStrategy 接口:

package com.mycompany.starlake

import ai.starlake.job.load.LoadStrategy
import ai.starlake.storage.StorageHandler
import org.apache.hadoop.fs.Path
import java.time.LocalDateTime

object CustomLoadStrategy extends LoadStrategy with StrictLogging {
  def list(
    storageHandler: StorageHandler,
    path: Path,
    extension: String = "",
    since: LocalDateTime = LocalDateTime.MIN,
    recursive: Boolean
  ): List[FileInfo] = {
    // 自定义文件排序逻辑
    ???
  }
}
application:
  loadStrategyClass: "com.mycompany.starlake.CustomLoadStrategy"

指标配置

指标类型

指标类型 计算属性
continuous 最小值、最大值、总和、均值、中位数、方差、标准差、偏度、峰度、第25百分位、第75百分位、缺失值、行数
discrete 不同计数、类别频率、类别计数(顶部类别)、行数

配置

# 应用级
application:
  metrics:
    active: true
    discreteMaxCardinality: 10 # 离散指标的最大不同值
    path: "{{SL_ROOT}}/metrics"

# 属性级
attributes:
  - name: "revenue"
    type: "decimal"
    metric: "continuous"

  - name: "product_category"
    type: "string"
    metric: "discrete"

指标存储: 存储在审计表(SL_METRICS)中用于历史跟踪和分析。


存储配置模式

S3/MinIO/SeaweedFS与Spark

# Hadoop S3A配置
spark:
  hadoop.fs.s3a.endpoint: "http://localhost:8333"
  hadoop.fs.s3a.access.key: "{{S3_ACCESS_KEY}}"
  hadoop.fs.s3a.secret.key: "{{S3_SECRET_KEY}}"
  hadoop.fs.s3a.path.style.access: "true"
  hadoop.fs.s3a.connection.ssl.enabled: "false"
  hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"

DuckDB S3扩展

connections:
  duckdb:
    type: jdbc
    options:
      url: "jdbc:duckdb:{{DUCKDB_PATH}}"
      driver: "org.duckdb.DuckDBDriver"

      # DuckDB S3扩展
      preActions: |
        INSTALL httpfs;
        LOAD httpfs;
        SET s3_region='us-east-1';
        SET s3_endpoint='localhost:8333';
        SET s3_access_key_id='{{S3_ACCESS_KEY}}';
        SET s3_secret_access_key='{{S3_SECRET_KEY}}';
        SET s3_use_ssl=false;
        SET s3_url_style='path';

完整示例

示例1:电子商务订单处理

# metadata/application.sl.yml
version: 1
application:
  name: "ecommerce-platform"
  connectionRef: "{{activeConnection}}"
  defaultWriteFormat: delta
  timezone: "UTC"

  connections:
    spark:
      type: spark

  spark:
    sql:
      extensions: "io.delta.sql.DeltaSparkSessionExtension"
      catalog:
        spark_catalog: "org.apache.spark.sql.delta.catalog.DeltaCatalog"

# metadata/load/orders/_config.sl.yml
load:
  name: "orders"
  metadata:
    format: DSV
    separator: ","
    withHeader: true

# metadata/load/orders/transactions.sl.yml
table:
  pattern: "orders_(?<type>FULL|DELTA)_.*\\.csv"
  primaryKey: ["order_id"]

  metadata:
    writeStrategy:
      types:
        OVERWRITE: 'group("type") == "FULL"'
        UPSERT_BY_KEY_AND_TIMESTAMP: 'group("type") == "DELTA"'
      key: ["order_id"]
      timestamp: "updated_at"
      on: TARGET

    sink:
      partition:
        - "order_date"
      clustering:
        - "customer_id"
        - "status"

  attributes:
    - name: "order_id"
      type: "long"
      required: true

    - name: "customer_id"
      type: "long"
      required: true
      foreignKey: "customers.customer_id"

    - name: "order_date"
      type: "date"
      required: true

    - name: "status"
      type: "string"
      required: true
      metric: "discrete"

    - name: "total_amount"
      type: "decimal"
      required: true
      metric: "continuous"

    - name: "customer_email"
      type: "string"
      privacy: "SHA256"

    - name: "updated_at"
      type: "timestamp"
      required: true

  expectations:
    - expect: "is_col_value_not_unique('order_id') => result(0) == 1"
      failOnError: true
    - expect: "SELECT COUNT(*) FROM SL_THIS WHERE total_amount <= 0 => count == 0"
      failOnError: true

# metadata/transform/analytics/daily_revenue.sl.yml
task:
  domain: "analytics"
  table: "daily_revenue"

  writeStrategy:
    type: "OVERWRITE_BY_PARTITION"

  sink:
    partition:
      - "report_date"

  attributesDesc:
    - name: "total_revenue"
      comment: "所有订单金额的总和"

# metadata/transform/analytics/daily_revenue.sql
SELECT
  DATE(order_date) as report_date,
  COUNT(*) as order_count,
  SUM(total_amount) as total_revenue,
  AVG(total_amount) as avg_order_value,
  COUNT(DISTINCT customer_id) as unique_customers
FROM {{orders}}.transactions
WHERE order_date = '{{sl_date}}'
  AND status IN ('COMPLETED', 'SHIPPED')
GROUP BY DATE(order_date)

示例2:SCD2客户维度

# metadata/load/customers/customer_master.sl.yml
table:
  pattern: "customers_.*\\.csv"
  primaryKey: ["customer_id"]

  metadata:
    format: DSV
    separator: ","
    withHeader: true

    writeStrategy:
      type: "SCD2"
      key: ["customer_id"]
      timestamp: "effective_date"
      startTs: "valid_from"
      endTs: "valid_to"
      on: BOTH

  attributes:
    - name: "customer_id"
      type: "long"
      required: true

    - name: "customer_name"
      type: "string"
      required: true

    - name: "email"
      type: "string"
      privacy: "SHA256"

    - name: "address"
      type: "string"

    - name: "city"
      type: "string"

    - name: "country"
      type: "string"
      metric: "discrete"

    - name: "tier"
      type: "string"
      metric: "discrete"

    - name: "effective_date"
      type: "date"
      required: true

    - name: "valid_from"
      type: "timestamp"
      comment: "SCD2开始时间戳(自动填充)"

    - name: "valid_to"
      type: "timestamp"
      comment: "SCD2结束时间戳(自动填充)"

结果: 维护客户变化的完整历史:

  • 当前行:valid_to = NULL
  • 历史行:valid_to 设置为变更时间戳

最佳实践

1. 处处使用变量替换

好:

url: "jdbc:postgresql://{{PG_HOST}}:{{PG_PORT}}/{{PG_DB}}"
incoming: "{{SL_ROOT}}/incoming/{{domain}}"

坏:

url: "jdbc:postgresql://localhost:5432/mydb"
incoming: "/projects/100/101/incoming/sales"

2. 分离环境配置

# env.sl.yml(全局)
env:
  root: "/opt/starlake"
  activeConnection: "duckdb"
  PG_HOST: "localhost"

# env.PROD.sl.yml(生产覆盖)
env:
  root: "/data/production/starlake"
  activeConnection: "bigquery"
  PG_HOST: "${POSTGRES_HOST}"  # 从环境变量注入

3. 定义可重用的自定义类型

# types/custom.sl.yml
types:
  - name: "product_sku"
    pattern: "^[A-Z]{3}-\\d{6}$"
    primitiveType: string
    sample: "PRD-123456"

  - name: "iso_country_code"
    pattern: "^[A-Z]{2}$"
    primitiveType: string
    sample: "US"

4. 分层期望用于数据质量

expectations:
  # 关键:出错时失败
  - expect: "is_col_value_not_unique('id') => result(0) == 1"
    failOnError: true

  # 警告:记录但继续
  - expect: "is_row_count_to_be_between(100, 1000000) => result(0) == 1"
    failOnError: false

5. 按用例选择写入策略

表类型 推荐策略 原因
维度(主数据) UPSERT_BY_KEY 保持最新版本
维度(带历史) SCD2 随时间跟踪变化
事实(分区) OVERWRITE_BY_PARTITION 替换每日/每月
事实(仅追加) APPEND 事件日志、事务
暂存 OVERWRITE 临时、完全刷新

6. 尽早应用隐私转换

attributes:
  - name: "ssn"
    type: "string"
    privacy: "HIDE" # 永不存储

  - name: "email"
    type: "string"
    privacy: "SHA256" # 单向哈希

  - name: "ip_address"
    type: "string"
    privacy: "MD5" # 匿名化

7. 分区大表

# BigQuery
sink:
  partition:
    field: "event_date"
  clustering:
    - "user_id"
    - "event_type"
  requirePartitionFilter: true  # 强制分区剪枝

# Spark
sink:
  partition:
    - "year"
    - "month"
    - "day"

8. 用注释和标签记录

table:
  name: "orders"
  comment: "来自Shopify API的电子商务订单事务"
  tags: ["revenue", "critical", "daily", "pii"]

  attributes:
    - name: "order_id"
      comment: "来自Shopify的唯一订单标识符"

验证

CLI验证

# 验证单个文件
starlake validate --config metadata/application.sl.yml

# 验证所有元数据
starlake validate --all

# 验证特定域
starlake validate --domain sales

IDE集成(VS Code)

添加到 .vscode/settings.json

{
  "yaml.schemas": {
    "https://json.schemastore.org/starlake.json": [
      "metadata/**/*.sl.yml",
      "**/metadata/**/*.sl.yml"
    ]
  }
}

好处:

  • 实时验证键入时
  • 配置键的自动完成
  • 内联文档工具提示

故障排除

常见问题

1. 模式验证错误

错误: Missing required property: version

修复: 始终在配置文件顶部包含 version: 1

version: 1
application: ...

2. 连接失败

错误: Connection refused to postgres:5432

检查:

  • 网络连接:telnet postgres 5432
  • env.sl.yml 中的凭据
  • 类路径中JDBC驱动程序的可用性

3. 写入策略冲突

错误: Key column 'order_id' not found

修复: 确保键列存在于属性中:

writeStrategy:
  type: UPSERT_BY_KEY
  key: ["order_id"] # 必须在属性列表中

attributes:
  - name: "order_id" # ✓ 存在
    type: "long"
    required: true

4. S3访问问题

错误: Status Code: 403; Error Code: AccessDenied

检查:

  • S3凭据正确
  • 端点URL格式:http://host:port(无尾随斜杠)
  • 存储桶权限允许读/写
  • S3路径样式:path vs virtual-hosted

5. 分区列不匹配

错误: Partition column 'date' not found in schema

修复: 分区列必须存在于属性中:

sink:
  partition:
    - "order_date"

attributes:
  - name: "order_date" # ✓ 必须存在
    type: "date"
    required: true

资源


版本历史

  • 1.0.0 (2026-02-06): 具有核心模式和JSON模式参考的初始版本
  • 2.0.0 (2026-02-06): 完整更新,包括完整环境变量目录、所有连接类型、自适应写入策略、完整属性类型、期望框架、指标、提取配置、DAG模式,以及来自官方文档的生产就绪示例