在3月20日的博客里,我通过代码来理解了一下NIO和BIO的区别,可以看出一部分代码是比较麻烦和复杂的,在项目开发中一般遇到这种普适性的问题,都会有前人造出轮子来让我们使用,而对于NIO的网络编程,最佳的轮子就是Netty。
Netty的语言实现是Java,所以去看源码什么的也会比较容易一点。
Netty的核心在于异步、事件驱动。
在了解Netty之前必须先了解Reactor设计模式。
Reactor的翻译是反应堆,其实本质上就是事件驱动模式。
在Reactor模式中,有一些比较重要的概念:
Netty实现了Reactor的三种模型,分别为:
我们在上一篇博客中实现的NIO模式,其实就是单Reactor单线程模式。
只启用一个Selector,这个Selector又要监听Accept事件,获得Channel之后将对应的Channel也注册到同一个Selector上去,这个Selector需要负责监听这个Channel的读写事件以及之前的ServerChannel的注册事件。
这里我用了掘金的图,我觉得画的比较符合我的认知。
那么在我们上篇博客里,服务器的实现代码如下:
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
public static void main(String[] args) throws Exception {
// NIO基于Channel控制,所以有Selector管理所有的Channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 设置监听端口
serverSocketChannel.bind(new InetSocketAddress(8080));
// 设置Selector管理所有Channel
Selector selector = Selector.open();
// 注册并设置连接时处理
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务启动成功,监听端口为:" + 8080);
// NIO使用轮询,当有请求连接时,则启动一个线程
int keySelect = 0;
while (serverSocketChannel.isOpen()) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
if (next.isAcceptable()) { // 如果是连接的
SocketChannel accept = serverSocketChannel.accept();
if (accept != null) {
//把新的会话测channel注册到selector里去,让Selector来管理它
accept.configureBlocking(false);
//并且把它的感兴趣状态变为可读状态
accept.register(selector,SelectionKey.OP_READ);
}
iterator.remove();
}
//一旦可读了,就代表客户端发来了消息,那我们就去处理这个消息
if(next.isReadable()){
handleRead(next);
}
}
}
serverSocketChannel.close();
}
//其实处理这个消息我们依然有一处是阻塞的,就是我们返回给客户端的时候,要求客户端是写可用的。
//但是写可用的触发是很多次的,一般都是写可用的,所以我们就没有做过多的干预
private static void handleRead(SelectionKey key) throws Exception {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(50);
buffer.clear();
int read = channel.read(buffer);
String msg = new String(buffer.array(), 0, read).trim();
System.out.println("服务端收到消息:"+msg);
String outMsg = "【Echo】" + msg; // 生成回应信息
//模拟消息处理时长
Thread.sleep(5000);
buffer.clear();
buffer.put(outMsg.getBytes()); //回传信息放入缓冲区
buffer.flip();
channel.write(buffer);// 回传信息
}
}
那么如果这段代码用Netty实现该如何实现呢?
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup singleReactor = new NioEventLoopGroup(1);//限制为单线程
bootstrap.group(singleReactor)
.channel(NioServerSocketChannel.class)
.handler(new BossLogHandler())
.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline channelPipeline = channel.pipeline();
// 添加 HTTP 编解码器
channelPipeline.addLast(new HttpServerCodec());
// 聚合 HTTP 消息
channelPipeline.addLast(new HttpObjectAggregator(65536));
// 处理 WebSocket 升级请求
channelPipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//超时处理器
channelPipeline.addLast(new IdleStateHandler(10,10,1000, TimeUnit.SECONDS));
// 自定义处理器
channelPipeline.addLast(new MyChannelHandler());
}
});
ChannelFuture future = bootstrap.bind("127.0.0.1", 8080).sync();
System.out.println("服务器启动");
// 等待连接关闭
future.channel().closeFuture().sync();
}
}
这个服务端实现起来是比较简单的,因为只有一个EventLoop,但其实多线程模式和主从模式其实也只需要稍作修改即可。
单线程模式的优势为:
存在的问题:
阻塞模式、CPU利用率低、不适合高并发场景
和单线程模型不同的点在于多了一个handler线程池。
Reactor设计模式的单Reactor多线程模式和Netty的多线程模型其实并不完全对应。
Reactor的单Reactor多线程模式只有一个线程的问题,但是仍然只有一个Reactor在同时监听ACCEPT事件和READ事件。
但是Netty的多线程模式则并不是,所有线程都负责处理连接和数据。
EventLoopGroup singleReactor = new NioEventLoopGroup();
就把这个1去掉就行了。
多线程模型的优点包括:
存在的问题:
为了解决这种问题Netty设计出了主从模型。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup(1);//限制为单线程
EventLoopGroup workers = new NioEventLoopGroup(1);//限制为单线程
bootstrap.group(boss,workers)
.channel(NioServerSocketChannel.class)
.handler(new BossLogHandler())
.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline channelPipeline = channel.pipeline();
// 添加 HTTP 编解码器
channelPipeline.addLast(new HttpServerCodec());
// 聚合 HTTP 消息
channelPipeline.addLast(new HttpObjectAggregator(65536));
// 处理 WebSocket 升级请求
channelPipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//超时处理器
channelPipeline.addLast(new IdleStateHandler(10,10,1000, TimeUnit.SECONDS));
// 自定义处理器
channelPipeline.addLast(new MyChannelHandler());
}
});
ChannelFuture future = bootstrap.bind("127.0.0.1", 8080).sync();
System.out.println("服务器启动");
// 等待连接关闭
future.channel().closeFuture().sync();
}
}
通过将线程之间的任务分开,分为负责监听并建立新连接的boss线程池和负责处理已经连接的连接的事件的workers线程池,能够有效实现不同分工的负载均衡。
EvenLoop其实是Netty的核心所在,我们常常定义的EventLoopGroup是用来管理EventLoop实例线程池的。常见的实现包括NioEventLoopGroup和EpollEventLoopGroup。
就像我想在mac上运行就会报错,只在Linux上支持
Exception in thread "main" java.lang.UnsatisfiedLinkError: failed to load the required native library
at io.netty.channel.epoll.Epoll.ensureAvailability(Epoll.java:81)
at io.netty.channel.epoll.EpollEventLoopGroup.<clinit>(EpollEventLoopGroup.java:41)
at NettyServer.main(NettyServer.java:16)
Caused by: java.lang.ExceptionInInitializerError
at io.netty.channel.epoll.Epoll.<clinit>(Epoll.java:40)
... 2 more
Caused by: java.lang.IllegalStateException: Only supported on Linux
at io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:317)
at io.netty.channel.epoll.Native.<clinit>(Native.java:85)
... 3 more
Channel其实就是类似于Socket的网络连接,可以是客户端连接或服务器监听端口。Channel负责读写网络数据,并注册到EventLoop中等待事件处理。
而Channel的I/O事件则会交给它的ChannelPipeline来处理,用户可以在ChannelPipeline添加一系列ChannelHandler,包括解码器、聚合器、WebSocketServerProtocolHandler这种http升级ws的Handler以及用户自定义的ChannelHandler(通过继承SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter或者实现ChannelInboundHandler)来定义
Netty的事件驱动模型通过EventLoopGroup、EventLoop、Channel、ChannelPipeline和ChannelHandler之间的协同工作来实现。其工作流程如下: