IO模式详解

37 min

我看了B站一堆教程,很少有能把IO多路复用讲明白的,所以我试一试看看能不能讲明白吧。

误!其实还是有的,我这里直接挂个链接!

BIO、NIO、IO多路复用

BIO

BIO的概念是同步阻塞IO,两个关键词,同步,阻塞。

同步在这里主要是看有消息返回之后,是否需要原线程继续处理。

阻塞是指一个线程在等待某个操作完成时暂停其执行的状态。

我们以Socket通信的方式来实现这些BIO,以一方请求另一方为例子。

我们先设计一个Socket的服务端,客户端每次发来消息就让线程休息5s,来模拟执行耗时,5s后给客户端发送一条消息。

当前我们设计一个客户端,一共进行四次输入,第一次客户端输入不阻塞等带服务器的返回。

服务端代码如下:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;

public class BIOServer {
    public static void main(String[] args) throws IOException {
        //定义服务器的接收端口
        ServerSocket serverSocket = new ServerSocket(8080);
        System.out.println("Server started on port "+8080);
        while(true){
            Socket socket = serverSocket.accept();//阻塞等待客户端连接
            new Thread(new BioHandler(socket)).start(); // 为每个客户端连接启动新线程
        }
    }

    static class BioHandler implements Runnable {
        private Socket socket;

        public BioHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                BufferedReader in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
                PrintWriter out = new PrintWriter(this.socket.getOutputStream(), true);

