KafkaStreams kafka-streams

Kafka Streams是一个用于构建实时流处理应用程序的库,它提供了一个高级的抽象,允许开发者轻松地表达处理逻辑,同时确保数据的容错性和可扩展性。

数据工程 0 次安装 0 次浏览 更新于 3/5/2026

Kafka Streams

概览

全面指南,介绍Apache Kafka模式和Kafka Streams,用于实时数据处理。

目录

  1. Kafka概念
  2. 生产者
  3. 消费者
  4. 主题和分区
  5. 消费者组
  6. Kafka Streams
  7. 错误处理
  8. 精确一次语义
  9. 性能调优
  10. 监控

Kafka概念

核心概念

# Kafka核心概念
"""
- 代理:存储和提供数据的Kafka服务器
- 主题:发布记录的类别/订阅源名称
- 分区:主题被分割成分区以实现并行处理
- 生产者:发布记录到主题的应用程序
- 消费者:订阅主题并处理记录的应用程序
- 消费者组:协同工作以消费主题的消费者组
- 偏移量:分区内每条记录的唯一标识符
- 提交:存储消费者的当前偏移量
"""

基本设置(Python)

# kafka_config.py
from dataclasses import dataclass
from typing import Optional

@dataclass
class KafkaConfig:
    bootstrap_servers: str = 'localhost:9092'
    group_id: str = 'default-group'
    auto_offset_reset: str = 'earliest'
    enable_auto_commit: bool = True
    auto_commit_interval_ms: int = 5000
    session_timeout_ms: int = 30000
    heartbeat_interval_ms: int = 3000
    max_poll_records: int = 500
    max_poll_interval_ms: int = 300000

# 默认配置
DEFAULT_CONFIG = KafkaConfig()

# 生产配置
PRODUCTION_CONFIG = KafkaConfig(
    bootstrap_servers='kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092',
    group_id='production-group',
    enable_auto_commit=False,  # 手动提交以提高可靠性
    max_poll_records=100,
    max_poll_interval_ms=600000
)

生产者

基本生产者

# producer.py
from kafka import KafkaProducer
import json
import logging

class KafkaMessageProducer:
    def __init__(self, config: KafkaConfig):
        self.producer = KafkaProducer(
            bootstrap_servers=config.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: str(k).encode('utf-8') if k else None,
            acks='all',  # 等待所有副本
            retries=3,
            compression_type='gzip',
            batch_size=16384,
            linger_ms=10
        )
    
    def send(
        self,
        topic: str,
        value: dict,
        key: str = None,
        headers: dict = None
    ) -> None:
        """向Kafka主题发送消息"""
        try:
            future = self.producer.send(
                topic,
                value=value,
                key=key,
                headers=[(k, v.encode()) for k, v in (headers or {}).items()]
            )
            
            # 等待送达报告
            record_metadata = future.get(timeout=10)
            logging.info(
                f"消息发送到 {record_metadata.topic} "
                f"[分区: {record_metadata.partition}, 偏移量: {record_metadata.offset}]"
            )
        except Exception as e:
            logging.error(f"发送消息失败:{e}")
            raise
    
    def send_async(
        self,
        topic: str,
        value: dict,
        key: str = None,
        callback: callable = None
    ) -> None:
        """异步发送消息"""
        def delivery_report(error, record_metadata):
            if error:
                logging.error(f"消息送达失败:{error}")
            else:
                logging.info(f"消息送达:{record_metadata}")
                if callback:
                    callback(record_metadata)
        
        self.producer.send(
            topic,
            value=value,
            key=key
        ).add_callback(delivery_report)
    
    def flush(self) -> None:
        """刷新所有待处理消息"""
        self.producer.flush()
    
    def close(self) -> None:
        """关闭生产者"""
        self.producer.close()

# 使用方法
producer = KafkaMessageProducer(DEFAULT_CONFIG)
producer.send('events', {'event': 'user_login', 'user_id': 123}, key='user_123')
producer.flush()
producer.close()

事务性生产者

# transactional_producer.py
from kafka import KafkaProducer
import json

