名称: 转换 描述: 运行SQL或Python转换任务
转换技能
运行定义的转换任务。任务是SQL或Python脚本,从源表读取数据并将结果写入目标表。任务可以具有依赖关系,支持多种写入策略,并可以递归执行其所有上游依赖关系。
用法
starlake transform [选项]
选项
--name <值>: 任务名称,格式为域名.任务(除非使用--tags,否则必需)--compile: 仅返回最终编译的SQL查询而不执行--sync-apply: 更新YAML属性以匹配SQL查询列--sync-preview: 预览将匹配SQL查询的YAML属性更改--query <值>: 运行此SQL查询而非任务文件中定义的查询--dry-run: 仅干运行——编译和验证而不执行(支持BigQuery)--tags <值>: 运行所有匹配这些标签的任务--format: 美化打印最终SQL查询并更新.sql文件--interactive <值>: 运行查询并显示结果而不下沉。格式:csv、json、table、json-array--reload: 执行前从磁盘重新加载YAML文件(用于服务器模式)--truncate: 在插入前强制截断目标表--pageSize <值>: 每页记录数(用于交互模式)--pageNumber <值>: 要显示的页码(用于交互模式)--recursive: 递归执行此任务的所有上游依赖关系--test: 在测试模式下运行而不提交更改--accessToken <值>: 访问令牌用于认证(例如GCP)--options k1=v1,k2=v2: SQL模板的替换参数--scheduledDate <值>: 作业的调度日期,格式:yyyy-MM-dd'T'HH:mm:ss.SSSZ--reportFormat <值>: 报告输出格式:console、json或html
配置上下文
转换任务定义在metadata/transform/目录中。
转换域配置(metadata/transform/{域名}/_config.sl.yml)
设置域中所有任务的默认属性:
# metadata/transform/kpi/_config.sl.yml
版本: 1
转换:
默认:
写入策略:
类型: 覆盖
SQL转换文件(metadata/transform/{域名}/{任务}.sql)
包含SQL查询。使用{{域名}}.表语法引用源表:
-- metadata/transform/kpi/收入汇总.sql
SELECT
o.订单ID,
o.时间戳 AS 订单日期,
SUM(ol.数量 * ol.销售价格) AS 总收入
FROM
starbake.订单 o
JOIN starbake.订单行 ol ON o.订单ID = ol.订单ID
GROUP BY
o.订单ID, o.时间戳
带有依赖关系的任务
任务可以引用其他任务的输出来形成有向无环图(DAG):
-- metadata/transform/kpi/订单汇总.sql
SELECT
ps.订单ID,
ps.订单日期,
rs.总收入,
ps.利润,
ps.总销售单位
FROM
kpi.产品汇总 ps
JOIN kpi.收入汇总 rs ON ps.订单ID = rs.订单ID
任务YAML配置(metadata/transform/{域名}/{任务}.sl.yml)
可选的YAML文件,用于配置写入策略、汇、期望等:
# metadata/transform/analytics/每日销售.sl.yml
版本: 1
任务:
域: "analytics"
表: "daily_sales"
写入策略:
类型: "按分区覆盖"
汇:
分区:
- "报告日期"
聚类:
- "区域"
connectionRef: "bigquery"
期望:
- 期望: "is_row_count_to_be_between(1, 1000000) => result(0) == 1"
failOnError: true
dagRef: "daily_analytics_dag"
Python转换(metadata/transform/{域名}/{任务}.py)
Python转换必须创建一个名为SL_THIS的临时视图:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count
spark = SparkSession.builder.getOrCreate()
df = spark.sql("SELECT * FROM sales.orders")
result = df.groupBy("customer_id").agg(
count("*").alias("order_count"),
sum("total_amount").alias("total_spent")
)
result.createOrReplaceTempView("SL_THIS")
示例
运行单个任务
starlake transform --name kpi.收入汇总
仅编译SQL(调试)
查看最终编译的SQL而不执行:
starlake transform --name kpi.订单汇总 --compile
运行带有递归依赖关系
执行任务及其所有上游依赖关系:
starlake transform --name kpi.订单汇总 --recursive
交互式查询(预览结果)
运行并将结果显示为表格而不写入目标:
starlake transform --name kpi.收入汇总 --interactive table
带分页的交互式查询
starlake transform --name kpi.收入汇总 --interactive json --pageSize 50 --pageNumber 1
干运行(BigQuery)
starlake transform --name kpi.订单汇总 --dry-run
运行所有带标签的任务
starlake transform --tags 每日
使用自定义选项运行
向SQL模板传递替换变量:
starlake transform --name kpi.收入汇总 --options 开始日期=2024-01-01,结束日期=2024-03-31
从SQL同步YAML属性
自动更新任务YAML属性以匹配SQL查询输出列:
starlake transform --name kpi.收入汇总 --sync-apply
美化打印和格式化SQL
starlake transform --name kpi.收入汇总 --format
测试转换
starlake transform --name kpi.收入汇总 --test
相关技能
- load - 在转换前加载原始数据
- lineage - 可视化任务依赖图
- col-lineage - 任务的列级血缘
- dag-generate - 生成编排DAG
- test - 运行转换的集成测试
- config - 配置参考(写入策略、连接)