                String inputLine;
                while ((inputLine = in.readLine()) != null) { // 阻塞等待输入
                    System.out.println("Received: " + inputLine);
                    System.out.println(new Date());
                    //sleep5s,模拟处理时间
                    Thread.sleep(5000);
                    out.println("Echo: " + inputLine); // 回显消息
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            } finally {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

客户端不阻塞等待返回如下:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Date;

public class BIOClient {
    public static void main(String[] args) throws IOException {
        //假设我们固定服务器的端口号为8080,之后我们会访问这个端口号,然后我们会模拟服务器需要很久才会返回
        Socket socket = new Socket("localhost",8080);
        //定义发送流
        PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
        //定义接受流
        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        System.out.println("Connected to server. Type messages and press enter.");
        String userInput;
        //我一共进行四次输入,按理来说我这四次输入间隔时间应该是很短的
        for(int i=0;i<=4;i++){
            userInput = "mess"+i;
            out.println(userInput); // 发送到服务器
            System.out.println("消息:\""+userInput+"\"已经成功发送,当前时间是"+new Date());//描述成功发送到服务器
        }

        out.close();
        in.close();
        socket.close();

    }
}

然后我们看客户端和服务器端分别的时间

客户端

BIO非阻塞客户端
BIO非阻塞客户端

服务端

BIO非阻塞服务端
BIO非阻塞服务端

可以看出,非阻塞的客户端是能直接把全部的消息发给服务器端的,只不过服务器端是一个一个处理的

那如果我们的客户端阻塞等待服务器端的返回呢?

只需要在客户端里加上一行代码就可以了。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Date;

public class BIOClient {
    public static void main(String[] args) throws IOException {
        //假设我们固定服务器的端口号为8080,之后我们会访问这个端口号,然后我们会模拟服务器需要很久才会返回
        Socket socket = new Socket("localhost",8080);
        //定义发送流
        PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
        //定义接受流
        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        System.out.println("Connected to server. Type messages and press enter.");
        String userInput;
        //我一共进行四次输入,按理来说我这四次输入间隔时间应该是很短的
        for(int i=0;i<=4;i++){
            userInput = "mess"+i;
            out.println(userInput); // 发送到服务器
            System.out.println("消息:\""+userInput+"\"已经成功发送,当前时间是"+new Date());//描述成功发送到服务器
            System.out.println("echo: " + in.readLine()); // 阻塞等待服务器返回消息
        }

        out.close();
        in.close();
        socket.close();

    }
}

那么这时候客户端的发送的方式是什么呢?

BIO阻塞客户端
BIO阻塞客户端

从结果我们发现,客户端阻塞在这里,等待服务器消费完一条消息才能执行自己接下来要做的事情,如果这时候客户端不是要给服务器发送消息,而是做其他的事情,是不是这些等待时间就是不必要的,完全可以去做其他的事情。

那么这时候,我们是不是能有这样一种方法,我们不在这里等着服务器处理完,我可以先去做其他的事情,然后时不时看服务器有没有处理完,处理完了我再获取这个数据呢。这时候NIO就应运而生了。

NIO

NIO的概念是同步非阻塞IO,也就是说我不用阻塞在这里等待对方执行完毕。而是我传递完消息之后就去做我自己的事情,时不时的通过Selector来看一看它是不是有返回值,也就是ReadAble状态。

所以接下来我们就通过Java来简单实现一下客户端和服务端。

首先介绍一下Selector,它提供了一种机制来管理多个通道(Channel)的I/O操作。通过使用selector,可以利用一个线程来监视多个通道上的事件(如连接请求、数据到达等),从而实现高效的服务端应用设计,尤其是在处理大量并发连接时。

我们先来改造客户端吧,改造比较容易,首先是消息就不通过PrintWriter发送了,因为他们属于不同的I/O模型,我们这次就得使用ByteBuffer来存我们的输入数据,然后让Channel去write。

同时呢,我们需要把Channel注册到Selector上去,让Selector来为我们轮询Channel的状态,当然也未必是轮询,后面我们讲到I/O多路复用的时候会说到。虽然这部分的代码内容稍多一些,但是我的注释写的还是比较翔实的。

比较有趣的是,这个客户端的实现没有考虑到粘包问题,刚好让我碰到了我以前比较好奇为什么会出现的粘包问题,这部分我们会逐渐优化给大家看。

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

public class NIOClient {
    private static ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>();

    private static int count=5;

    public static void main(String[] args) throws Exception {
        SocketChannel clientChannel = SocketChannel.open();
        clientChannel.configureBlocking(false);
        clientChannel.connect(new InetSocketAddress("localhost", 8080));

        Selector selector = Selector.open();
        //将Channel注册到selector中去,并且设置该通道关注可连接事件
        clientChannel.register(selector, SelectionKey.OP_CONNECT);
        //当channel没被关的时候一直循环
        while (clientChannel.isOpen()) {
            //这段代码还是会阻塞当前线程,直到有注册到selector的通道准备好了进行某项操作
            selector.select();
            //一旦 select() 方法返回,通过调用 selectedKeys() 方法来获取一个包含所有已准备好进行操作的通道的 SelectionKey 集合。
            // 每个 SelectionKey 都代表了一个与特定通道和操作相关的键。
            // 这些键包含了关于哪些通道准备好进行哪种类型的操作的信息(例如可读、可写等)。
            // 其实这里包含了多路复用的实现,因为获得了准备好的所有channel的信息,而不是一个channel准备好了就切换回用户态
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iter = keys.iterator();
            //然后对所有准备好的通道依次处理
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                //在第一次channel可用时候,其实只会触发这个isConnectable(),因为我们只注册该通道为可连接的
                //这时候就会调用我们定义好的连接handler
                if (key.isConnectable()) {
                    handleConnect(key);
                }
                //在之后我们关注了可写事件,如果这个通道可写了,selector返回的key里就会设置为可写的
                //这时候就会调用我们定义好的writeHandler
                //可写其实是频繁触发的
                if (key.isWritable()) {
                    handleWrite(key);
                }
                //这时候如果服务器返回消息了,就会触发可读事件,selector返回的key里就会设置为可读的
                //这时候就调用我们定义好的readHandler
                if (key.isReadable()) {
                    handleRead(key);
                }
            }
        }
        clientChannel.close();
    }

    /**
     * 连接handler
     * @param key selector的包含channel信息以及其对应类型的key
     * @throws Exception
     */
    private static void handleConnect(SelectionKey key) throws Exception {
        //从key里获取channel
        SocketChannel channel = (SocketChannel) key.channel();
        //如果连接完毕,就要把channel的关注切换为关注可写事件
        if (channel.finishConnect()) {
            System.out.println("连接建立成功,开始发送数据...");
            key.interestOps(SelectionKey.OP_WRITE);
            //当然其实也可以这么切换,就是既关注可写事件,又关注可读事件,毕竟谁规定服务器不可以在连接上之后给客户端发消息的
            //key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            //连接成功之后我们往消息队列里加点数据
            for (int i = 0; i < 5; i++) {
                messageQueue.add("mess" + i);
            }
        }
    }

    private static void handleWrite(SelectionKey key) throws Exception {
        SocketChannel channel = (SocketChannel) key.channel();
        //触发可写事件的时候,我们从消息队列里挑选一条消息发过去
        //所以我们可能会触发粘包事件
        if (!messageQueue.isEmpty()) {
            System.out.println("触发可写事件且有消息要发!");
            //分配写入字节流
            ByteBuffer buffer = ByteBuffer.allocate(50);
            //从消息队列获取消息
            String msg = messageQueue.peek();
            //重置缓冲区,将限制设置为50,并将位置重置为0
            buffer.clear();
            buffer.put(msg.getBytes());
            //将缓冲区从写模式切换到读模式。
            //它通过设置限制(limit)为当前位置,并将位置(position)重置为0。
            //这意味着接下来可以从位置0开始读取数据,直到达到之前的位置(现在变成了限制)。
            buffer.flip();
            //向channel写入数据
            int bytesWritten = channel.write(buffer);
            //如果写入数据长度为0,就不进行接下来的操作了,也就是从消息队列里删除,那时候就会重新发送消息
            if (bytesWritten == 0) return;
            //如果全部写入成功,就会把消息删除,并且打印已发送
            if (!buffer.hasRemaining()) {
                String sentMsg = messageQueue.poll();
                System.out.println("消息:\"" + sentMsg + "\"已发送,时间:" + new Date());
            }
        }

        // 切换关注读事件,但保持写事件关注
        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    }

    private static void handleRead(SelectionKey key) throws Exception {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(50);
        int bytesRead;

        while ((bytesRead = channel.read(buffer)) > 0) {
            //同样是把buffer设置为读状态
            buffer.flip();
            //读取响应消息
            System.out.println("收到响应:" + new String(buffer.array(), 0, bytesRead)+" ,当前时间是:"+new Date());
            buffer.clear();
        }
        //为了验证读消息之后还能触发写事件,我们每次读完往messageQueue里加2条消息
        for (int i = count; i < count+2; i++) {
            messageQueue.add("mess" + i);
        }
        count+=2;
    }
}

好,设计完客户端之后,就是设计服务端了,与之前BIO处理多个连接的方式不同,我们服务端这次不使用多线程来管理多个连接,而是直接使用Selector来管理多个Channel,ServerSocketChannel和SocketChannel一视同仁,都交给一个Selector管理。

让我们端上来吧!

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {
    public static void main(String[] args) throws Exception {
        // NIO基于Channel控制,所以有Selector管理所有的Channel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 设置监听端口
        serverSocketChannel.bind(new InetSocketAddress(8080));
        // 设置Selector管理所有Channel
        Selector selector = Selector.open();
        // 注册并设置连接时处理
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务启动成功,监听端口为:" + 8080);
        // NIO使用轮询,当有请求连接时,则启动一个线程
        int keySelect = 0;
        while (serverSocketChannel.isOpen()) {
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey next = iterator.next();
                if (next.isAcceptable()) {    //  如果是连接的
                    SocketChannel accept = serverSocketChannel.accept();
                    if (accept != null) {
                        //把新的会话测channel注册到selector里去,让Selector来管理它
                        accept.configureBlocking(false);
                        //并且把它的感兴趣状态变为可读状态
                        accept.register(selector,SelectionKey.OP_READ);
                    }
                    iterator.remove();
                }
                //一旦可读了,就代表客户端发来了消息,那我们就去处理这个消息
                if(next.isReadable()){
                    handleRead(next);
                }
            }
        }
        serverSocketChannel.close();
    }