class TransactionalProducer:
    def __init__(self, config: KafkaConfig, transactional_id: str):
        self.producer = KafkaProducer(
            bootstrap_servers=config.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            transactional_id=transactional_id,
            enable_idempotence=True
        )
        # 初始化事务
        self.producer.init_transactions()
    
    def send_in_transaction(
        self,
        messages: list,
        input_topic: str,
        output_topic: str
    ) -> None:
        """在事务中发送多条消息"""
        try:
            # 开始事务
            self.producer.begin_transaction()
            
            # 从输入主题消费(用于处理)
            # 处理消息
            # 发送到输出主题
            for message in messages:
                self.producer.send(output_topic, value=message)
            
            # 提交事务
            self.producer.commit_transaction()
        except Exception as e:
            # 出错时中止事务
            self.producer.abort_transaction()
            raise
    
    def close(self) -> None:
        self.producer.close()

消费者

基本消费者

# consumer.py
from kafka import KafkaConsumer
import json
import logging

class KafkaMessageConsumer:
    def __init__(self, config: KafkaConfig, topics: list):
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=config.bootstrap_servers,
            group_id=config.group_id,
            auto_offset_reset=config.auto_offset_reset,
            enable_auto_commit=config.enable_auto_commit,
            auto_commit_interval_ms=config.auto_commit_interval_ms,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda m: m.decode('utf-8') if m else None,
            session_timeout_ms=config.session_timeout_ms,
            heartbeat_interval_ms=config.heartbeat_interval_ms,
            max_poll_records=config.max_poll_records,
            max_poll_interval_ms=config.max_poll_interval_ms
        )
    
    def consume(self, handler: callable) -> None:
        """消费消息并使用处理器处理"""
        try:
            for message in self.consumer:
                try:
                    # 处理消息
                    result = handler(message)
                    
                    # 如果自动提交被禁用,则手动提交
                    if not self.consumer.config['enable_auto_commit']:
                        self.consumer.commit()
                    
                    logging.info(f"处理消息:{message.offset}")
                except Exception as e:
                    logging.error(f"处理消息出错:{e}")
                    # 跳过问题消息
                    if not self.consumer.config['enable_auto_commit']:
                        self.consumer.commit()
        except KeyboardInterrupt:
            logging.info("停止消费者...")
        finally:
            self.close()
    
    def consume_batch(self, handler: callable, batch_size: int = 10) -> None:
        """批量消费消息"""
        batch = []
        
        try:
            for message in self.consumer:
                batch.append(message)
                
                if len(batch) >= batch_size:
                    # 处理批次
                    handler(batch)
                    
                    # 提交批次
                    self.consumer.commit()
                    batch = []
        finally:
            # 处理剩余消息
            if batch:
                handler(batch)
                self.consumer.commit()
            self.close()
    
    def seek_to_beginning(self, topic: str, partition: int) -> None:
        """定位到分区的开头"""
        self.consumer.seek_to_beginning({partition: topic})
    
    def seek_to_end(self, topic: str, partition: int) -> None:
        """定位到分区的末尾"""
        self.consumer.seek_to_end({partition: topic})
    
    def close(self) -> None:
        """关闭消费者"""
        self.consumer.close()

# 使用方法
def message_handler(message):
    print(f"收到:{message.value}")
    return True

consumer = KafkaMessageConsumer(DEFAULT_CONFIG, ['events'])
consumer.consume(message_handler)

重新平衡监听器

# rebalancing_consumer.py
from kafka import KafkaConsumer
from kafka.coordinator.assignor import RoundRobinPartitionAssignor
import logging

class RebalancingConsumer(KafkaMessageConsumer):
    def __init__(self, config: KafkaConfig, topics: list):
        super().__init__(config, topics)
        self.consumer.subscribe(
            topics,
            listener=self.RebalanceListener(self)
        )
    
    class RebalanceListener:
        def __init__(self, outer):
            self.outer = outer
        
        def on_partitions_revoked(self, revoked):
            """分区重新平衡前调用"""
            logging.info(f"分区被撤销:{revoked}")
            # 在失去分区前提交偏移量
            self.outer.consumer.commit()
        
        def on_partitions_assigned(self, assigned):
            """分区重新平衡后调用"""
            logging.info(f"分区被分配:{assigned}")

主题和分区

主题管理

# topic_management.py
from kafka import KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType, NewTopic
import logging

