文章

AIO

Java7给NIO引入了新的异步类,其实就是AIO(async io,异步IO)。因为依然放在NIO包下,也被称为NIO2。

  1. 同步异步,阻塞非阻塞
  2. AIO
    1. 异步读取
    2. 方法分析示例
    3. code
  3. Reactor vs. Proactor
  4. AIO server
    1. 处理方式一:返回Future
    2. 处理方式二:回调函数
      1. 保证先读后写,保证处理长连接
      2. 保证第一次读后再第二次读——ReadPendingException

同步异步,阻塞非阻塞

同步异步,阻塞非阻塞是两个完全不同的概念

  • 同步异步:指一件事要不要去做
    • 同步就是由当前线程去做;
    • 异步就是当前线程不去做,而是留下一个“纸条”(回调函数),告诉一会儿异步线程读完之后要做什么;
  • 阻塞非阻塞:指一件事去做了,能不能立刻返回
    • 阻塞就是不能立即返回,在此之前一直卡着等候;
    • 非阻塞就是无论有没有获取结果,先返回,不会卡住,可以继续做后面的事情。至于没取到结果怎么办,取决于程序的行为,可以立即再取获取一次,也可以过一段再去获取,甚至可以不要了。全看程序的逻辑要怎么去做。

所以NIO是同步非阻塞:

  1. 同步,因为读数据这件事情是自己去做的;
  2. 非阻塞,因为NIO的read/write方法无论能否读/写到,都直接返回。

所以NIO要配合Selector用呀!当Selector通知有数据到了之后,再去read。

AIO

异步读取

NIO本质上还是自己去read。AIO则是由异步线程去read,自己只需要留下一个“纸条”告诉异步线程读完之后做什么就行了。

所以,本线程不需要操心读取的事情了,满脑子应该想的是读完之后我应该做什么

Java在java.nio.channels下添加了几个异步Channel:

  • AsynchronousFileChannel;
  • AsynchronousServerSocket;
  • AsynchronousSocket;

