Netty基础

NIO类库和API繁杂,使用麻烦需要熟练掌握SelectorServerSocketChannelSocketChannelByteBuffer等,且客户端断线重连网络闪断心跳处理半包读写网络拥塞异常流的处理等问题开发工作量和难度都非常大。

Netty对JDK自带的NIO的API进行了良好的封装,解决了上述问题。且Netty拥有高性能吞吐量更高延迟更低减少资源消耗最小化不必要的内存复制等优点。Netty现在都在用的是4.x,5.x版本已废弃,Netty4.x需要JDK6以上版本支持。

在分布式系统中,各个节点之间需要远程服务调用,高性能的RPC框架必不可少,Netty作为异步高性能通信框架,往往作为基础通信组件被这些RPC框架使用,RocketMQDubboZookeeper等底层就是用Netty作为基础通信组件Hadoop高性能通信和序列化组件Avro的RPC框架,默认采用Netty进行跨界点通信,其Netty Service基于Netty框架二次封装实现。Netty作为高性能基础通信组件本身提供了TCP/UDPHTTP协议栈

Netty基本概念

一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类ServerBootstrap是服务端启动引导类。Netty中所有IO操作都是异步的不能立刻得知消息是否被正确处理,可过一会等它执行完成或直接注册一个监听,具体的实现就是通过FutureChannelFutures,可注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件

Channel是Netty网络通信组件,可用于执行网络I/O操作,Channel为用户提供当前网络连接通道状态,如是否打开、是否已连接,网络连接配置参数接收缓冲区大小,提供异步的网络I/O操作建立连接读写绑定端口等,异步调用意味着任何I/O调用都将立即返回,且不保证在调用结束时所请求的I/O操作已完成,调用立即返回一个ChannelFuture实例,通过注册监听器到ChannelFuture上可I/O操作成功、失败或取消时回调通知调用方支持关联I/O操作与对应的处理程序。且不同协议不同的阻塞类型的连接都有不同的Channel类型与之对应,涵盖了UDP和TCP网络IO以及文件IO:

  • NioSocketChannel,异步的客户端TCP Socket连接
  • NioServerSocketChannel,异步的服务器端TCP Socket连接
  • NioDatagramChannel,异步的UDP连接
  • NioSctpChannel,异步的客户端Sctp连接
  • NioSctpServerChannel,异步的Sctp服务器端连接

Netty基于Selector对象实现I/O多路复用,通过Selector一个线程可以监听多个连接的Channel事件,当向一个Selector中注册Channel后,Selector内部机制就可自动不断地查询这些注册的Channel是否有已就绪的I/O事件,如可读可写网络连接完成等,这样程序就可以很简单地使用一个线程高效地管理多个Channel。

NioEventLoop中维护了一个线程任务队列支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务,I/O任务即SelectionKeyready的事件如acceptconnectreadwrite等,由processSelectedKeys方法触发。非IO任务添加到taskQueue中的任务,如register0bind0 等任务,由runAllTasks方法触发。

NioEventLoopGroup主要管理NioEventLoop的生命周期,可理解为一个线程池,内部维护了一组线程,每个NioEventLoop线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程

ChannelHandler是一个接口,处理I/O事件拦截I/O操作并将其转发到其ChannelPipeline业务处理链中的下一个处理程序,ChannelHandler本身并没有提供很多方法,因为该接口有许多的方法需要实现,方便使用期间可继承用于处理入站I/O事件的子类ChannelInboundHandler和用于处理出站I/O事件的子类ChannelOutboundHandler。或使用ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter适配器。

ChannelHandlerContext保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象。

ChannelPipline是保存ChannelHandler的List,用于处理或拦截Channel入站事件出站操作ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可完全控制事件处理方式,以及Channel中各个ChannelHandler如何相互交互,在Netty中每个Channel都有且仅有一个ChannelPipeline与之对应,而ChannelPipeline中又维护了一个ChannelHandlerContext组成的双向链表,且每个ChannelHandlerContext中又关联着一个ChannelHandler

read事件入站事件write事件出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的handler,出站事件会从链表tail往前传递到最前一个出站的handler,两种类型的handler互不干扰

