
本文基于一个实际案例展开:在 Windows 上搭建 Kafka 本地环境,用 kafka-python 写生产者和消费者,并处理连接时遇到的网络线程异常。以下示例从基础用法到多线程消费、配置管理、重试监控,逐步递进。
往期阅读>>>
Python 20 个文本分析的库:效率提升 10 倍的秘密武器
Python 自动化管理Jenkins的15个实用脚本,提升效率
App2Docker:如何无需编写Dockerfile也可以创建容器镜像
Python 自动化识别Nginx配置并导出为excel文件,提升Nginx管理效率
pip install kafka-python| 概念 | 说明 |
|---|---|
| Topic | 消息的分类,生产者向 Topic 写消息,消费者从 Topic 订阅 |
| Partition | Topic 的分区,支持并行处理和水平扩展 |
| Producer | 消息生产者,负责创建和发送消息 |
| Consumer | 消息消费者,负责接收和处理消息 |
| Broker | Kafka 集群中的单个服务器节点 |
| Zookeeper | Kafka 用于集群管理和协调的分布式服务 |
用户行为追踪、日志收集等实时数据流处理
高并发消息队列,替代传统 MQ
事件溯源:记录系统状态变化的历史序列
实时 ETL:数据提取、转换、加载
微服务之间的异步通信
fromkafkaimportKafkaProducerimportjsonimporttimeclassBasicProducer:def__init__(self, bootstrap_servers='localhost:9092'):""" 初始化Kafka生产者 参数: bootstrap_servers: Kafka服务器地址,格式为'host:port' """self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambdav: json.dumps(v).encode('utf-8') )defsend_message(self, topic, message, partition=None):""" 发送消息到指定Topic 参数: topic: 目标Topic名称 message: 要发送的消息内容 partition: 可选,指定分区编号 """try:ifpartitionisnotNone:future = self.producer.send(topic, value=message, partition=partition)else:future = self.producer.send(topic, value=message)# 等待消息发送确认record_metadata = future.get(timeout=10)print(f"消息发送成功 - Topic: {record_metadata.topic}, "f"Partition: {record_metadata.partition}, "f"Offset: {record_metadata.offset}")returnTrueexceptExceptionase:print(f"消息发送失败: {str(e)}")returnFalsedefsend_batch_messages(self, topic, messages, batch_size=100):""" 批量发送消息 参数: topic: 目标Topic名称 messages: 消息列表 batch_size: 每批发送的消息数量 """foriinrange(0, len(messages), batch_size):batch = messages[i:i+batch_size]formessageinbatch:self.producer.send(topic, value=message)self.producer.flush()print(f"已发送批次 {i//batch_size + 1}: {len(batch)} 条消息")time.sleep(0.1)defclose(self):"""关闭生产者连接"""self.producer.close()# 使用示例if__name__ == "__main__":producer = BasicProducer()# 发送单条消息test_message = {"event_type": "user_login","user_id": "12345","timestamp": "2024-01-01T10:30:00Z","ip_address": "192.168.1.100" }producer.send_message("user_events", test_message)# 批量发送消息batch_messages = [ {"event_type": "page_view", "page": "/home", "timestamp": "2024-01-01T10:31:00Z"}, {"event_type": "product_click", "product_id": "P001", "timestamp": "2024-01-01T10:32:00Z"}, {"event_type": "add_to_cart", "product_id": "P001", "quantity": 2, "timestamp": "2024-01-01T10:33:00Z"} ]producer.send_batch_messages("user_events", batch_messages)producer.close()
fromkafkaimportKafkaConsumerimportjsonimportthreadingfromdatetimeimportdatetimeclassAdvancedConsumer:def__init__(self, bootstrap_servers='localhost:9092', group_id='default_group'):""" 初始化Kafka消费者 参数: bootstrap_servers: Kafka服务器地址 group_id: 消费者组ID,用于负载均衡 """self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,group_id=group_id,auto_offset_reset='earliest', # 从最早的消息开始消费enable_auto_commit=True, # 自动提交偏移量auto_commit_interval_ms=1000, # 提交间隔value_deserializer=lambdax: json.loads(x.decode('utf-8')) )self.running = Falsedefsubscribe_topics(self, topics):""" 订阅一个或多个Topic 参数: topics: Topic名称列表 """self.consumer.subscribe(topics)print(f"已订阅Topic: {topics}")defprocess_message(self, message):""" 处理接收到的消息(可重写此方法实现自定义逻辑) 参数: message: 接收到的消息对象 """# 基础处理:打印消息信息print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "f"Topic: {message.topic}, "f"Partition: {message.partition}, "f"Offset: {message.offset}")print(f"消息内容: {message.value}")print("-"*50)defstart_consuming(self, timeout_ms=1000):""" 开始消费消息 参数: timeout_ms: 每次拉取消息的超时时间(毫秒) """self.running = Trueprint("开始消费消息...")try:whileself.running:# 拉取消息message_batch = self.consumer.poll(timeout_ms=timeout_ms)fortopic_partition, messagesinmessage_batch.items():formessageinmessages:self.process_message(message)# 手动控制消费速度self.consumer.commit()exceptKeyboardInterrupt:print("接收到中断信号,停止消费...")exceptExceptionase:print(f"消费过程中发生错误: {str(e)}")finally:self.stop()defstop(self):"""停止消费"""self.running = Falseself.consumer.close()print("消费者已关闭")classMultiThreadConsumer:"""多线程消费者,用于处理高并发场景"""def__init__(self, num_threads=3):self.num_threads = num_threadsself.consumers = []self.threads = []defcreate_consumer_thread(self, thread_id, topic, bootstrap_servers='localhost:9092'):"""创建消费者线程"""defconsumer_task():consumer = KafkaConsumer(topic,bootstrap_servers=bootstrap_servers,group_id=f'consumer_group_{thread_id}',auto_offset_reset='latest',value_deserializer=lambdax: json.loads(x.decode('utf-8')) )print(f"线程 {thread_id} 开始消费Topic: {topic}")formessageinconsumer:print(f"[线程{thread_id}] 收到消息: {message.value}")# 实际应用中这里可以添加业务处理逻辑returnthreading.Thread(target=consumer_task)defstart_all(self, topic):"""启动所有消费者线程"""foriinrange(self.num_threads):thread = self.create_consumer_thread(i, topic)self.threads.append(thread)thread.start()defstop_all(self):"""停止所有消费者线程"""forthreadinself.threads:thread.join()# 使用示例if__name__ == "__main__":# 单消费者示例consumer = AdvancedConsumer(group_id='user_events_group')consumer.subscribe_topics(['user_events'])# 启动消费(在实际应用中可能需要在单独线程中运行)# consumer.start_consuming()# 多线程消费者示例mt_consumer = MultiThreadConsumer(num_threads=3)mt_consumer.start_all('user_events')# 让主线程等待一段时间importtimetime.sleep(10)mt_consumer.stop_all()
importyamlfromdataclassesimportdataclassfromtypingimportList, Optional@dataclassclassKafkaConfig:"""Kafka配置类"""bootstrap_servers: List[str]topic: strgroup_id: Optional[str] = Noneauto_offset_reset: str = 'earliest'enable_auto_commit: bool = Trueauto_commit_interval_ms: int = 1000max_poll_records: int = 500session_timeout_ms: int = 10000request_timeout_ms: int = 30000classConfigManager:"""配置文件管理器"""@staticmethoddefload_config(config_file='kafka_config.yaml'):"""从YAML文件加载配置"""withopen(config_file, 'r', encoding='utf-8') asf:config_data = yaml.safe_load(f)returnKafkaConfig(**config_data['kafka'])@staticmethoddefcreate_default_config():"""创建默认配置文件"""default_config = {'kafka': {'bootstrap_servers': ['localhost:9092'],'topic': 'default_topic','group_id': 'default_group','auto_offset_reset': 'earliest','enable_auto_commit': True,'auto_commit_interval_ms': 1000,'max_poll_records': 500,'session_timeout_ms': 10000,'request_timeout_ms': 30000 } }withopen('kafka_config.yaml', 'w', encoding='utf-8') asf:yaml.dump(default_config, f, default_flow_style=False)print("默认配置文件已创建: kafka_config.yaml")# 使用示例if__name__ == "__main__":# 创建默认配置文件ConfigManager.create_default_config()# 加载配置config = ConfigManager.load_config()print(f"已加载配置: {config}")
importloggingfromkafkaimportKafkaProducer, KafkaConsumerfromkafka.errorsimportKafkaErrorimporttimefromfunctoolsimportwrapsclassKafkaMonitor:"""Kafka监控类"""def__init__(self):self.logger = self.setup_logger()self.metrics = {'messages_sent': 0,'messages_received': 0,'errors': 0,'last_error': None }defsetup_logger(self):"""设置日志记录器"""logger = logging.getLogger('kafka_monitor')logger.setLevel(logging.INFO)# 文件处理器file_handler = logging.FileHandler('kafka_monitor.log')file_handler.setLevel(logging.INFO)# 控制台处理器console_handler = logging.StreamHandler()console_handler.setLevel(logging.WARNING)# 格式化器formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s' )file_handler.setFormatter(formatter)console_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(console_handler)returnloggerdeflog_message_sent(self, topic, partition, offset):"""记录消息发送"""self.metrics['messages_sent'] += 1self.logger.info(f"消息发送成功 - Topic: {topic}, "f"Partition: {partition}, Offset: {offset}")deflog_message_received(self, topic, partition, offset):"""记录消息接收"""self.metrics['messages_received'] += 1self.logger.debug(f"消息接收成功 - Topic: {topic}, "f"Partition: {partition}, Offset: {offset}")deflog_error(self, error, context=None):"""记录错误"""self.metrics['errors'] += 1self.metrics['last_error'] = {'error': str(error),'context': context,'timestamp': time.time() }self.logger.error(f"Kafka错误: {str(error)} - 上下文: {context}")defget_metrics(self):"""获取监控指标"""returnself.metrics.copy()defretry_on_failure(max_retries=3, delay=1):"""重试装饰器"""defdecorator(func):@wraps(func)defwrapper(*args, **kwargs):retries = 0whileretries<max_retries:try:returnfunc(*args, **kwargs)exceptKafkaErrorase:retries += 1ifretries == max_retries:raisetime.sleep(delay*retries)exceptExceptionase:raisereturnwrapperreturndecoratorclassResilientProducer:"""具有重试机制的Kafka生产者"""def__init__(self, bootstrap_servers='localhost:9092', monitor=None):self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers,acks='all', # 确保消息被所有副本接收retries=5, # 重试次数max_in_flight_requests_per_connection=1 )self.monitor = monitororKafkaMonitor()@retry_on_failure(max_retries=3, delay=2)defsend_with_retry(self, topic, message, key=None):"""带重试机制的消息发送"""try:ifkey:future = self.producer.send(topic, key=key.encode(), value=message)else:future = self.producer.send(topic, value=message)record_metadata = future.get(timeout=10)self.monitor.log_message_sent(record_metadata.topic,record_metadata.partition,record_metadata.offset )returnrecord_metadataexceptKafkaErrorase:self.monitor.log_error(e, context={'topic': topic, 'message': message})raisedefclose(self):self.producer.close()print("生产者已关闭,监控指标:")print(self.monitor.get_metrics())# 使用示例if__name__ == "__main__":monitor = KafkaMonitor()producer = ResilientProducer(monitor=monitor)try:# 发送测试消息foriinrange(5):message = f"测试消息 {i}"producer.send_with_retry('test_topic', message.encode())time.sleep(0.5)finally:producer.close()
四个示例覆盖了日常开发中最常见的用法:单条/批量发送、多线程消费、YAML 配置管理、重试与日志监控。实际接入时,重点关注 group_id 和 auto_offset_reset 的设置,这两个参数直接影响消费行为,踩坑概率较高。
