BIO和NIO底层原理对比

在Tomcat7中,默认为BIO,可以通过如下配置改为NIO

1
<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" connectionTimeout="20000" redirectPort="8443" />

BIO

BIO的模型比较简单,是通过JioEndpoint实例中的Acceptor线程负责循环阻塞接收socket连接,每接收到一个socket连接就包装成SocketProcessor扔进线程池Executor中,SocketProcessor是一个Runnable,SocketProcessor负责从socket中阻塞读取数据,并且向socket中阻塞写入数据。

Acceptor线程的数量默认为1个,可以通过acceptorThreadCount参数进行配置,线程池Executor是可以配置的,比如:

1
2
3
<Executor name="tomcatThreadPool" namePrefix="catalina-exec-" maxThreads="150" minSpareThreads="4"/>

<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" connectionTimeout="20000" redirectPort="8443" executor="tomcatThreadPool"/>

每个Connector可以对应一个线程池,默认情况下Tomcat中每个Connector都会创建一个自己的线程池,并且该线程池的默认值为:最小线程数量为10,最大线程数量为200,若两个Connector配置的executor是一样的话,则表示这两个Connector共用一个线程池。

使用BIO来处理请求时,当请求数量比较大时,可以提高Acceptor线程数量,提高接收请求的速率,当请求比较耗时时,可以提高线程池Executor的最大线程数量

增加线程的目的都是为了提高Tomcat的性能,但一台机器的线程数量并不是越多越好,需要利用压测来最终确定一个更加符合当前业务场景的线程数量。

BIO Socket请求通过SocketProcessor处理流程

BIO的处理请求响应的流程。

NIO

NIO最大的特性就是非阻塞非阻塞接收socket连接,非阻塞从socket中读取数据,非阻塞从将数据写到socket中。但在Tomcat7中,只有在从socket中读取请求行请求头数据时是非阻塞的,在读取请求体是阻塞的,响应数据时也是阻塞的。因为Tomcat7对应Servlet3.0,Servlet3.0规范中没有考虑NIO,如在具体Servlet中读取请求体的代码:

1
2
3
4
5
6
ServletInputStream inputStream = req.getInputStream();
byte[] bytes = new byte[1024];
int n;
while ((n = inputStream.read(bytes)) > 0) {
System.out.println(new String(bytes, 0, n));
}

inputStream.read()方法的含义就是阻塞读取数据,当读取请求体时,如果操作系统中还没有准备好,那么read方法就得阻塞。

而NIO则不一样,NIO中是一旦操作系统中的数据准备好了,那么则会通知Java程序可以读取数据了,这里的通知很重要,这决定了Java代码到底如何实现,若在Servlet中想利用NIO去读取数据,则在Servlet中肯定就要去监听是否有通知过来,比如在Servlet3.1中则增加了NIO相关的定义,这里有Listener,用来监听数据可读的通知,这才是真正的利用了NIO:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ServletInputStream inputStream = req.getInputStream();
inputStream.setReadListener(new ReadListener() {
// 有数据可用时触发
@Override
public void onDataAvailable() throws IOException {
}
// 数据全部读完了
@Override
public void onAllDataRead() throws IOException {
}
// 出现异常了
@Override
public void onError(Throwable throwable) {
}
});

NIO处理Socket请求流程

利用Acceptor来阻塞获取socket连接,NIO中叫socketChannel,接收到socketChannel后,需要将socketChannel绑定到一个Selector中并注册读事件,基于NIO还需要一个线程来轮询Selector中是否存在就绪事件,若存在则将就绪事件查出来,并处理该事件,在Tomcat中支持多个线程同时查询是否存在就绪事件,该线程对象为Poller每个Poller中都包含一个Selector,这样每个Poller线程就负责轮询自己的Selector上就绪的事件,然后处理事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public class NioEndpoint extends AbstractEndpoint<NioChannel> {
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
while (running) {
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
socket = serverSock.accept();
} catch (IOException ioe) {
countDownConnection();
errorDelay = handleExceptionWithDelay(errorDelay);
throw ioe;
}
errorDelay = 0;
if (running && !paused) {
if (!setSocketOptions(socket)) {
countDownConnection();
closeSocket(socket);
}
} else {
countDownConnection();
closeSocket(socket);
}
}
}
state = AcceptorState.ENDED;
}
}

