netty-接受请求过程源码分析

前言

netty学习系列笔记总结,接受请求过程源码浅析,错误之处欢迎指正, 共同学习

背景

netty启动过程源码分析,我们得知,服务器最终注册了一个 Accept 事件等待客户端的连接。我们也知道,NioServerSocketChannel 将自己注册到了 boss 单例线程池(reactor 线程)上,也就是 EventLoop.

EventLoop所做的事情均分为以下三个步骤

1.轮询注册在selector上的IO事件

2.处理IO事件

3.执行异步task

新链接的建立

  • 检测到有新的连接
  • 将新的连接注册到worker线程组
  • 注册新连接的读事件
1.检测新连接
  • 进入到 NioEventLoop 源码中,找到 processSelectedKey()方法设置断点
  • debug 启动 EchoServer 的 main 方法
  • 建立一个新的连接:telnet 127.0.0.1 8888

上面代码表示boos reactor线程已经轮询到 SelectionKey.OP_ACCEPT 事件,说明有新的连接进入,此时将调用channel的 unsafe来进行实际的操作。接下来,进入到它的read方法,进入新连接处理的第二步

2.注册到reactor线程

1
2
3
4
1.检查该 eventloop 线程是否是当前线程。
2.执行 doReadMessages 方法,并传入一个 readBuf 变量,这个变量是一个 List,也就是容器。
3.循环容器,执行 pipeline.fireChannelRead(readBuf.get(i));
4.清理容器,触发 pipeline.fireChannelReadComplete()

进入 doReadMessages 方法

netty调用jdk底层 javaChannel().accept();由于netty中reactor线程第一步就扫描到有accept事件发生,因此,这里的accept方法是立即返回的,返回jdk底层nio创建的一条channel

netty将jdk的 SocketChannel 封装成自定义的 NioSocketChannel,添加到容器中,做后续处理

默认一次最多读取16条连接

创建NioSocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();

try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}

@Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}

该方法调用了 NioServerSocketChannel 中的 serverSocketChannel.accept() 方法。返回了一个 Nio 的通道,注意:这个通道,就是我们刚刚 Boss 线程监听到的 Accept 事件,相当于一个 Tcp 连接。

然后我们看 NioSocketChannel 的创建过程,其中参数 this 是 NioServerSocketChannel ,这个就是 SocketChannel 的 parent 属性,ch 是 SocketChannel 。构造方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}

// NioSocketChannel的父类为 AbstractNioByteChannel
// 注册了SelectionKey.OP_READ,表示对channel的读感兴趣
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}

//AbstractNioByteChannel的父类 AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
// readInterestOp 表示该channel关心的事件是 SelectionKey.OP_READ,后续会将该事件注册到selector,之后设置该通道为非阻塞模式
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}

throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}

// super(parent)
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();// NioSocketChannel$NioSocketChannelUnsafe
pipeline = newChannelPipeline();
}

Netty中的Channel的分类

1.Unsafe[实现Channel读写抽象],服务端NioMessageUnsafe读连接,客户端NioByteUnsafe读数据

2.AbstractChannel用于实现channel的大部分方法,其中我们最熟悉的就是其构造函数中,创建出一条channel的基本组件

3.AbstractNioChannel基于AbstractChannel做了nio相关的一些操作,保存jdk底层的 SelectableChannel,并且在构造函数中设置channel为非阻塞

4.最后,就是两大channel,NioServerSocketChannel,NioSocketChannel对应着服务端接受新连接过程和新连接读写过程,分别创建NioServerSocketChannelConfig,NioSocketChannelConfig

在创建出一条 NioSocketChannel之后,放置在List容器里面之后,就开始进行下一步操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void read() {
// ......
try {
do {
int localRead = doReadMessages(readBuf);
// ......
} while (allocHandle.continueReading());

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
// ......
} finally {
// ......
}
}

循环执行 pipeline.fireChannelRead 方法

DefaultChannelPipeline.fireChannelRead(NioSocketChannel)

