skill_name: config version: 2.0.0 description: 全面的Starlake数据管道配置模式,完整的环境变量目录,JSON模式引用,以及生产就绪最佳实践 tags: [ starlake, data-engineering, etl, yaml, configuration, schema, spark, duckdb, bigquery, snowflake, airflow, dagster, ] author: Starlake Team created: 2026-02-06 updated: 2026-02-06
Starlake配置技能(完整参考)
使用官方JSON模式和生产测试模式从官方文档中提取的专家知识,用于创建和验证Starlake数据管道配置。
概述
Starlake使用YAML配置文件,根据JSON模式进行验证,可用地址:
- 模式URL: https://www.schemastore.org/starlake.json
- 模式ID:
https://json.schemastore.org/starlake.json - 草案版本: JSON模式草案-07
核心配置文件
文件结构
metadata/
├── application.sl.yml # 全局应用配置,连接
├── env.sl.yml # 全局环境变量
├── env.{ENV}.sl.yml # 环境特定覆盖(生产、开发等)
├── types/
│ ├── default.sl.yml # 内置类型定义
│ └── custom.sl.yml # 自定义类型定义
├── load/
│ └── {domain}/
│ ├── _config.sl.yml # 域级配置
│ └── {table}.sl.yml # 表模式
├── transform/
│ └── {domain}/
│ ├── {task}.sl.yml # 任务配置
│ ├── {task}.sql # SQL转换
│ └── {task}.py # Python转换(可选)
├── extract/
│ └── {config}.sl.yml # JDBC/API提取配置
├── dags/
│ ├── {dag}.sl.yml # DAG配置
│ └── template/
│ └── {template}.py.j2 # 自定义DAG模板
└── expectations/
└── {name}.j2 # Jinja数据质量宏
环境变量参考
核心变量
| 变量 | 目的 | 默认值 | 示例 |
|---|---|---|---|
SL_ROOT |
项目根目录 | - | /projects/100/101 |
SL_ENV |
环境选择器用于环境文件 | - | DEV, PROD, DUCKDB, BQ |
SL_DATASETS |
数据集目录位置 | {{root}}/datasets |
/projects/100/101/datasets |
SL_METADATA |
元数据目录位置 | {{root}}/metadata |
/projects/100/101/metadata |
SL_INCOMING |
传入文件目录 | {{root}}/incoming |
/projects/100/101/incoming |
SL_ARCHIVE |
归档处理过的文件 | true |
true / false |
SL_FS |
文件系统类型 | - | file://, s3a://, hdfs:// |
SL_TIMEZONE |
日期操作的时区 | UTC |
Europe/Paris, America/New_York |
区域特定变量
| 变量 | 目的 | 默认值 |
|---|---|---|
SL_AREA_PENDING |
待处理文件 | pending |
SL_AREA_UNRESOLVED |
不匹配模式的文件 | unresolved |
SL_AREA_ARCHIVE |
处理过的文件归档 | archive |
SL_AREA_INGESTING |
正在处理的文件 | ingesting |
SL_AREA_ACCEPTED |
有效记录位置 | accepted |
SL_AREA_REJECTED |
无效记录位置 | rejected |
SL_AREA_BUSINESS |
转换结果位置 | business |
SL_AREA_REPLAY |
拒绝记录重放 | replay |
SL_AREA_HIVE_DATABASE |
Hive数据库名称模式 | ${domain}_${area} |
组件特定变量
| 变量 | 目的 | 组件 | 默认值 |
|---|---|---|---|
SL_METRICS_ACTIVE |
启用指标计算 | 加载/转换 | true |
SL_HIVE |
存储为Hive/Databricks表 | Spark | false |
SL_AUDIT_SINK_TYPE |
审计日志目的地 | 审计 | BigQuerySink, FsSink |
SL_API_HTTP_PORT |
API服务器端口 | API | 11000 |
SL_API_DOMAIN |
API服务器域/IP | API | localhost |
SL_UI_PORT |
UI服务器端口 | UI | 8080 |
SL_STARLAKE_PATH |
Starlake可执行文件路径 | 编排器 | /usr/local/bin/starlake |
预定义模板变量(自动生成)
| 变量 | 格式 | 示例 | 用途 |
|---|---|---|---|
sl_date |
yyyyMMdd | 20260206 |
文件名模式 |
sl_datetime |
yyyyMMddHHmmss | 20260206143000 |
时间戳模式 |
sl_year |
yyyy | 2026 |
分区 |
sl_month |
MM | 02 |
分区 |
sl_day |
dd | 06 |
分区 |
sl_hour |
HH | 14 |
时间分区 |
sl_minute |
mm | 30 |
时间分区 |
sl_second |
ss | 00 |
时间分区 |
sl_milli |
SSS | 123 |
精度 |
sl_epoch_second |
自1970年以来的秒数 | 1738850400 |
时间戳 |
sl_epoch_milli |
自1970年以来的毫秒数 | 1738850400000 |
时间戳 |
应用配置
完整应用结构
# metadata/application.sl.yml
version: 1
application:
name: "my-data-platform"
connectionRef: "{{activeConnection}}"
# 默认写入格式
defaultWriteFormat: parquet # parquet, delta, iceberg, json, csv
# 加载策略类(文件处理顺序)
loadStrategyClass: "ai.starlake.job.load.IngestionTimeStrategy"
# SCD2默认列名
scd2StartTimestamp: "sl_start_ts"
scd2EndTimestamp: "sl_end_ts"
# 日期操作的时区
timezone: "UTC"
# 存储路径
datasets: "{{root}}/datasets"
incoming: "{{root}}/incoming"
metadata: "{{root}}/metadata"
# 处理
loader: native # 或 spark
grouped: true
parallelism: 4
# 区域配置
area:
pending: "pending"
unresolved: "unresolved"
archive: "archive"
ingesting: "ingesting"
accepted: "accepted"
rejected: "rejected"
business: "business"
replay: "replay"
hiveDatabase: "${domain}_${area}"
# 审计配置
audit:
sink:
connectionRef: "{{activeConnection}}"
# 访问策略(BigQuery列级安全)
accessPolicies:
apply: true
location: EU
taxonomy: RGPD
# 默认DAG引用用于编排
dagRef:
load: "default_load_dag"
transform: "default_transform_dag"
# 连接(见连接类型部分)
connections:
duckdb-local:
type: jdbc
options:
url: "jdbc:duckdb:{{sl_root_local}}/datasets/duckdb.db"
driver: "org.duckdb.DuckDBDriver"
duckdb-s3:
type: jdbc
options:
url: "jdbc:duckdb:{{sl_root_local}}/datasets/duckdb.db"
driver: "org.duckdb.DuckDBDriver"
# DuckDB S3扩展选项
s3_endpoint: "{{S3_ENDPOINT}}"
s3_access_key_id: "{{S3_ACCESS_KEY}}"
s3_secret_access_key: "{{S3_SECRET_KEY}}"
s3_use_ssl: "false"
s3_url_style: "path"
s3_region: "us-east-1"
# 数据质量
expectations:
active: true
# 指标
metrics:
active: true
discreteMaxCardinality: 10
path: "{{SL_ROOT}}/metrics"
# Spark配置(如果使用Spark加载器)
spark:
# Delta Lake
sql:
extensions: "io.delta.sql.DeltaSparkSessionExtension"
catalog:
spark_catalog: "org.apache.spark.sql.delta.catalog.DeltaCatalog"
# Iceberg(额外配置)
# sql.extensions: "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
# sql.catalog.spark_catalog: org.apache.iceberg.spark.SparkSessionCatalog
# sql.catalog.spark_catalog.type: hadoop
# sql.catalog.spark_catalog.warehouse: "{{SL_ROOT}}/warehouse"
# Hadoop S3A配置(用于Spark S3访问)
hadoop.fs.s3a.endpoint: "http://localhost:8333"
hadoop.fs.s3a.access.key: "{{S3_ACCESS_KEY}}"
hadoop.fs.s3a.secret.key: "{{S3_SECRET_KEY}}"
hadoop.fs.s3a.path.style.access: "true"
hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
连接类型参考
BigQuery
connections:
bigquery:
type: "bigquery"
sparkFormat: "bigquery" # 可选:使用Spark连接器
options:
location: "europe-west1" # 或 "us-central1"
authType: "APPLICATION_DEFAULT" # 最常见
# authType: "SERVICE_ACCOUNT_JSON_KEYFILE" # 用于服务账户
# jsonKeyfile: "/path/to/key.json"
# authType: "ACCESS_TOKEN" # 用于短期令牌
# gcpAccessToken: "TOKEN"
authScopes: "https://www.googleapis.com/auth/cloud-platform"
writeMethod: "direct" # 或 "indirect"(需要sparkFormat)
temporaryGcsBucket: "bucket_name" # 没有gs://前缀
Snowflake
connections:
snowflake:
type: jdbc
sparkFormat: snowflake # 可选:用于Spark操作
options:
url: "jdbc:snowflake://{{SNOWFLAKE_ACCOUNT}}.snowflakecomputing.com"
driver: "net.snowflake.client.jdbc.SnowflakeDriver"
user: "{{SNOWFLAKE_USER}}"
password: "{{SNOWFLAKE_PASSWORD}}"
warehouse: "{{SNOWFLAKE_WAREHOUSE}}"
db: "{{SNOWFLAKE_DB}}"
keep_column_case: "off"
preActions: "ALTER SESSION SET TIMESTAMP_TYPE_MAPPING = 'TIMESTAMP_LTZ'; ALTER SESSION SET QUOTED_IDENTIFIERS_IGNORE_CASE = true"
# 使用sparkFormat时,用sf-前缀键:
# sfUrl: "{{SNOWFLAKE_ACCOUNT}}.snowflakecomputing.com"
# sfUser: "{{SNOWFLAKE_USER}}"
# sfPassword: "{{SNOWFLAKE_PASSWORD}}"
# sfWarehouse: "{{SNOWFLAKE_WAREHOUSE}}"
# sfDatabase: "{{SNOWFLAKE_DB}}"
Amazon Redshift
connections:
redshift:
type: jdbc
sparkFormat: "io.github.spark_redshift_community.spark.redshift"
# 在Databricks上:sparkFormat: "redshift"
options:
url: "jdbc:redshift://account.region.redshift.amazonaws.com:5439/database"
driver: com.amazon.redshift.Driver
user: "{{REDSHIFT_USER}}"
password: "{{REDSHIFT_PASSWORD}}"
tempdir: "s3a://bucketName/data"
tempdir_region: "eu-central-1" # 在AWS外需要
aws_iam_role: "arn:aws:iam::aws_count_id:role/role_name"
PostgreSQL
connections:
postgresql:
type: jdbc
sparkFormat: jdbc # 可选:用于Spark操作
options:
url: "jdbc:postgresql://{{POSTGRES_HOST}}:{{POSTGRES_PORT}}/{{POSTGRES_DATABASE}}"
driver: "org.postgresql.Driver"
user: "{{DATABASE_USER}}"
password: "{{DATABASE_PASSWORD}}"
quoteIdentifiers: false
DuckDB
connections:
duckdb:
type: jdbc
options:
url: "jdbc:duckdb:{{DUCKDB_PATH}}"
driver: "org.duckdb.DuckDBDriver"
user: "{{DATABASE_USER}}"
password: "{{DATABASE_PASSWORD}}"
# DuckDB S3扩展
s3_endpoint: "{{S3_ENDPOINT}}"
s3_access_key_id: "{{S3_ACCESS_KEY}}"
s3_secret_access_key: "{{S3_SECRET_KEY}}"
s3_use_ssl: "false"
s3_url_style: "path"
s3_region: "us-east-1"
# DuckDB自定义主目录
# SL_DUCKDB_HOME: "{{SL_ROOT}}/.duckdb"
# DuckDB SECRET自定义主目录
# SL_DUCKDB_SECRET_HOME: "{{SL_ROOT}}/.duckdb/stored_secrets"
DuckLake
connections:
duckdb:
type: jdbc
options:
url: "jdbc:duckdb:{{DUCKDB_PATH}}"
driver: "org.duckdb.DuckDBDriver"
user: "{{DATABASE_USER}}"
password: "{{DATABASE_PASSWORD}}"
# DuckLake(DuckDB元数据存储在PostgreSQL上)
# 用户应首先有postgres数据库
# 用户还应用以下SQL命令在DuckDB中创建秘密:
# CREATE OR REPLACE PERSISTENT SECRET pg_{{SL_DB_ID}}
# (TYPE postgres, HOST '{{PG_HOST}}',PORT {{PG_PORT}}, DATABASE {{SL_DB_ID}}, USER '{{PG_USERNAME}}',PASSWORD '{{PG_PASSWORD}}')
# CREATE OR REPLACE PERSISTENT SECRET {{SL_DB_ID}}
# (TYPE ducklake, METADATA_PATH '',DATA_PATH '{{SL_DATA_PATH}}', METADATA_PARAMETERS MAP {'TYPE': 'postgres', 'SECRET': 'pg_{{SL_DB_ID}}'});
preActions: "ATTACH IF NOT EXISTS 'ducklake:{{SL_DB_ID}}' AS {{SL_DB_ID}}; USE {{SL_DB_ID}};"
# 如果SL_DATA_PATH在S3上,使用DuckDB S3扩展
s3_endpoint: "{{S3_ENDPOINT}}"
s3_access_key_id: "{{S3_ACCESS_KEY}}"
s3_secret_access_key: "{{S3_SECRET_KEY}}"
s3_use_ssl: "false"
s3_url_style: "path"
s3_region: "us-east-1"
SL_DATA_PATH: "{{SL_ROOT}}/ducklake_data/{{SL_DB_ID}}" # DuckLake数据路径(可在S3或本地)
# DuckDB自定义主目录
# SL_DUCKDB_HOME: "{{SL_ROOT}}/.duckdb"
# DuckDB SECRET自定义主目录
# SL_DUCKDB_SECRET_HOME: "{{SL_ROOT}}/.duckdb/stored_secrets"
Apache Spark(本地/Databricks)
connections:
spark:
type: "spark"
options: {} # 任何spark.*配置可以放在application.spark部分
本地文件系统
connections:
local:
type: local
加载配置
域配置(_config.sl.yml)
# metadata/load/sales/_config.sl.yml
load:
name: "sales"
directory: "{{root}}/incoming/sales" # 可选:覆盖传入位置
ack: "ack" # 可选:需要.ack文件进行处理
# 域级元数据(所有表继承)
metadata:
format: DSV
separator: ","
withHeader: true
encoding: "UTF-8"
writeStrategy:
type: APPEND
dagRef: "sales_load_dag" # 可选:自定义编排DAG
表配置(table.sl.yml)
# metadata/load/sales/orders.sl.yml
table:
pattern: "orders_(?<mode>FULL|INCR)_.*\\.csv" # 命名组用于自适应策略
comment: "销售订单表"
primaryKey: ["order_id"]
metadata:
format: "DSV"
encoding: "UTF-8"
withHeader: true
separator: ";"
quote: '"'
escape: "\\"
filter: "^[^#].*" # 可选:过滤行(跳过注释)
ack: "ack" # 可选:需要确认文件
emptyIsNull: false
fillWithDefaultValue: false
loader: "native" # 可选:使用原生数据库加载器
# 写入策略(见写入策略部分)
writeStrategy:
type: "UPSERT_BY_KEY"
key: ["order_id"]
timestamp: "updated_at"
on: TARGET
# 接收器配置
sink:
connectionRef: "bigquery" # 可选:覆盖连接
# BigQuery:单字段,Spark:字段列表
partition:
field: "order_date" # BigQuery
# - "order_date" # Spark
clustering:
- "customer_id"
- "status"
requirePartitionFilter: false # 仅BigQuery
days: 90 # BigQuery:分区过期天数
materializedView: false # BigQuery
enableRefresh: false # BigQuery
refreshIntervalMs: 3600000 # BigQuery:1小时
format: "parquet" # Spark:parquet, delta, iceberg
coalesce: true # Spark:写入前减少分区
options:
compression: "snappy" # Spark选项
# DAG引用(覆盖域级)
dagRef: "orders_load_dag"
# Spark CSV选项(见Spark文档)
options:
dateFormat: "yyyy-MM-dd"
timestampFormat: "yyyy-MM-dd HH:mm:ss"
# 属性(列)- 见属性部分
attributes:
- name: "order_id"
type: "long"
required: true
comment: "唯一订单标识符"
- name: "customer_id"
type: "long"
required: true
foreignKey: "customers.customer_id"
- name: "order_date"
type: "date"
required: true
- name: "total_amount"
type: "decimal"
metric: "continuous" # 或 "discrete"
- name: "email"
type: "string"
privacy: "SHA256" # 隐私哈希
- name: "credit_card"
type: "string"
privacy: "HIDE" # 完全隐藏
# 嵌套JSON/XML结构
- name: "address"
type: "struct"
array: false
attributes:
- name: "street"
type: "string"
- name: "city"
type: "string"
- name: "zipcode"
type: "string"
# 期望(数据质量检查)
expectations:
- expect: "is_col_value_not_unique('order_id') => result(0) == 1"
failOnError: true
- expect: "is_row_count_to_be_between(1, 1000000) => result(0) == 1"
failOnError: false
# 访问控制(表级)
acl:
- role: "roles/bigquery.dataViewer" # BigQuery
grants:
- "user:user@domain.com"
- "group:group@domain.com"
- "serviceAccount:sa@project.iam.gserviceaccount.com"
# Spark示例:
# - role: SELECT
# grants:
# - "user@domain.com"
# 行级安全
rls:
- name: "仅美国"
predicate: "country = 'USA'"
grants:
- "group:usa_team"
- name: "近期数据"
predicate: "order_date > CURRENT_DATE - INTERVAL 90 DAY"
grants:
- "user:analyst@domain.com"
文件格式选项
| 格式 | 描述 | 配置键 |
|---|---|---|
| DSV | 分隔值(CSV、TSV等) | format: DSV |
| JSON | 每行一个JSON对象 | format: JSON |
| JSON_FLAT | 扁平JSON(无嵌套/重复) | format: JSON_FLAT |
| JSON_ARRAY | 单个对象数组 | format: JSON_ARRAY |
| XML | XML文件 | format: XML |
| POSITION | 固定宽度位置 | format: POSITION |
属性配置
attributes:
- name: "product_id"
type: "integer"
required: true
comment: "产品主键"
# 可选转换
rename: "id" # 原始列名(数据库中重命名)
default: "0" # 如果为空的默认值
trim: "Both" # 无、左、右、两者
ignore: false # 跳过加载此列
script: "UPPER(product_id)" # 转换表达式
# 关系
foreignKey: "products.product_id"
# 指标(用于自动指标计算)
metric: "continuous" # 或 "discrete"
# 隐私转换
privacy: "SHA256" # HIDE, MD5, SHA1, SHA256, SHA512, AES
# BigQuery列级安全
accessPolicy: "PII" # 引用BigQuery策略标签
# 固定宽度格式(POSITION)
position:
first: 0
last: 10
属性类型目录
内置基本类型
| 类型 | 基本类型 | 模式 | 示例 | 用例 |
|---|---|---|---|---|
string |
string | .+ |
“Hello World” | 文本字段 |
int, integer |
long | [-|\\+|0-9][0-9]* |
1234 |
ID、计数 |
long |
long | [-|\\+|0-9][0-9]* |
-64564 |
大数字 |
short |
short | -?\\d+ |
564 |
小整数 |
byte |
byte | . |
x |
单字节 |
double |
double | [-+]?\\d*\\.?\\d+[Ee]?[-+]?\\d* |
-45.78 |
浮点 |
decimal |
decimal | -?\\d*\\.{0,1}\\d+ |
-45.787686786876 |
精确小数 |
boolean |
boolean | (?i)true|yes|[y1]<-TF->(?i)false|no|[n0] |
TruE |
布尔标志 |
日期/时间类型
| 类型 | 模式/格式 | 示例 | 用例 |
|---|---|---|---|
date |
yyyy-MM-dd | 2018-07-21 |
标准日期 |
timestamp |
ISO_DATE_TIME | 2019-12-31 23:59:02 |
时间戳 |
basic_iso_date |
yyyyMMdd | 20180721 |
紧凑日期 |
iso_local_date |
yyyy-MM-dd | 2018-07-21 |
本地日期 |
iso_offset_date |
yyyy-MM-ddXXX | 2018-07-21+02:00 |
带偏移的日期 |
iso_date |
yyyy-MM-ddXXX | 2018-07-21+02:00 |
ISO日期 |
iso_local_date_time |
yyyy-MM-ddTHH:mm:ss | 2018-07-21T14:30:00 |
本地日期时间 |
iso_offset_date_time |
yyyy-MM-ddTHH:mm:ssXXX | 2018-07-21T14:30:00+02:00 |
带偏移的日期时间 |
iso_zoned_date_time |
yyyy-MM-ddTHH:mm:ss[VV] | 2018-07-21T14:30:00[Europe/Paris] |
带时区的日期时间 |
iso_date_time |
ISO_DATE_TIME | 2018-07-21T14:30:00+02:00 |
完整ISO日期时间 |
iso_ordinal_date |
yyyy-DDD | 2018-202 |
序数日期 |
iso_week_date |
YYYY-Www-D | 2018-W29-6 |
周日期 |
iso_instant |
yyyy-MM-ddTHH:mm:ss.SSSZ | 2018-07-21T14:30:00.000Z |
UTC瞬间 |
rfc_1123_date_time |
RFC 1123 | Sat, 21 Jul 2018 14:30:00 GMT |
HTTP日期 |
自定义类型
# metadata/types/custom.sl.yml
types:
- name: "email"
pattern: "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
primitiveType: "string"
sample: "user@example.com"
comment: "电子邮件地址格式"
- name: "phone_fr"
pattern: "^(\\+33|0)[1-9](\\d{2}){4}$"
primitiveType: "string"
sample: "+33612345678"
comment: "法国电话号码"
- name: "iban"
pattern: "^[A-Z]{2}\\d{2}[A-Z0-9]{1,30}$"
primitiveType: "string"
sample: "FR7630006000011234567890189"
comment: "IBAN格式"
写入策略
策略比较
| 策略 | 描述 | 必需选项 | 用例 |
|---|---|---|---|
| APPEND | 插入所有行 | 无 | 事件日志、事实表 |
| OVERWRITE | 替换整个表 | 无 | 暂存、完全刷新 |
| UPSERT_BY_KEY | 更新现有,插入新 | key, on: TARGET |
维度表 |
| UPSERT_BY_KEY_AND_TIMESTAMP | 带时间戳检查的Upsert | key, timestamp, on: TARGET |
CDC、增量 |
| OVERWRITE_BY_PARTITION | 替换特定分区 | 需要 sink.partition |
分区事实表 |
| DELETE_THEN_INSERT | 删除匹配键,然后插入 | key |
事务更新 |
| SCD2 | 缓慢变化维度类型2 | key, timestamp, startTs, endTs, on: BOTH |
历史跟踪 |
| ADAPTATIVE | 运行时策略选择 | types: { strategy: 'condition' } |
动态路由 |
APPEND策略
writeStrategy:
type: "APPEND"
用于:事件日志、事务日志、仅追加事实表。
OVERWRITE策略
writeStrategy:
type: "OVERWRITE"
用于:暂存表、完全刷新场景、临时表。
UPSERT_BY_KEY策略
writeStrategy:
type: "UPSERT_BY_KEY"
key: ["customer_id"]
on: TARGET # 检查目标表中的键
用于:维度表、主数据、配置表。
UPSERT_BY_KEY_AND_TIMESTAMP策略
writeStrategy:
type: "UPSERT_BY_KEY_AND_TIMESTAMP"
key: ["order_id"]
timestamp: "updated_at"
on: TARGET
用于:变更数据捕获(CDC)、带时间戳的增量更新。
OVERWRITE_BY_PARTITION策略
writeStrategy:
type: "OVERWRITE_BY_PARTITION"
on: TARGET
# 需要sink.partition配置
sink:
partition:
- "year"
- "month"
- "day"
用于:每日/每月分区事实表。
DELETE_THEN_INSERT策略
writeStrategy:
type: "DELETE_THEN_INSERT"
key: ["product_id", "store_id"]
用于:事务更新、在插入前清除特定记录。
SCD2策略(缓慢变化维度类型2)
writeStrategy:
type: "SCD2"
key: ["customer_id"]
timestamp: "effective_date"
startTs: "valid_from" # 可选,默认为application.scd2StartTimestamp
endTs: "valid_to" # 可选,默认为application.scd2EndTimestamp
on: BOTH
用于:维度变化的历史跟踪、审计跟踪。
SCD2行为:
- 插入新版本:
valid_from = timestamp,valid_to = NULL - 更新前一个版本:
valid_to = timestamp - 维护变化的完整历史
ADAPTATIVE策略(动态选择)
# 按星期几(周日完全刷新)
writeStrategy:
types:
APPEND: 'dayOfWeek != 7'
OVERWRITE: 'dayOfWeek == 7'
# 按文件名模式(命名组)
table:
pattern: "orders_(?<mode>FULL|INCR)_.*\\.csv"
writeStrategy:
types:
OVERWRITE: 'group("mode") == "FULL"'
APPEND: 'group("mode") == "INCR"'
# 按文件大小(如果大则完全加载)
writeStrategy:
types:
OVERWRITE: 'fileSizeMo > 100'
APPEND: 'fileSizeMo <= 100'
# 按日期(每月第一天)
writeStrategy:
types:
OVERWRITE: 'isFirstDayOfMonth'
APPEND: '!isFirstDayOfMonth'
自适应标准:
| 标准 | 描述 | 示例 |
|---|---|---|
group(index/name) |
从模式捕获组 | group(1) == "F", group("mode") == "FULL" |
fileSize, fileSizeB |
文件大小(字节) | fileSize > 1000 |
fileSizeKo/Mo/Go/To |
文件大小(单位) | fileSizeMo > 100 |
isFirstDayOfMonth |
当前日期检查 | isFirstDayOfMonth |
isLastDayOfMonth |
当前日期检查 | isLastDayOfMonth |
dayOfWeek |
1-7(周一至周日) | dayOfWeek == 7 |
isFileFirstDayOfMonth |
文件修改日期检查 | isFileFirstDayOfMonth |
isFileLastDayOfMonth |
文件修改日期检查 | isFileLastDayOfMonth |
fileDayOfWeek |
文件修改日1-7 | fileDayOfWeek == 1 |
转换配置
任务配置(task.sl.yml)
# metadata/transform/analytics/daily_sales.sl.yml
task:
name: "daily_sales" # 可选:覆盖文件名
domain: "analytics" # 可选:覆盖文件夹名
table: "daily_sales" # 可选:覆盖文件名
# 写入策略(与加载相同选项)
writeStrategy:
type: "OVERWRITE_BY_PARTITION"
# 接收器配置
sink:
connectionRef: "bigquery" # 可选:写入不同数据库
partition:
- "report_date"
clustering:
- "region"
format: "delta"
options:
compression: "snappy"
# 读取源数据的连接(与接收器分开)
connectionRef: "source_connection"
# SQL解析
parseSQL: true # 设置为false以使用自定义INSERT/UPDATE/MERGE语句
# 期望(数据质量)
expectations:
- expect: "is_row_count_to_be_between(1, 1000000) => result(0) == 1"
failOnError: true
# 访问控制(与加载相同)
acl:
- role: SELECT
grants:
- "group:analytics_team"
rls:
- name: "近期数据"
predicate: "report_date > CURRENT_DATE - INTERVAL 90 DAY"
grants:
- "user:analyst@domain.com"
# 列描述(用于计算列)
attributesDesc:
- name: "total_revenue"
comment: "所有销售金额的总和"
accessPolicy: "SENSITIVE" # BigQuery CLS
# DAG引用
dagRef: "daily_analytics_dag"
SQL转换(task.sql)
-- metadata/transform/analytics/daily_sales.sql
SELECT
DATE(order_date) as report_date,
region,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value
FROM {{sales}}.orders
WHERE order_date BETWEEN '{{sl_start_date}}' AND '{{sl_end_date}}'
GROUP BY DATE(order_date), region
ORDER BY report_date DESC, region
模板变量:
{{domain}}- 引用其他域{{sl_start_date}},{{sl_end_date}}- 增量处理窗口{{table}}- 当前表名(在期望中)
Python转换(task.py)
# metadata/transform/analytics/complex_transform.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count
if __name__ == "__main__":
spark = SparkSession.builder.getOrCreate()
# 读取源数据
df = spark.sql("SELECT * FROM sales.orders WHERE order_date >= CURRENT_DATE - INTERVAL 30 DAYS")
# 转换逻辑
result_df = df.groupBy("customer_id").agg(
count("*").alias("order_count"),
sum("total_amount").alias("total_spent")
).filter(col("order_count") > 5)
# 必需:创建名为SL_THIS的临时视图
result_df.createOrReplaceTempView("SL_THIS")
重要: Python转换必须创建名为 SL_THIS 的临时视图。
命令行选项: 通过 --options key1=value1,key2=value2 传递,变为 --key1 value1 --key2 value2。
期望(数据质量)
期望语法
expectations:
- expect: "<query_name>(<params>) => <condition>"
failOnError: true # 或false以继续警告
内置期望宏
# metadata/expectations/default.j2(Jinja宏)
# 唯一性检查
{% macro is_col_value_not_unique(col, table='SL_THIS') %}
SELECT max(cnt)
FROM (SELECT {{ col }}, count(*) as cnt FROM {{ table }}
GROUP BY {{ col }}
HAVING cnt > 1)
{% endmacro %}
# 行数范围
{% macro is_row_count_to_be_between(min_value, max_value, table_name = 'SL_THIS') -%}
SELECT
CASE
WHEN count(*) BETWEEN {{min_value}} AND {{max_value}} THEN 1
ELSE
0
END
FROM {{table_name}}
{%- endmacro %}
# 值计数
{% macro count_by_value(col, value, table='SL_THIS') %}
SELECT count(*)
FROM {{ table }}
WHERE {{ col }} LIKE '{{ value }}'
{% endmacro %}
期望示例
expectations:
# 唯一性:order_id必须唯一
- expect: "is_col_value_not_unique('order_id') => result(0) == 1"
failOnError: true
# 行数:在100到1百万行之间
- expect: "is_row_count_to_be_between(100, 1000000) => result(0) == 1"
failOnError: false
# 值计数:至少10个美国记录
- expect: "count_by_value('country', 'USA') => result(0) >= 10"
failOnError: false
# 自定义SQL:无负金额
- expect: "SELECT COUNT(*) FROM SL_THIS WHERE amount < 0 => count == 0"
failOnError: true
# 空值检查:email不为空
- expect: "SELECT COUNT(*) FROM SL_THIS WHERE email IS NULL => count == 0"
failOnError: true
条件中可用变量
| 变量 | 类型 | 描述 |
|---|---|---|
count |
Long | 查询结果中的行数 |
result |
Seq[Any] | 第一行值(0索引) |
results |
Seq[Seq[Any]] | 所有行(用于多行结果) |
提取配置
JDBC提取
# metadata/extract/source_db.sl.yml
version: 1
extract:
connectionRef: "source_postgres"
jdbcSchemas:
- schema: "sales"
# 自定义备注查询(用于DB2等数据库)
columnRemarks: "SELECT COLUMN_NAME, COLUMN_TEXT FROM SYSCAT.COLUMNS WHERE ..."
tableRemarks: "SELECT TABLE_TEXT FROM SYSCAT.TABLES WHERE ..."
# 要提取的表类型
tableTypes:
- "TABLE"
- "VIEW"
# - "SYSTEM TABLE"
# - "GLOBAL TEMPORARY"
# 要提取的表
tables:
- name: "*" # 或特定表名或模式
fullExport: true # false用于增量
# 增量配置
partitionColumn: "id" # 用于并行提取
numPartitions: 4 # 并行级别
timestamp: "updated_at" # 用于增量跟踪
# JDBC调优
fetchSize: 1000 # JDBC获取大小
# 自定义查询(覆盖表名)
sql: "SELECT * FROM orders WHERE region = 'EMEA'"
# 列选择(可选)
columns:
- "order_id"
- "customer_id"
- "order_date"
- "total_amount"
提取命令:
# 完全提取
starlake extract --config metadata/extract/source_db.sl.yml
# 增量(使用时间戳跟踪)
starlake extract --config metadata/extract/source_db.sl.yml --incremental
OpenAPI提取
# metadata/extract/api.sl.yml
version: 1
extract:
openAPI:
basePath: /api/v2
domains:
- name: customers_api
# 模式过滤(正则)
schemas:
exclude:
- "Model\\.Common\\.Id"
- "Internal\\..*"
# 路由选择
routes:
- paths:
include:
- "/users"
- "/orders"
- "/products"
新鲜度监控
使用时间戳列跟踪数据新鲜度:
# 检查特定表的新鲜度
starlake freshness --tables dataset1.table1,dataset2.table2 --persist true
监控表:审计模式中的 SL_LAST_EXPORT。
DAG配置(编排)
DAG配置文件
# metadata/dags/sales_load_dag.sl.yml
dag:
name: "sales_load_dag"
schedule: "0 2 * * *" # Cron表达式(每天凌晨2点)
catchup: true # 处理历史运行
default_pool: "default_pool"
description: "从SFTP的每日销售数据加载"
# Airflow特定
tags:
- "production"
- "sales"
- "daily"
# 任务的环境变量
options:
sl_env_var: '{"SL_ROOT": "${root_path}", "SL_DATASETS": "${root_path}/datasets", "SL_TIMEZONE": "Europe/Paris"}'
# 加载策略(如何触发加载)
load:
strategy: "FILE_SENSOR" # FILE_SENSOR, FILE_SENSOR_DOMAIN, ACK_FILE_SENSOR, NONE
options:
incoming_path: "{{SL_ROOT}}/incoming/{{domain}}"
pending_path: "{{SL_ROOT}}/datasets/pending/{{domain}}"
global_ack_file_path: "{{SL_ROOT}}/datasets/pending/{{domain}}/{{{{ds}}}}.ack"
# 自定义模板(可选)
template:
file: "custom_template.py.j2" # 相对于metadata/dags/template或绝对路径
DAG分配层次结构
优先级(从低到高):
- 项目级:
application.dagRef.load/application.dagRef.transform - 域级:
load.metadata.dagRef在_config.sl.yml中 - 表级:
table.metadata.dagRef在table.sl.yml中 - 转换级:
task.dagRef在task.sl.yml中
较低级别覆盖较高级别。
加载策略
标准策略
| 策略类 | 描述 | 排序 |
|---|---|---|
ai.starlake.job.load.IngestionTimeStrategy |
按文件修改时间加载 | 最旧优先 |
ai.starlake.job.load.IngestionNameStrategy |
按字典序文件名顺序加载 | 字母顺序 |
配置:
application:
loadStrategyClass: "ai.starlake.job.load.IngestionNameStrategy"
自定义加载策略
实现 ai.starlake.job.load.LoadStrategy 接口:
package com.mycompany.starlake
import ai.starlake.job.load.LoadStrategy
import ai.starlake.storage.StorageHandler
import org.apache.hadoop.fs.Path
import java.time.LocalDateTime
object CustomLoadStrategy extends LoadStrategy with StrictLogging {
def list(
storageHandler: StorageHandler,
path: Path,
extension: String = "",
since: LocalDateTime = LocalDateTime.MIN,
recursive: Boolean
): List[FileInfo] = {
// 自定义文件排序逻辑
???
}
}
application:
loadStrategyClass: "com.mycompany.starlake.CustomLoadStrategy"
指标配置
指标类型
| 指标类型 | 计算属性 |
|---|---|
| continuous | 最小值、最大值、总和、均值、中位数、方差、标准差、偏度、峰度、第25百分位、第75百分位、缺失值、行数 |
| discrete | 不同计数、类别频率、类别计数(顶部类别)、行数 |
配置
# 应用级
application:
metrics:
active: true
discreteMaxCardinality: 10 # 离散指标的最大不同值
path: "{{SL_ROOT}}/metrics"
# 属性级
attributes:
- name: "revenue"
type: "decimal"
metric: "continuous"
- name: "product_category"
type: "string"
metric: "discrete"
指标存储: 存储在审计表(SL_METRICS)中用于历史跟踪和分析。
存储配置模式
S3/MinIO/SeaweedFS与Spark
# Hadoop S3A配置
spark:
hadoop.fs.s3a.endpoint: "http://localhost:8333"
hadoop.fs.s3a.access.key: "{{S3_ACCESS_KEY}}"
hadoop.fs.s3a.secret.key: "{{S3_SECRET_KEY}}"
hadoop.fs.s3a.path.style.access: "true"
hadoop.fs.s3a.connection.ssl.enabled: "false"
hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
DuckDB S3扩展
connections:
duckdb:
type: jdbc
options:
url: "jdbc:duckdb:{{DUCKDB_PATH}}"
driver: "org.duckdb.DuckDBDriver"
# DuckDB S3扩展
preActions: |
INSTALL httpfs;
LOAD httpfs;
SET s3_region='us-east-1';
SET s3_endpoint='localhost:8333';
SET s3_access_key_id='{{S3_ACCESS_KEY}}';
SET s3_secret_access_key='{{S3_SECRET_KEY}}';
SET s3_use_ssl=false;
SET s3_url_style='path';
完整示例
示例1:电子商务订单处理
# metadata/application.sl.yml
version: 1
application:
name: "ecommerce-platform"
connectionRef: "{{activeConnection}}"
defaultWriteFormat: delta
timezone: "UTC"
connections:
spark:
type: spark
spark:
sql:
extensions: "io.delta.sql.DeltaSparkSessionExtension"
catalog:
spark_catalog: "org.apache.spark.sql.delta.catalog.DeltaCatalog"
# metadata/load/orders/_config.sl.yml
load:
name: "orders"
metadata:
format: DSV
separator: ","
withHeader: true
# metadata/load/orders/transactions.sl.yml
table:
pattern: "orders_(?<type>FULL|DELTA)_.*\\.csv"
primaryKey: ["order_id"]
metadata:
writeStrategy:
types:
OVERWRITE: 'group("type") == "FULL"'
UPSERT_BY_KEY_AND_TIMESTAMP: 'group("type") == "DELTA"'
key: ["order_id"]
timestamp: "updated_at"
on: TARGET
sink:
partition:
- "order_date"
clustering:
- "customer_id"
- "status"
attributes:
- name: "order_id"
type: "long"
required: true
- name: "customer_id"
type: "long"
required: true
foreignKey: "customers.customer_id"
- name: "order_date"
type: "date"
required: true
- name: "status"
type: "string"
required: true
metric: "discrete"
- name: "total_amount"
type: "decimal"
required: true
metric: "continuous"
- name: "customer_email"
type: "string"
privacy: "SHA256"
- name: "updated_at"
type: "timestamp"
required: true
expectations:
- expect: "is_col_value_not_unique('order_id') => result(0) == 1"
failOnError: true
- expect: "SELECT COUNT(*) FROM SL_THIS WHERE total_amount <= 0 => count == 0"
failOnError: true
# metadata/transform/analytics/daily_revenue.sl.yml
task:
domain: "analytics"
table: "daily_revenue"
writeStrategy:
type: "OVERWRITE_BY_PARTITION"
sink:
partition:
- "report_date"
attributesDesc:
- name: "total_revenue"
comment: "所有订单金额的总和"
# metadata/transform/analytics/daily_revenue.sql
SELECT
DATE(order_date) as report_date,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
COUNT(DISTINCT customer_id) as unique_customers
FROM {{orders}}.transactions
WHERE order_date = '{{sl_date}}'
AND status IN ('COMPLETED', 'SHIPPED')
GROUP BY DATE(order_date)
示例2:SCD2客户维度
# metadata/load/customers/customer_master.sl.yml
table:
pattern: "customers_.*\\.csv"
primaryKey: ["customer_id"]
metadata:
format: DSV
separator: ","
withHeader: true
writeStrategy:
type: "SCD2"
key: ["customer_id"]
timestamp: "effective_date"
startTs: "valid_from"
endTs: "valid_to"
on: BOTH
attributes:
- name: "customer_id"
type: "long"
required: true
- name: "customer_name"
type: "string"
required: true
- name: "email"
type: "string"
privacy: "SHA256"
- name: "address"
type: "string"
- name: "city"
type: "string"
- name: "country"
type: "string"
metric: "discrete"
- name: "tier"
type: "string"
metric: "discrete"
- name: "effective_date"
type: "date"
required: true
- name: "valid_from"
type: "timestamp"
comment: "SCD2开始时间戳(自动填充)"
- name: "valid_to"
type: "timestamp"
comment: "SCD2结束时间戳(自动填充)"
结果: 维护客户变化的完整历史:
- 当前行:
valid_to = NULL - 历史行:
valid_to设置为变更时间戳
最佳实践
1. 处处使用变量替换
好:
url: "jdbc:postgresql://{{PG_HOST}}:{{PG_PORT}}/{{PG_DB}}"
incoming: "{{SL_ROOT}}/incoming/{{domain}}"
坏:
url: "jdbc:postgresql://localhost:5432/mydb"
incoming: "/projects/100/101/incoming/sales"
2. 分离环境配置
# env.sl.yml(全局)
env:
root: "/opt/starlake"
activeConnection: "duckdb"
PG_HOST: "localhost"
# env.PROD.sl.yml(生产覆盖)
env:
root: "/data/production/starlake"
activeConnection: "bigquery"
PG_HOST: "${POSTGRES_HOST}" # 从环境变量注入
3. 定义可重用的自定义类型
# types/custom.sl.yml
types:
- name: "product_sku"
pattern: "^[A-Z]{3}-\\d{6}$"
primitiveType: string
sample: "PRD-123456"
- name: "iso_country_code"
pattern: "^[A-Z]{2}$"
primitiveType: string
sample: "US"
4. 分层期望用于数据质量
expectations:
# 关键:出错时失败
- expect: "is_col_value_not_unique('id') => result(0) == 1"
failOnError: true
# 警告:记录但继续
- expect: "is_row_count_to_be_between(100, 1000000) => result(0) == 1"
failOnError: false
5. 按用例选择写入策略
| 表类型 | 推荐策略 | 原因 |
|---|---|---|
| 维度(主数据) | UPSERT_BY_KEY |
保持最新版本 |
| 维度(带历史) | SCD2 |
随时间跟踪变化 |
| 事实(分区) | OVERWRITE_BY_PARTITION |
替换每日/每月 |
| 事实(仅追加) | APPEND |
事件日志、事务 |
| 暂存 | OVERWRITE |
临时、完全刷新 |
6. 尽早应用隐私转换
attributes:
- name: "ssn"
type: "string"
privacy: "HIDE" # 永不存储
- name: "email"
type: "string"
privacy: "SHA256" # 单向哈希
- name: "ip_address"
type: "string"
privacy: "MD5" # 匿名化
7. 分区大表
# BigQuery
sink:
partition:
field: "event_date"
clustering:
- "user_id"
- "event_type"
requirePartitionFilter: true # 强制分区剪枝
# Spark
sink:
partition:
- "year"
- "month"
- "day"
8. 用注释和标签记录
table:
name: "orders"
comment: "来自Shopify API的电子商务订单事务"
tags: ["revenue", "critical", "daily", "pii"]
attributes:
- name: "order_id"
comment: "来自Shopify的唯一订单标识符"
验证
CLI验证
# 验证单个文件
starlake validate --config metadata/application.sl.yml
# 验证所有元数据
starlake validate --all
# 验证特定域
starlake validate --domain sales
IDE集成(VS Code)
添加到 .vscode/settings.json:
{
"yaml.schemas": {
"https://json.schemastore.org/starlake.json": [
"metadata/**/*.sl.yml",
"**/metadata/**/*.sl.yml"
]
}
}
好处:
- 实时验证键入时
- 配置键的自动完成
- 内联文档工具提示
故障排除
常见问题
1. 模式验证错误
错误: Missing required property: version
修复: 始终在配置文件顶部包含 version: 1:
version: 1
application: ...
2. 连接失败
错误: Connection refused to postgres:5432
检查:
- 网络连接:
telnet postgres 5432 env.sl.yml中的凭据- 类路径中JDBC驱动程序的可用性
3. 写入策略冲突
错误: Key column 'order_id' not found
修复: 确保键列存在于属性中:
writeStrategy:
type: UPSERT_BY_KEY
key: ["order_id"] # 必须在属性列表中
attributes:
- name: "order_id" # ✓ 存在
type: "long"
required: true
4. S3访问问题
错误: Status Code: 403; Error Code: AccessDenied
检查:
- S3凭据正确
- 端点URL格式:
http://host:port(无尾随斜杠) - 存储桶权限允许读/写
- S3路径样式:
pathvsvirtual-hosted
5. 分区列不匹配
错误: Partition column 'date' not found in schema
修复: 分区列必须存在于属性中:
sink:
partition:
- "order_date"
attributes:
- name: "order_date" # ✓ 必须存在
type: "date"
required: true
资源
- JSON模式: https://www.schemastore.org/starlake.json
- 完整参考: json-schema-guide.md
- Starlake文档: https://docs.starlake.ai
- GitHub: https://github.com/starlake-ai/starlake
- 示例: https://github.com/starlake-ai/starlake-examples
- Starlake Airflow: https://github.com/starlake-ai/starlake-airflow
- Starlake Dagster: https://github.com/starlake-ai/starlake-dagster
版本历史
- 1.0.0 (2026-02-06): 具有核心模式和JSON模式参考的初始版本
- 2.0.0 (2026-02-06): 完整更新,包括完整环境变量目录、所有连接类型、自适应写入策略、完整属性类型、期望框架、指标、提取配置、DAG模式,以及来自官方文档的生产就绪示例