protected boolean setSocketOptions(SocketChannel socket) {
try {
// 从该channel上读取数据不阻塞
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
// 每接收到一个socket连接就获取一个NioChannel来封装这个socket,NioChannel是可重用的对象
NioChannel channel = nioChannels.poll(); // nioChannels是一个缓存队列,拿出对头的NioChannel
if (channel == null) {
// SSL setup
if (sslContext != null) {
SSLEngine engine = createSSLEngine();
int appbufsize = engine.getSession().getApplicationBufferSize();
NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize, socketProperties.getAppReadBufSize()),Math.max(appbufsize, socketProperties.getAppWriteBufSize()), socketProperties.getDirectBuffer());
channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
} else {
// normal tcp setup
NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer());
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
if (channel instanceof SecureNioChannel) {
SSLEngine engine = createSSLEngine();
((SecureNioChannel) channel).reset(engine);
} else {
channel.reset();
}
}
// 每接收到一个新socket连接,就会生成一个
getPoller0().register(channel);
} catch (Throwable t) {
return false;
}
return true;
}
public void register(final NioChannel socket) {
socket.setPoller(this);
// 获取一个KeyAttachment对象,将当前socket的相关信息设置进去
KeyAttachment key = keyCache.poll();
final KeyAttachment ka = key != null ? key : new KeyAttachment(socket);
ka.reset(this, socket, getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());

// 获取一个PollerEvent对象,本事件为一个注册事件,对读事件感兴趣(这里暂时还没有真正的向select去注册事件)
PollerEvent r = eventCache.poll();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if (r == null)
r = new PollerEvent(socket, ka, OP_REGISTER);
else
r.reset(socket, ka, OP_REGISTER);
// 把PollerEvent添加到事件列表中去
addEvent(r);
}
public void addEvent(Runnable event) {
events.offer(event);
if (wakeupCounter.incrementAndGet() == 0)
selector.wakeup();
}
}

当Acceptro接收到一个socketChannel后,就会将socketChannel注册到某一个Poller上,确定Polloer的逻辑非常简单,假设现在有3个Poller,编号为1,2,3,那么Tomcat接收到的第一个socketChannel注册到1号Poller上,第二个socketChannel注册到2号Poller上,第三个socketChannel注册到3号Poller上,第四个socketChannel注册到1号Poller上,依次循环。

1
2
3
4
public Poller getPoller0() {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}

代码中的nioChannelskeyCacheeventCache等都是缓存队列,为了避免重复大量的New对象,以及下面的processorCache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
public class Poller implements Runnable {
public void run() {
while (true) {
try {
while (paused && (!close)) {
try {
Thread.sleep(100);
}
}
boolean hasEvents = false;
if (close) {
events();
timeout(0, false);
try {
selector.close();
}
break;
} else {
// 执行PollerEvent事件,向Selector注册读写事件
hasEvents = events(); // 真正的向selector注册
}
try {
if (!close) {
if (wakeupCounter.getAndSet(-1) > 0) {
// 上面的events()会去注册事件,而这里是去查询是否有事件就绪,不阻塞
keyCount = selector.selectNow();
} else {
// 阻塞,超时会继续执行下面的代码,不会报错
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
}
break;
}
}
if (keyCount == 0) hasEvents = (hasEvents | events());
// 如果存在就绪事件,那么则遍历并处理事件
Iterator<SelectionKey> iterator = eyCount > 0 ? selector.selectedKeys().iterator() : null;
// 循环处理当前就绪的事件
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
KeyAttachment attachment = (KeyAttachment) sk.attachment();
if (attachment == null) {
iterator.remove();
} else {
attachment.access();
iterator.remove();
processKey(sk, attachment);// 处理事件
}
}//while
timeout(keyCount, hasEvents);
if (oomParachute > 0 && oomParachuteData == null) checkParachute();
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}
}
}//while
stopLatch.countDown();
}

public boolean events() {
boolean result = false;
Runnable r = null;
// poll会把元素从队列中删除掉
for (int i = 0, size = events.size(); i < size && (r = events.poll()) != null; i++) {
result = true;
try {
// 如果是PollerEvent,会将读事件注册到当前poller中的selector对象上
r.run();
if (r instanceof PollerEvent) {
((PollerEvent) r).reset();
eventCache.offer((PollerEvent) r);
}
}
}
return result;
}

protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
boolean result = true;
try {
if (close) {
cancelledKey(sk, SocketStatus.STOP, attachment.comet);
} else if (sk.isValid() && attachment != null) {
attachment.access();//make sure we don't time out valid sockets
sk.attach(attachment);//cant remember why this is here
// 当前就绪事件对应的channel
NioChannel channel = attachment.getChannel();
// 读就绪或写就绪
if (sk.isReadable() || sk.isWritable()) {
if (attachment.getSendfileData() != null) {
processSendfile(sk, attachment, false);
} else {
if (isWorkerAvailable()) {
unreg(sk, attachment, sk.readyOps()); //
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
// 从channel中读取数据
if (!processSocket(channel, SocketStatus.OPEN_READ, true)) {
closeSocket = true;
}
}
// 读完数据之后可能就要写数据
if (!closeSocket && sk.isWritable()) {
// 将数据写入到channel中
if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk, SocketStatus.DISCONNECT, false);
}
} else {
result = false;
}
}
}
}
}
return result;
}
}
public class NioEndpoint extends AbstractEndpoint<NioChannel> {
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
// 该方法是用来从socket中读数据或写数据的,dispatch表示是不是要把这个任务派发给线程池,也就是要不要异步
try {
KeyAttachment attachment = (KeyAttachment) socket.getAttachment();
if (attachment == null) {
return false;
}
attachment.setCometNotify(false); //will get reset upon next reg
// 获取一个SocketProcessor对象
SocketProcessor sc = processorCache.poll();
if (sc == null) sc = new SocketProcessor(socket, status);
else sc.reset(socket, status);
// 派发给线程池
if (dispatch && getExecutor() != null) getExecutor().execute(sc);
else sc.run();
} catch (RejectedExecutionException rx) {
return false;
} catch (Throwable t) {
return false;
}
return true;
}
}
// events()中r.run()方法实际上是调用的PollerEvent的run方法,真正将读事件注册到当前poller中的selector对象上
public static class PollerEvent implements Runnable {
// PollerEvent表示需要注册的事件,
protected NioChannel socket;
protected int interestOps;
protected KeyAttachment key;
@Override
public void run() {
if (interestOps == OP_REGISTER) {
// 真正将读事件注册到当前poller中的selector对象上
try {
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
}
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
// 如果当前这个channel没有任何注册事件了,表示这个这个socket连接已经关掉了
if (key == null) {
socket.getPoller().getEndpoint().countDownConnection();
} else {
final KeyAttachment att = (KeyAttachment) key.attachment();
if (att != null) {
if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK) {
att.setCometNotify(true);
} else {
att.setCometNotify(false);
}
interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
att.access();//to prevent timeout
// 将新注册的事件添加到注册事件列表中
int ops = key.interestOps() | interestOps;
att.interestOps(ops);
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT, true);
}
}
}//end if
}//run
}

在某一个Poller中,除开有selector外,还有一个ConcurrentLinkedQueue队列events,events表示待执行事件,比如Tomcat要socketChannel注册到selector上,但是Tomcat并没有直接这么做,而是先自己生成一个PollerEvent,然后把PollerEvent加入到队列events中,然后这个队列中的事件会在Poller线程的循环过程中真正执行

Poller线程中需要循环查询selector中是否存在就绪事件,而Tomcat在真正查询之前会先看一下events队列中是否存在待执行事件,如果存在就会先执行,这些事件表示需要向selector上注册事件,比如注册socketChannel的读事件和写事件,所以在真正执行events队列中的事件时就会真正的向selector上注册事件。所以只有先执行events队列中的PollerEvent,Poller线程才能有机会从selector中查询到就绪事件,每个Poller线程一旦查询到就绪事件,就会去处理这些事件,事件无非就是读事件和写事件

  • 处理的第一步就是获取当前就绪事件对应的socketChannel,因为我们要向socketChannel中读数据或写数据
  • 处理的第二步就是把socketChannel和当前要做的事情(读或写)封装为SocketProcessor对象
  • 处理的第三步就是把SocketProcessor扔进线程池进行处理

