name: 部署Kafka到K8s description: | 使用Strimzi操作符在Kubernetes上部署Apache Kafka,采用KRaft模式。 适用于为事件驱动微服务、消息队列或发布/订阅模式设置Kafka。 涵盖操作符安装、集群创建、主题管理以及生产者/消费者测试。 不适用于使用托管Kafka(Confluent Cloud, MSK)或没有K8s的本地开发场景。
在Kubernetes上部署Kafka
使用Strimzi操作符(v0.49.1+)和KRaft模式部署生产就绪的Apache Kafka集群。
快速开始
# 1. 创建命名空间
kubectl create namespace kafka
# 2. 安装Strimzi操作符
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
# 3. 等待操作符就绪
kubectl wait deployment/strimzi-cluster-operator --for=condition=Available -n kafka --timeout=300s
# 4. 部署Kafka集群
kubectl apply -f https://strimzi.io/examples/latest/kafka/kraft/kafka-single-node.yaml -n kafka
# 5. 等待集群就绪
kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n kafka
Strimzi操作符安装
标准安装(集群范围)
kubectl create namespace kafka
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
kubectl get pods -n kafka -w
命名空间范围安装
# 下载并修改为单命名空间
curl -L https://strimzi.io/install/latest?namespace=kafka > strimzi-install.yaml
# 根据需要编辑RoleBindings和ClusterRoles
kubectl apply -f strimzi-install.yaml -n kafka
Kafka集群配置
单节点(开发环境)
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka
spec:
kafka:
version: 3.9.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
生产集群(3节点 + KRaft)
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-production
namespace: kafka
spec:
kafka:
version: 3.9.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: nodeport
tls: false
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: "3.9"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
resources:
requests:
memory: 2Gi
cpu: "500m"
limits:
memory: 4Gi
cpu: "2"
entityOperator:
topicOperator: {}
userOperator: {}
主题管理
通过CRD创建主题
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: task-events
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 3
replicas: 1
config:
retention.ms: 604800000 # 7天
segment.bytes: 1073741824 # 1GB
列出和描述主题
# 列出主题
kubectl -n kafka run kafka-topics -ti --rm --restart=Never \
--image=quay.io/strimzi/kafka:0.49.1-kafka-3.9.0 -- \
bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --list
# 描述主题
kubectl -n kafka run kafka-topics -ti --rm --restart=Never \
--image=quay.io/strimzi/kafka:0.49.1-kafka-3.9.0 -- \
bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 \
--describe --topic task-events
生产者/消费者测试
控制台生产者
kubectl -n kafka run kafka-producer -ti --rm --restart=Never \
--image=quay.io/strimzi/kafka:0.49.1-kafka-3.9.0 -- \
bin/kafka-console-producer.sh \
--bootstrap-server my-cluster-kafka-bootstrap:9092 \
--topic my-topic
控制台消费者
kubectl -n kafka run kafka-consumer -ti --rm --restart=Never \
--image=quay.io/strimzi/kafka:0.49.1-kafka-3.9.0 -- \
bin/kafka-console-consumer.sh \
--bootstrap-server my-cluster-kafka-bootstrap:9092 \
--topic my-topic --from-beginning
服务发现
用于客户端连接的Kafka引导服务:
| 服务 | 端口 | 用途 |
|---|---|---|
my-cluster-kafka-bootstrap:9092 |
明文 | 内部集群应用 |
my-cluster-kafka-bootstrap:9093 |
TLS | 安全内部应用 |
my-cluster-kafka-0.my-cluster-kafka-brokers:9092 |
明文 | 直接代理访问 |
从其他命名空间连接
# 在您的应用部署中
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
监控
启用Prometheus指标
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: kafka-metrics
key: kafka-metrics-config.yml
检查集群状态
kubectl get kafka -n kafka
kubectl describe kafka my-cluster -n kafka
kubectl get pods -n kafka -l strimzi.io/cluster=my-cluster
故障排除
操作符未启动
kubectl logs deployment/strimzi-cluster-operator -n kafka
kubectl describe pod -l name=strimzi-cluster-operator -n kafka
Kafka Pod未就绪
kubectl describe pod my-cluster-kafka-0 -n kafka
kubectl logs my-cluster-kafka-0 -n kafka
kubectl get events -n kafka --sort-by='.lastTimestamp'
常见问题
| 错误 | 原因 | 修复方法 |
|---|---|---|
| PVC挂起 | 无存储类 | 添加storageClassName或使用临时存储 |
| Pods OOMKilled | 内存不足 | 增加资源限制 |
| 连接被拒绝 | 错误的引导URL | 使用cluster-kafka-bootstrap:9092 |
清理
# 删除集群
kubectl -n kafka delete kafka my-cluster
# 删除PVC(数据)
kubectl delete pvc -l strimzi.io/name=my-cluster-kafka -n kafka
# 移除操作符
kubectl -n kafka delete -f 'https://strimzi.io/install/latest?namespace=kafka'
# 删除命名空间
kubectl delete namespace kafka
与Dapr集成
关于Dapr发布/订阅集成,请参阅configuring-dapr-pubsub技能:
# 指向Strimzi Kafka的Dapr组件
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
spec:
type: pubsub.kafka
metadata:
- name: brokers
value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
- name: authType
value: "none"
验证
运行:python scripts/verify.py
相关技能
operating-k8s-local- 本地Minikube集群设置configuring-dapr-pubsub- Dapr Kafka发布/订阅集成scaffolding-fastapi-dapr- 使用Kafka事件的FastAPI服务