name: load description: 从待处理区域加载数据到数据仓库
加载技能
从待处理区域加载数据文件到数据仓库。文件必须匹配表配置文件(table.sl.yml)中定义的模式。加载过程根据模式验证数据,应用写策略,强制执行数据质量期望,并应用隐私转换。
用法
starlake load [options]
选项
--domains <value>: 逗号分隔的域列表以加载(默认:所有域)--tables <value>: 逗号分隔的表列表以加载(默认:所有表)--accessToken <value>: 用于身份验证的访问令牌(例如,GCP)--options k1=v1,k2=v2: 传递给加载作业的替换参数--test: 在测试模式下运行,通过处理存储在元数据/测试目录中的文件,而不提交更改到数据仓库--files <value>: 仅加载此特定文件(完全限定路径)--primaryKeys <value>: 在表模式上设置的主键(当推断模式时)--scheduledDate <value>: 作业的计划日期,格式:yyyy-MM-dd'T'HH:mm:ss.SSSZ--reportFormat <value>: 报告输出格式:console、json或html
配置上下文
load 命令依赖于 metadata/load/ 目录中的 YAML 配置文件。
域配置(metadata/load/{domain}/_config.sl.yml)
定义域名和域中所有表的共享元数据:
# metadata/load/starbake/_config.sl.yml
version: 1
load:
name: "starbake"
metadata:
directory: "{{incoming_path}}/starbake"
表配置(metadata/load/{domain}/{table}.sl.yml)
定义表模式、文件模式、格式、写策略和属性:
# metadata/load/starbake/orders.sl.yml
version: 1
table:
name: "orders"
pattern: "orders_.*.json"
attributes:
- name: "customer_id"
type: "long"
sample: "9"
- name: "order_id"
type: "long"
sample: "99"
- name: "status"
type: "string"
sample: "Pending"
- name: "timestamp"
type: "iso_date_time"
sample: "2024-03-01T09:01:12.529Z"
metadata:
format: "JSON"
encoding: "UTF-8"
array: true
writeStrategy:
type: "APPEND"
CSV 表示例
# metadata/load/starbake/order_lines.sl.yml
version: 1
table:
name: "order_lines"
pattern: "order-lines_.*.csv"
attributes:
- name: "order_id"
type: "int"
sample: "99"
- name: "product_id"
type: "int"
sample: "9"
- name: "quantity"
type: "int"
sample: "5"
- name: "sale_price"
type: "double"
sample: "8.68"
metadata:
format: "DSV"
encoding: "UTF-8"
withHeader: true
separator: ";"
writeStrategy:
type: "APPEND"
XML 表示例
# metadata/load/starbake/products.sl.yml
version: 1
table:
name: "products"
pattern: "products.*.xml"
attributes:
- name: "category"
type: "string"
- name: "cost"
type: "double"
- name: "name"
type: "string"
- name: "price"
type: "double"
- name: "product_id"
type: "long"
metadata:
format: "XML"
encoding: "UTF-8"
options:
rowTag: "record"
writeStrategy:
type: "OVERWRITE"
写策略
writeStrategy.type 字段控制数据写入方式:
| 策略 | 描述 |
|---|---|
APPEND |
插入所有行 |
OVERWRITE |
替换整个表 |
UPSERT_BY_KEY |
按键更新现有行,插入新行 |
UPSERT_BY_KEY_AND_TIMESTAMP |
带时间戳比较的更新插入 |
OVERWRITE_BY_PARTITION |
仅替换特定分区 |
DELETE_THEN_INSERT |
删除匹配键然后插入 |
SCD2 |
缓慢变化维度类型2 |
数据质量期望
table:
expectations:
- expect: "is_col_value_not_unique('order_id') => result(0) == 1"
failOnError: true
- expect: "is_row_count_to_be_between(1, 1000000) => result(0) == 1"
failOnError: false
示例
加载所有域
starlake load
加载特定域
starlake load --domains starbake
加载特定表
starlake load --domains starbake --tables orders
加载多个表
starlake load --domains starbake --tables orders,order_lines,products
测试加载(干运行)
在测试模式下运行加载,不写入仓库:
starlake load --test
加载特定文件
starlake load --files /path/to/incoming/starbake/orders_20240301.json
使用自定义选项加载
starlake load --domains starbake --options date=2024-03-01,env=prod
使用 JSON 报告加载
starlake load --domains starbake --reportFormat json