在SocketProcessor线程运行时,就会从socketChannel读取数据(假设当前处理的是读事件),并且是非阻塞读,既然是非阻塞读,大概的一个流程就是,某一个Poller中的selector查询到了一个读就绪事件,然后交给一个SocketProcessor线程进行处理,SocketProcessor线程读取数据之后,如果发现请求行和请求头的数据都已经读完了,并解析完了,那么该SocketProcessor线程就会继续把解析后的请求交给Servlet进行处理,Servlet中可能会读取请求体,可能会响应数据,而不管是读请求体还是响应数据都是阻塞的,直到Servlet中的逻辑都执行完后,SocketProcessor线程才会运行结束。假如SocketProcessor读到了数据之后,发现请求行或请求头的数据还没有读完,那么本次读事件处理完毕,需要Poller线程再次查询到就绪读事件才能继续读数据,以及解析数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
protected class SocketProcessor implements Runnable {
protected NioChannel socket = null;
protected SocketStatus status = null;
@Override
public void run() {
// 获取当前channel上就绪的事件
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
KeyAttachment ka = null;
if (key != null) {
ka = (KeyAttachment) key.attachment();
}
if (ka != null && ka.isUpgraded() && SocketStatus.OPEN_WRITE == status) {
synchronized (ka.getWriteThreadLock()) {
doRun(key, ka);
}
} else {
// 在nio中,每产生一个就绪的io事件,就会通过一个线程来处理该事件,需要进行同步
// 多个线程只能并发处理不同socket,不能处理同一个socket
synchronized (socket) {
doRun(key, ka); // 真正处理事件的逻辑
}
}
}

private void doRun(SelectionKey key, KeyAttachment ka) {
try {
int handshake = -1;
try {
if (key != null) {
if (socket.isHandshakeComplete() || status == SocketStatus.STOP) {
handshake = 0;
} else {
handshake = socket.handshake(key.isReadable(), key.isWritable());
status = SocketStatus.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
if (status == null) {
state = handler.process(ka, SocketStatus.OPEN_READ);
} else {
state = handler.process(ka, status);
}
if (state == SocketState.CLOSED) {
close(ka, socket, key, SocketStatus.ERROR);
}
} else if (handshake == -1) {
close(ka, socket, key, SocketStatus.DISCONNECT);
} else {
ka.getPoller().add(socket, handshake);
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key, null, false);
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
if (socket != null) {
socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
}
releaseCaches();
}
} catch (Throwable t) {
if (socket != null) {
socket.getPoller().cancelledKey(key, SocketStatus.ERROR, false);
}
} finally {
socket = null;
status = null;
//return to cache
if (running && !paused) {
processorCache.offer(this);
}
}
}
}

这里调用的handler.process和BIO是同一个方法包括processor.process(wrapper)也是一样,不同的点在于后续解析字节流的逻辑,connections是将未处理完的socket缓存起来,以便下一次事件继续读取数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
protected abstract static class AbstractConnectionHandler<S,P extends Processor<S>> implements AbstractEndpoint.Handler {
public SocketState process(SocketWrapper<S> wrapper, SocketStatus status) {
S socket = wrapper.getSocket();
Processor<S> processor = connections.get(socket);
// 设置为非异步,就是同步
wrapper.setAsync(false);
try {
if (processor == null) {
// 从被回收的processor中获取processor
processor = recycledProcessors.poll();
}
if (processor == null) {
processor = createProcessor(); // HTTP11NIOProce
}
initSsl(wrapper, processor);
SocketState state = SocketState.CLOSED;
do {
if (status == SocketStatus.DISCONNECT && !processor.isComet()) {
// Do nothing here, just wait for it to get recycled
// Don't do this for Comet we need to generate an end
// event (see BZ 54022)
} else if (processor.isAsync() || state == SocketState.ASYNC_END) {
// 要么Tomcat线程还没结束,业务线程就已经调用过complete方法了,然后利用while走到这个分支
// 要么Tomcat线程结束后,在超时时间内业务线程调用complete方法,然后构造一个新的SocketProcessor对象扔到线程池里走到这个分支
// 要么Tomcat线程结束后,超过超时时间了,由AsyncTimeout线程来构造一个SocketProcessor对象扔到线程池里走到这个分支
// 不管怎么样,在整个调用异步servlet的流程中,此分支只经历一次,用来将output缓冲区中的内容发送出去
state = processor.asyncDispatch(status);
if (state == SocketState.OPEN) {
getProtocol().endpoint.removeWaitingRequest(wrapper);
state = processor.process(wrapper);
}
} else if (processor.isComet()) {
state = processor.event(status);
} else {
// 大多数情况下走这个分支
state = processor.process(wrapper);
}
if (state != SocketState.CLOSED && processor.isAsync()) {
// 代码执行到这里,就去判断一下之前有没有调用过complete方法
// 如果调用,那么当前的AsyncState就会从COMPLETE_PENDING-->调用doComplete方法改为COMPLETING,SocketState为ASYNC_END
// 如果没有调用,那么当前的AsyncState就会从STARTING-->STARTED,SocketState为LONG
//
// 状态转换,有三种情况
// 1. COMPLETE_PENDING--->COMPLETING,COMPLETE_PENDING是在调用complete方法时候由STARTING改变过来的
// 2. STARTING---->STARTED,STARTED的下一个状态需要有complete方法来改变,会改成COMPLETING
// 3. COMPLETING---->DISPATCHED
state = processor.asyncPostProcess();
}
// 如果在访问异步servlet时,代码执行到这里,已经调用过complete方法了,那么状态就是SocketState.ASYNC_END
} while (state == SocketState.ASYNC_END || state == SocketState.UPGRADING || state == SocketState.UPGRADING_TOMCAT);
return state;
}
return SocketState.CLOSED;
}
}

public abstract class AbstractHttp11Processor<S> extends AbstractProcessor<S> {
public SocketState process(SocketWrapper<S> socketWrapper)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // 设置请求状态为解析状态
// Setting up the I/O
setSocketWrapper(socketWrapper);
getInputBuffer().init(socketWrapper, endpoint); // 将socket的InputStream与InternalInputBuffer进行绑定
getOutputBuffer().init(socketWrapper, endpoint); // 将socket的OutputStream与InternalOutputBuffer进行绑定
keepAlive = true;
comet = false;
openSocket = false;
sendfileInProgress = false;
readComplete = true;
// NioEndpoint返回true, Bio返回false
if (endpoint.getUsePolling()) {
keptAlive = false;
} else {
keptAlive = socketWrapper.isKeptAlive();
}
// 如果当前活跃的线程数占线程池最大线程数的比例大于75%,那么则关闭KeepAlive,不再支持长连接
if (disableKeepAlive()) {
socketWrapper.setKeepAliveLeft(0);
}
// keepAlive默认为true,它的值会从请求中读取
while (!getErrorState().isError() && keepAlive && !comet && !isAsync() && upgradeInbound == null && httpUpgradeHandler == null && !endpoint.isPaused()) {
// keepAlive如果为true,接下来需要从socket中不停的获取http请求
try {
// 第一次从socket中读取数据,并设置socket的读取数据的超时时间
// 对于BIO,一个socket连接建立好后,不一定马上就被Tomcat处理了,其中需要线程池的调度,所以这段等待的时间要算在socket读取数据的时间内
// 而对于NIO而言,没有阻塞
setRequestLineReadTimeout();
// 解析请求行
if (!getInputBuffer().parseRequestLine(keptAlive)) {
// 下面这个方法在NIO时有用,比如在解析请求行时,如果没有从操作系统读到数据,则上面的方法会返回false
// 而下面这个方法会返回true,从而退出while,表示此处read事件处理结束,到下一次read事件发生了,就会从小进入到while中
if (handleIncompleteRequestLineRead()) {
break;
}
}
if (endpoint.isPaused()) {
// 503 - Service unavailable
// 如果Endpoint被暂停了,则返回503
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
// 每次处理一个请求就重新获取一下请求头和cookies的最大限制
request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
request.getCookies().setLimit(getMaxCookieCount());
// Currently only NIO will ever return false here
// 解析请求头
if (!getInputBuffer().parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!disableUploadTimeout) {
setSocketTimeout(connectionUploadTimeout);
}
}
}
if (!getErrorState().isError()) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); // 设置请求状态为预处理状态
try {
prepareRequest(); // 预处理, 主要从请求中处理处keepAlive属性,以及进行一些验证,以及根据请求分析得到ActiveInputFilter
}
}
if (maxKeepAliveRequests == 1) {
// 如果最大的活跃http请求数量仅仅只能为1的话,那么设置keepAlive为false,则不会继续从socket中获取Http请求了
keepAlive = false;
} else if (maxKeepAliveRequests > 0 && socketWrapper.decrementKeepAlive() <= 0) {
// 如果已经达到了keepAlive的最大限制,也设置为false,则不会继续从socket中获取Http请求了
keepAlive = false;
}
// Process the request in the adapter
if (!getErrorState().isError()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); // 设置请求的状态为服务状态,表示正在处理请求
adapter.service(request, response); // 交给容器处理请求
if(keepAlive && !getErrorState().isError() && (response.getErrorException() != null || (!isAsync() && statusDropsConnection(response.getStatus())))) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
setCometTimeouts(socketWrapper);
}
}
// Finish the handling of the request
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT); // 设置请求的状态为处理请求结束
if (!isAsync() && !comet) {
// 当前http请求已经处理完了,做一些收尾工作
endRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT); // 请求状态为输出结束
if (getErrorState().isError()) {
response.setStatus(500);
}
request.updateCounters();