class TopicManager:
    def __init__(self, config: KafkaConfig):
        self.admin_client = KafkaAdminClient(
            bootstrap_servers=config.bootstrap_servers
        )
    
    def create_topic(
        self,
        topic_name: str,
        num_partitions: int = 3,
        replication_factor: int = 1,
        config: dict = None
    ) -> None:
        """创建新主题"""
        topic = NewTopic(
            name=topic_name,
            num_partitions=num_partitions,
            replication_factor=replication_factor
        )
        
        if config:
            topic.topic_configs = config
        
        try:
            future = self.admin_client.create_topics([topic])
            for topic_name, future_result in future.items():
                try:
                    future_result.result()
                    logging.info(f"主题 {topic_name} 创建成功")
                except Exception as e:
                    logging.error(f"创建主题 {topic_name} 失败:{e}")
        except Exception as e:
            logging.error(f"创建主题出错:{e}")
    
    def delete_topic(self, topic_name: str) -> None:
        """删除主题"""
        try:
            future = self.admin_client.delete_topics([topic_name])
            for topic, future_result in future.items():
                future_result.result()
                logging.info(f"主题 {topic} 删除成功")
        except Exception as e:
            logging.error(f"删除主题出错:{e}")
    
    def list_topics(self) -> list:
        """列出所有主题"""
        return list(self.admin_client.list_topics().keys())
    
    def describe_topic(self, topic_name: str) -> dict:
        """获取主题元数据"""
        metadata = self.admin_client.describe_topics([topic_name])
        return metadata[topic_name]
    
    def alter_topic_config(self, topic_name: str, config: dict) -> None:
        """更改主题配置"""
        resource = ConfigResource(ConfigResourceType.TOPIC, topic_name)
        future = self.admin_client.alter_configs({
            resource: config
        })
        
        for resource, future_result in future.items():
            future_result.result()
    
    def close(self) -> None:
        self.admin_client.close()

# 使用方法
topic_manager = TopicManager(DEFAULT_CONFIG)

# 创建带有保留策略的主题
topic_manager.create_topic(
    'user-events',
    num_partitions=6,
    replication_factor=3,
    config={
        'retention.ms': '604800000',  # 7天
        'segment.ms': '86400000',      # 1天
        'cleanup.policy': 'delete'
    }
)

分区管理

# partition_management.py
from kafka import KafkaConsumer
import logging

class PartitionManager:
    def __init__(self, config: KafkaConfig):
        self.config = config
    
    def get_partition_count(self, topic: str) -> int:
        """获取主题的分区数量"""
        consumer = KafkaConsumer(
            bootstrap_servers=self.config.bootstrap_servers
        )
        partitions = consumer.partitions_for_topic(topic)
        consumer.close()
        return len(partitions) if partitions else 0
    
    def get_partition_info(self, topic: str) -> dict:
        """获取主题分区信息"""
        consumer = KafkaConsumer(
            bootstrap_servers=self.config.bootstrap_servers
        )
        partitions = consumer.partitions_for_topic(topic)
        
        info = {}
        for partition in partitions:
            tp = (topic, partition)
            beginning_offset = consumer.beginning_offsets([tp])[tp]
            end_offset = consumer.end_offsets([tp])[tp]
            
            info[partition] = {
                'beginning_offset': beginning_offset,
                'end_offset': end_offset,
                'message_count': end_offset - beginning_offset
            }
        
        consumer.close()
        return info
    
    def get_consumer_offsets(self, topic: str, group_id: str) -> dict:
        """获取消费者组的偏移量"""
        from kafka import KafkaAdminClient
        
        admin_client = KafkaAdminClient(
            bootstrap_servers=self.config.bootstrap_servers
        )
        
        offsets = admin_client.list_consumer_group_offsets(group_id)
        consumer_offsets = {}
        
        for tp, offset_info in offsets.items():
            if tp.topic == topic:
                consumer_offsets[tp.partition] = {
                    'offset': offset_info.offset,
                    'metadata': offset_info.metadata
                }
        
        admin_client.close()
        return consumer_offsets

# 使用方法
partition_manager = PartitionManager(DEFAULT_CONFIG)
print(partition_manager.get_partition_info('user-events'))

消费者组

消费者组管理

# consumer_group.py
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
import logging

