Kafka数据加载技能Skill kafkaload

Kafka 数据加载技能是一个用于在 Apache Kafka 主题和文件系统之间高效传输数据的工具。它支持批处理和流式处理模式,提供数据转换功能,适用于数据集成、ETL 流程和大数据处理场景。关键词:Kafka、数据加载、数据卸载、批处理、流式处理、数据转换、ETL。

ETL开发 0 次安装 0 次浏览 更新于 3/15/2026

name: kafkaload description: 从/向 Kafka 主题加载或卸载数据

Kafka 加载技能

从文件加载数据到 Kafka 主题或从 Kafka 主题卸载数据到文件。支持批处理和流式模式,并可进行可选转换。

使用方式

starlake kafkaload [options]

选项

读取选项

  • --config <value>: Kafka 主题名称(来自连接配置)
  • --connectionRef <value>: Kafka 集群的连接引用
  • --format <value>: 读取格式:parquetjsoncsv 等(默认:parquet
  • --path <value>: 源文件路径(用于加载到 Kafka)或目标路径(用于卸载)
  • --options k1=v1,k2=v2: Spark 读取器选项

写入选项

  • --write-config <value>: 写入主题名称
  • --write-path <value>: 卸载的目标文件路径
  • --write-mode <value>: 文件输出的写入模式
  • --write-options k1=v1,k2=v2: Spark 写入器选项
  • --write-format <value>: 流式输出格式:kafkaconsole
  • --write-coalesce <value>: 输出分区数

处理选项

  • --transform <value>: 在加载/卸载前应用到消息的 SQL 转换
  • --stream: 启用流式模式(连续处理)
  • --streaming-trigger <value>: 触发类型:OnceContinuousProcessingTime
  • --streaming-trigger-option <value>: 触发间隔(例如 10 seconds
  • --streaming-to-table <value>: 输出到表而不是文件
  • --streaming-partition-by <value>: 按这些列分区输出
  • --reportFormat <value>: 报告输出格式:consolejsonhtml

示例

卸载 Kafka 主题到 Parquet

starlake kafkaload --config orders_topic --path /data/output/orders --format parquet

加载文件到 Kafka 主题

starlake kafkaload --config orders_topic --path /data/input/orders.json --format json --write-config orders_output

从 Kafka 流式传输到控制台

starlake kafkaload --config orders_topic --stream --write-format console

带处理时间触发的流式传输

starlake kafkaload --config orders_topic --stream --streaming-trigger ProcessingTime --streaming-trigger-option "10 seconds"

卸载并应用转换

在保存前应用 SQL 转换:

starlake kafkaload --config orders_topic --path /data/output --transform "SELECT order_id, total FROM SL_THIS WHERE total > 100"

相关技能

  • load - 从文件加载数据
  • esload - 加载数据到 Elasticsearch
  • cnxload - 加载数据到 JDBC 表