    //其实处理这个消息我们依然有一处是阻塞的,就是我们返回给客户端的时候,要求客户端是写可用的。
    //但是写可用的触发是很多次的,一般都是写可用的,所以我们就没有做过多的干预
    private static void handleRead(SelectionKey key) throws Exception {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(50);
        buffer.clear();
        int read = channel.read(buffer);
        String msg = new String(buffer.array(), 0, read).trim();
        System.out.println("服务端收到消息:"+msg);
        String outMsg = "【Echo】" + msg; // 生成回应信息
        //模拟消息处理时长
        Thread.sleep(5000);
        buffer.clear();
        buffer.put(outMsg.getBytes());  //回传信息放入缓冲区
        buffer.flip();
        channel.write(buffer);// 回传信息
    }
}

服务器的实现我们写的时候还是阻塞的,但是其实写可用触发频率是很高的,所以我们就不把这个阻塞考虑进去了。

这时候我们起一个服务端,一个客户端看看情况如何!

NIO单客户端
NIO单客户端

是不是出现了粘包问题!在服务器处理mess0(也就是sleep 5s的时候),后面的mess1、mess2、mess3、mess4都到了,但是我们没有使用头部或者区分符号来进行区分,导致出现了粘包问题。

但是恰恰是这个粘包,也进一步佐证了我们实现了NIO,因为粘包是NIO的常见问题之一。

那是否我们的服务器在不使用多线程的情况下,不阻塞的同时响应两个客户端呢,我们来试一下就行哩!

NIO多客户端
NIO多客户端

由此可见,是可以的!只不过单线程响应就要做好逐个处理的缓慢准备。

如何解决粘包问题

其实解决粘包问题很简单,就是设计一个自己通用的协议,比如说设定一个自己通用的终止符号或者设计一个定长的头,用这个头来规定长度。

我们就选择后者实现。

下面是服务端的代码,这部分注释就少很多了,主要还是看handleRead部分

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NIOServer {
    public static void main(String[] args) throws Exception {
        // NIO基于Channel控制,所以有Selector管理所有的Channel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 设置监听端口
        serverSocketChannel.bind(new InetSocketAddress(8080));
        // 设置Selector管理所有Channel
        Selector selector = Selector.open();
        // 注册并设置连接时处理
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务启动成功,监听端口为:" + 8080);
        // NIO使用轮询,当有请求连接时,则启动一个线程
        while (serverSocketChannel.isOpen()) {
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey next = iterator.next();
                if (next.isAcceptable()) {    //  如果是连接的
                    SocketChannel accept = serverSocketChannel.accept();
                    if (accept != null) {
                        //把新的会话测channel注册到selector里去,让Selector来管理它
                        accept.configureBlocking(false);
                        //并且把它的感兴趣状态变为可读状态
                        accept.register(selector,SelectionKey.OP_READ);
                    }
                    iterator.remove();
                }
                //一旦可读了,就代表客户端发来了消息,那我们就去处理这个消息
                if(next.isReadable()){
                    handleRead(next);
                }
            }
        }
        serverSocketChannel.close();
    }

