第一章Netty,Worker代码优化后分析

先看优化后代码:

packagecom.example.demo;importlombok.extern.slf4j.Slf4j;importjava.io.IOException;importjava.net.InetSocketAddress;importjava.nio.ByteBuffer;importjava.nio.channels.*;importjava.util.Iterator;importjava.util.Set;importjava.util.concurrent.ConcurrentLinkedDeque;importjava.util.concurrent.ConcurrentLinkedQueue;@Slf4jpublicclassMultiThreadServerTest{publicstaticvoidmain(String[]args)throwsIOException{Thread.currentThread().setName("boss");ServerSocketChannelssc=ServerSocketChannel.open();ssc.configureBlocking(false);Selectorboss=Selector.open();SelectionKeybosskey=ssc.register(boss,0,null);bosskey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(newInetSocketAddress(8081));// 1,创建固定数据的worker,并初始化Workerworker=newWorker("worker-0");// worker.register();while(true){boss.select();Iterator<SelectionKey>iterator=boss.selectedKeys().iterator();while(iterator.hasNext()){SelectionKeykey=iterator.next();iterator.remove();if(key.isAcceptable()){SocketChannelsc=ssc.accept();sc.configureBlocking(false);log.info("connected-------{}",sc.getRemoteAddress());// 2,关联selectorlog.info("before register-------{}",sc.getRemoteAddress());worker.register(sc);log.info("after register-------{}",sc.getRemoteAddress());// 【核心修复】唤醒 Worker 线程,使其从 select() 中返回// worker.workerSelector.wakeup();}}}}staticclassWorkerimplementsRunnable{privateThreadthread;publicSelectorworkerSelector;privateStringname;privatevolatilebooleanstart=false;privateConcurrentLinkedQueue<Runnable>queue=newConcurrentLinkedQueue<>();publicWorker(Stringname){this.name=name;}// 初始化线程和selectorpublicvoidregister(SocketChannelsc)throwsIOException{if(!start){workerSelector=Selector.open();thread=newThread(this,name);thread.start();start=true;}// 给队列加了任务,但是任务并没有立刻执行queue.add(()->{try{sc.register(workerSelector,SelectionKey.OP_READ,null);}catch(ClosedChannelExceptione){thrownewRuntimeException(e);}});workerSelector.wakeup();//唤醒 workerSelector.select();}@Overridepublicvoidrun(){while(true){try{workerSelector.select();Runnablepoll=queue.poll();if(poll!=null){poll.run();// 执行 sc.register(workerSelector,SelectionKey.OP_READ,null);}Set<SelectionKey>selectionKeys=workerSelector.selectedKeys();Iterator<SelectionKey>iterator=selectionKeys.iterator();while(iterator.hasNext()){SelectionKeykey=iterator.next();iterator.remove();if(key.isReadable()){ByteBufferbuffer=ByteBuffer.allocate(16);SocketChannelchannel=(SocketChannel)key.channel();log.info("read-------{}",channel.getRemoteAddress());channel.read(buffer);buffer.flip();while(buffer.hasRemaining()){System.out.print((char)buffer.get());}System.out.println();}}}catch(IOExceptione){thrownewRuntimeException(e);}}}}}

优化后的代码,运行似乎看起来没有问题;正确的输出了发送的消息

sc.register(workerSelector,SelectionKey.OP_READ,null);
workerSelector.wakeup();
workerSelector.select();
这3行代码的执行顺序问题

这段代码存在一个‌严重的逻辑缺陷‌,会导致新连接的注册事件(OP_READ)无法被及时处理,甚至完全丢失。

核心问题:select() 与 queue.poll() 的执行顺序错误
在 Worker 线程的 run() 方法中,目前的执行顺序是:

workerSelector.select(); ‌阻塞等待‌ I/O 事件。
queue.poll(); 处理注册任务。
处理 I/O 事件。
‌问题分析:‌
当 Boss 线程调用 worker.register(sc) 时:

它将注册任务放入 queue。
它调用 workerSelector.wakeup()。
此时,如果 Worker 线程正阻塞在 select() 上:

wakeup() 会使 select() 立即返回。
‌但是‌,select() 返回后,代码紧接着执行 queue.poll()。
如果 queue 中有任务,任务被执行(即 sc.register(…))。
‌关键点‌:这次注册操作发生在当前这一轮 select() 循环的‌后半段‌。这意味着,刚刚注册的 Channel 的 OP_READ 事件,‌不会‌在当前这次 select() 返回的 selectedKeys 中被检测到(因为注册发生在 select 之后)。
程序进入下一轮循环,再次调用 select()。如果此时没有新的 I/O 事件,也没有新的 wakeup,select() 将再次阻塞。
‌更严重的问题‌:如果 wakeup() 调用时,Worker 线程‌不在‌ select() 阻塞状态(例如正在处理上一个请求或正在执行 queue.poll()),wakeup() 会设置一个“唤醒标志”。当下一次调用 select() 时,它会立即返回。但如果此时 queue 为空(任务已被前一次 poll 取走并执行),或者任务执行失败,逻辑依然混乱。
‌最大的隐患在于:register 操作本身不是线程安全的,且不应该在 select() 返回后的业务逻辑中随意穿插,除非你非常清楚 SelectionKey 的状态。‌

虽然代码中使用了 wakeup(),但由于 register 是通过队列异步执行的,而 select() 是在队列处理‌之前‌调用的,这导致了一个时间窗口问题:

Boss 线程: queue.add(task), wakeup().
Worker 线程: select() 被唤醒返回。
Worker 线程: poll() 取出任务并执行 register。
Worker 线程: 遍历 selectedKeys。‌注意:此时刚注册的 key 不在 selectedKeys 中,因为 register 发生在 select 之后。‌
Worker 线程: 下一轮 select()。如果客户端此时发送数据,select() 能检测到吗?能。但如果客户端在 register 完成前就发送了数据,或者由于时序问题,可能导致事件延迟处理。
‌然而,上述分析还不是最致命的。最致命的错误是:‌

在 Worker.run() 中,workerSelector.select() 是阻塞的。
当 boss 线程调用 workerSelector.wakeup() 时,select() 返回。
然后 queue.poll() 执行注册。
‌但是,如果 wakeup() 调用时,Worker 线程并没有在 select() 中阻塞呢?‌
比如 Worker 线程正在处理上一个 read 事件。
此时 wakeup() 设置标志位。
Worker 处理完 read,进入下一轮循环,调用 select()。
select() 发现标志位,立即返回(不阻塞)。
然后 queue.poll() 执行注册。
这看起来没问题。

‌真正的 Bug 在于:register 操作必须在 select() 调用之前完成,或者在 select() 返回后立即处理,但必须确保下一次 select() 能感知到新的 Channel。‌

当前代码的逻辑是:

select() (可能阻塞,可能被 wakeup 唤醒)
poll() 并执行 register
处理 selectedKeys
这种模式下,‌新注册的 Channel 的事件永远会在下一轮 select() 中才被检测到‌。这在功能上是可行的,但效率低且容易出错。

‌更严重的潜在 Bug:ClosedChannelException 和并发修改‌

‌queue 的使用方式不当‌:
ConcurrentLinkedQueue 是线程安全的,但 register 操作本身涉及到 Selector 的内部数据结构。虽然 NIO 的 register 是线程安全的(它会同步),但在 select() 循环中动态注册需要格外小心。

‌wakeup() 的幂等性与丢失‌:
如果 Boss 线程连续两次调用 register 和 wakeup,而 Worker 线程还在处理第一个任务,第二个 wakeup 可能不会生效(如果第一次 wakeup 的标志位还没被 select() 消费)。不过 wakeup 是幂等的,通常只会保证至少一次唤醒。

‌最核心的逻辑错误:select() 返回后,selectedKeys 中不包含本次循环中 newly registered 的 channel。‌
这是 NIO 的标准行为。你的代码依赖下一轮 select() 来发现新连接的数据。这在低负载下没问题,但在高负载或特定时序下,可能导致响应延迟。

修正方案
标准的 Reactor 模式中,Worker 线程处理注册任务的正确姿势通常是:

将注册任务放入队列。
唤醒 Selector。
在 select() ‌之前‌ 或 ‌之后立即‌ 处理队列中的注册任务,但要确保这些新注册的 Channel 不会被当前轮的 selectedKeys 遗漏(实际上它们本来就不会在当前轮出现,所以重点是确保它们被正确注册到 Selector 中,以便下一轮监听)。
‌你的代码主要问题在于:wakeup() 调用位置和执行顺序的配合不够严谨,且没有处理 register 可能抛出的异常对后续逻辑的影响。‌

此外,还有一个常见的 ‌NIO 空轮询 Bug‌(JDK Bug),虽然你的代码没有体现处理逻辑,但在生产环境中需要考虑。

代码优化建议

以下是修复后的 Worker 类,主要改进点:

‌确保注册任务在 select() 之前被处理‌,或者接受“下一轮生效”的事实并确保逻辑正确。
‌增加异常处理‌,防止单个 Channel 注册失败影响整个 Worker 线程。
‌优化日志和流程‌。

staticclassWorkerimplementsRunnable{privateThreadthread;publicSelectorworkerSelector;privateStringname;privatevolatilebooleanstart=false;// 使用队列来串行化注册操作,避免多线程直接操作 Selector 导致的复杂同步问题privateConcurrentLinkedQueue<Runnable>queue=newConcurrentLinkedQueue<>();publicWorker(Stringname){this.name=name;}publicvoidregister(SocketChannelsc)throwsIOException{if(!start){synchronized(this){if(!start){workerSelector=Selector.open();thread=newThread(this,name);thread.start();start=true;}}}// 将注册操作封装为任务放入队列queue.add(()->{try{// 注册为读就绪,并附加一个处理器对象(可选)sc.register(workerSelector,SelectionKey.OP_READ,null);log.info("Channel registered to worker: {}",sc.getRemoteAddress());}catch(ClosedChannelExceptione){log.error("Channel closed before registration",e);}catch(Exceptione){log.error("Registration failed",e);}});// 唤醒 selector,使其从 select() 阻塞中返回,从而尽快处理队列中的注册任务workerSelector.wakeup();}@Overridepublicvoidrun(){while(true){try{// 1. 先处理队列中的注册任务(非阻塞)// 注意:也可以在 select() 之后处理,但放在这里可以确保新注册的 channel// 不会干扰当前 selectedKeys 的迭代,且逻辑更清晰Runnabletask;while((task=queue.poll())!=null){task.run();}// 2. 阻塞等待事件// 如果队列为空且无事件,select() 会阻塞// 如果有 wakeup() 调用,select() 会立即返回intselectedCount=workerSelector.select();// 如果 select() 返回 0,可能是被 wakeup() 唤醒但无实际 I/O 事件// 此时继续循环,再次检查队列(防止 wakeup 后又有新任务加入)if(selectedCount==0){continue;}// 3. 处理 I/O 事件Set<SelectionKey>selectionKeys=workerSelector.selectedKeys();Iterator<SelectionKey>iterator=selectionKeys.iterator();while(iterator.hasNext()){SelectionKeykey=iterator.next();// 必须移除,防止重复处理iterator.remove();if(!key.isValid()){continue;}if(key.isReadable()){SocketChannelchannel=(SocketChannel)key.channel();ByteBufferbuffer=ByteBuffer.allocate(1024);// 适当增大缓冲区try{intbytesRead=channel.read(buffer);if(bytesRead>0){buffer.flip();log.info("Read from {}: {}",channel.getRemoteAddress(),newString(buffer.array(),0,buffer.limit()));// 这里可以添加业务逻辑处理}elseif(bytesRead==-1){// 对端关闭连接log.info("Client disconnected: {}",channel.getRemoteAddress());key.cancel();channel.close();}}catch(IOExceptione){log.error("IO error on channel: {}",channel.getRemoteAddress(),e);key.cancel();try{channel.close();}catch(IOExceptionex){// ignore}}}}}catch(IOExceptione){log.error("Selector error in worker: {}",name,e);// 在实际生产中,可能需要重建 Selector 或退出线程break;}}}}

主要修改说明:

调整 select() 和 queue.poll() 的顺序‌:
原代码:select() -> poll() -> 处理事件。
新代码:poll() -> select() -> 处理事件。
理由‌:先处理注册任务,再调用 select()。这样,如果 wakeup() 被调用,select() 返回后,新注册的 Channel 已经存在于 Selector 中,虽然它们的事件不会在当前轮次被触发(因为注册发生在 select 之前,但事件可能还没到达),但逻辑上更顺畅。更重要的是,如果在 select() 阻塞期间有 wakeup(),select() 返回后,我们先处理可能新增的注册任务,然后再去检查是否有 I/O 事件。这避免了原代码中“先 select 后注册”导致的时序混淆。
注:其实两种顺序在 NIO 中都是常见的,关键是‌注册操作必须原子性地完成‌,且‌不能在当前轮的 selectedKeys 中期望看到刚注册的 Channel 的事件‌。新代码将 poll 放在 select 前,可以确保每次 select 开始前,Selector 的注册表是最新的。

增加 selectedCount == 0 的判断‌:
wakeup() 会导致 select() 返回 0。此时应该继续循环,而不是直接去遍历空的 selectedKeys。原代码虽然没有显式判断,但 iterator 为空时也不会出错。显式判断可以提高效率,避免不必要的迭代器创建。

增强异常处理和资源清理‌:
在读取数据时,处理 bytesRead == -1(客户端关闭)的情况,并正确取消 Key 和关闭 Channel。
捕获 IOException,防止单个连接异常导致 Worker 线程崩溃。

start 标志的双重检查锁定‌:
在 register 方法中,初始化 Worker 线程的部分增加了 synchronized 块,防止多线程同时启动多个 Worker 线程。

总结

你原来的代码‌能跑通‌,但在高并发或特定时序下可能存在‌事件处理延迟‌或‌逻辑不清晰‌的问题。主要风险在于 select() 和 register() 的时序配合。按照上述优化后的代码,逻辑更加健壮,符合标准的 NIO Reactor 模式实践。