文章

Java NIO

关于NIO,之前也不是没有总结过,但是老忘,每一次忘记之后再次拿起,可能会有些新的感悟,直到发现忘记的根本原因是没掌握准它和Blocking IO的真正区别。NIO能解决掉BIO存在的问题,才是NIO存在的真正意义。

  1. NIO的意义所在
    1. BIO的瓶颈
    2. NIO事件模型
  2. Java NIO组件
    1. SelectableChannel implements Channel
    2. Selector
    3. SelectionKey
  3. NIO流程
  4. Ref
  5. epoll
  6. NIO:同步非阻塞

NIO的意义所在

BIO的瓶颈

BIO在(一)How Tomcat Works - 原始Web服务器里写过,只不过是个单线程。服务器不可能只接受一个请求,所以一定是多线程的,在(三)How Tomcat Works - Tomcat连接器Connector里可以看到,使用单个主线程做Connector,接受请求,每收到一个请求,就交给线程池处理请求。

这种经典的BIO模型,有以下会出现阻塞的地方:

  • ServerSocket#accept:这个阻塞是必要的。没请求来就没活干,自然不需要使用到CPU,所以线程理应阻塞。此时不使用CPU不叫浪费,本来就不该使用;
  • Socket读写:其实就是Socket#getInputStream/Socket#getOutputStream,然后read或write。

读写socket都是分为两部分:

  1. 等待可读可写;
  2. 真正的读写;

真正的读写是一个用到CPU的操作,而且其实就是将数据在网卡和内存之间拷贝,速度很快,一般的小网络请求可以忽略读写字节的时间。所以一个connection本不应该霸占一个线程这么久的,大部分的时间线程都在阻塞等待可读写。比如:

  • 和客户端建立起connection之后,线程等待connection上的消息到达。到达之前只能阻塞;
  • 如果客户端是长连接,线程处理完一个请求写回connection之后,还会继续等待connection上的请求到达,但是客户端可能完事儿走人了,该线程要白白等到Socket#setSoTimeout到了,读取超时,才能发现结束了。

为什么读写要搞成阻塞操作?因为线程必须要阻塞。假设读写是非阻塞操作,线程读一下发现没数据,直接返回。但是线程又不能一走了之,不处理这些数据了。怎么办呢?只能过一会儿再过来读一下,看看数据到了没。所以读写即使搞成非阻塞操作,按照BIO这种架构,也得搞成while true死循环,不停轮询数据是否到达是否可读。这样的话,将读写搞成非阻塞操作并没有什么卵用,还不如直接搞成阻塞操作,写起来代码更简洁。

这种阻塞会让一个线程“长时间绑定”到一个connection上,直到connection关闭之前,这个线程都要和它耗着,脱不开身。所以,为了应对高并发请求,BIO服务器要搞一个线程池,创建许多线程:

  1. 在别的线程都和已有connection耗着阻塞的时候,新的connection只能使用新的线程去处理;
  2. 别的线程阻塞,也不会使用CPU。所以新开线程处理新请求,也是在充分利用CPU;

看起来创建多个线程是必须的、高效的。但是这其实是BIO不得已而为之的策略。别忘了,线程也是有开销的:

  1. 占内存,开的越多占得越多;
  2. 线程切换需要成本,线程太多的话光在这么多线程之间切换都已经是不小的开销了。如果CPU在线程切换上的时间花的比线程真正干活的时间还多,岂不是CPU狂转,活却没干多少?

所以线程太多时,对系统造成的负担就太大了。而如果请求量很大,BIO模型又不得不创建很多线程。所以BIO模型能应对的并发量有限。如果活动连接不到1000,还可以搞一搞,面对十万百万级的请求,BIO的吞吐就无能为力了。

BIO模型:

  • 优点:活动连接数不是特别高(< 1000)时用起来很不错,模型简单,写代码也简单;
  • 缺点:应对并发场景的能力有限。

NIO事件模型

倒推一下:BIO无法应对特别高的并发,是因为线程创建太多反而有害。之所以要一直创建线程,是因为connnection和线程绑定了太久,即使connection的读写状态暂时还没准备好,线程也得等着,导致线程无法脱身。

那有没有办法能让线程脱身呢?如果线程只在某connection数据真正到达之后,再来读取,其他时间处理别的connection的数据,那显然一个线程是可以不止应付一个connection的。

线程阻塞时间远大于线程读写connection时间,假设为99:1,如果只在读写的时候用到线程,那99%的时间线程可以不用跟connection在这儿耗了。一个线程之前只能处理一个connection,现在可以处理100个connection。使用同样多的线程,可同时应对的活动connection数却提升了100倍,能处理的请求量级直接从数万变成了数百万。