1
2
3
4
5
6
// 这里的Pipeline是NioServerSocketChannel的Pipeline, 此时Pipeline中有一个ServerBootstrapAcceptor, 所以会传递到ServerBootstrapAcceptor的channelRead方法
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}

ServerBootstrapAcceptor.channelRead(ctx, msg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
void init(Channel channel) throws Exception {
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}

// 初始化ServerBootstrapAcceptor时, 该Acceptor有四个属性是由ServerBootstrap传过来的.
this.childGroup = currentChildGroup;
this.childHandler = currentChildHandler;
this.childOptions = currentChildOptions;
this.childAttrs = currentChildAttrs;

// 这里的childGroup就是subReactor, childHandler就是
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.childHandler(new ChannelInitializer<SocketChannel>() {...})

这个ChannelInitializer是用来初始化NioSocketChannel所对应的Pipeline的.

// 来看ServerBootstrapAcceptor的read方法.
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

// ......
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}


拿到该channel,也就是我们之前new出来的 NioSocketChannel对应的pipeline,将用户代码中的
childHandler,添加到pipeline,然后进入到 childGroup.register(child),这里的childGroup就是我们在启动代码中new出来的NioEventLoopGroup。之后, 触发channelRegister事件, 执行ChannelInitializer的initChannel方法, 进一步初始化NioSocketChannel所对应的pipeline.

至此, 接收客户端并注册到NioEventLoop的过程完毕。

接下来重点看下childGroup.register方法,为什么是subReactor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// next方法使用位运算获取数组中的EventLoop
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

promise.channel() 方法返回的是 NioSocketChannel
promise.channel().unsafe() NioSocketChannel$NioSocketChannelUnsafe
所以最终调用的是 NioSocketChannel 的内部类的 register 方法。
参数是当前的 EventLoop 和 promise
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ......

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// ......
}
}
}

// register0
private void register0(ChannelPromise promise) {
try {
// ......
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
// ......
}

// doRegister(),真正的注册过程
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
将该条channel绑定到一个selector上去,一个selector被一个reactor线程使用,
后续该channel的事件轮询,以及事件处理,异步task执行都是由此reactor线程来负责

绑定完reactor线程之后,调用 pipeline.invokeHandlerAddedIfNeeded()

pipeline.invokeHandlerAddedIfNeeded();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 该方法最终会调用到用户的initChannel方法
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}

// 也就是用户代码
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
}
});
3.注册读事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void register0(ChannelPromise promise) {
try {
// ......
// 再调用一下业务pipeline中每个处理器的 ChannelHandlerAdded方法处理下回调
pipeline.fireChannelRegistered();
if (isActive()) { // 在连接已经建立的情况下返回true
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// ......
}
}

protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;
// 将 SelectionKey.OP_READ事件注册到selector中去,表示这条通道已经可以开始处理read事件了
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

总结

1.boos reactor线程轮询到有新的连接进入
2.通过封装jdk底层的channel创建 NioSocketChannel以及一系列的netty核心组件
3.将该条连接通过chooser,选择一条worker reactor线程绑定上去
4.注册读事件,开始新连接的读写

问:Netty是在哪里检测有新连接接入的?
答:Boss线程通过服务端Channel绑定的Selector轮询OP_ACCEPT事件,通过JDK底层Channel的accept()方法获取JDK底层SocketChannel创建新连接

问:新连接是怎样注册到NioEventLoop线程的?
答:Worker线程调用Chooser的next()方法选择获取NioEventLoop绑定到客户端Channel,使用doRegister()方法将新连接注册到NioEventLoop的Selector上面

Netty新连接接入处理逻辑:
服务端Channel绑定的NioEventLoop即Boss线程轮询OP_ACCEPT事件,调用服务端Channel的accept()方法获取客户端Channel封装成NioSocketChannel,
封装创建组件Unsafe用来实现Channel读写和Pipeline负责数据处理业务逻辑链,服务端Channel通过连接接入器ServerBootstrapAcceptor给客户端Channel分配NioEventLoop,
将客户端Channel绑定到Selector上面,通过传播Channel Active方法将客户端Channel读事件注册到Selector

------本文结束感谢阅读------
显示评论
0%