Netty进阶

无锁串行化

大多数场景下并行多线程处理可提升系统并发性能,但若对于共享资源并发访问处理不当,会带来严重锁竞争,最终会导致性能下降。为了尽可能避免锁竞争带来的性能损耗,可通过串行化设计,即消息处理尽可能在同一个线程内完成,期间不进行线程切换,就避免了多线程竞争和同步锁。NIO多路复用就是一种无锁串行化的设计思想

为了尽可能提升性能,Netty采用了串行无锁化设计,在IO线程内部进行串行操作,避免多线程竞争导致的性能下降,表面上看串行化设计似乎CPU利用率不高并发程度不够。但通过调整NIO线程池的线程参数,同时启动多个串行化线程并行运行,这种局部无锁化的串行线程设计相比一个队列多个工作线程模型性能更优。

Netty的NioEventLoop读取到消息后,直接调用ChannelPipelinefireChannelRead(Object msg),只要用户不主动切换线程,一直会由NioEventLoop调用到用户的Handler,期间不进行线程切换,这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。

直接内存

Direct Memory直接内存堆外内存并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域,某些情况下这部分内存也会被频繁使用,且也可能导致OutOfMemoryError异常出现,Java用DirectByteBuffer可分配一块直接内存,元空间对应的内存也叫作直接内存,它们对应的都是机器的物理内存

运行程序可以很明显看出直接内存申请相对堆内存储较慢但访问效率高,Java虚拟机实现上,本地IO一般会直接操作直接内存直接内存->系统调用->硬盘/网卡,而非直接内存则需要二次拷贝堆内存->直接内存->系统调用->硬盘/网卡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class DirectMemoryTest {
public static void heapAccess() {
long startTime = System.currentTimeMillis();
ByteBuffer buffer = ByteBuffer.allocate(1000); //分配堆内存
for (int i = 0; i < 100000; i++) {
for (int j = 0; j < 200; j++) {
buffer.putInt(j);
}
buffer.flip();

for (int j = 0; j < 200; j++) {
buffer.getInt();
}
buffer.clear();
}
long endTime = System.currentTimeMillis();
System.out.println("堆内存访问:" + (endTime - startTime) + "ms");
}
public static void directAccess() {
long startTime = System.currentTimeMillis();
ByteBuffer buffer = ByteBuffer.allocateDirect(1000); //分配直接内存
for (int i = 0; i < 100000; i++) {
for (int j = 0; j < 200; j++) {
buffer.putInt(j);
}
buffer.flip();
for (int j = 0; j < 200; j++) {
buffer.getInt();
}
buffer.clear();
}
long endTime = System.currentTimeMillis();
System.out.println("直接内存访问:" + (endTime - startTime) + "ms");
}
public static void heapAllocate() {
long startTime = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
ByteBuffer.allocate(100);
}
long endTime = System.currentTimeMillis();
System.out.println("堆内存申请:" + (endTime - startTime) + "ms");
}
public static void directAllocate() {
long startTime = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
ByteBuffer.allocateDirect(100);
}
long endTime = System.currentTimeMillis();
System.out.println("直接内存申请:" + (endTime - startTime) + "ms");
}
public static void main(String args[]) {
for (int i = 0; i < 10; i++) {
heapAccess();
directAccess();
}
System.out.println();
for (int i = 0; i < 10; i++) {
heapAllocate();
directAllocate();
}
}
}