他们共同的接口AsynchronousChannel倒是没有提供什么read/write接口方法(应该是子类的方法参数比较难统一,所以虽然行为类似,但没法定义一个统一的接口方法,比如AsynchronousFileChannel和AsynchronousServerSocket同样都是read,参数明显不一样),不过javadoc上写明了这一类异步channel一般怎么用。以read为例:

  1. 使用返回Future的方法去读,其实就是异步线程读完之后把结果放到一个地方。具体到AsynchronousFileChannel,就是Future<Integer> read(ByteBuffer dst, long position)。返回Future之后,就可以根据需要,在想要的时候从Future里取数据了 (看接口,Future里放的并不是读取后的内容,而是Integer代表读取字节数,真正的内容读到了作为参数的ByteBuffer里,和FileChannel#read(ByteBuffer)一样,和InputStream#read(byte[])也一样,java的read从来都是只返回读取size,内容放到了入参里)。关于Future,参阅Executor - Thread Pool
  2. 使用支持回调函数的方法去读,其实就是异步线程读完之后执行一下回调函数。回调函数,你懂的,java不支持把函数作为参数,但是可以使用匿名类实现。所以java定义了CompletionHandler接口充当回调函数。具体到AsynchronousFileChannel就是read(ByteBuffer dst, long position, A attachment, CompletionHandler<Integer,? super A> handler)

方法分析示例

假设从file里异步读取数据到一个ByteBuffer。

输出buffer里数据的方法:

1
2
3
4
5
6
    private static void output(ByteBuffer byteBuffer) {
        byteBuffer.flip();
        byte[] data = new byte[byteBuffer.limit()];
        byteBuffer.get(data);
        System.out.println(String.format("[%s] %s", Thread.currentThread().getName(), new String(data, StandardCharsets.UTF_8)));
    }

首先打开一个file,获取异步channel:

1
2
3
4
        // 使用相对路径读。Note:换目录就崩
        Path gitIgnore = Paths.get("forwrite.txt");
        OpenOption[] options = {StandardOpenOption.READ};
        AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(gitIgnore, options);

第一种方式,异步读数据到Future:

1
2
3
4
5
        ByteBuffer byteBuffer = ByteBuffer.allocate(10);

        // 方法一:使用Future去读
        // read once
        Future<Integer> future = fileChannel.read(byteBuffer, 0);

可以使用Future#get()死等,也可以使用Future#isDone()判断Future的异步执行状态(状态自然是异步线程读完之后set为true的):

1
2
3
4
5
6
        // reture read size
//        System.out.println(future.get());

        while (!future.isDone()) {
            // wait
        }

确认完成之后,就可以输出读到ByteBuffer里的内容了:output(byteBuffer)

第二种方式,使用回调函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
        // 方法二:使用CompletionHandle去读
        fileChannel.read(byteBuffer, 0, "A Read Attachment", new CompletionHandler<Integer, String>() {
            @Override
            public void completed(Integer result, String attachment) {
                System.out.println(String.format("[%s] Read result code: %s. This is the attachment object I put when reading: %s.", Thread.currentThread().getName(), result, attachment));
                output(byteBuffer);
            }

            @Override
            public void failed(Throwable exc, String attachment) {
                System.out.println(String.format("[%s] Read failed.", Thread.currentThread().getName()));
            }
        });

数据从file的起点(position=0)开始读,读入ByteBuffer,可以设置一个attachment Object,最重要的是回调函数。

写回调,最重要的是分工明确,规定好本线程要做什么,异步线程做什么:

  • 异步线程:读数据。同时看CompletionHandler的接口参数,异步线程还要把读取结果(读取字节数)以result的形式提供,同时会把我们一开始设置的attachment Object也提供回来;
  • 本线程:异步线程干的事儿不需要我们操心了,只需要操心异步线程传过来的result和attachment Object我们有没有用;

所以这里本线程设置好回调的行为,当异步线程执行到completed方法的时候,已经提供好了参数,也把数据正常放入了ByteBuffer。这里本线程只是简单输出一下这些参数和ByteBuffer里的内容。一个示例输出:

1
2
[Thread-6] Read result code: 9. This is the attachment object I put when reading: A Read Attachment.
[Thread-6] test data

读了9个字节,设置的attachment Object原封不动输出一遍,之后输出一下读到的数据:test data。

attachment Object的作用就是提供个上下文,比如一个map或者一个自定义的context,可以存储一些前后的信息。其实就是相当于一下子传了一堆参数。这一点儿Python经常干……函数参数经常是一个map,根本不知道里面的东西是啥……

写同理。

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package example.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Future;

/**
 * FileChannel只能是阻塞的。所以Java7引入了{@link java.nio.channels.AsynchronousFileChannel}。
 *
 * @author liuhaibo on 2019/09/26
 */
public class AsyncFileChannelDemo {

    public static void main(String... args) throws IOException, InterruptedException {
        asyncWrite();
        asyncRead();

        System.out.println(String.format("[%s] Sleep to wait async io...", Thread.currentThread().getName()));
        Thread.sleep(1000);
        System.out.println(String.format("[%s] Main thread exit...", Thread.currentThread().getName()));
    }

    private static void asyncRead() throws IOException {
        // 使用相对路径读。Note:换目录就崩
        Path gitIgnore = Paths.get("forwrite.txt");
        OpenOption[] options = {StandardOpenOption.READ};
        AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(gitIgnore, options);

        ByteBuffer byteBuffer = ByteBuffer.allocate(10);

        // 方法一:使用Future去读
        // read once
        Future<Integer> future = fileChannel.read(byteBuffer, 0);

        // reture read size
//        System.out.println(future.get());

        while (!future.isDone()) {
            // wait
        }

        System.out.println(String.format("[%s] Read done by Future.", Thread.currentThread().getName()));
        output(byteBuffer);
        byteBuffer.clear();

        // 方法二:使用CompletionHandle去读
        fileChannel.read(byteBuffer, 0, "A Read Attachment", new CompletionHandler<Integer, String>() {
            @Override
            public void completed(Integer result, String attachment) {
                System.out.println(String.format("[%s] Read result code: %s. This is the attachment object I put when reading: %s.", Thread.currentThread().getName(), result, attachment));
                output(byteBuffer);
            }

            @Override
            public void failed(Throwable exc, String attachment) {
                System.out.println(String.format("[%s] Read failed.", Thread.currentThread().getName()));
            }
        });
    }

    private static void asyncWrite() throws IOException {
        // ${working directory}/forwrite.txt
        Path forWrite = Paths.get("forwrite.txt");
        OpenOption[] options = {StandardOpenOption.WRITE, StandardOpenOption.CREATE};
        AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(forWrite, options);

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        buffer.put("test data".getBytes());
        buffer.flip();

        // 方法一:使用Future去写
        Future<Integer> operation = fileChannel.write(buffer, 0);

        while(!operation.isDone()) {
            // wait
        }

        System.out.println(String.format("[%s] Write done by Future.", Thread.currentThread().getName()));

        fileChannel.write(buffer, 0, "A Write Attachment", new CompletionHandler<Integer, String>() {
            @Override
            public void completed(Integer result, String attachment) {
                System.out.println(String.format("[%s] Write result code: %s. This is the attachment object I put when writing: %s.", Thread.currentThread().getName(), result, attachment));
                output(buffer);
            }

            @Override
            public void failed(Throwable exc, String attachment) {
                System.out.println(String.format("[%s] Write failed.", Thread.currentThread().getName()));
            }
        });
    }

    private static void output(ByteBuffer byteBuffer) {
        byteBuffer.flip();
        byte[] data = new byte[byteBuffer.limit()];
        byteBuffer.get(data);
        System.out.println(String.format("[%s] %s", Thread.currentThread().getName(), new String(data, StandardCharsets.UTF_8)));
    }
}

程序主要有三个线程:

  1. 主线程,负责发起写文件任务,再发起读文件任务。最后等1s后退出;
  2. 写线程,异步写数据,执行写回调函数;
  3. 读线程,异步读数据,执行读回调函数;

一个示例输出:

1
2
3
4
5
6
7
8
9
[main] Write done by Future.
[Thread-7] Write result code: 0. This is the attachment object I put when writing: A Write Attachment.
[main] Read done by Future.
[main] test data
[main] Sleep to wait async io...
[Thread-6] Read result code: 9. This is the attachment object I put when reading: A Read Attachment.
[Thread-6] test data
[Thread-7] test data
[main] Main thread exit...

可以看到Thread-7是写线程,Thread-6是读线程。主线程都执行到最后了,输出“[main] Sleep to wait async io…”,之后休眠,两个异步线程才陆续输出“test data”。

Servlet - NIO & Async相比,二者其实都是一种异步的思想,只不过AIO将异步用于IO,Async Servlet是将异步用于Servlet的任务处理阶段。

Reactor vs. Proactor

Reactor就是NIO,而Proactor就是AIO。二者的区别在于NIO关注的是可读可写事件,然后自己去读写;AIO关注的是读写完成,实际的读写由OS去完成(OS支持异步),并将结果放入用户传参的缓冲区,调用回调

  • Ref: https://tech.meituan.com/2016/11/04/nio.html

AIO server

AsynchronousServerSocket和AsynchronousSocket的行为和AsynchronousFileChannel一样,只不过参数略有不同。他们的行为同样可以类比ServerSocketChannel和ServerSocketChannel。再往远古回忆,可以类比ServerSocket和Socket。

回忆ServerSocket和Socket的用法(一)How Tomcat Works - 原始Web服务器,大致步骤:

  1. 创建ServerSocket,绑定监听端口;
  2. ServerSocket accept,阻塞等待连接;
  3. accept返回Socket;
  4. 读写socket;

处理方式一:返回Future

AsynchronousServerSocketChannel:

  • open():按照provider提供的方法创建一个ServerSocketChannel;
  • accept():返回Future<AsynchronousSocketChannel>,异步操作;

AsynchronousSocketChannel:

  • Future<Integer> read(ByteBuffer dst):异步read到ByteBuffer里;
  • Future<Integer> write(ByteBuffer src):同read;

和AsynchronousFileChannel一样,可以选择处理Future,不过这样就生生把一个异步操作卡成了同步的

1
2
3
4
5
6
7
8
9
10
11
12
13
// 创建一个AsynchronousServerSocketChannel
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
server.bind(new InetSocketAddress("127.0.0.1", 4555));
// 异步accept
Future<AsynchronousSocketChannel> acceptFuture = server.accept();
// 阻塞,直到connection成功创建,获取到AsynchronousSocketChannel
AsynchronousSocketChannel worker = future.get();
// 异步读取数据
ByteBuffer byteBuffer = xxx;
Future<Integer> size =  worker.read(byteBuffer);
// 阻塞,直到读到数据
byteBuffer.get();
...

总之用Future去处理,意义就不大了。

处理方式二:回调函数

回调才是真正的异步处理流程。

AsynchronousServerSocketChannel:

  • accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler):异步监听,等待connection创建,返回AsynchronousSocketChannel;