class ConsumerGroupManager:
    def __init__(self, config: KafkaConfig):
        self.admin_client = KafkaAdminClient(
            bootstrap_servers=config.bootstrap_servers
        )
    
    def list_consumer_groups(self) -> list:
        """列出所有消费者组"""
        return list(self.admin_client.list_consumer_groups().keys())
    
    def describe_consumer_group(self, group_id: str) -> dict:
        """描述消费者组详情"""
        return self.admin_client.describe_consumer_groups([group_id])[group_id]
    
    def delete_consumer_group(self, group_id: str) -> None:
        """删除消费者组"""
        try:
            self.admin_client.delete_consumer_groups([group_id])
            logging.info(f"消费者组 {group_id} 已删除")
        except Exception as e:
            logging.error(f"删除消费者组出错:{e}")
    
    def reset_consumer_group_offset(
        self,
        group_id: str,
        topic: str,
        partition: int,
        new_offset: int
    ) -> None:
        """将消费者组偏移量重置到特定位置"""
        from kafka import KafkaConsumer
        
        consumer = KafkaConsumer(
            bootstrap_servers=self.config.bootstrap_servers,
            group_id=group_id,
            enable_auto_commit=False
        )
        
        tp = (topic, partition)
        consumer.seek(tp, new_offset)
        consumer.commit()
        consumer.close()
    
    def reset_to_earliest(self, group_id: str, topic: str) -> None:
        """将消费者组重置到最早偏移量"""
        from kafka import KafkaConsumer
        
        consumer = KafkaConsumer(
            bootstrap_servers=self.config.bootstrap_servers,
            group_id=group_id,
            enable_auto_commit=False
        )
        
        partitions = consumer.partitions_for_topic(topic)
        for partition in partitions:
            tp = (topic, partition)
            beginning_offset = consumer.beginning_offsets([tp])[tp]
            consumer.seek(tp, beginning_offset)
        
        consumer.commit()
        consumer.close()
    
    def close(self) -> None:
        self.admin_client.close()

Kafka Streams

使用Python进行流处理

# kafka_streams.py
from kafka import KafkaConsumer, KafkaProducer
import json
import threading
import logging