if (!isAsync() && !comet || getErrorState().isError()) {
if (getErrorState().isIoAllowed()) {
// 准备处理下一个请求
getInputBuffer().nextRequest();
getOutputBuffer().nextRequest();
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
// 如果处理完当前这个Http请求之后,发现socket里没有下一个请求了,那么就退出当前循环
// 如果是keepalive,就不会关闭socket, 如果是close就会关闭socket
// 对于keepalive的情况,因为是一个线程处理一个socket,当退出这个while后,当前线程就会介绍,
// 当时对于socket来说,它仍然要继续介绍连接,所以又会新开一个线程继续来处理这个socket
if (breakKeepAliveLoop(socketWrapper)) {
break;
}
}
}
}

当Servlet中通过inputstream.read()来读取请求体数据时,最终执行的是InternalNioInputBuffer.SocketInputBuffer.doRead()方法,该方法中会调用fill(true,true),第一个参数是timeout,第二个参数是block,block等于true,表示阻塞,fill方法会从操作系统读取数据填充到Tomcat的buf中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
protected boolean fill(boolean timeout, boolean block) throws IOException, EOFException {
// 读请求体数据的时候需要阻塞读
boolean read = false;
if (parsingHeader) {
if (lastValid > headerBufferSize) {
throw new IllegalArgumentException(sm.getString("iib.requestheadertoolarge.error"));
}
// Do a simple read with a short timeout
read = readSocket(timeout,block)>0;
} else {
lastValid = pos = end;
// Do a simple read with a short timeout
read = readSocket(timeout, block)>0;
}
return read;
}
private int readSocket(boolean timeout, boolean block) throws IOException {
// 读请求体数据的时候需要阻塞读
int nRead = 0;
socket.getBufHandler().getReadBuffer().clear();
if ( block ) {
Selector selector = null;
try {
selector = pool.get();
}
try {
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment) socket.getAttachment();
if (att == null) {
throw new IOException("Key must be cancelled.");
}
// socket. selector
nRead = pool.read(socket.getBufHandler().getReadBuffer(), socket, selector, att.getTimeout());
} catch ( EOFException eof ) {
nRead = -1;
} finally {
if ( selector != null ) pool.put(selector);
}
} else {
// 非阻塞读,没有读到不会阻塞,立即返回0,如果在处理这次NIo事件中没有读到数据,那么此事件其实就是处理结束了,等待下一次事件
nRead = socket.read(socket.getBufHandler().getReadBuffer());
}
if (nRead > 0) {
socket.getBufHandler().getReadBuffer().flip();
socket.getBufHandler().getReadBuffer().limit(nRead);
expand(nRead + pos);
// 把readBuffer中的数据转移到buf中
socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
lastValid = pos + nRead;
return nRead;
} else if (nRead == -1) {
//return false;
throw new EOFException(sm.getString("iib.eof.error"));
} else {
return 0;
}
}

