Netty笔记—-概念
本文是读书笔记,读的《Netty实战》
一、Netty 的核心组件
- Channel;
- 回调;
- Future;
- 事件和 ChannelHandler。
1.Channel
Channel
是
Java NIO
的一个基本构造。
是
Java NIO
的一个基本构造。
它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执
行,一个或者多个不同的
I/O
操作的程序组件)的开放连接,如读操作和写操作 。
I/O
操作的程序组件)的开放连接,如读操作和写操作 。
目前,可以把
Channel
看作是传入(入站)或者传出(出站)数据的载体。因此,它可以
Channel
看作是传入(入站)或者传出(出站)数据的载体。因此,它可以
被打开或者被关闭,连接或者断开连接。
基本的
I/O
操作(
bind()
、
connect()
、
read()
和
write()
)依赖于底层网络传输所提
I/O
操作(
bind()
、
connect()
、
read()
和
write()
)依赖于底层网络传输所提
供的原语。在基于
Java
的网络编程中,其基本的构造是
class Socket
。
Netty
的
Channel
接
Java
的网络编程中,其基本的构造是
class Socket
。
Netty
的
Channel
接
口所提供的
API
,大大地降低了直接使用
Socket
类的复杂性。此外,
Channel
也是拥有许多
API
,大大地降低了直接使用
Socket
类的复杂性。此外,
Channel
也是拥有许多
预定义的、专门化实现的广泛类层次结构的根,下面是一个简短的部分清单:
- EmbeddedChannel;
- LocalServerChannel;
- NioDatagramChannel;
- NioSctpChannel;
- NioSocketChannel。
2.回调
一个
回调
其实就是一个方法,一个指向已经被提供给另外一个方法的方法的引用。这使得后
回调
其实就是一个方法,一个指向已经被提供给另外一个方法的方法的引用。这使得后
者 可以在适当的时候调用前者。
回调在广泛的编程场景中都有应用,而且也是在操作完成后通 知相关方最常见的方式之一。
Netty
在内部使用了回调来处理事件;当一个回调被触发时,相关的事件可以被一个
interface
在内部使用了回调来处理事件;当一个回调被触发时,相关的事件可以被一个
interface
ChannelHandler
的实现处理。代码清单
1-2
展示了一个例子:当一个新的连接已经被建立时,
的实现处理。代码清单
1-2
展示了一个例子:当一个新的连接已经被建立时,
ChannelHandler
的
channelActive()
回调方法将会被调用,并将打印出一条信息。
的
channelActive()
回调方法将会被调用,并将打印出一条信息。
![](https://img-blog.csdnimg.cn/6e368264a9c4449fab726511333eb616.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAankwMjI2ODg3OQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
3.Future和ChannelFuture
Future
提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操
提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操
作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。
Netty 提供了它自己的实现——ChannelFuture
,用于在执行异步操作的时候使用。
,用于在执行异步操作的时候使用。
ChannelFuture
提供了几种额外的方法,这些方法使得我们能够注册一个或者多个ChannelFutureListener实例。
提供了几种额外的方法,这些方法使得我们能够注册一个或者多个ChannelFutureListener实例。
监听器的回调方法
operationComplete()
,将会在对应的操作完成时被调用
。
operationComplete()
,将会在对应的操作完成时被调用
。
简而 言之 ,由
ChannelFutureListener
提供的通知机制消除了手动检查对应的操作是否完成的必要。
ChannelFutureListener
提供的通知机制消除了手动检查对应的操作是否完成的必要。
每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture
;也就是说,它们都不会阻塞。
;也就是说,它们都不会阻塞。
Netty
完全是异步和事件驱动的。
完全是异步和事件驱动的。
代码清单
1-4
显示了如何利用
ChannelFutureListener
。首先,要连接到远程节点
1-4
显示了如何利用
ChannelFutureListener
。首先,要连接到远程节点
上。然后,要注册一个新的
ChannelFutureListener
到对
connect()
方法的调用所返
ChannelFutureListener
到对
connect()
方法的调用所返
回的
ChannelFuture
上。当该监听器被通知连接已经建立的时候,要检查对应的状态 。
ChannelFuture
上。当该监听器被通知连接已经建立的时候,要检查对应的状态 。
如果该操作是成功的,那么将数据写到该
Channel
。否则,要从
ChannelFuture
中检索
Channel
。否则,要从
ChannelFuture
中检索
对应的
Throwable
。
Throwable
。
![](https://img-blog.csdnimg.cn/d01e3e2bd02b419a8dd3296a745b6b01.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAankwMjI2ODg3OQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
4.事件和 ChannelHandler
Netty 是一个网络编程框架,所以事件是按照它们与入站或出站数据流的相关性进行分类的。
4.1 可能由入站数据或者相关的状态更改而触发的事件包括:
- 连接已被激活或者连接失活;
- 数据读取
- 用户事件
- 错误事件
4.2 出站事件是未来将会触发的某个动作的操作结果,这些动作包括:
- 打开或者关闭到远程节点的连接;
- 将数据写到或者冲刷到套接字。
4.3 ChannelHandler
每个事件都可以被分发给
ChannelHandler 类中的某个用户实现的方法。
ChannelHandler 类中的某个用户实现的方法。
图
1-3
展示了一个事件是如何被一个这样的ChannelHandler 链处理的。
1-3
展示了一个事件是如何被一个这样的ChannelHandler 链处理的。
![](https://img-blog.csdnimg.cn/ebeb07c2ceba4836b50fecf732dc8138.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAankwMjI2ODg3OQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
Netty
提供了大量预定义的可以开箱即用的
ChannelHandler
实现,包括用于各种协议
提供了大量预定义的可以开箱即用的
ChannelHandler
实现,包括用于各种协议
(如
HTTP
和
SSL/TLS
)的
ChannelHandler
。在内部,
ChannelHandler
自己也使用了事件
HTTP
和
SSL/TLS
)的
ChannelHandler
。在内部,
ChannelHandler
自己也使用了事件
和
Future
,使得它们也成为了你的应用程序将使用的相同抽象的消费者。
Future
,使得它们也成为了你的应用程序将使用的相同抽象的消费者。
5.EventLoop
Netty
的异步编程模型是建立在
Future
和回调的概念之上的,而将事件派发到ChannelHandler
的异步编程模型是建立在
Future
和回调的概念之上的,而将事件派发到ChannelHandler
的方法则发生在更深的层次上。
拦截操作以及高速地转换入站数据和出站数据,都
只需要你提供回调或者利用操作所返回的
只需要你提供回调或者利用操作所返回的
Future。
Netty
通过触发事件将
Selector
从应用程序中抽象出来,消除了所有本来将需要手动编写
通过触发事件将
Selector
从应用程序中抽象出来,消除了所有本来将需要手动编写
的派发代码。
在内部,将会为每个
Channel
分配
一个
EventLoop
,用以处理所有事件,包括:
Channel
分配
一个
EventLoop
,用以处理所有事件,包括:
- 注册感兴趣的事件;
- 将事件派发给 ChannelHandler;
- 安排进一步的动作。
EventLoop 本身只由一个线程驱动,其处理了一个 Channel 的所有 I/O 事件
,并且在该
,并且在该
EventLoop
的整个生命周期内都不会改变。
的整个生命周期内都不会改变。
这个设计,支持你在ChannelHandler 实现中进行同步逻辑。
EventLoop
定义了
Netty
的核心抽象,用于处理连接的生命周期中所发生的事件.
定义了
Netty
的核心抽象,用于处理连接的生命周期中所发生的事件.
Channel、EventLoop、Thread 以及 EventLoopGroup 之间的关系
![](https://img-blog.csdnimg.cn/4e8bf7b5c6ab48f7827c7d0bf1191c05.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAankwMjI2ODg3OQ==,size_16,color_FFFFFF,t_70,g_se,x_16)
这些关系是:
- 一个 EventLoopGroup 包含一个或者多个 EventLoop;
- 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
- 所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
- 一个 Channel 在它的生命周期内只注册于一个 EventLoop;
- 一个 EventLoop 可能会被分配给一个或多个 Channel。
注意,在这种设计中,一个给定 Channel 的 I/O 操作都是由相同的 Thread 执行的,实际
上消除了对于同步的需要。
6.ChannelPipeline 接口
ChannelPipeline
提供了
ChannelHandler
链的容器,并定义了用于在该链上传播入站
提供了
ChannelHandler
链的容器,并定义了用于在该链上传播入站
和出站事件流的
API
。
API
。
当 Channel
被创建时,它会被自动地分配到它专属的
ChannelPipeline
。
被创建时,它会被自动地分配到它专属的
ChannelPipeline
。
ChannelHandler 安装到
ChannelPipeline
中的过程如下所示:
ChannelPipeline
中的过程如下所示:
- 一个ChannelInitializer的实现被注册到了ServerBootstrap中 ①;
- 当 ChannelInitializer.initChannel()方法被调用时,ChannelInitializer将在 ChannelPipeline 中安装一组自定义的 ChannelHandler;
- ChannelInitializer 将它自己从 ChannelPipeline 中移除。
![](https://img-blog.csdnimg.cn/57b0deb9b84748e7a9c45956c90d5626.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAankwMjI2ODg3OQ==,size_12,color_FFFFFF,t_70,g_se,x_16)
在 Netty 中,有两种发送消息的方式。
1.直接写到
Channel
中
Channel
中
这种方式将会导致消息从
ChannelPipeline 的尾端开始流动
ChannelPipeline 的尾端开始流动
2.写到和 ChannelHandler相关联的ChannelHandlerContext对象中。
这种方式将导致消息从 ChannelPipeline 中的下一个 ChannelHandler 开始流动。
7、Bootstrap和EventLoopGroup
有两种类型的Bootstrap:
- 1.用于客户端(简单地称为 Bootstrap)
- 2.(ServerBootstrap)用于服务器。
无论你的应用程序使用哪种协议或者处理哪种类型的数据, 唯一决定它使用哪种引导类的是它是作为一个客户端还是作为一个服务器。
![](https://img-blog.csdnimg.cn/3c9e4594751e443b8045daaf6e0cc8a4.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAankwMjI2ODg3OQ==,size_20,color_FFFFFF,t_70,g_se,x_16)
为什么引导一个客户端的bootstrap只需要一个 EventLoopGroup,但是一个ServerBootstrap 则需要两个(也可以是同一个实例)?
因为服务器需要两组不同的
Channel
。
Channel
。
第一组将只包含一个 ServerChannel
,代表服务器自身的已绑定到某个本地端口的正在监听的套接字。
,代表服务器自身的已绑定到某个本地端口的正在监听的套接字。
而第二组将包含所有已创建的用来处理传入客户端连接(对于每个服务器已经接受的连接都有一个)的 Channel
。
。
下图说明了这个模型,并且展示了为何需要两个不同的 EventLoopGroup
。
。
二、用NETTY实现ECHO的代码
1.服务端
package com.sid.echo.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
/**
* 绑定到服务器将在其上监听并接受传入连接请求的端口;
* 配置 Channel,以将有关的入站消息通知给 EchoServerHandler 实例。
* */
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println(
"Usage: " + EchoServer.class.getSimpleName() + " <port>");
}
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}
public void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(serverHandler);
}
});
//异步地绑定服务器;调用 sync()方法阻塞;等待直到绑定完成
ChannelFuture f = b.bind().sync();
//获取 Channel 的CloseFuture,并且阻塞当前线
f.channel().closeFuture().sync();
} finally {
//关闭 EventLoopGroup,释放所有的资源,包括所有被创建的线程
group.shutdownGracefully().sync();
}
}
}
package com.sid.echo.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
/**
* 对于每个传入的消息都要调用;
* */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}
/**
* —通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取中的最后一条消息;
* */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
/**
* —在读取操作期间,有异常抛出时会调用
*
*如果不捕获异常,会发生什么呢?
* 每个 Channel 都拥有一个与之相关联的 ChannelPipeline,其持有一个 ChannelHandler 的
* 实例链。在默认的情况下,ChannelHandler 会把对它的方法的调用转发给链中的下一个 ChannelHandler。因此,如果 exceptionCaught()方法没有被该链中的某处实现,那么所接收的异常将会被
* 传递到 ChannelPipeline 的尾端并被记录。为此,你的应用程序应该提供至少有一个实现了
* exceptionCaught()方法的 ChannelHandler。
* */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
2.客户端
package com.sid.echo.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println(
"Usage: " + EchoClient.class.getSimpleName() + " <host> <port>");
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
}
package com.sid.echo.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* SimpleChannelInboundHandler 与 ChannelInboundHandler
* 你可能会想:为什么我们在客户端使用的是 SimpleChannelInboundHandler,而不是在 EchoServerHandler 中所使用的 ChannelInboundHandlerAdapter 呢?这和两个因素的相互作用有
* 关:业务逻辑如何处理消息以及 Netty 如何管理资源。
* 在客户端,当 channelRead0()方法完成时,你已经有了传入消息,并且已经处理完它了。当该方
* 法返回时,SimpleChannelInboundHandler 负责释放指向保存该消息的 ByteBuf 的内存引用。
* 在 EchoServerHandler 中,你仍然需要将传入消息回送给发送者,而 write()操作是异步的,直
* 到 channelRead()方法返回后可能仍然没有完成(如代码清单 2-1 所示)。为此,EchoServerHandler
* 扩展了 ChannelInboundHandlerAdapter,其在这个时间点上不会释放消息。
* 消息在 EchoServerHandler 的 channelReadComplete()方法中,当 writeAndFlush()方
* 法被调用时被释放
* */
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 每当接收数据时,都会调用这个方法
*
* 由服务器发送的消息可能会被分块接收。也就是说,如果服务器发送了 5 字节,那么不
* 能保证这 5 字节会被一次性接收。即使是对于这么少量的数据,channelRead0()方法也可能
* 会被调用两次,第一次使用一个持有 3 字节的 ByteBuf(Netty 的字节容器),第二次使用一个
* 持有 2 字节的 ByteBuf。作为一个面向流的协议,TCP 保证了字节数组将会按照服务器发送它
* 们的顺序被接收。
* */
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
System.out.println(
"Client received: " + in.toString(CharsetUtil.UTF_8));
}
/**
* 其将在一个连接建立时被调用
* */
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",
CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
3.SimpleChannelInboundHandler 与 ChannelInboundHandler
为什么我们在客户端使用的是 SimpleChannelInboundHandler,而不是在 EchoServerHandler 中所使用的 ChannelInboundHandlerAdapter 呢?
这和两个因素的相互作用有关:业务逻辑如何处理消息以及 Netty 如何管理资源。
在客户端,当 channelRead0()方法完成时,你已经有了传入消息,并且已经处理完它了。当该方法返回时,SimpleChannelInboundHandler 负责释放指向保存该消息的 ByteBuf 的内存引用。
在 EchoServerHandler 中,你仍然需要将传入消息回送给发送者,而 write()操作是异步的,直到 channelRead()方法返回后可能仍然没有完成。为此,EchoServerHandler
扩展了 ChannelInboundHandlerAdapter,其在这个时间点上不会释放消息。
消息在 EchoServerHandler 的 channelReadComplete()方法中,当 writeAndFlush()方法被调用时被释放
版权声明:本文为jy02268879原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。