示例

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.35.Final</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(3);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
ServerBootstrap bootstrap = new ServerBootstrap(); // 创建服务器端的启动对象
// 使用链式编程来配置参数
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel作为服务器的通道实现
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在SocketChannel建立起来之前执行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler()); // 对workerGroup的SocketChannel设置处理器
}
});
System.out.println("netty server start。。");
// 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
// 启动服务器并绑定端口,bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture cf = bootstrap.bind(9000).sync();
// 给cf注册监听器,监听关心的事件
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});*/
// 等待服务端监听端口关闭,closeFuture是异步操作
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
/**
* 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端连接服务器完成就会触发该方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("客户端连接通道建立完成");
}
/**
* 读取客户端发送的数据
* @param ctx 上下文对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//Channel channel = ctx.channel();
//ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
// 将msg转成一个ByteBuf,类似NIO的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到客户端的消息:" + buf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) { // 数据读取完毕处理方法
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close(); // 处理异常, 一般是需要关闭通道
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
EventLoopGroup group = new NioEventLoopGroup(); //客户端需要一个事件循环组
try {
//创建客户端启动对象,注意客户端使用的不是ServerBootstrap而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler()); //加入处理器
}
});
System.out.println("netty client start。。");
//启动客户端去连接服务器端
ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
cf.channel().closeFuture().sync(); //对通道关闭进行监听
} finally {
group.shutdownGracefully();
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
// 当客户端连接服务器完成就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
// 当通道有读取事件时会触发,即服务端发送数据给客户端
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

Handler生命周期回调接口调用顺序依次为:

  • handlerAdded:新建立的连接会按照初始化策略,把handler添加到该channel的pipeline里面,即channel.pipeline.addLast(new LifeCycleInBoundHandler)执行完成后的回调
  • channelRegistered:当该连接分配到具体的Worker线程后,执行该回调
  • channelActive:channel准备工作完成,所有pipeline添加完成,并分配到具体的线上,说明该channel准备就绪可以使用了
  • channelRead:客户端向服务端发来数据,每次都会回调此方法,表示有数据可读
  • channelReadComplete:服务端每次读完一次完整的数据后,回调该方法表示数据读取完毕
  • channelInactive:当连接断开时该回调会被调用,底层TCP连接已被断开
  • channelUnRegistered:对应channelRegistered,当连接关闭后,释放绑定的Workder线程
  • channelUnRegistered:对应handlerAdded,将handler从该channel的pipeline移除后的回调方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class LifeCycleInBoundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered: channel注册到NioEventLoop");
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered: channel取消和NioEventLoop的绑定");
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive: channel准备就绪");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive: channel被关闭");
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead: channel中有可读的数据" );
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete: channel读数据完成");
super.channelReadComplete(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded: handler被添加到channel的pipeline");
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved: handler从channel的pipeline中移除");
super.handlerRemoved(ctx);
}
}

Netty线程模型

Netty抽象出BossGroupWorkerGroup两组线程池,BossGroup专门负责接收客户端的连接WorkerGroup专门负责网络的读写BossGroupWorkerGroup类型都是NioEventLoopGroupNioEventLoopGroup相当于一个事件循环线程组,该组中含有多个事件循环线程,每个事件循环线程是NioEventLoop

每个NioEventLoop都有一个Selector , 用于监听注册在其上的SocketChannel的网络通讯,每个Boss NioEventLoop线程内部循环执行如下步骤

  • 处理Accept事件与Client建立连接,生成NioSocketChannel
  • NioSocketChannel注册到某个Worker NIOEventLoop上的Selector
  • 处理任务队列的任务, 即runAllTasks

每个Worker NIOEventLoop处理NioSocketChannel业务时会使用pipeline管道,管道中维护了很多Handler处理器用来处理Channel中的数据,线程循环执行如下步骤

  • 轮询注册到当前Selector上的所有NioSocketChannelreadwrite事件
  • 处理I/O事件,即read , write事件,在对应NioSocketChannel处理业务
  • runAllTasks处理任务队列TaskQueue的任务,一些耗时业务处理一般可放入TaskQueue中慢慢处理,这样不影响数据在pipeline中的流动处理

ByteBuf

ByteBuf由一串字节数组构成,ByteBuf提供了两个索引,分别用于读取数据写入数据。这两个索引通过在字节数组中移动来定位需要读或者写信息的位置。当ByteBufreaderIndex读索引将会根据读取的字节数递增。当ByteBufwriterIndex会根据写入字节数进行递增

通过readerindex和writerIndex和capacity,将buffer分成三个区域,已经读取区域[0, readerindex)可读取区域[readerindex, writerIndex)可写区域[writerIndex, capacity)

若极限情况下readerIndex刚好读到writerIndex写入的地方,若readerIndex超过writerIndexNetty会抛出 IndexOutOfBoundsException异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ByteBuf byteBuf = Unpooled.buffer(1);
System.out.println("byteBuf=" + byteBuf);
for (int i = 0; i < 8; i++) {
byteBuf.writeByte(i);
}
System.out.println("byteBuf=" + byteBuf);
for (int i = 0; i < 5; i++) {
System.out.println(byteBuf.getByte(i)); // readerIndex保持不变
}
System.out.println("byteBuf=" + byteBuf);
for (int i = 0; i < 5; i++) {
System.out.println(byteBuf.readByte()); // 移动readerIndex
}
System.out.println("byteBuf=" + byteBuf);

//用Unpooled工具类创建ByteBuf
ByteBuf byteBuf2 = Unpooled.copiedBuffer("hello,eleven!", CharsetUtil.UTF_8);
if (byteBuf2.hasArray()) { //使用相关的方法
byte[] content = byteBuf2.array();
System.out.println(new String(content, CharsetUtil.UTF_8)); //将content转成字符串
System.out.println("byteBuf2=" + byteBuf2);
System.out.println(byteBuf2.getByte(0)); // 获取数组0这个位置的字符h的ascii码,h=104
int len = byteBuf2.readableBytes(); // 可读的字节数12
System.out.println("len=" + len);
for (int i = 0; i < len; i++) { //使用for取出各个字节
System.out.println((char) byteBuf2.getByte(i));
}
//范围读取
System.out.println(byteBuf2.getCharSequence(0, 6, CharsetUtil.UTF_8));
System.out.println(byteBuf2.getCharSequence(6, 6, CharsetUtil.UTF_8));
}

编码解码器

通过Netty发送接收消息时,会发生一次数据转换,入站消息被解码即从字节转换为另一种格式如Java对象,出站消息会被编码成字节

Netty提供了一系列实用的编码解码器,都实现了ChannelInboundHadnlerChannelOutboundHandler接口。编码解码器中channelRead方法已被重写,当调用解码器时将调用解码器所提供的decode()方法进行解码,并将已解码的字节转发给ChannelPipeline的下一个ChannelInboundHandler

Netty提供了很多编解码器,如字符串编解码StringEncoderStringDecoder对象编解码ObjectEncoderObjectDecoder等。若要实现高效的编解码可用protobuf,但protobuf需要维护大量的proto文件比较麻烦。

protostuff是一个基于protobuf实现的序列化方法,它较于protobuf最明显的好处是,在几乎不损耗性能的情况下做到了不用写.proto文件来实现序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-api</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.0.10</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ProtostuffUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
private static <T> Schema<T> getSchema(Class<T> clazz) {
@SuppressWarnings("unchecked")
Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
if (schema == null) {
schema = RuntimeSchema.getSchema(clazz);
if (schema != null) {
cachedSchema.put(clazz, schema);
}
}
return schema;
}
/**
* 序列化
*/
public static <T> byte[] serializer(T obj) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(clazz);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
/**
* 反序列化
*/
public static <T> T deserializer(byte[] data, Class<T> clazz) {
try {
T obj = clazz.newInstance();
Schema<T> schema = getSchema(clazz);
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
public static void main(String[] args) {
byte[] userBytes = ProtostuffUtil.serializer(new User(1, "eleven"));
User user = ProtostuffUtil.deserializer(userBytes, User.class);
System.out.println(user);
}
}

粘包拆包

TCP是一个流协议即没有界限的一长串二进制数据,面向流的通信是无消息保护边界的。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,在业务上认为是一个完整的包,可能会TCP拆分成多个包进行发送,也可能把多个小包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题

解决粘包拆包比较常用的有消息定长度在数据包尾部添加特殊分隔符发送数据长度三种方式:

  • 消息定长度:传输的数据大小固定长度,如每段长度固定为100字节,若不够空位补空格
  • 在数据包尾部添加特殊分隔符:如下划线,中划线等,该方法简单易行,但选择分隔符时一定每条数据内部一定不能出现分隔符
  • 发送长度:发送每条数据时,将数据的长度一并发送,如可选择每条数据的前4位是数据的长度,应用层处理时可根据长度来判断每条数据的开始和结束

Netty提供了多个解码器可进行分包的操作,也可通过继承ByteToMessageDecoderMessageToByteEncoder自定义解码器:

  • LineBasedFrameDecoder:回车换行分包
  • DelimiterBasedFrameDecoder:特殊分隔符分包
  • FixedLengthFrameDecoder:固定长度报文来分包

自动重连

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class NettyClient {
private String host;
private int port;
private Bootstrap bootstrap;
private EventLoopGroup group;
public static void main(String[] args) throws Exception {
NettyClient nettyClient = new NettyClient("localhost", 9000);
nettyClient.connect();
}
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
init();
}
private void init() {
group = new NioEventLoopGroup(); //客户端需要一个事件循环组
bootstrap = new Bootstrap(); // bootstrap 可重用, 只需在NettyClient实例化的时候初始化即可.
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler(NettyClient.this)); //加入处理器
}
});
}
public void connect() throws Exception {
System.out.println("netty client start。。");
ChannelFuture cf = bootstrap.connect(host, port); //启动客户端去连接服务器端
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { //重连交给后端线程执行
future.channel().eventLoop().schedule(() -> {
System.err.println("重连服务端...");
try {
connect();
} catch (Exception e) {
e.printStackTrace();
}
}, 3000, TimeUnit.MILLISECONDS);
} else {
System.out.println("服务端连接成功...");
}
}
});
cf.channel().closeFuture().sync(); //对通道关闭进行监听
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private NettyClient nettyClient;
public NettyClientHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}
// channel 处于不活动状态时调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
nettyClient.connect(); // channel处于不活动状态时调用重连
}
}