    public static void handleRead(SelectionKey key) throws Exception{
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int expectedLength = -1;
        buffer.clear();
        int read = channel.read(buffer);
        if(read ==-1 ) return;
        buffer.flip();
        while (buffer.remaining() > 0) {
            if (expectedLength == -1) { // 等待读取长度头
                if (buffer.remaining() >= 4) {
                    expectedLength = buffer.getInt(); // 读取4字节长度头
                } else {
                    break; // 长度头未接收完整
                }
            }
            if (buffer.remaining() >= expectedLength) {
                byte[] bodyBytes = new byte[expectedLength];
                buffer.get(bodyBytes);
                String message = new String(bodyBytes);
                System.out.println("服务端收到消息:"+message);
                String response = "【Echo】" + message;
                // 模拟处理延迟
                Thread.sleep(5000);
                byte[] responseBytes = response.getBytes();
                // 构造响应:4字节长度头 + 消息体
                ByteBuffer responseBuffer = ByteBuffer.allocate(4 + responseBytes.length);
                responseBuffer.putInt(responseBytes.length);
                responseBuffer.put(responseBytes);
                responseBuffer.flip();

                // 发送响应
                while (responseBuffer.hasRemaining()) {
                    channel.write(responseBuffer);
                }
                expectedLength = -1; // 重置等待下一个消息
            }else {
                break; //消息体未接收完整
            }
        }
        buffer.compact(); // 压缩缓冲区,保留未处理数据
    }
}

客户端部分的代码修改为下:

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

public class NIOClient {
    private static ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>();

    private static int count=5;