AsynchronousSocketChannel:

  • void read(ByteBuffer dst, A attachment, CompletionHandler<Integer,? super A> handler):异步读socket;
  • void write(ByteBuffer src, A attachment, CompletionHandler<Integer,? super A> handler):异步写socket;

和AsynchronousFileChannel的处理方式类似,但是又有些许不同之处:

  • AsynchronousFileChannel只关注读或写,所以read或write方法的回调函数只需要关心一件事就行了;
  • AsynchronousServerSocketChannel通过accept获取到AsynchronousSocketChannel之后,一般要先读该channel,再写该channel,所以既要有读的操作,又要有写的操作。

创建AsynchronousServerSocketChannel和之前一样:

1
2
3
        AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
        InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
        serverChannel.bind(hostAddress);

接下来要异步处理连接,回调我们对AsynchronousSocketChannel的操作。

  1. 首先,应该继续监听该AsynchronousServerSocketChannel,等待后续连接;
  2. 应该处理AsynchronousSocketChannel;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    
             serverChannel.accept(
                     null,
                     new CompletionHandler<AsynchronousSocketChannel, Object>() {
    
                         @Override
                         public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
                             if (serverChannel.isOpen()){
                                 serverChannel.accept(null, this);
                             }
    
                             if ((clientChannel != null) && (clientChannel.isOpen())) {
                                 ByteBuffer buffer = ByteBuffer.allocate(32);
    
                                 clientChannel.read(
                                         buffer,
                                         null,
                                         new CompletionHandler<Integer, Object>() {
    
                                             @Override
                                             public void completed(Integer result, Object attachment) {
    
                                             }
    
                                             @Override
                                             public void failed(Throwable exc, Object attachment) {
    
                                             }
                                         }
                                 );
    
                                 // 读后写?
    
                                 clientChannel.write(
                                         (ByteBuffer) buffer.flip(),
                                         null,
                                         new CompletionHandler<Integer, Object>() {
                                             @Override
                                             public void completed(Integer result, Object attachment) {
    
                                             }
    
                                             @Override
                                             public void failed(Throwable exc, Object attachment) {
    
                                             }
                                         }
                                 );
                             }
                         }
                         @Override
                         public void failed(Throwable exc, Object attachment) {
    
                         }
                     });
             System.in.read();
    

    乍看有点儿套娃,但实际上就三个回调: - 第一个是accept在connection建立后要调用的回调。在这个回调里,先读socket,后写socket:

    • 第二个回调是读socket之后被调用;
    • 第三个回调是写socket之后被调用;