想达到这种效果,就要改模型。BIO线程之所以只能阻塞死等(或者就算读写是非阻塞的,也要无限轮询),就是因为它也不知道数据啥时候会到。如果有一个东西能在数据到达之后通知线程去处理数据,线程就不用死等了。这也就是NIO所谓的事件通知模型。

NIO定义了一些事件:

  • 新连接到来;
  • 读数据到达,可读;
  • 写状态就绪,可写;
  • connect,一般是connect失败需要重连或者直接异步调用connect的时候;

每个channel把自己关注的事件告诉selector,selector一个人去等待所有事件,任一事件发生时,selector告诉对这个事件感兴趣的channel,channel使用线程去处理数据。

Http Server线程模型:NIO vs. BIO里也形象地举例对比过BIO和NIO。

Java NIO组件

SelectableChannel implements Channel

ServerSocketChannel和SocketChannel的父类。

  • configureBlocking(false):channel默认是阻塞的,所以获取channel后,首先要使用该方法将其设为非阻塞,这样所有的读写即使没有数据也都会立即返回,不会阻塞;
  • register(Selector, int):将channel注册到selector上,需要告知selector它对什么事件感兴趣;

ServerSocketChannel类似于BIO的ServerSocket,负责接收请求:

  • bind(SocketAddress):绑定端口;
  • validOps():注册到selector的时候用,其实就是accept事件SelectionKey.OP_ACCEPT,ServerSocketChannel只对请求到来的事件感兴趣;
  • SocketChannel accept():如果channel配置成非阻塞的了,即使没有请求到来,也立即返回null,不会阻塞。否则和ServerSocket的accept一样,阻塞等待请求到来;

SocketChannel和Socket一样,负责读写:

  • int read(ByteBuffer):非阻塞模式的channel不能保证读取的数据数目,因为它不是阻塞的,不会等到读完才返回。阻塞模式时,就像BIO读InputStream一样,读完才返回,如果数据没完全到达,就一直阻塞;
  • int write(ByteBuffer):同上;

accept、read、write这些方法在channel是非阻塞时,都是立即返回的,所以需要selector通知他们事件到了,他们才能真正接收、可读、可写到东西。

Selector

  • select():阻塞方法,事件发生,能选出channel时才会返回。就像BIO的accept阻塞一样,这个方法阻塞是合理的。没事情做的时候自然要阻塞
  • Set<SelectionKey> selectedKeys():被选出的key set,具体见SelectionKey;

SelectionKey

可认为是selector和channel的绑定:

  • Selector selector():返回绑定的selector;
  • SelectableChannel channel():返回绑定的channel;
  • int interestOps():绑定到这个key上的事件。其实就是channel对什么事件感兴趣;
  • int readyOps():就绪的事件;
  • isAcceptable/isConnectable/isReadable/isWritable:用于判断具体是哪一种事件;

NIO流程

有了以上组件,NIO可以按照以下流程使用事件通知:

  • 注册channel和事件到selector;
  • 轮询selector事件;
  • (selector返回就绪事件和绑定该事件的channel,这个是Java调用系统调用完成的);
  • 获取事件类型、相应的channel,使用线程处理事件;

这时候再看代码,虽然跟BIO比是麻烦了不少,但也好理解了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package example.nio.asyncio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * ServerSocketChannel只能在Selector上注册OP_ACCEPT事件;
 * SocketChannel则可以注册OP_READ和OP_WRITE等
 * <p>
 * 这个程序可以仅使用一个线程,因为它只是一个演示,但是在现实场景中,创建一个线程池来负责 I/O 事件处理中的耗时部分会更有意义。
 */
public class NioMultiPortEchoServer {
    private int ports[];
    private ByteBuffer echoBuffer = ByteBuffer.allocate(1024);

    public NioMultiPortEchoServer(int ports[]) throws IOException {
        this.ports = ports;

        go();
    }

    private void go() throws IOException {
        // Create a new selector
        Selector selector = Selector.open();

        // Open a listener on each port, and register each one
        // with the selector
        for (int port : ports) {
            // 获得channel,设为非阻塞
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            // channel绑定到相应的端口
            serverSocketChannel.bind(new InetSocketAddress(port));

            // 告诉selector,我们对OP_ACCEPT事件感兴趣(这是适用于ServerSocketChannel的唯一事件类型)
            // SelectionKey的作用就是,当事件发生时,selector提供对应于那个事件的SelectionKey
            // 这里,ServerSocketChannel所支持的操作只有SelectionKey.OP_ACCEPT
            SelectionKey key = serverSocketChannel.register(selector, serverSocketChannel.validOps());

            System.out.println("Going to listen on " + port);
        }

        while (true) {
            // 这个方法会阻塞(如果没事儿干,你就歇着吧),直到至少有一个已注册的事件发生。
            // 当一个或者更多的事件发生时,select()方法将返回所发生的事件的数量
            int num = selector.select();

            // 发生了事件的 SelectionKey 对象的一个 集合
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectedKeys.iterator();

            // 依次判断每个事件发生的到底是啥事儿
            while (it.hasNext()) {
                SelectionKey key = it.next();

                // 处理过了,就删了,防止一会儿重复处理。(可认为Selector只往set里加,但是不删!!!)
                it.remove();

                // SelectionKey.channel()方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。
                // 是有新连接了
                if (key.isAcceptable()) {
                    acceptSocketAndRegisterIt(key, selector);

                    // 是socket上有可读的数据来了
                } else if (key.isReadable()) {
                    readSocket(key);
                }
            }
            // 如果上面没有一个一个删掉,这里直接清空也行
//            selectedKeys.clear();
        }
    }