    public static void main(String[] args) throws Exception {
        SocketChannel clientChannel = SocketChannel.open();
        clientChannel.configureBlocking(false);
        clientChannel.connect(new InetSocketAddress("localhost", 8080));

        Selector selector = Selector.open();
        clientChannel.register(selector, SelectionKey.OP_CONNECT);


        while (!messageQueue.isEmpty() || clientChannel.isOpen()) {
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iter = keys.iterator();

            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();

                if (key.isConnectable()) {
                    handleConnect(key);
                }
                if (key.isWritable()) {
                    handleWrite(key);
                }
                if (key.isReadable()) {
                    handleRead(key);
                }
            }
        }
        clientChannel.close();
    }

    /**
     * 连接handler
     * @param key selector的包含channel信息以及其对应类型的key
     * @throws Exception
     */
    private static void handleConnect(SelectionKey key) throws Exception {
        //从key里获取channel
        SocketChannel channel = (SocketChannel) key.channel();
        //如果连接完毕,就要把channel的关注切换为关注可写事件
        if (channel.finishConnect()) {
            System.out.println("连接建立成功,开始发送数据...");
            key.interestOps(SelectionKey.OP_WRITE);
            //当然其实也可以这么切换,就是既关注可写事件,又关注可读事件,毕竟谁规定服务器不可以在连接上之后给客户端发消息的
            //key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            //连接成功之后我们往消息队列里加点数据
            for (int i = 0; i < 5; i++) {
                messageQueue.add("mess" + i);
            }
        }
    }

    private static void handleWrite(SelectionKey key) throws Exception {
        SocketChannel channel = (SocketChannel) key.channel();
        //触发可写事件的时候,我们从消息队列里挑选一条消息发过去
        //所以我们可能会触发粘包事件
        if (!messageQueue.isEmpty()) {
            System.out.println("触发可写事件且有消息要发!");
            //分配写入字节流
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //从消息队列获取消息
            String msg = messageQueue.peek();
            byte[] msgBytes = msg.getBytes();
            //数据总长为4+真实数据,4是一个int,用来存储数据长度
            int totalLength = 4 + msgBytes.length; // 头部4字节 + 消息体
            //重置缓冲区,将限制设置为500,并将位置重置为0
            buffer.clear();
            buffer.putInt(msgBytes.length); // 写入4字节长度头(大端序)
            buffer.put(msgBytes);           // 写入消息体
            //将缓冲区从写模式切换到读模式。
            //它通过设置限制(limit)为当前位置,并将位置(position)重置为0。
            //这意味着接下来可以从位置0开始读取数据,直到达到之前的位置(现在变成了限制)。
            buffer.flip();
            //向channel写入数据
            int bytesWritten = channel.write(buffer);
            //如果写入数据长度为0,就不进行接下来的操作了,也就是从消息队列里删除,那时候就会重新发送消息
            if (bytesWritten == 0) return;
            //如果全部写入成功,就会把消息删除,并且打印已发送
            if (!buffer.hasRemaining()) {
                String sentMsg = messageQueue.poll();
                System.out.println("消息:\"" + sentMsg + "\"已发送,时间:" + new Date());
            }
        }
        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    }

    private static void handleRead(SelectionKey key) throws Exception {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        readBuffer.clear();
        int bytesRead = channel.read(readBuffer);
        if (bytesRead == -1) {
            channel.close();
            return;
        }
        readBuffer.flip();
        int expectedLength =-1;
        while (readBuffer.remaining() > 0) {
            if (expectedLength == -1) { // 等待读取长度头
                if (readBuffer.remaining() >= 4) {
                    expectedLength = readBuffer.getInt(); // 读取4字节长度头
                } else {
                    break; // 长度头未接收完整
                }
            }

            if (readBuffer.remaining() >= expectedLength) {
                byte[] bodyBytes = new byte[expectedLength];
                readBuffer.get(bodyBytes);
                String response = new String(bodyBytes);
                System.out.println("收到响应:" + response + ",时间:" + new Date());
                expectedLength = -1; // 重置等待下一个消息
            } else {
                break; // 消息体未接收完整,继续回去候着
            }
        }
        readBuffer.compact(); // 压缩缓冲区,保留未处理数据
    }
}

最终的结果如下:

解决NIO粘包问题
解决NIO粘包问题

可以发现,成功解决哩!

那么接下来,我们就可以转战I/O多路复用了。

I/O多路复用

其实,在实现之前的NIO的时候,我们已经有用到I/O多路复用了,还记得我们使用了Selector吗,我们把所有的SocketChannel注册到一个Selector上,Selector帮我们判断他们是否能够满足我们关注的事件,如果能满足我们关注的事件,就触发对应的事件关注模式。