在接下来的阻塞读取数据流程中,主要利用的还是Selector,为什么阻塞的时候还要利用Selector呢,因为socketChannel一开始是非阻塞的,我们现在如果想把它改成阻塞的,在NIO里是有一个限制的,如果一个socketChannel被设置成了非阻塞的,然后注册了事件,然后又想把socketChannel设置成阻塞的,这时会抛异常。所以在Tomcat中是使用的另外的方式来达到阻塞效果的。

在需要读取请求体数据时,不能直接利用之前的主SelectorSelector就是用来注册新socketChannel的,需要一个辅助Selector,在读取请求体数据时,新生成一个辅助Selector,这个辅助Selector用来监听当前请求的读事件,当有数据就绪时,辅助Selector就会查询到此次就绪事件,这时主Selector是监听不到的,因为在这之前主Selector已经取消了对当前socketChannel的事件。

这是辅助Selector的主要作用,inputstream.read(),向辅助Selector注册读事件,加锁(目的是达到阻塞),与辅助Selector对应的有另外一个辅助Poller,辅助Poller负责轮询辅助Selector上发生的就绪事件,一旦轮询到就绪事件就会解锁,从而解阻塞,从socketChannel中读数据,返回,本次read结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
public class NioSelectorPool {
protected static final boolean SHARED = Boolean.parseBoolean(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true"));
protected NioBlockingSelector blockingSelector;
protected volatile Selector SHARED_SELECTOR;
protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<Selector>();

protected Selector getSharedSelector() throws IOException {
if (SHARED && SHARED_SELECTOR == null) {
synchronized ( NioSelectorPool.class ) {
if ( SHARED_SELECTOR == null ) {
synchronized (Selector.class) {
SHARED_SELECTOR = Selector.open();
}
}
}
}
return SHARED_SELECTOR;
}

public Selector get() throws IOException{
if ( SHARED ) {
return getSharedSelector();
}
if ( (!enabled) || active.incrementAndGet() >= maxSelectors ) {
if ( enabled ) active.decrementAndGet();
return null;
}
Selector s = null;
try {
s = selectors.size()>0?selectors.poll():null;
if (s == null) {
synchronized (Selector.class) {
s = Selector.open();
}
} else spare.decrementAndGet();
} catch (NoSuchElementException x ) {
try {
synchronized (Selector.class) {
s = Selector.open();
}
}
} finally {
if ( s == null ) active.decrementAndGet();//we were unable to find a selector
}
return s;
}
public void open() throws IOException {
enabled = true;
getSharedSelector();
if (SHARED) {
blockingSelector = new NioBlockingSelector();
blockingSelector.open(getSharedSelector());
}
}

public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout) throws IOException {
return write(buf,socket,selector,writeTimeout,true);
}

public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout, boolean block) throws IOException {
if ( SHARED && block ) {
return blockingSelector.write(buf,socket,writeTimeout);
}
SelectionKey key = null;
int written = 0;
boolean timedout = false;
int keycount = 1; //assume we can write 假设现在就可以写
long time = System.currentTimeMillis(); //start the timeout timer
try {
// 没有超时并且buf中有数据
while ( (!timedout) && buf.hasRemaining() ) {
int cnt = 0;
if ( keycount > 0 ) { //only write if we were registered for a write
// 写数据,返回的数字是多少,表示写了多少,可能返回0,表示没有写入数据
cnt = socket.write(buf); //write the data
if (cnt == -1) throw new EOFException();
written += cnt;
if (cnt > 0) {
// 如果有数据写到socket中了,就继续while,看buf中还有没剩余数据
time = System.currentTimeMillis(); //reset our timeout timer
continue; //we successfully wrote, try again without a selector
}
// 如果没有写入数据并且是阻塞的,则不会break
if (cnt==0 && (!block)) break; //don't block
}
if ( selector != null ) {
//register OP_WRITE to the selector
// 向selector注册一个写事件
if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
else key.interestOps(SelectionKey.OP_WRITE);
// 查询是否有写事件发生
keycount = selector.select(writeTimeout);
}
// 看是否超时
if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
}//while
if ( timedout ) throw new SocketTimeoutException();
} finally {
if (key != null) {
key.cancel();
if (selector != null) selector.selectNow();//removes the key from this selector
}
}
return written;
}