Bits.reserveMemory判断是否有足够的直接内存空间分配,可通过-XX:MaxDirectMemorySize=<size>参数指定直接内存最大可分配空间,若不指定认为最大堆内存大小,在分配直接内存时若发现空间不够显示调用System.gc()触发一次full gc回收掉一部分无用的直接内存的引用对象,同时直接内存也会被释放掉,若释放完分配空间还是不够则抛出异常java.lang.OutOfMemoryError

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class DirectByteBuffer extends MappedByteBuffer implements DirectBuffer {
DirectByteBuffer(int cap) { // package-private
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
Bits.reserveMemory(size, cap);
long base = 0;
try { // 调用unsafe本地方法分配直接内存
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) { // Round up to page boundary
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
// 使用Cleaner机制注册内存回收处理函数,当直接内存引用对象被GC清理掉时,会提前调用这里注册的释放直接内存的Deallocator线程对象的run方法
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
}
// 申请一块本地内存。内存空间是未初始化的,其内容是无法预期的,使用freeMemory释放内存,使用reallocateMemory修改内存大小
public native long allocateMemory(long bytes);

// openjdk8/hotspot/src/share/vm/prims/unsafe.cpp
UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
UnsafeWrapper("Unsafe_AllocateMemory");
size_t sz = (size_t)size;
if (sz != (julong)size || size < 0) {
THROW_0(vmSymbols::java_lang_IllegalArgumentException());
}
if (sz == 0) {
return 0;
}
sz = round_to(sz, HeapWordSize);
// 调用os::malloc申请内存,内部使用malloc这个C标准库的函数申请内存
void* x = os::malloc(sz, mtInternal);
if (x == NULL) {
THROW_0(vmSymbols::java_lang_OutOfMemoryError());
}
//Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize);
return addr_to_java(x);
UNSAFE_END

使用直接内存不会占用堆内空间,减少了发生GC的可能,Java虚拟机实现上本地IO会直接操作直接内存,而非直接内存则需要二次拷贝,但直接内存初始化会比较慢,没有JVM直接帮助管理内存,容易发生内存溢出。为了避免一直没有FULL GC,最终导致直接内存把物理内存耗完,可通过-XX:MaxDirectMemorySize指定直接内存的最大值,当达到阈值时调用system.gc来进行一次FULL GC,间接把没有被使用的直接内存回收。

Netty零拷贝

Netty接收和发送ByteBuf采用PooledUnsafeDirectByteBuf,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区二次拷贝。若使用JVM堆内存PooledUnsafeHeapByteBufUnpooledUnsafeHeapByteBuf进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中然后才能写入Socket中。JVM堆内存的数据是不能直接写入Socket中的。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝

随着JVM虚拟机和JIT即时编译技术的发展,对象的分配和回收是个非常轻量级的工作。但缓冲区Buffer特别是堆外直接内存分配回收是一件耗时的操作。为了尽量重用缓冲区Netty提供了基于ByteBuf内存池缓冲区重用机制。需要的时候直接从缓冲池中获取ByteBuf使用即可,使用完毕之后就重新放回缓冲池

客户端的发送数据事件最终调用AbstractNioByteChannelNioByteUnsaferead方法,通过MaxMessageHandleByteBuf分配内存,然后调用AbstractByteBufAllocatordirectBuffer方法,最终在PooledByteBufAllocatornewDirectBuffer方法中通过PoolArena的子类DirectArena实现的缓冲区分配,最终执行PooledUnsafeDirectByteBufnewInstance方法,通过RECYCLERget方法循环使用ByteBuf对象,若为非内存池实现则直接创建一个新的ByteBuf对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
protected class NioByteUnsafe extends AbstractNioUnsafe {
public final void read() {
ChannelConfig config = AbstractNioByteChannel.this.config();
if (AbstractNioByteChannel.this.shouldBreakReadReady(config)) {
AbstractNioByteChannel.this.clearReadPending();
} else {
ChannelPipeline pipeline = AbstractNioByteChannel.this.pipeline();
ByteBufAllocator allocator = config.getAllocator();
Handle allocHandle = this.recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator); // 分配内存
allocHandle.lastBytesRead(AbstractNioByteChannel.this.doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
AbstractNioByteChannel.this.readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
AbstractNioByteChannel.this.readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while(allocHandle.continueReading());

allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
this.closeOnRead(pipeline);
}
} catch (Throwable var11) {
this.handleReadException(pipeline, byteBuf, var11, close, allocHandle);
} finally {
if (!AbstractNioByteChannel.this.readPending && !config.isAutoRead()) {
this.removeReadOp();
}
}

}
}
}
public abstract class MaxMessageHandle implements ExtendedHandle {
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(this.guess());
}
}
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
public ByteBuf ioBuffer(int initialCapacity) {
return PlatformDependent.hasUnsafe() ? this.directBuffer(initialCapacity) : this.heapBuffer(initialCapacity);
}
public ByteBuf directBuffer(int initialCapacity) {
return this.directBuffer(initialCapacity, 2147483647);
}
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return this.emptyBuf;
} else {
validate(initialCapacity, maxCapacity);
return this.newDirectBuffer(initialCapacity, maxCapacity);
}
}
}
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = (PoolThreadCache)this.threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
Object buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer((ByteBuf)buf);
}
}
abstract class PoolArena<T> implements PoolArenaMetric {
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = this.newByteBuf(maxCapacity);
this.allocate(cache, buf, reqCapacity);
return buf;
}
static final class DirectArena extends PoolArena<ByteBuffer> {
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
return (PooledByteBuf)(HAS_UNSAFE ? PooledUnsafeDirectByteBuf.newInstance(maxCapacity) : PooledDirectByteBuf.newInstance(maxCapacity));
}
}
}
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
return new PooledDirectByteBuf(handle, 0);
}
};
static PooledDirectByteBuf newInstance(int maxCapacity) { // 通过RECYCLER的get方法循环使用ByteBuf对象
PooledDirectByteBuf buf = (PooledDirectByteBuf)RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
private PooledDirectByteBuf(Handle<PooledDirectByteBuf> recyclerHandle, int maxCapacity) {
super(recyclerHandle, maxCapacity);
}
}

灵活的TCP参数配置能力

合理设置TCP参数在某些场景下对于性能的提升可以起到显著效果,如接收缓冲区SO_RCVBUF发送缓冲区SO_SNDBUF若设置不当,对性能影响非常大,通常建议值为128K256K。Netty在启动辅助类ChannelOption中可灵活的配置TCP参数,满足不同的用户场景。