那这时候大家有没有一个疑惑呢?为什么调用Set<SelectionKey> keys = selector.selectedKeys();这段代码,返回的结果是一个集合呢?它监视了select管理的全部channel的状态,这就是IO多路复用的体现!由此,我们在实现NIO的服务器的时候并没有在程序里显示创建额外的子线程来响应,而是创建了一个文件描述符channel,并把它交给selector管理!客户端其实是不需要IO多路复用的,因为客户端只设置了一个Channel。

想象一下,如果没有IO多路复用,这段代码的逻辑是什么呢?

我们还是更加具体的讲一下Selector的工作流程吧

  1. 注册通道与兴趣操作:首先,通过调用SelectableChannel.register(Selector sel, int ops)方法将一个或多个通道注册到选择器上,并指定对该通道感兴趣的I/O操作类型(如OP_READ, OP_WRITE, OP_CONNECT, 或 OP_ACCEPT)。这一步骤主要是在用户态完成的。
  2. 轮询请求:当调用selector.select()或者其变种时,Java程序会进入等待状态,也就是触发一个打断,由用户态切换到内核态,直到至少有一个已注册的通道准备好执行至少一个你感兴趣的操作。在这个过程中,Java运行时环境会发起系统调用(例如,在Unix/Linux系统上的epoll_wait或在Windows上的WSAWaitForMultipleEvents),这些系统调用实际上会使控制权转移到操作系统内核。
  3. 内核态处理:一旦控制权转移到内核,操作系统会监视所有被注册的通道,检查它们是否满足任何已注册的兴趣条件。这一过程高效地利用了操作系统提供的事件通知机制,比如Linux上的epoll、BSD系统上的kqueue或Windows上的I/O完成端口(IOCP)等。如果某个或某些通道的状态发生了变化(例如,新的数据到达使得读操作变为可能),内核就会识别出这些事件。
  4. 返回用户态并更新就绪状态:当有通道变得“就绪”时,操作系统会返回相应的信息给Java的NIO层,然后从select()方法返回,允许Java程序继续执行。此时,你可以通过selectedKeys()方法获取所有已经准备好的通道对应的SelectionKey对象集合,并对这些通道进行相应的I/O操作。

如果没有IO多路复用呢?是不是处理就不一样了呢?需要手动的遍历每个文件描述符,也就是我们要手动遍历每一个Channel,然后去内核态判断他们是不是可接受、可连接、可读、可写的,然后再回到用户态来进行对应的操作。似乎这样是可行的?

但是需要注意一点,这样又一个很明显的问题,就是频繁的进行了用户态和内核态的切换,这种切换是需要额外耗时的。

那么除此之外,其实还有一些其他的问题:

  1. 每个连接都需要一个独立的线程或进程:在这种情况下,为了处理多个并发连接,常见的做法是为每个连接分配一个独立的线程或进程。这个线程或进程将负责执行读写操作,直到该连接关闭。这导致了所谓的“一连接一线程”模型。
  2. 缺乏高效的事件通知机制:在没有IO多路复用(如select、poll、epoll等)的情况下,程序员需要手动检查每个文件描述符的状态,或者依赖于每个线程/进程阻塞在一个特定的文件描述符上等待数据的到来。这种方式无法高效地管理和监控大量文件描述符。
  3. 资源消耗大且扩展性差:由于每个连接都需要一个独立的线程或进程,随着连接数的增加,系统资源(如内存和CPU时间)会被迅速耗尽。此外,大量的上下文切换也会降低系统的整体性能,使得这种方法难以扩展到支持数千甚至数万个并发连接。

所以这时候我们就可以理解什么是IO多路复用了,简单来说就是:允许单个线程同时监视多个文件描述符。那么具体是怎么实现的,就是下面要讨论的内容了。分为:select、poll、epoll三种。其实到后面已经有点脱离Java的范畴了,所以在这里提前说一下。Java的Selector的select方法是灵活的,主要还是根据系统不同来实现的,如果在Linux内核上,会默认使用epoll来实现,如果epoll有些特性无法满足则会使用poll。在Windows内核上就需要使用select来实现。

Select

Select模型的实现是基于轮询的,也就是每一次都把文件描述符的bitmap从用户态拷贝到内核态。这个bitmap最大大小为1024,所以select模型的一个selector最多只能监管1024个channel

