name: kafkaload description: 从/向 Kafka 主题加载或卸载数据
Kafka 加载技能
从文件加载数据到 Kafka 主题或从 Kafka 主题卸载数据到文件。支持批处理和流式模式,并可进行可选转换。
使用方式
starlake kafkaload [options]
选项
读取选项
--config <value>: Kafka 主题名称(来自连接配置)--connectionRef <value>: Kafka 集群的连接引用--format <value>: 读取格式:parquet、json、csv等(默认:parquet)--path <value>: 源文件路径(用于加载到 Kafka)或目标路径(用于卸载)--options k1=v1,k2=v2: Spark 读取器选项
写入选项
--write-config <value>: 写入主题名称--write-path <value>: 卸载的目标文件路径--write-mode <value>: 文件输出的写入模式--write-options k1=v1,k2=v2: Spark 写入器选项--write-format <value>: 流式输出格式:kafka、console等--write-coalesce <value>: 输出分区数
处理选项
--transform <value>: 在加载/卸载前应用到消息的 SQL 转换--stream: 启用流式模式(连续处理)--streaming-trigger <value>: 触发类型:Once、Continuous、ProcessingTime--streaming-trigger-option <value>: 触发间隔(例如10 seconds)--streaming-to-table <value>: 输出到表而不是文件--streaming-partition-by <value>: 按这些列分区输出--reportFormat <value>: 报告输出格式:console、json或html
示例
卸载 Kafka 主题到 Parquet
starlake kafkaload --config orders_topic --path /data/output/orders --format parquet
加载文件到 Kafka 主题
starlake kafkaload --config orders_topic --path /data/input/orders.json --format json --write-config orders_output
从 Kafka 流式传输到控制台
starlake kafkaload --config orders_topic --stream --write-format console
带处理时间触发的流式传输
starlake kafkaload --config orders_topic --stream --streaming-trigger ProcessingTime --streaming-trigger-option "10 seconds"
卸载并应用转换
在保存前应用 SQL 转换:
starlake kafkaload --config orders_topic --path /data/output --transform "SELECT order_id, total FROM SL_THIS WHERE total > 100"