public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout) throws IOException {
return read(buf,socket,selector,readTimeout,true);
}
public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout, boolean block) throws IOException {
if ( SHARED && block ) {
return blockingSelector.read(buf,socket,readTimeout);
}
SelectionKey key = null;
int read = 0;
boolean timedout = false;
int keycount = 1; //assume we can write
long time = System.currentTimeMillis(); //start the timeout timer
try {
while ( (!timedout) ) {
int cnt = 0;
if ( keycount > 0 ) { //only read if we were registered for a read
cnt = socket.read(buf);
if (cnt == -1) throw new EOFException();
read += cnt;
if (cnt > 0) continue; //read some more
if (cnt==0 && (read>0 || (!block) ) ) break; //we are done reading
}
if ( selector != null ) {//perform a blocking read
//register OP_WRITE to the selector
if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_READ);
else key.interestOps(SelectionKey.OP_READ);
keycount = selector.select(readTimeout);
}
if (readTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=readTimeout;
}//while
if ( timedout ) throw new SocketTimeoutException();
} finally {
if (key != null) {
key.cancel();
if (selector != null) selector.selectNow();//removes the key from this selector
}
}
return read;
}
}

默认情况下,辅助Selector是NioBlockingSelector对象,每次read都使用同一个NioBlockingSelector对象,在NioBlockingSelector对象中存在一个辅助Poller即BlockPoller线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
public class NioBlockingSelector {
protected BlockPoller poller;
protected Selector sharedSelector;

public void open(Selector selector) {
sharedSelector = selector;
poller = new BlockPoller();
poller.selector = sharedSelector;
poller.setDaemon(true);
poller.setName("NioBlockingSelector.BlockPoller-"+(++threadCounter));
poller.start();
}

public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if ( key == null ) throw new IOException("Key no longer registered");
KeyReference reference = keyReferenceQueue.poll();
if (reference == null) {
reference = new KeyReference();
}
KeyAttachment att = (KeyAttachment) key.attachment();
int read = 0;
boolean timedout = false;
int keycount = 1; //assume we can read
long time = System.currentTimeMillis(); //start the timeout timer
try {
while(!timedout) {
if (keycount > 0) { //only read if we were registered for a read
// 先尝试着读一下,如果没有读到数据,则返回0
read = socket.read(buf);
if (read == -1) throw new EOFException();
if (read > 0) break;
}
try {
// 开启latch
if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1);
// 将注册读事件,通过events队列和线程实现,poller为BlockPoller
poller.add(att,SelectionKey.OP_READ, reference);
if (readTimeout < 0) {
att.awaitReadLatch(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} else {
// 注册完事件后就会阻塞,直到BlockPoller发现了读事件,才会解阻塞
att.awaitReadLatch(readTimeout, TimeUnit.MILLISECONDS);
}
}catch (InterruptedException ignore) {
Thread.interrupted();
}
// 不是正常被poller解阻塞的
if ( att.getReadLatch()!=null && att.getReadLatch().getCount()> 0) {
//we got interrupted, but we haven't received notification from the poller.
keycount = 0;
}else {
// 解阻塞之后就得到了1个读事件,就可以读取数据了
//latch countdown has happened
keycount = 1;
att.resetReadLatch();
}
if (readTimeout >= 0 && (keycount == 0))
timedout = (System.currentTimeMillis() - time) >= readTimeout;
} //while
if (timedout)
throw new SocketTimeoutException();
} finally {
poller.remove(att,SelectionKey.OP_READ);
if (timedout && reference.key!=null) {
poller.cancelKey(reference.key);
}
reference.key = null;
keyReferenceQueue.add(reference);
}
return read;
}