但问题是,这样真的是“先读后写”吗?读写现在已经都是异步的操作了,有可能读还没完成,就执行到后面的写了。这样逻辑就乱了。

保证先读后写,保证处理长连接

所以是不是要在异步读和异步写之间加个阻塞操作,比如sleep两秒,以确保write时已经完成异步read了?首先,sleep两秒并不能确保一定异步read完了。其次,就算加个检查ByteBuffer的操作,确保ByteBuffer已经被异步读入数据后,再去执行异步write,仍然有一个问题——该connection只进行了一次read和write,如果后面客户端在此连接上又发送了一个请求,server并没有接收,所以又出问题了。

想想Tomcat怎么处理长连接的。把if ((clientChannel != null) && (clientChannel.isOpen()))改成while就行了!

但是,在异步读写之间加上阻塞操作,以让读优先于写执行,终究是很蠢的操作。既然写一定在读后去做,那把异步写操作放到异步读的回调里不就行了!所以把写放入读回调的completed(),这样只有读成功之后,才会去写。这才是正常操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
                        while (clientChannel.isOpen()) {
                            ByteBuffer buffer = ByteBuffer.allocate(32);

                            clientChannel.read(
                                    buffer,
                                    null,
                                    new CompletionHandler<Integer, Object>() {

                                        @Override
                                        public void completed(Integer result, Object attachment) {
                                            buffer.flip();

                                            // 写一定在读后执行
                                            clientChannel.write(
                                                    buffer,
                                                    null,
                                                    new CompletionHandler<Integer, Object>() {
                                                        @Override
                                                        public void completed(Integer result, Object attachment) {

                                                        }

                                                        @Override
                                                        public void failed(Throwable exc, Object attachment) {

                                                        }
                                                    }
                                            );
                                        }

                                        @Override
                                        public void failed(Throwable exc, Object attachment) {

                                        }
                                    }
                            );
                        }