用户态的bitmap存储了哪些文件描述符需要被监视,如果需要被监视,bitmap对应位置置为1。

内核态遍历为1的bitmap,如果文件描述符已经就绪,那就将对应位置置为1,否则为0,然后将bitmap返回给用户态

主要流程如下:

  • 将当前进程的所有文件描述符,一次性的从用户态拷贝到内核态;
  • 在内核中快速的无差别遍历每个fd,判断是否有数据到达;
  • 将所有fd状态,从内核态拷贝到用户态,并返回已就绪fd的个数;
  • 在用户态遍历判断具体哪个fd已就绪,然后进行相应的事件处理。

缺点是很明显的:

  1. 能管理的文件描述符有限,最多为1024
  2. 每次都需要将文件描述符的bitmap从用户态拷贝到内核态,再从内核态拷贝到用户态
  3. 内核态返回bitmap之后,用户态仍然需要遍历才能知道哪个文件描述符就绪了

一共有三个bitmap,分别对应了readfds、writefds、errorfds。主要检查三个对应的项目。

读缓冲区(readfds):检测里边有没有数据,如果有数据该缓冲区对应的文件描述符就绪 写缓冲区(writefds):检测写缓冲区是否可以写(有没有容量),如果有容量可以写,缓冲区对应的文件描述符就绪 读写异常(errorfds):检测读写缓冲区是否有异常,如果有该缓冲区对应的文件描述符就绪

Poll

Poll模型其实和Select模型是比较类似的,也是需要轮询的。

区别在于:

select使用位图来标记想关注的文件描述符,使用三个位图来标记想关注的读事件,写事件,错误事件。

poll使用一个结构体pollfd数组来标志想关注的文件描述符和在这个描述符上感兴趣的事件,poll的优点是数组的长度突破了1024的限制,其他的区别不大。

select可以跨平台,但是poll只支持Linux使用

两者同样都需要进行多次内核态和用户态的拷贝

EPoll

其实epoll才是最难理解的部分,但是也是对前两者设计了优化,解决了文件描述符限制和多次内核态和用户态拷贝导致的性能开销问题。

Epoll其实基于一个比较重要的模型,就是事件驱动模型,使用了回调机制。

Select模型和Poll模型每次都需要遍历全部的文件描述符来检查它是否就绪,而epoll不同,它允许用户在注册时指定感兴趣的事件,这样在事件发生时,内核只需要更新它的内部数据结构,而不是每次都遍历全部的文件描述符。

执行原理

int num_size = 5;//epoll监听的文件描述符的个数,其实在linux2.6.8的时候就取消这个限制了,这个数字的存在只是为了兼容
int epoll_fd = poll_create(num_size);//创建一个epoll模型,其实也是一个文件描述符
//然后会创建一个内部数据结构,重点是两个部分,一个是红黑树的根,另一个是rd_list,也就是已就绪的双端队列
//假设这个时候有5个客户端连接到了服务端,epoll就会调用五次event_ctl,会在红黑树上创建5个节点,使用的方法是ADD,如果已经建立完成客户端,就要使用MOD修改监听的事件为读事件
//event_ctl方法需要传入的包括红黑树的根也就是even模型,需要操作的文件描述符,对这个文件描述符的操作,需要监视文件描述符事件集合
//event_ctl方法向内存注册fd和事件的时候,注册了一个回调函数,当操作系统将数据拷贝到缓冲区后,就会执行回调函数
//创建完成之后,如果有三个客户端向服务端的网卡发送数据,网卡会用DMA-Copy技术将数据拷贝到内存缓冲区,执行回调函数,这时候会将缓冲区的内容和文件描述符加入到rd_list,其实没有做拷贝操作,只是做了指针的连接操作
//这时候如果服务端调用epoll_wait方法,就会判断哪些文件描述符上的哪些事件已经就绪了,也就是把rd_link的内容拷贝到返回数组中,然后返回内核态

为什么会快

只有被调用epoll_wait方法的时候,epoll才会进行一次内核态到用户态的拷贝,所以上下文切换较少。

epoll返回的事件只有就绪的事件,不需要遍历比较。

而且epoll是通过回调的方式来将就绪的事件加入到就绪队列中。

总结

至此,IO系列基本就分析完了,如果有问题欢迎各种平台交流。