Docker中启动kafka集群(Kraft)compose方式 并通过本地访问 mkdir docker-kafka-cluster vim docker-compose.yml文件配置services: kafka1: image: apache/kafka:3.8.0 container_name: kafka1 hostname: kafka1 ports: - 9092:9092 # PLAINTEXT容器间通信 - 9095:9095 # EXTERNAL宿主机访问 environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller # 三个不同的端口 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9095 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,EXTERNAL://localhost:9095 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_QUORUM_VOTERS: 1kafka1:9093,2kafka2:9093,3kafka3:9093 KAFKA_CLUSTER_ID: kraft-cluster-1 kafka2: image: apache/kafka:3.8.0 container_name: kafka2 hostname: kafka2 ports: - 9093:9092 # PLAINTEXT容器间通信 - 9096:9095 # EXTERNAL宿主机访问 environment: KAFKA_NODE_ID: 2 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9095 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092,EXTERNAL://localhost:9096 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_QUORUM_VOTERS: 1kafka1:9093,2kafka2:9093,3kafka3:9093 KAFKA_CLUSTER_ID: kraft-cluster-1 kafka3: image: apache/kafka:3.8.0 container_name: kafka3 hostname: kafka3 ports: - 9094:9092 # PLAINTEXT容器间通信 - 9097:9095 # EXTERNAL宿主机访问 environment: KAFKA_NODE_ID: 3 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9095 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092,EXTERNAL://localhost:9097 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_QUORUM_VOTERS: 1kafka1:9093,2kafka2:9093,3kafka3:9093 KAFKA_CLUSTER_ID: kraft-cluster-1通过py脚本验证# test_kafka_connection.py from kafka import KafkaProducer from kafka.errors import KafkaError import json import socket # 配置 # ✅ 修正所有节点内部都用 9092 端口 KAFKA_BOOTSTRAP_SERVERS localhost:9095,localhost:9096,localhost:9097 KAFKA_TOPIC collect_reading_data_topic # 测试连接 def test_connection(): print(f 正在连接 Kafka: {KAFKA_BOOTSTRAP_SERVERS}) print(f 目标主题: {KAFKA_TOPIC}) print(- * 50) producer None try: producer KafkaProducer( bootstrap_serversKAFKA_BOOTSTRAP_SERVERS, value_serializerlambda v: json.dumps(v).encode(utf-8), request_timeout_ms5000, max_block_ms5000, ) print(✅ Producer 创建成功) print(⏳ 正在获取集群元数据...) partitions producer.partitions_for(KAFKA_TOPIC) print(f✅ 连接成功主题 [{KAFKA_TOPIC}] 有 {len(partitions)} 个分区) print(f 分区列表: {partitions}) test_data {test: connection, timestamp: 1234567890} future producer.send(KAFKA_TOPIC, keybtest-key, valuetest_data) result future.get(timeout5) print(f✅ 测试消息发送成功) print(f Topic: {result.topic}) print(f Partition: {result.partition}) print(f Offset: {result.offset}) return True except KafkaError as e: print(f❌ Kafka 错误: {e}) print(f 错误类型: {type(e).__name__}) if NoBrokersAvailable in str(e) or NoBrokers in str(e): print(\n 可能原因:) print( 1. Kafka 服务未启动) print( 2. advertised.listeners 配置不正确) print( 3. 端口号配置错误容器内部都使用 9092) return False except Exception as e: print(f❌ 连接失败: {e}) print(f 错误类型: {type(e).__name__}) if timeout in str(e).lower(): print(\n 可能是 advertised.listeners 配置问题) print( 检查: docker exec -it kafka1 env | grep ADVERTISED_LISTENERS) import traceback traceback.print_exc() return False finally: if producer: producer.close() print( Producer 已关闭) # 快速诊断 def quick_diagnose(): 快速诊断检查端口是否可达 print(\n 快速诊断:) print(- * 50) # 检查宿主机端口映射 for port in [9092, 9093, 9094]: sock socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2) try: result sock.connect_ex((localhost, port)) if result 0: print(f ✅ localhost:{port} 端口可达) else: print(f ❌ localhost:{port} 端口不可达 (错误码: {result})) except Exception as e: print(f ❌ localhost:{port} 连接异常: {e}) finally: sock.close() # 主程序 if __name__ __main__: print( * 50) print( Kafka 连接测试工具) print( * 50) quick_diagnose() print(\n * 50) print( Kafka 连接测试) print( * 50) success test_connection() if success: print(\n 测试通过Kafka 连接正常) print( 可以运行你的生产者程序了) else: print(\n 测试失败请按提示检查问题)