Kafka Streams
概览
全面指南,介绍Apache Kafka模式和Kafka Streams,用于实时数据处理。
目录
Kafka概念
核心概念
# Kafka核心概念
"""
- 代理:存储和提供数据的Kafka服务器
- 主题:发布记录的类别/订阅源名称
- 分区:主题被分割成分区以实现并行处理
- 生产者:发布记录到主题的应用程序
- 消费者:订阅主题并处理记录的应用程序
- 消费者组:协同工作以消费主题的消费者组
- 偏移量:分区内每条记录的唯一标识符
- 提交:存储消费者的当前偏移量
"""
基本设置(Python)
# kafka_config.py
from dataclasses import dataclass
from typing import Optional
@dataclass
class KafkaConfig:
bootstrap_servers: str = 'localhost:9092'
group_id: str = 'default-group'
auto_offset_reset: str = 'earliest'
enable_auto_commit: bool = True
auto_commit_interval_ms: int = 5000
session_timeout_ms: int = 30000
heartbeat_interval_ms: int = 3000
max_poll_records: int = 500
max_poll_interval_ms: int = 300000
# 默认配置
DEFAULT_CONFIG = KafkaConfig()
# 生产配置
PRODUCTION_CONFIG = KafkaConfig(
bootstrap_servers='kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092',
group_id='production-group',
enable_auto_commit=False, # 手动提交以提高可靠性
max_poll_records=100,
max_poll_interval_ms=600000
)
生产者
基本生产者
# producer.py
from kafka import KafkaProducer
import json
import logging
class KafkaMessageProducer:
def __init__(self, config: KafkaConfig):
self.producer = KafkaProducer(
bootstrap_servers=config.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: str(k).encode('utf-8') if k else None,
acks='all', # 等待所有副本
retries=3,
compression_type='gzip',
batch_size=16384,
linger_ms=10
)
def send(
self,
topic: str,
value: dict,
key: str = None,
headers: dict = None
) -> None:
"""向Kafka主题发送消息"""
try:
future = self.producer.send(
topic,
value=value,
key=key,
headers=[(k, v.encode()) for k, v in (headers or {}).items()]
)
# 等待送达报告
record_metadata = future.get(timeout=10)
logging.info(
f"消息发送到 {record_metadata.topic} "
f"[分区: {record_metadata.partition}, 偏移量: {record_metadata.offset}]"
)
except Exception as e:
logging.error(f"发送消息失败:{e}")
raise
def send_async(
self,
topic: str,
value: dict,
key: str = None,
callback: callable = None
) -> None:
"""异步发送消息"""
def delivery_report(error, record_metadata):
if error:
logging.error(f"消息送达失败:{error}")
else:
logging.info(f"消息送达:{record_metadata}")
if callback:
callback(record_metadata)
self.producer.send(
topic,
value=value,
key=key
).add_callback(delivery_report)
def flush(self) -> None:
"""刷新所有待处理消息"""
self.producer.flush()
def close(self) -> None:
"""关闭生产者"""
self.producer.close()
# 使用方法
producer = KafkaMessageProducer(DEFAULT_CONFIG)
producer.send('events', {'event': 'user_login', 'user_id': 123}, key='user_123')
producer.flush()
producer.close()
事务性生产者
# transactional_producer.py
from kafka import KafkaProducer
import json
class TransactionalProducer:
def __init__(self, config: KafkaConfig, transactional_id: str):
self.producer = KafkaProducer(
bootstrap_servers=config.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
transactional_id=transactional_id,
enable_idempotence=True
)
# 初始化事务
self.producer.init_transactions()
def send_in_transaction(
self,
messages: list,
input_topic: str,
output_topic: str
) -> None:
"""在事务中发送多条消息"""
try:
# 开始事务
self.producer.begin_transaction()
# 从输入主题消费(用于处理)
# 处理消息
# 发送到输出主题
for message in messages:
self.producer.send(output_topic, value=message)
# 提交事务
self.producer.commit_transaction()
except Exception as e:
# 出错时中止事务
self.producer.abort_transaction()
raise
def close(self) -> None:
self.producer.close()
消费者
基本消费者
# consumer.py
from kafka import KafkaConsumer
import json
import logging
class KafkaMessageConsumer:
def __init__(self, config: KafkaConfig, topics: list):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=config.bootstrap_servers,
group_id=config.group_id,
auto_offset_reset=config.auto_offset_reset,
enable_auto_commit=config.enable_auto_commit,
auto_commit_interval_ms=config.auto_commit_interval_ms,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda m: m.decode('utf-8') if m else None,
session_timeout_ms=config.session_timeout_ms,
heartbeat_interval_ms=config.heartbeat_interval_ms,
max_poll_records=config.max_poll_records,
max_poll_interval_ms=config.max_poll_interval_ms
)
def consume(self, handler: callable) -> None:
"""消费消息并使用处理器处理"""
try:
for message in self.consumer:
try:
# 处理消息
result = handler(message)
# 如果自动提交被禁用,则手动提交
if not self.consumer.config['enable_auto_commit']:
self.consumer.commit()
logging.info(f"处理消息:{message.offset}")
except Exception as e:
logging.error(f"处理消息出错:{e}")
# 跳过问题消息
if not self.consumer.config['enable_auto_commit']:
self.consumer.commit()
except KeyboardInterrupt:
logging.info("停止消费者...")
finally:
self.close()
def consume_batch(self, handler: callable, batch_size: int = 10) -> None:
"""批量消费消息"""
batch = []
try:
for message in self.consumer:
batch.append(message)
if len(batch) >= batch_size:
# 处理批次
handler(batch)
# 提交批次
self.consumer.commit()
batch = []
finally:
# 处理剩余消息
if batch:
handler(batch)
self.consumer.commit()
self.close()
def seek_to_beginning(self, topic: str, partition: int) -> None:
"""定位到分区的开头"""
self.consumer.seek_to_beginning({partition: topic})
def seek_to_end(self, topic: str, partition: int) -> None:
"""定位到分区的末尾"""
self.consumer.seek_to_end({partition: topic})
def close(self) -> None:
"""关闭消费者"""
self.consumer.close()
# 使用方法
def message_handler(message):
print(f"收到:{message.value}")
return True
consumer = KafkaMessageConsumer(DEFAULT_CONFIG, ['events'])
consumer.consume(message_handler)
重新平衡监听器
# rebalancing_consumer.py
from kafka import KafkaConsumer
from kafka.coordinator.assignor import RoundRobinPartitionAssignor
import logging
class RebalancingConsumer(KafkaMessageConsumer):
def __init__(self, config: KafkaConfig, topics: list):
super().__init__(config, topics)
self.consumer.subscribe(
topics,
listener=self.RebalanceListener(self)
)
class RebalanceListener:
def __init__(self, outer):
self.outer = outer
def on_partitions_revoked(self, revoked):
"""分区重新平衡前调用"""
logging.info(f"分区被撤销:{revoked}")
# 在失去分区前提交偏移量
self.outer.consumer.commit()
def on_partitions_assigned(self, assigned):
"""分区重新平衡后调用"""
logging.info(f"分区被分配:{assigned}")
主题和分区
主题管理
# topic_management.py
from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType, NewTopic
import logging
class TopicManager:
def __init__(self, config: KafkaConfig):
self.admin_client = KafkaAdminClient(
bootstrap_servers=config.bootstrap_servers
)
def create_topic(
self,
topic_name: str,
num_partitions: int = 3,
replication_factor: int = 1,
config: dict = None
) -> None:
"""创建新主题"""
topic = NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor
)
if config:
topic.topic_configs = config
try:
future = self.admin_client.create_topics([topic])
for topic_name, future_result in future.items():
try:
future_result.result()
logging.info(f"主题 {topic_name} 创建成功")
except Exception as e:
logging.error(f"创建主题 {topic_name} 失败:{e}")
except Exception as e:
logging.error(f"创建主题出错:{e}")
def delete_topic(self, topic_name: str) -> None:
"""删除主题"""
try:
future = self.admin_client.delete_topics([topic_name])
for topic, future_result in future.items():
future_result.result()
logging.info(f"主题 {topic} 删除成功")
except Exception as e:
logging.error(f"删除主题出错:{e}")
def list_topics(self) -> list:
"""列出所有主题"""
return list(self.admin_client.list_topics().keys())
def describe_topic(self, topic_name: str) -> dict:
"""获取主题元数据"""
metadata = self.admin_client.describe_topics([topic_name])
return metadata[topic_name]
def alter_topic_config(self, topic_name: str, config: dict) -> None:
"""更改主题配置"""
resource = ConfigResource(ConfigResourceType.TOPIC, topic_name)
future = self.admin_client.alter_configs({
resource: config
})
for resource, future_result in future.items():
future_result.result()
def close(self) -> None:
self.admin_client.close()
# 使用方法
topic_manager = TopicManager(DEFAULT_CONFIG)
# 创建带有保留策略的主题
topic_manager.create_topic(
'user-events',
num_partitions=6,
replication_factor=3,
config={
'retention.ms': '604800000', # 7天
'segment.ms': '86400000', # 1天
'cleanup.policy': 'delete'
}
)
分区管理
# partition_management.py
from kafka import KafkaConsumer
import logging
class PartitionManager:
def __init__(self, config: KafkaConfig):
self.config = config
def get_partition_count(self, topic: str) -> int:
"""获取主题的分区数量"""
consumer = KafkaConsumer(
bootstrap_servers=self.config.bootstrap_servers
)
partitions = consumer.partitions_for_topic(topic)
consumer.close()
return len(partitions) if partitions else 0
def get_partition_info(self, topic: str) -> dict:
"""获取主题分区信息"""
consumer = KafkaConsumer(
bootstrap_servers=self.config.bootstrap_servers
)
partitions = consumer.partitions_for_topic(topic)
info = {}
for partition in partitions:
tp = (topic, partition)
beginning_offset = consumer.beginning_offsets([tp])[tp]
end_offset = consumer.end_offsets([tp])[tp]
info[partition] = {
'beginning_offset': beginning_offset,
'end_offset': end_offset,
'message_count': end_offset - beginning_offset
}
consumer.close()
return info
def get_consumer_offsets(self, topic: str, group_id: str) -> dict:
"""获取消费者组的偏移量"""
from kafka import KafkaAdminClient
admin_client = KafkaAdminClient(
bootstrap_servers=self.config.bootstrap_servers
)
offsets = admin_client.list_consumer_group_offsets(group_id)
consumer_offsets = {}
for tp, offset_info in offsets.items():
if tp.topic == topic:
consumer_offsets[tp.partition] = {
'offset': offset_info.offset,
'metadata': offset_info.metadata
}
admin_client.close()
return consumer_offsets
# 使用方法
partition_manager = PartitionManager(DEFAULT_CONFIG)
print(partition_manager.get_partition_info('user-events'))
消费者组
消费者组管理
# consumer_group.py
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
import logging
class ConsumerGroupManager:
def __init__(self, config: KafkaConfig):
self.admin_client = KafkaAdminClient(
bootstrap_servers=config.bootstrap_servers
)
def list_consumer_groups(self) -> list:
"""列出所有消费者组"""
return list(self.admin_client.list_consumer_groups().keys())
def describe_consumer_group(self, group_id: str) -> dict:
"""描述消费者组详情"""
return self.admin_client.describe_consumer_groups([group_id])[group_id]
def delete_consumer_group(self, group_id: str) -> None:
"""删除消费者组"""
try:
self.admin_client.delete_consumer_groups([group_id])
logging.info(f"消费者组 {group_id} 已删除")
except Exception as e:
logging.error(f"删除消费者组出错:{e}")
def reset_consumer_group_offset(
self,
group_id: str,
topic: str,
partition: int,
new_offset: int
) -> None:
"""将消费者组偏移量重置到特定位置"""
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=self.config.bootstrap_servers,
group_id=group_id,
enable_auto_commit=False
)
tp = (topic, partition)
consumer.seek(tp, new_offset)
consumer.commit()
consumer.close()
def reset_to_earliest(self, group_id: str, topic: str) -> None:
"""将消费者组重置到最早偏移量"""
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=self.config.bootstrap_servers,
group_id=group_id,
enable_auto_commit=False
)
partitions = consumer.partitions_for_topic(topic)
for partition in partitions:
tp = (topic, partition)
beginning_offset = consumer.beginning_offsets([tp])[tp]
consumer.seek(tp, beginning_offset)
consumer.commit()
consumer.close()
def close(self) -> None:
self.admin_client.close()
Kafka Streams
使用Python进行流处理
# kafka_streams.py
from kafka import KafkaConsumer, KafkaProducer
import json
import threading
import logging
class KafkaStreamProcessor:
def __init__(
self,
input_topic: str,
output_topic: str,
config: KafkaConfig
):
self.input_topic = input_topic
self.output_topic = output_topic
self.config = config
self.consumer = KafkaConsumer(
input_topic,
bootstrap_servers=config.bootstrap_servers,
group_id=f"{config.group_id}-processor",
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=config.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def process_stream(self, processor: callable) -> None:
"""处理消息流"""
try:
for message in self.consumer:
try:
# 处理消息
result = processor(message.value)
# 发送到输出主题
self.producer.send(self.output_topic, value=result)
# 提交偏移量
self.consumer.commit()
logging.info(f"处理消息:{message.offset}")
except Exception as e:
logging.error(f"处理消息出错:{e}")
except KeyboardInterrupt:
logging.info("停止流处理器...")
finally:
self.close()
def aggregate_stream(
self,
window_size_ms: int,
processor: callable
) -> None:
"""在时间窗口内聚合消息"""
window = []
last_flush_time = None
try:
for message in self.consumer:
try:
window.append(message.value)
current_time = message.timestamp
if last_flush_time is None:
last_flush_time = current_time
# 如果时间超过,则刷新窗口
if current_time - last_flush_time >= window_size_ms:
# 处理窗口
result = processor(window)
self.producer.send(self.output_topic, value=result)
# 清空窗口
window = []
last_flush_time = current_time
# 提交偏移量
self.consumer.commit()
except Exception as e:
logging.error(f"聚合错误:{e}")
finally:
# 刷新剩余消息
if window:
result = processor(window)
self.producer.send(self.output_topic, value=result)
self.close()
def close(self) -> None:
"""关闭消费者和生产者"""
self.consumer.close()
self.producer.close()
# 示例处理器
def uppercase_processor(message):
"""将文本转换为大写"""
message['text'] = message.get('text', '').upper()
return message
def count_aggregator(messages):
"""计算窗口内的消息数量"""
return {
'count': len(messages),
'window_start': messages[0].get('timestamp'),
'window_end': messages[-1].get('timestamp')
}
# 使用方法
processor = KafkaStreamProcessor('input-topic', 'output-topic', DEFAULT_CONFIG)
processor.process_stream(uppercase_processor)
错误处理
错误处理策略
# error_handling.py
from kafka import KafkaConsumer, KafkaProducer
import json
import logging
import time
class ResilientConsumer:
def __init__(self, config: KafkaConfig, topics: list):
self.config = config
self.topics = topics
self.max_retries = 3
self.retry_delay = 1000 # ms
def consume_with_retry(self, handler: callable) -> None:
"""带重试逻辑的消费"""
retry_count = 0
while retry_count < self.max_retries:
try:
consumer = KafkaConsumer(
*self.topics,
bootstrap_servers=self.config.bootstrap_servers,
group_id=self.config.group_id,
auto_offset_reset=self.config.auto_offset_reset,
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
try:
handler(message)
consumer.commit()
retry_count = 0 # 成功后重置
except Exception as e:
logging.error(f"处理错误:{e}")
# 不提交,消息将重新传递
break
except Exception as e:
retry_count += 1
logging.error(f"消费者错误(尝试 {retry_count}):{e}")
if retry_count < self.max_retries:
time.sleep(self.retry_delay * retry_count)
else:
logging.error("超过最大重试次数,放弃")
raise
class DeadLetterProducer:
def __init__(self, config: KafkaConfig, dlq_topic: str):
self.producer = KafkaProducer(
bootstrap_servers=config.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.dlq_topic = dlq_topic
def send_to_dlq(self, original_message: dict, error: Exception) -> None:
"""将失败的消息发送到死信队列"""
dlq_message = {
'original_message': original_message,
'error': str(error),
'timestamp': time.time() * 1000
}
self.producer.send(self.dlq_topic, value=dlq_message)
self.producer.flush()
精确一次语义
精确一次处理
# exactly_once.py
from kafka import KafkaProducer
import json
class ExactlyOnceProcessor:
def __init__(
self,
config: KafkaConfig,
input_topic: str,
output_topic: str,
transactional_id: str
):
self.input_topic = input_topic
self.output_topic = output_topic
# 为精确一次配置生产者
self.producer = KafkaProducer(
bootstrap_servers=config.bootstrap_servers,
transactional_id=transactional_id,
enable_idempotence=True,
acks='all',
max_in_flight_requests_per_connection=1,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# 初始化事务
self.producer.init_transactions()
def process_exactly_once(self, messages: list, processor: callable) -> None:
"""使用精确一次语义处理消息"""
try:
# 开始事务
self.producer.begin_transaction()
# 处理消息
for message in messages:
result = processor(message)
self.producer.send(self.output_topic, value=result)
# 提交事务
self.producer.commit_transaction()
except Exception as e:
# 出错时中止事务
self.producer.abort_transaction()
raise
def close(self) -> None:
self.producer.close()
性能调优
生产者调优
# producer_tuning.py
from kafka import KafkaProducer
import json
class TunedProducer:
def __init__(self, config: KafkaConfig):
self.producer = KafkaProducer(
bootstrap_servers=config.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
# 批处理
batch_size=32768, # 32 KB
linger_ms=10, # 等待批处理最多10ms
# 压缩
compression_type='lz4', # LZ4平衡速度和压缩
# 可靠性
acks='all', # 等待所有副本
retries=3,
max_in_flight_requests_per_connection=5,
# 缓冲
buffer_memory=67108864, # 64 MB缓冲区
# 幂等性
enable_idempotence=True,
# 超时
request_timeout_ms=30000,
metadata_fetch_timeout_ms=5000
)
消费者调优
# consumer_tuning.py
from kafka import KafkaConsumer
import json
class TunedConsumer:
def __init__(self, config: KafkaConfig, topics: list):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=config.bootstrap_servers,
group_id=config.group_id,
# 获取设置
fetch_min_bytes=1,
fetch_max_wait_ms=500,
fetch_max_bytes=52428800, # 50 MB
# 会话设置
session_timeout_ms=30000,
heartbeat_interval_ms=3000,
max_poll_interval_ms=300000,
# 轮询设置
max_poll_records=500,
max_partition_fetch_bytes=1048576, # 1 MB
# 偏移量管理
enable_auto_commit=False,
auto_offset_reset='earliest',
# 网络
connections_max_idle_ms=540000,
# 反序列化
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
监控
指标收集
# monitoring.py
from kafka import KafkaConsumer
import time
class KafkaMonitor:
def __init__(self, config: KafkaConfig):
self.config = config
def get_consumer_lag(self, topic: str, group_id: str) -> dict:
"""获取消费者的滞后量"""
consumer = KafkaConsumer(
bootstrap_servers=self.config.bootstrap_servers,
group_id=group_id,
enable_auto_commit=False
)
partitions = consumer.partitions_for_topic(topic)
lag_info = {}
for partition in partitions:
tp = (topic, partition)
# 获取消费者偏移量
committed = consumer.committed(tp)
# 获取最新偏移量
end_offset = consumer.end_offsets([tp])[tp]
if committed is not None:
lag = end_offset - committed
lag_info[partition] = {
'consumer_offset': committed,
'end_offset': end_offset,
'lag': lag
}
consumer.close()
return lag_info
def get_topic_metrics(self, topic: str) -> dict:
"""获取主题指标"""
consumer = KafkaConsumer(
bootstrap_servers=self.config.bootstrap_servers
)
partitions = consumer.partitions_for_topic(topic)
metrics = {
'partition_count': len(partitions),
'partitions': {}
}
for partition in partitions:
tp = (topic, partition)
beginning = consumer.beginning_offsets([tp])[tp]
end = consumer.end_offsets([tp])[tp]
metrics['partitions'][partition] = {
'beginning_offset': beginning,
'end_offset': end,
'message_count': end - beginning
}
consumer.close()
return metrics
# 使用方法
monitor = KafkaMonitor(DEFAULT_CONFIG)
print(monitor.get_consumer_lag('user-events', 'consumer-group-1'))
额外资源
最佳实践
生产者配置
- 使用适当的
acks设置:关键数据使用acks=all,高吞吐量使用acks=1 - 配置批处理:使用
batch_size和linger_ms提高吞吐量 - 启用压缩:使用
lz4或gzip减少网络带宽 - 设置合理的超时:配置
request_timeout_ms和delivery.timeout.ms - 使用幂等生产者:防止重试时产生重复消息
- 实现错误处理:优雅地处理网络错误和代理故障
消费者配置
- 使用手动偏移量提交:禁用
enable_auto_commit以获得更好的控制 - 设置适当的
session.timeout.ms:在故障检测和处理时间之间取得平衡 - 配置
max.poll.records:限制批次大小以实现可预测的处理 - 使用消费者组:启用并行处理和负载均衡
- 实现重新平衡监听器:优雅地处理分区重新分配
- 监控消费者滞后量:跟踪偏移量滞后量以检测处理延迟
主题设计
- 选择合适的分区数量:更多分区=更多并行性但更多开销
- 设置复制因子:生产环境至少为3
- 配置保留策略:根据数据新鲜度要求设置
retention.ms - 使用压缩主题:最新值语义(例如,用户配置文件)
- 规划分区增长:Kafka不支持减少分区
- 使用描述性主题名称:遵循命名约定以清晰
消息设计
- 使用小消息大小:首选<1MB消息以获得最佳性能
- 包含消息键:用于分区和排序保证
- 添加消息头:包括元数据以实现跟踪和路由
- 使用一致的模式:考虑使用Avro/Protobuf进行模式演进
- 包含时间戳:使用Kafka时间戳或自定义时间戳
- 设计为幂等性:消息可能会被多次传递
精确一次语义
- 使用事务性生产者:用于精确一次写入语义
- 配置read_committed隔离:消费者只看到已提交的事务
- 使用幂等生产者:防止重复消息生产
- 监控事务协调器:跟踪事务状态和超时
- 处理事务失败:实现适当的回滚逻辑
- 测试故障场景:验证故障下的精确一次行为
性能调优
- 调整批次大小:更大的批次提高吞吐量但增加延迟
- 调整停留时间:在批处理和延迟之间取得平衡
- 使用适当的压缩:
lz4速度,gzip大小 - 配置获取大小:将
fetch.max.bytes与消息大小匹配 - 使用多个消费者:扩展消费者组以实现并行处理
- 监控代理指标:跟踪吞吐量、延迟和错误率
监控和可观测性
- 跟踪消费者滞后量:每个分区监控偏移量滞后量
- 监控代理健康:跟踪CPU、内存、磁盘I/O和网络
- 收集JMX指标:使用Prometheus/JMX导出器进行指标收集
- 设置警报:对高滞后量、代理故障或分区不平衡进行警报
- 记录消息处理:包括相关ID以进行跟踪
- 使用分布式跟踪:与OpenTelemetry或Jaeger集成
安全性
- 启用SASL认证:使用SCRAM-SHA-256/512进行强认证
- 配置TLS加密:在客户端和代理之间传输数据时进行加密
- 使用ACLs:基于用户角色限制主题访问
- 启用审计日志:跟踪谁访问了哪些主题
- 定期旋转凭证:定期更新密码和证书
- 使用单独的集群:隔离开发、暂存和生产
灾难恢复
- 配置复制:使用复制因子≥3以实现高可用性
- 启用领导者选举:允许自动领导者故障转移
- 监控欠复制分区:对复制延迟进行警报
- 测试故障转移场景:验证自动恢复是否有效
- 备份关键主题:使用mirror-maker进行跨集群复制
- 记录恢复程序:为常见故障制定清晰的运行手册
清单
设置和配置
- [ ] 使用适当的设置配置Kafka代理集群
- [ ] 设置Zookeeper/KRaft进行集群协调
- [ ] 配置主题分区和复制因子
- [ ] 启用认证(SASL)和加密(TLS)
- [ ] 设置JMX指标和监控
生产者设置
- [ ] 使用适当的acks设置配置生产者
- [ ] 启用批处理和压缩
- [ ] 设置幂等生产者
- [ ] 配置重试和超时设置
- [ ] 实现错误处理和日志记录
消费者设置
- [ ] 配置消费者组和偏移量管理
- [ ] 设置手动偏移量提交
- [ ] 配置会话和心跳超时
- [ ] 实现重新平衡监听器
- [ ] 设置消费者滞后量监控
主题管理
- [ ] 创建具有适当分区的主题
- [ ] 配置保留策略
- [ ] 如有需要,设置压缩主题
- [ ] 配置主题ACLs
- [ ] 记录主题命名约定
消息设计
- [ ] 定义消息模式(Avro/Protobuf)
- [ ] 包含用于分区的消息键
- [ ] 添加用于元数据的消息头
- [ ] 设计为幂等性
- [ ] 如使用Avro,设置模式注册表
精确一次处理
- [ ] 配置事务性生产者
- [ ] 为消费者设置read_committed隔离
- [ ] 实现事务错误处理
- [ ] 监控事务协调器
- [ ] 测试精确一次行为
性能调优
- [ ] 调整批次大小和停留时间
- [ ] 配置压缩设置
- [ ] 调整获取大小和轮询间隔
- [ ] 扩展消费者以实现并行处理
- [ ] 监控和优化吞吐量/延迟
监控和警报
- [ ] 设置JMX指标收集
- [ ] 配置Grafana仪表板
- [ ] 设置消费者滞后量警报
- [ ] 监控代理健康指标
- [ ] 跟踪消息吞吐量和错误率
安全性
- [ ] 启用SASL认证
- [ ] 配置TLS加密
- [ ] 设置主题访问的ACLs
- [ ] 启用审计日志
- [ ] 定期旋转凭证
灾难恢复
- [ ] 配置复制因子≥3
- [ ] 设置跨集群复制
- [ ] 测试代理故障转移
- [ ] 记录恢复程序
- [ ] 设置自动备份
测试
- [ ] 在负载下测试生产者/消费者
- [ ] 验证精确一次语义
- [ ] 测试重新平衡场景
- [ ] 验证错误处理
- [ ] 测试故障转移和恢复
文档
- [ ] 记录集群架构
- [ ] 创建主题设计文档
- [ ] 记录监控和警报设置
- [ ] 创建常见问题的处理手册
- [ ] 维护API文档