ByteBuf扩容机制

minNewCapacity用户需要写入的值大小threshold阈值为Bytebuf内部设定容量的最大值maxCapacityNetty最大能接受的容量大小一般为Integer的最大值。

Netty的ByteBuf需要动态扩容来满足需要,默认阈值为4MB,当需要的容量等于阈值,使用阈值作为新的缓存区容量,若大于阈值,采用每次步进4MB的方式进行内存扩张即(需要扩容值/4MB)*4MB,扩张后需要和最大内存maxCapacity进行比较,大于maxCapacity的z则用maxCapacity,否则使用扩容值,若小于阈值,采用倍增方式,以64字节作为基本数值,每次翻倍增长64->128->256,直到倍增后的结果大于或等于需要的容量值。

心跳检测机制

TCP长连接中客户端和服务器之间定期发送的一种特殊的数据包,通知对方自己还在线以确保TCP连接的有效性,Netty中实现心跳机制的关键是IdleStateHandler

  • readerIdleTimeSeconds读超时即当在指定的时间间隔内没有从Channel读取到数据时,会触发一个READER_IDLEIdleStateEvent事件
  • writerIdleTimeSeconds写超时即当在指定的时间间隔内没有数据写入到Channel时,会触发一个WRITER_IDLEIdleStateEvent事件
  • allIdleTimeSeconds读/写超时即当在指定的时间间隔内没有读或写操作时,会触发一个ALL_IDLEIdleStateEvent事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class IdleStateHandler extends ChannelDuplexHandler {
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS);
}
public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}
this.observeOutput = observeOutput;
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
if (writerIdleTime <= 0) {
writerIdleTimeNanos = 0;
} else {
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
if (allIdleTime <= 0) {
allIdleTimeNanos = 0;
} else {
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
}
}

要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer中将IdleStateHandler添加到ChannelPipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// 服务端
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new HeartBeatServerHandler());
}
});
ChannelFuture future = bootstrap.bind(9000).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}

public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
int readIdleTimes = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(" ====== > [server] message received : " + s);
if ("Heartbeat Packet".equals(s)) {
ctx.channel().writeAndFlush("ok");
} else {
System.out.println(" 其他信息处理 ... ");
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
readIdleTimes++; // 读空闲的计数加1
break;
case WRITER_IDLE: // 不处理
eventType = "写空闲";
break;
case ALL_IDLE: // 不处理
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
if (readIdleTimes > 3) {
System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
ctx.channel().writeAndFlush("idle close");
ctx.channel().close();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
}
}
// 客户端
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HeartBeatClientHandler());
}
});
Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel();
Random random = new Random();
while (channel.isActive()) {
int num = random.nextInt(8);
Thread.sleep(num * 1000);
channel.writeAndFlush("Heartbeat Packet");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(" client received :" + msg);
if (msg != null && msg.equals("idle close")) {
System.out.println(" 服务端关闭连接,客户端也关闭");
ctx.channel().closeFuture();
}
}
}

channel准备就绪后就会调用IdleStateHandlerchannelActive方法,然后调用initialize方法将对应读超时写超时、或读写超时监听异步任务添加到延时线程池中。在channelRead读取数据时将reading置为true,数据读取完成后在channelReadComplete方法中将reading置为false,在ReaderIdleTimeoutTask中可以很明显看出若再读取中,则将任务直接放入延迟线程池中,否则计算是否超时,若超时则触发下一个handleruserEventTriggered方法,且将任务放会延迟线程池中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class IdleStateHandler extends ChannelDuplexHandler {
private byte state; // 0 - none, 1 - initialized, 2 - destroyed
public void channelActive(ChannelHandlerContext ctx) throws Exception {
initialize(ctx);
super.channelActive(ctx);
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { // 配置了读超时或读写超时
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg); // 透传,不做任何业务逻辑处理
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
lastReadTime = ticksInNanos(); // 获取当前系统时间
reading = false;
}
ctx.fireChannelReadComplete();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
destroy();
super.channelInactive(ctx);
}
private void initialize(ChannelHandlerContext ctx) {
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) { // 若配置了读超时
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) { // 若配置了写超时
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) { // 若配置了读写超时
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) { // 当前时间减去最后一次channelRead方法调用的时间
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) { // 若读取超时
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event); // 触发下一个handler的userEventTriggered方法
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else { // Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt); // 触发下一个handler的userEventTriggered方法
}
protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
switch (state) {
case ALL_IDLE:
return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
case READER_IDLE:
return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
case WRITER_IDLE:
return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
default:
throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
}
}
private void destroy() {
state = 2;
if (readerIdleTimeout != null) {
readerIdleTimeout.cancel(false);
readerIdleTimeout = null;
}
if (writerIdleTimeout != null) {
writerIdleTimeout.cancel(false);
writerIdleTimeout = null;
}
if (allIdleTimeout != null) {
allIdleTimeout.cancel(false);
allIdleTimeout = null;
}
}
}