但是又但是了……现在一定是先第一次读,再第一次写。但是第二次读什么时候开始呢?会不会第一次异步读还没读完呢,第二次异步读又开始了?显然,这是存在的。因为开始第一次异步读之后,while循环又开始了第二次异步读,如果这时候第一次异步读还没有完成,就会报错:ReadPendingException,Unchecked exception thrown when an attempt is made to read from an asynchronous socket channel and a previous read has not completed。

ReadPendingException的描述就知道,只有异步读的时候会存在这种情况,毕竟对于同步读来讲,第一次不读完第二次是不可能开始的。

那现在怎么解决这个问题?

保证第一次读后再第二次读——ReadPendingException

在while后加个sleep两秒?这样基本能保证第一次读完后第二次读才开始。可以演示一下,这样可能是可以的。但和之前在异步read和异步write之间加sleep是一样的,未必行,而且丑陋。

和上面的思路一致,既然第二次读要在第一次读后开始,那把第二次读放到第一次写后面不就行了。所以在异步write操作的completed函数里再加个异步read就行了。

但是等等,是不是套娃了?第一次read的回调里执行第一次write,write的回调里再执行read,read的回调里再执行write……没完没了了……

下面的写法提供了一个很好的处理方式。异步read和异步write用同一个回调处理,该回调在read后,进行write;同样在write后,进行read。通过attachment记录的信息表明当前是异步read完成后的回调还是异步write完成后的回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    public static class ReadWriteHandler implements CompletionHandler<Integer, Map<String, Object>> {

        @Override
        public void completed(Integer client, Map<String, Object> attachment) {
            String action = (String) attachment.get("action");

            if ("read".equals(action)) {
                ByteBuffer buffer = (ByteBuffer) attachment.get("buffer");
                AsynchronousSocketChannel clientChannel = (AsynchronousSocketChannel) attachment.get("channel");
                buffer.flip();
                attachment.put("action", "write");

                // duplicate buffer
                String bufferContent = StandardCharsets.UTF_8.decode(buffer.duplicate()).toString();
                System.out.println(String.format("[%s] data read: %s", Thread.currentThread().getName(), bufferContent));


                attachment.put("buffer", buffer.duplicate());
                // write the buffer content back
                clientChannel.write(buffer, attachment, this);
                buffer.clear();

            } else if ("write".equals(action)) {
                AsynchronousSocketChannel clientChannel = (AsynchronousSocketChannel) attachment.get("channel");

                ByteBuffer written = (ByteBuffer) attachment.get("buffer");
                String bufferContent = StandardCharsets.UTF_8.decode(written.duplicate()).toString();
                System.out.println(String.format("[%s] data written: %s", Thread.currentThread().getName(), bufferContent));

                ByteBuffer buffer = ByteBuffer.allocate(32);
                attachment.put("action", "read");
                attachment.put("buffer", buffer);

                clientChannel.read(buffer, attachment, this);
            }
        }

        @Override
        public void failed(Throwable exc, Map<String, Object> attachment) {
            //
        }
    }