class KafkaStreamProcessor:
    def __init__(
        self,
        input_topic: str,
        output_topic: str,
        config: KafkaConfig
    ):
        self.input_topic = input_topic
        self.output_topic = output_topic
        self.config = config
        
        self.consumer = KafkaConsumer(
            input_topic,
            bootstrap_servers=config.bootstrap_servers,
            group_id=f"{config.group_id}-processor",
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
        self.producer = KafkaProducer(
            bootstrap_servers=config.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
    
    def process_stream(self, processor: callable) -> None:
        """处理消息流"""
        try:
            for message in self.consumer:
                try:
                    # 处理消息
                    result = processor(message.value)
                    
                    # 发送到输出主题
                    self.producer.send(self.output_topic, value=result)
                    
                    # 提交偏移量
                    self.consumer.commit()
                    
                    logging.info(f"处理消息:{message.offset}")
                except Exception as e:
                    logging.error(f"处理消息出错:{e}")
        except KeyboardInterrupt:
            logging.info("停止流处理器...")
        finally:
            self.close()
    
    def aggregate_stream(
        self,
        window_size_ms: int,
        processor: callable
    ) -> None:
        """在时间窗口内聚合消息"""
        window = []
        last_flush_time = None
        
        try:
            for message in self.consumer:
                try:
                    window.append(message.value)
                    
                    current_time = message.timestamp
                    if last_flush_time is None:
                        last_flush_time = current_time
                    
                    # 如果时间超过,则刷新窗口
                    if current_time - last_flush_time >= window_size_ms:
                        # 处理窗口
                        result = processor(window)
                        self.producer.send(self.output_topic, value=result)
                        
                        # 清空窗口
                        window = []
                        last_flush_time = current_time
                        
                        # 提交偏移量
                        self.consumer.commit()
                except Exception as e:
                    logging.error(f"聚合错误:{e}")
        finally:
            # 刷新剩余消息
            if window:
                result = processor(window)
                self.producer.send(self.output_topic, value=result)
            self.close()
    
    def close(self) -> None:
        """关闭消费者和生产者"""
        self.consumer.close()
        self.producer.close()

# 示例处理器
def uppercase_processor(message):
    """将文本转换为大写"""
    message['text'] = message.get('text', '').upper()
    return message

def count_aggregator(messages):
    """计算窗口内的消息数量"""
    return {
        'count': len(messages),
        'window_start': messages[0].get('timestamp'),
        'window_end': messages[-1].get('timestamp')
    }

# 使用方法
processor = KafkaStreamProcessor('input-topic', 'output-topic', DEFAULT_CONFIG)
processor.process_stream(uppercase_processor)

错误处理

错误处理策略

# error_handling.py
from kafka import KafkaConsumer, KafkaProducer
import json
import logging
import time

class ResilientConsumer:
    def __init__(self, config: KafkaConfig, topics: list):
        self.config = config
        self.topics = topics
        self.max_retries = 3
        self.retry_delay = 1000  # ms
    
    def consume_with_retry(self, handler: callable) -> None:
        """带重试逻辑的消费"""
        retry_count = 0
        
        while retry_count < self.max_retries:
            try:
                consumer = KafkaConsumer(
                    *self.topics,
                    bootstrap_servers=self.config.bootstrap_servers,
                    group_id=self.config.group_id,
                    auto_offset_reset=self.config.auto_offset_reset,
                    enable_auto_commit=False,
                    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
                )
                
                for message in consumer:
                    try:
                        handler(message)
                        consumer.commit()
                        retry_count = 0  # 成功后重置
                    except Exception as e:
                        logging.error(f"处理错误:{e}")
                        # 不提交,消息将重新传递
                
                break
                
            except Exception as e:
                retry_count += 1
                logging.error(f"消费者错误(尝试 {retry_count}):{e}")
                
                if retry_count < self.max_retries:
                    time.sleep(self.retry_delay * retry_count)
                else:
                    logging.error("超过最大重试次数,放弃")
                    raise

class DeadLetterProducer:
    def __init__(self, config: KafkaConfig, dlq_topic: str):
        self.producer = KafkaProducer(
            bootstrap_servers=config.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.dlq_topic = dlq_topic
    
    def send_to_dlq(self, original_message: dict, error: Exception) -> None:
        """将失败的消息发送到死信队列"""
        dlq_message = {
            'original_message': original_message,
            'error': str(error),
            'timestamp': time.time() * 1000
        }
        self.producer.send(self.dlq_topic, value=dlq_message)
        self.producer.flush()

精确一次语义

精确一次处理

# exactly_once.py
from kafka import KafkaProducer
import json

class ExactlyOnceProcessor:
    def __init__(
        self,
        config: KafkaConfig,
        input_topic: str,
        output_topic: str,
        transactional_id: str
    ):
        self.input_topic = input_topic
        self.output_topic = output_topic
        
        # 为精确一次配置生产者
        self.producer = KafkaProducer(
            bootstrap_servers=config.bootstrap_servers,
            transactional_id=transactional_id,
            enable_idempotence=True,
            acks='all',
            max_in_flight_requests_per_connection=1,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        # 初始化事务
        self.producer.init_transactions()
    
    def process_exactly_once(self, messages: list, processor: callable) -> None:
        """使用精确一次语义处理消息"""
        try:
            # 开始事务
            self.producer.begin_transaction()
            
            # 处理消息
            for message in messages:
                result = processor(message)
                self.producer.send(self.output_topic, value=result)
            
            # 提交事务
            self.producer.commit_transaction()
        except Exception as e:
            # 出错时中止事务
            self.producer.abort_transaction()
            raise
    
    def close(self) -> None:
        self.producer.close()

性能调优

生产者调优

# producer_tuning.py
from kafka import KafkaProducer
import json

class TunedProducer:
    def __init__(self, config: KafkaConfig):
        self.producer = KafkaProducer(
            bootstrap_servers=config.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            
            # 批处理
            batch_size=32768,        # 32 KB
            linger_ms=10,              # 等待批处理最多10ms
            
            # 压缩
            compression_type='lz4',   # LZ4平衡速度和压缩
            
            # 可靠性
            acks='all',               # 等待所有副本
            retries=3,
            max_in_flight_requests_per_connection=5,
            
            # 缓冲
            buffer_memory=67108864,   # 64 MB缓冲区
            
            # 幂等性
            enable_idempotence=True,
            
            # 超时
            request_timeout_ms=30000,
            metadata_fetch_timeout_ms=5000
        )

消费者调优

# consumer_tuning.py
from kafka import KafkaConsumer
import json

class TunedConsumer:
    def __init__(self, config: KafkaConfig, topics: list):
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=config.bootstrap_servers,
            group_id=config.group_id,
            
            # 获取设置
            fetch_min_bytes=1,
            fetch_max_wait_ms=500,
            fetch_max_bytes=52428800,   # 50 MB
            
            # 会话设置
            session_timeout_ms=30000,
            heartbeat_interval_ms=3000,
            max_poll_interval_ms=300000,
            
            # 轮询设置
            max_poll_records=500,
            max_partition_fetch_bytes=1048576,  # 1 MB
            
            # 偏移量管理
            enable_auto_commit=False,
            auto_offset_reset='earliest',
            
            # 网络
            connections_max_idle_ms=540000,
            
            # 反序列化
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )

监控

指标收集

# monitoring.py
from kafka import KafkaConsumer
import time

class KafkaMonitor:
    def __init__(self, config: KafkaConfig):
        self.config = config
    
    def get_consumer_lag(self, topic: str, group_id: str) -> dict:
        """获取消费者的滞后量"""
        consumer = KafkaConsumer(
            bootstrap_servers=self.config.bootstrap_servers,
            group_id=group_id,
            enable_auto_commit=False
        )
        
        partitions = consumer.partitions_for_topic(topic)
        lag_info = {}
        
        for partition in partitions:
            tp = (topic, partition)
            
            # 获取消费者偏移量
            committed = consumer.committed(tp)
            
            # 获取最新偏移量
            end_offset = consumer.end_offsets([tp])[tp]
            
            if committed is not None:
                lag = end_offset - committed
                lag_info[partition] = {
                    'consumer_offset': committed,
                    'end_offset': end_offset,
                    'lag': lag
                }
        
        consumer.close()
        return lag_info
    
    def get_topic_metrics(self, topic: str) -> dict:
        """获取主题指标"""
        consumer = KafkaConsumer(
            bootstrap_servers=self.config.bootstrap_servers
        )
        
        partitions = consumer.partitions_for_topic(topic)
        metrics = {
            'partition_count': len(partitions),
            'partitions': {}
        }
        
        for partition in partitions:
            tp = (topic, partition)
            beginning = consumer.beginning_offsets([tp])[tp]
            end = consumer.end_offsets([tp])[tp]
            
            metrics['partitions'][partition] = {
                'beginning_offset': beginning,
                'end_offset': end,
                'message_count': end - beginning
            }
        
        consumer.close()
        return metrics

# 使用方法
monitor = KafkaMonitor(DEFAULT_CONFIG)
print(monitor.get_consumer_lag('user-events', 'consumer-group-1'))

额外资源

最佳实践

生产者配置

  • 使用适当的acks设置:关键数据使用acks=all,高吞吐量使用acks=1
  • 配置批处理:使用batch_sizelinger_ms提高吞吐量
  • 启用压缩:使用lz4gzip减少网络带宽
  • 设置合理的超时:配置request_timeout_msdelivery.timeout.ms
  • 使用幂等生产者:防止重试时产生重复消息
  • 实现错误处理:优雅地处理网络错误和代理故障

消费者配置

  • 使用手动偏移量提交:禁用enable_auto_commit以获得更好的控制
  • 设置适当的session.timeout.ms:在故障检测和处理时间之间取得平衡
  • 配置max.poll.records:限制批次大小以实现可预测的处理
  • 使用消费者组:启用并行处理和负载均衡
  • 实现重新平衡监听器:优雅地处理分区重新分配
  • 监控消费者滞后量:跟踪偏移量滞后量以检测处理延迟

主题设计

  • 选择合适的分区数量:更多分区=更多并行性但更多开销
  • 设置复制因子:生产环境至少为3
  • 配置保留策略:根据数据新鲜度要求设置retention.ms
  • 使用压缩主题:最新值语义(例如,用户配置文件)
  • 规划分区增长:Kafka不支持减少分区
  • 使用描述性主题名称:遵循命名约定以清晰

消息设计

  • 使用小消息大小:首选<1MB消息以获得最佳性能
  • 包含消息键:用于分区和排序保证
  • 添加消息头:包括元数据以实现跟踪和路由
  • 使用一致的模式:考虑使用Avro/Protobuf进行模式演进
  • 包含时间戳:使用Kafka时间戳或自定义时间戳
  • 设计为幂等性:消息可能会被多次传递

精确一次语义

  • 使用事务性生产者:用于精确一次写入语义
  • 配置read_committed隔离:消费者只看到已提交的事务
  • 使用幂等生产者:防止重复消息生产
  • 监控事务协调器:跟踪事务状态和超时
  • 处理事务失败:实现适当的回滚逻辑
  • 测试故障场景:验证故障下的精确一次行为

性能调优

  • 调整批次大小:更大的批次提高吞吐量但增加延迟
  • 调整停留时间:在批处理和延迟之间取得平衡
  • 使用适当的压缩lz4速度,gzip大小
  • 配置获取大小:将fetch.max.bytes与消息大小匹配
  • 使用多个消费者:扩展消费者组以实现并行处理
  • 监控代理指标:跟踪吞吐量、延迟和错误率

监控和可观测性

  • 跟踪消费者滞后量:每个分区监控偏移量滞后量
  • 监控代理健康:跟踪CPU、内存、磁盘I/O和网络
  • 收集JMX指标:使用Prometheus/JMX导出器进行指标收集
  • 设置警报:对高滞后量、代理故障或分区不平衡进行警报
  • 记录消息处理:包括相关ID以进行跟踪
  • 使用分布式跟踪:与OpenTelemetry或Jaeger集成

安全性

  • 启用SASL认证:使用SCRAM-SHA-256/512进行强认证
  • 配置TLS加密:在客户端和代理之间传输数据时进行加密
  • 使用ACLs:基于用户角色限制主题访问
  • 启用审计日志:跟踪谁访问了哪些主题
  • 定期旋转凭证:定期更新密码和证书
  • 使用单独的集群:隔离开发、暂存和生产

灾难恢复

  • 配置复制:使用复制因子≥3以实现高可用性
  • 启用领导者选举:允许自动领导者故障转移
  • 监控欠复制分区:对复制延迟进行警报
  • 测试故障转移场景:验证自动恢复是否有效
  • 备份关键主题:使用mirror-maker进行跨集群复制
  • 记录恢复程序:为常见故障制定清晰的运行手册

清单

设置和配置

  • [ ] 使用适当的设置配置Kafka代理集群
  • [ ] 设置Zookeeper/KRaft进行集群协调
  • [ ] 配置主题分区和复制因子
  • [ ] 启用认证(SASL)和加密(TLS)
  • [ ] 设置JMX指标和监控

生产者设置

  • [ ] 使用适当的acks设置配置生产者
  • [ ] 启用批处理和压缩
  • [ ] 设置幂等生产者
  • [ ] 配置重试和超时设置
  • [ ] 实现错误处理和日志记录

消费者设置

  • [ ] 配置消费者组和偏移量管理
  • [ ] 设置手动偏移量提交
  • [ ] 配置会话和心跳超时
  • [ ] 实现重新平衡监听器
  • [ ] 设置消费者滞后量监控

主题管理

  • [ ] 创建具有适当分区的主题
  • [ ] 配置保留策略
  • [ ] 如有需要,设置压缩主题
  • [ ] 配置主题ACLs
  • [ ] 记录主题命名约定

消息设计

  • [ ] 定义消息模式(Avro/Protobuf)
  • [ ] 包含用于分区的消息键
  • [ ] 添加用于元数据的消息头
  • [ ] 设计为幂等性
  • [ ] 如使用Avro,设置模式注册表

精确一次处理

  • [ ] 配置事务性生产者
  • [ ] 为消费者设置read_committed隔离
  • [ ] 实现事务错误处理
  • [ ] 监控事务协调器
  • [ ] 测试精确一次行为

性能调优

  • [ ] 调整批次大小和停留时间
  • [ ] 配置压缩设置
  • [ ] 调整获取大小和轮询间隔
  • [ ] 扩展消费者以实现并行处理
  • [ ] 监控和优化吞吐量/延迟

监控和警报

  • [ ] 设置JMX指标收集
  • [ ] 配置Grafana仪表板
  • [ ] 设置消费者滞后量警报
  • [ ] 监控代理健康指标
  • [ ] 跟踪消息吞吐量和错误率

安全性

  • [ ] 启用SASL认证
  • [ ] 配置TLS加密
  • [ ] 设置主题访问的ACLs
  • [ ] 启用审计日志
  • [ ] 定期旋转凭证

灾难恢复

  • [ ] 配置复制因子≥3
  • [ ] 设置跨集群复制
  • [ ] 测试代理故障转移
  • [ ] 记录恢复程序
  • [ ] 设置自动备份

测试

  • [ ] 在负载下测试生产者/消费者
  • [ ] 验证精确一次语义
  • [ ] 测试重新平衡场景
  • [ ] 验证错误处理
  • [ ] 测试故障转移和恢复

文档

  • [ ] 记录集群架构
  • [ ] 创建主题设计文档
  • [ ] 记录监控和警报设置
  • [ ] 创建常见问题的处理手册
  • [ ] 维护API文档