protected static class BlockPoller extends Thread {
protected volatile boolean run = true;
protected Selector selector = null;
protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
public void add(final KeyAttachment key, final int ops, final KeyReference ref) {
Runnable r = new Runnable() {
@Override
public void run() {
if ( key == null ) return;
NioChannel nch = key.getChannel();
if ( nch == null ) return;
SocketChannel ch = nch.getIOChannel();
if ( ch == null ) return;
SelectionKey sk = ch.keyFor(selector);
try {
// 将事件注册到BlockPoller的selector上,sharedSelector
if (sk == null) {
sk = ch.register(selector, ops, key);
ref.key = sk;
} else if (!sk.isValid()) {
cancel(sk,key,ops);
} else {
sk.interestOps(sk.interestOps() | ops);
}
}catch (CancelledKeyException cx) {
cancel(sk,key,ops);
}catch (ClosedChannelException cx) {
cancel(sk,key,ops);
}
}
};
// 添加进事件队列
events.offer(r);
// 添加事件成功后,立即唤醒select.select()方法
wakeup();
}
public boolean events() {
Runnable r = null;
int size = events.size();
for (int i = 0; i < size && (r = events.poll()) != null; i++) {
r.run();
}
return (size > 0);
}
@Override
public void run() {
while (run) {
try {
events(); // 执行PollerEvent实现,就是Runnable
int keyCount = 0;
try {
// 这个wakeupCounter只有0,-1,1三中情况
int i = wakeupCounter.get();
// i==1表示添加了PollerEvent,并且上面执行了events方法,所以应该可以直接selectNow查询到事件
if (i>0)
keyCount = selector.selectNow();
else {
// 此处i只可能为0,然后改成-1,表示没有添加过PollerEvent,然后阻塞获取
// 在阻塞获取就绪事件的过程中,很有可能添加了PollerEvent进入到了events中,并且会被唤醒,调用selector.wakeup()
wakeupCounter.set(-1);
keyCount = selector.select(1000);
}
// 不管有没有查询到就绪事件,都会改为0
wakeupCounter.set(0);
if (!run) break;
}catch ( NullPointerException x ) {
continue;
} catch ( CancelledKeyException x ) {
continue;
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}

Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (run && iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
try {
attachment.access();
iterator.remove();
sk.interestOps(sk.interestOps() & (~sk.readyOps()));
if ( sk.isReadable() ) {
countDown(attachment.getReadLatch());
}
if (sk.isWritable()) {
countDown(attachment.getWriteLatch());
}
}catch (CancelledKeyException ckx) {
sk.cancel();
countDown(attachment.getReadLatch());
countDown(attachment.getWriteLatch());
}
}//while
}catch ( Throwable t ) {
log.error("",t);
}
}
events.clear();
if (selector.isOpen()) {
try {
selector.selectNow();
}catch( Exception ignore ) {
if (log.isDebugEnabled())log.debug("",ignore);
}
}
}
}
}

对于响应也是类似的思路,也是先注册写事件,阻塞,都有写就绪事件时就解阻塞,开始写入数据。