然后在获取connection后正常read就行了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
    AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
    InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
    serverChannel.bind(hostAddress);

    serverChannel.accept(
            null,
            new CompletionHandler<AsynchronousSocketChannel, Object>() {

                @Override
                public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
                    if (serverChannel.isOpen()) {
                        serverChannel.accept(null, this);
                    }

                    try {
                        System.out.println(String.format("[%s] client connected: %s", Thread.currentThread().getName(), clientChannel.getRemoteAddress()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                    if (clientChannel.isOpen()) {
                        ReadWriteHandler handler = new ReadWriteHandler();
                        ByteBuffer buffer = ByteBuffer.allocate(32);

                        Map<String, Object> readInfo = new HashMap<>();
                        readInfo.put("action", "read");
                        readInfo.put("buffer", buffer);
                        readInfo.put("channel", clientChannel);

                        clientChannel.read(buffer, readInfo, handler);
                    }
                }

                @Override
                public void failed(Throwable exc, Object attachment) {
                    // process error
                }
            });
    System.in.read();

这样既可以accept多个connection,又可以处理一个长连接上的多个请求。

client demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class AsyncClient {

    public static void main(String... args) throws IOException, ExecutionException, InterruptedException {

        Client client = new Client();

        System.out.println("1st send");
        String resp1 = client.sendMessage("hello");
        System.out.println("2st send");
        String resp2 = client.sendMessage("world");
        System.out.println("The end");
        System.out.println(resp1);
        System.out.println(resp2);
    }

    public static class Client {

        AsynchronousSocketChannel client;

        Client() throws IOException, ExecutionException, InterruptedException {
            client = AsynchronousSocketChannel.open();
            InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999);
            Future<Void> future = client.connect(hostAddress);

            future.get();
        }

        String sendMessage(String message) throws ExecutionException, InterruptedException {
            byte[] byteMsg = new String(message).getBytes(StandardCharsets.UTF_8);
            ByteBuffer buffer = ByteBuffer.wrap(byteMsg);
            Future<Integer> writeResult = client.write(buffer);

            // do some computation

            writeResult.get();
            buffer.flip();
            Future<Integer> readResult = client.read(buffer);

            // do some computation

            readResult.get();
            String echo = new String(buffer.array()).trim();
            buffer.clear();
            return echo;
        }
    }
}

这个client就是懒省事儿,虽然用了AsynchronousSocketChannel,但是是用Future的方式处理的,而不是异步回调的方式。所以实际上还是通过Future的get方法把整个操作变成了阻塞式的

多次run client,server输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[Thread-7] client connected: /127.0.0.1:55650
[Thread-6] data read: hello
[Thread-5] data written: hello
[Thread-6] data read: world
[Thread-5] data written: world
[Thread-7] client connected: /127.0.0.1:55657
[Thread-5] data read: hello
[Thread-6] data written: hello
[Thread-6] data read: world
[Thread-5] data written: world
[Thread-7] client connected: /127.0.0.1:55665
[Thread-5] data read: hello
[Thread-6] data written: hello
[Thread-6] data read: world
[Thread-5] data written: world

看起来异步操作也是有个线程池去搞的。从AsynchronousServerSocketChannel的一个带线程池参数的open方法似乎也可以看出来这一点:

  • AsynchronousServerSocketChannel open(AsynchronousChannelGroup group)

还有AsynchronousFileChannel的一个带线程池的open方法:

  • AsynchronousFileChannel open(Path file, Set<? extends OpenOption> options, ExecutorService executor, FileAttribute<?>... attrs)

Ref:

  • https://www.baeldung.com/java-nio2-async-socket-channel
本文由作者按照 CC BY 4.0 进行授权