    private void acceptSocketAndRegisterIt(SelectionKey key, Selector selector) throws IOException {
        // Accept the new connection
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        // channel设置成非阻塞后,该方法就是非阻塞的了
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);

        // 只是建立了连接,但数据未必立即可读,而NIO的精髓就在于我(线程)不会一直等你(连接)可读,
        // 而是selector告诉我你可读之后,我再来直接读
        // 我们期望从这个socket上读取数据,所以也注册到selector,等通知再来读。这次注册的是OP_READ:“可读”就通知我
        // Add the new connection to the selector
        SelectionKey newKey = socketChannel.register(selector, SelectionKey.OP_READ);

        System.out.println("+++ New connection: " + socketChannel);
    }

    private void readSocket(SelectionKey key) throws IOException {
        // Read the data
        SocketChannel socketChannel = (SocketChannel) key.channel();

        // Echo data
        int bytesEchoed = 0, r = 0;
        while ((r = socketChannel.read(echoBuffer)) > 0) {
            // flip. ready to write: limit = position, position = 0
            echoBuffer.flip();
            socketChannel.write(echoBuffer);
            bytesEchoed += r;
            // clear. ready to read: position = 0, limit = capacity
            echoBuffer.clear();
        }
        System.out.println("Echoed " + bytesEchoed + " from " + socketChannel);
        // CLOSE SOCKET AFTER HANDLED
        socketChannel.close();
        System.out.println("--- Close connection: " + socketChannel);
    }

    static public void main(String args[]) throws Exception {
        if (args.length <= 0) {
            System.err.println("Usage: java NioMultiPortEchoServer port [port port ...]");
            System.exit(1);
        }

        int ports[] = new int[args.length];

        for (int i = 0; i < args.length; ++i) {
            ports[i] = Integer.parseInt(args[i]);
        }

        new NioMultiPortEchoServer(ports);
    }
}

所以NIO在非常高并发的场景下,能胜任BIO不能胜任的工作。但机制复杂了,实现起来也会更复杂。

Ref

  • https://www.baeldung.com/java-nio-selector
  • 一篇比较高屋建瓴的文章:https://tech.meituan.com/2016/11/04/nio.html

epoll

NIO需要OS层面的支持。对于Linux kernel 2.6,就是epoll(Event poll)- I/O event notification facility:

The epoll API performs a similar task to poll(2): monitoring multiple file descriptors to see if I/O is possible on any of them. The epoll API can be used either as an edge-triggered or a level-triggered interface and scales well to large numbers of watched file descriptors.

没有该支持时,如果一个进程要接收某socket流量,Linux会调用recv函数,把进程扔到socket等待队列,移出CPU工作队列,阻塞。有数据了再让唤醒进程,读数据。

这就是BIO,线程和connection绑定。

Linux 2.6之前用select进行OS层面的支持:将进程加入所有感兴趣的socket等待队列里。移出CPU工作队列,阻塞。有数据唤醒进程,读数据。但是进程并不知道是哪个队列有数据,所以要轮询。

2.6之后的epoll,直接把有数据的那个socket拎出来,通知进程,不用再轮询了。

Java的Selector就是利用epoll这一点,获取所有的就绪事件。然后Java开发者就可以获取事件和channel,处理事件了。

  • https://www.jianshu.com/p/37a2bb9d1cae

NIO:同步非阻塞

NIO是基于事件的同步非阻塞IO,如果TCP RecvBuffer有数据,就把数据从网卡读到内存,并且返回给用户;反之则直接返回0,永远不会阻塞。但IO读写的时候依然是同步的,线程需要等待IO读写完毕。异步IO(Async I/O,AIO)更进一步:不但等待就绪是非阻塞的,就连数据从网卡到内存的过程也是异步的。

  • 同步:用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;
  • 异步:用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

因此 阻塞I/O,非阻塞I/O,I/O多路复用,都属于同步调用。只有实现了特殊API的AIO才是异步调用。

这个和Java的回调函数流程是一样的,这也是异步的本质。

本文由作者按照 CC BY 4.0 进行授权