Java 非阻塞 IO 和异步 IO

转载自 Java 非阻塞 IO 和异步 IO

上一篇文章介绍了 Java NIO 中 Buffer、Channel 和 Selector 的基本操作,主要是一些接口操作,比较简单。

本文将介绍非阻塞 IO 和异步 IO,也就是大家耳熟能详的 NIO 和 AIO。很多初学者可能分不清楚异步和非阻塞的区别,只是在各种场合能听到异步非阻塞这个词。

本文会先介绍并演示阻塞模式,然后引入非阻塞模式来对阻塞模式进行优化,最后再介绍 JDK7 引入的异步 IO,由于网上关于异步 IO 的介绍相对较少,所以这部分内容我会介绍得具体一些。

希望看完本文,读者可以对非阻塞 IO 和异步 IO 的迷雾看得更清晰些,或者为初学者解开一丝丝疑惑也是好的。

NIO,JDK1.4,New IO,Non-Blocking IO

NIO.2,JDK7,More New IO,Asynchronous IO,严格地说 NIO.2 不仅仅引入了 AIO

阻塞模式 IO

我们已经介绍过使用 Java NIO 包组成一个简单的客户端-服务端网络通讯所需要的 ServerSocketChannel、SocketChannel 和 Buffer,我们这里整合一下它们,给出一个完整的可运行的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 监听 8080 端口进来的 TCP 链接
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        while (true) {
            // 这里会阻塞,直到有一个请求的连接进来
            SocketChannel socketChannel = serverSocketChannel.accept();
            // 开启一个新的线程来处理这个请求,然后在 while 循环中继续监听 8080 端口
            SocketHandler handler = new SocketHandler(socketChannel);
            new Thread(handler).start();
        }
    }
}

这里看一下新的线程需要做什么,SocketHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class SocketHandler implements Runnable {
    private SocketChannel socketChannel;
    public SocketHandler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }
    @Override
    public void run() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        try {
            // 将请求数据读入 Buffer 中
            int num;
            while ((num = socketChannel.read(buffer)) > 0) {
                // 读取 Buffer 内容之前先 flip 一下
                buffer.flip();
                // 提取 Buffer 中的数据
                byte[] bytes = new byte[num];
                buffer.get(bytes);
                String re = new String(bytes, "UTF-8");
                System.out.println("收到请求:" + re);
                // 回应客户端
                ByteBuffer writeBuffer = ByteBuffer.wrap(("我已经收到你的请求,你的请求内容是:" + re).getBytes());
                socketChannel.write(writeBuffer);
                buffer.flip();
            }
        } catch (IOException e) {
            IOUtils.closeQuietly(socketChannel);
        }
    }
}

最后,贴一下客户端 SocketChannel 的使用,客户端比较简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SocketChannelTest {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 8080));
        // 发送请求
        ByteBuffer buffer = ByteBuffer.wrap("1234567890".getBytes());
        socketChannel.write(buffer);
        // 读取响应
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        int num;
        if ((num = socketChannel.read(readBuffer)) > 0) {
            readBuffer.flip();
            byte[] re = new byte[num];
            readBuffer.get(re);
            String result = new String(re, "UTF-8");
            System.out.println("返回值: " + result);
        }
    }
}

上面介绍的阻塞模式的代码应该很好理解:来一个新的连接,我们就新开一个线程来处理这个连接,之后的操作全部由那个线程来完成。

那么,这个模式下的性能瓶颈在哪里呢?

  1. 首先,每次来一个连接都开一个新的线程这肯定是不合适的。当活跃连接数在几十几百的时候当然是可以这样做的,但如果活跃连接数是几万几十万的时候,这么多线程明显就不行了。每个线程都需要一部分内存,内存会被迅速消耗,同时,线程切换的开销非常大。
  2. 其次,阻塞操作在这里也是一个问题。首先,accept() 是一个阻塞操作,当 accept() 返回的时候,代表有一个连接可以使用了,我们这里是马上就新建线程来处理这个 SocketChannel 了,但是,但是这里不代表对方就将数据传输过来了。所以,SocketChannel#read 方法将阻塞,等待数据,明显这个等待是不值得的。同理,write 方法也需要等待通道可写才能执行写入操作,这边的阻塞等待也是不值得的。

非阻塞 IO

说完了阻塞模式的使用及其缺点以后,我们这里就可以介绍非阻塞 IO 了。

非阻塞 IO 的核心在于使用一个 Selector 来管理多个通道,可以是 SocketChannel,也可以是 ServerSocketChannel,将各个通道注册到 Selector 上,指定监听的事件。

之后可以只用一个线程来轮询这个 Selector,看看上面是否有通道是准备好的,当通道准备好可读或可写,然后才去开始真正的读写,这样速度就很快了。我们就完全没有必要给每个通道都起一个线程。

NIO 中 Selector 是对底层操作系统实现的一个抽象,管理通道状态其实都是底层系统实现的,这里简单介绍下在不同系统下的实现。

select:上世纪 80 年代就实现了,它支持注册 FD_SETSIZE(1024) 个 socket,在那个年代肯定是够用的,不过现在嘛,肯定是不行了。

poll:1997 年,出现了 poll 作为 select 的替代者,最大的区别就是,poll 不再限制 socket 数量。

select 和 poll 都有一个共同的问题,那就是它们都只会告诉你有几个通道准备好了,但是不会告诉你具体是哪几个通道。所以,一旦知道有通道准备好以后,自己还是需要进行一次扫描,显然这个不太好,通道少的时候还行,一旦通道的数量是几十万个以上的时候,扫描一次的时间都很可观了,时间复杂度 O(n)。所以,后来才催生了以下实现。

epoll:2002 年随 Linux 内核 2.5.44 发布,epoll 能直接返回具体的准备好的通道,时间复杂度 O(1)。

除了 Linux 中的 epoll,2000 年 FreeBSD 出现了 Kqueue,还有就是,Solaris 中有 /dev/poll。

前面说了那么多实现,但是没有出现 Windows,Windows 平台的非阻塞 IO 使用 select,我们也不必觉得 Windows 很落后,在 Windows 中 IOCP 提供的异步 IO 是比较强大的。

我们回到 Selector,毕竟 JVM 就是这么一个屏蔽底层实现的平台,我们面向 Selector 编程就可以了。

之前在介绍 Selector 的时候已经了解过了它的基本用法,这边来一个可运行的实例代码,大家不妨看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class SelectorServer {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel server = ServerSocketChannel.open();
        server.socket().bind(new InetSocketAddress(8080));
        // 将其注册到 Selector 中,监听 OP_ACCEPT 事件
        server.configureBlocking(false);
        server.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            // 需要不断地去调用 select() 方法获取最新的准备好的通道
            int readyChannels = selector.select();
            if (readyChannels == 0) {
                continue;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            // 遍历
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    // 有已经接受的新的到服务端的连接
                    SocketChannel socketChannel = server.accept();
                    // 有新的连接并不代表这个通道就有数据,
                    // 这里将这个新的 SocketChannel 注册到 Selector,监听 OP_READ 事件,等待数据
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    // 有数据可读
                    // 上面一个 if 分支中注册了监听 OP_READ 事件的 SocketChannel
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int num = socketChannel.read(readBuffer);
                    if (num > 0) {
                        // 处理进来的数据...
                        System.out.println("收到数据:" + new String(readBuffer.array()).trim());
                        socketChannel.register(selector, SelectionKey.OP_WRITE);
                    } else if (num == -1) {
                        // -1 代表连接已经关闭
                        socketChannel.close();
                    }
                }
                else if (key.isWritable()) {
                    // 通道可写
                    // 给用户返回数据的通道可以进行写操作了
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.wrap("返回给客户端的数据...".getBytes());
                    socketChannel.write(buffer);
                    // 重新注册这个通道,监听 OP_READ 事件,客户端还可以继续发送内容过来
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
            }
        }
    }
}

至于客户端,大家可以继续使用上一节介绍阻塞模式时的客户端进行测试。

NIO.2 异步 IO

More New IO,或称 NIO.2,随 JDK 1.7 发布,包括了引入异步 IO 接口和 Paths 等文件访问接口。

异步这个词,我想对于绝大多数开发者来说都很熟悉,很多场景下我们都会使用异步。

通常,我们会有一个线程池用于执行异步任务,提交任务的线程将任务提交到线程池就可以立马返回,不必等到任务真正完成。如果想要知道任务的执行结果,通常是通过传递一个回调函数的方式,任务结束后去调用这个函数。

同样的原理,Java 中的异步 IO 也是一样的,都是由一个线程池来负责执行任务,然后使用回调或自己去查询结果。

大部分开发者都知道为什么要这么设计了,这里再啰嗦一下。异步 IO 主要是为了控制线程数量,减少过多的线程带来的内存消耗和 CPU 在线程调度上的开销。

在 Unix/Linux 等系统中,JDK 使用了并发包中的线程池来管理任务,具体可以查看 AsynchronousChannelGroup 的源码。

在 Windows 操作系统中,提供了一个叫做 I/O Completion Ports 的方案,通常简称为 IOCP,操作系统负责管理线程池,其性能非常优异,所以在 Windows 中 JDK 直接采用了 IOCP 的支持,使用系统支持,把更多的操作信息暴露给操作系统,也使得操作系统能够对我们的 IO 进行一定程度的优化。

在 Linux 中其实也是有异步 IO 系统实现的,但是限制比较多,性能也一般,所以 JDK 采用了自建线程池的方式。

本文还是以实用为主,想要了解更多信息请自行查找其他资料,下面对 Java 异步 IO 进行实践性的介绍。

总共有三个类需要我们关注,分别是 AsynchronousSocketChannel,AsynchronousServerSocketChannel 和 AsynchronousFileChannel,只不过是在之前介绍的 FileChannel、SocketChannel 和 ServerSocketChannel 的类名上加了个前缀 Asynchronous。

Java 异步 IO 提供了两种使用方式,分别是返回 Future 实例和使用回调函数。

1、返回 Future 实例

返回 java.util.concurrent.Future 实例的方式我们应该很熟悉,JDK 线程池就是这么使用的。Future 接口的几个方法语义在这里也是通用的,这里先做简单介绍。

  • future.isDone();

    判断操作是否已经完成,包括了正常完成、异常抛出、取消

  • future.cancel(true);

    取消操作,方式是中断。参数 true 说的是,即使这个任务正在执行,也会进行中断。

  • future.isCancelled();

    是否被取消,只有在任务正常结束之前被取消,这个方法才会返回 true

  • future.get();

    这是我们的老朋友,获取执行结果,阻塞。

  • future.get(10, TimeUnit.SECONDS);

    如果上面的 get() 方法的阻塞你不满意,那就设置个超时时间。

2、提供 CompletionHandler 回调函数

java.nio.channels.CompletionHandler 接口定义:

1
2
3
4
5
6
public interface CompletionHandler<V,A> {
    void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

注意,参数上有个 attachment,虽然不常用,我们可以在各个支持的方法中传递这个参数值

1
2
3
4
5
6
7
8
9
10
11
12
AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(null);
// accept 方法的第一个参数可以传递 attachment
listener.accept(attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() {
    public void completed(
      AsynchronousSocketChannel client, Object attachment) {
          //
      }
    public void failed(Throwable exc, Object attachment) {
          //
      }
});

AsynchronousFileChannel

网上关于 Non-Blocking IO 的介绍文章很多,但是 Asynchronous IO 的文章相对就少得多了,所以我这边会多介绍一些相关内容。

首先,我们就来关注异步的文件 IO,前面我们说了,文件 IO 在所有的操作系统中都不支持非阻塞模式,但是我们可以对文件 IO 采用异步的方式来提高性能。

下面,我会介绍 AsynchronousFileChannel 里面的一些重要的接口,都很简单,读者要是觉得无趣,直接滑到下一个标题就可以了。

实例化:

1
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("/Users/hongjie/test.txt"));

一旦实例化完成,我们就可以着手准备将数据读入到 Buffer 中:

1
2
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> result = channel.read(buffer, 0);

异步文件通道的读操作和写操作都需要提供一个文件的开始位置,文件开始位置为 0

除了使用返回 Future 实例的方式,也可以采用回调函数进行操作,接口如下:

1
2
3
4
public abstract <A> void read(ByteBuffer dst,
                              long position,
                              A attachment,
                              CompletionHandler<Integer,? super A> handler);

顺便也贴一下写操作的两个版本的接口:

1
2
3
4
5
6
public abstract Future<Integer> write(ByteBuffer src, long position);
public abstract <A> void write(ByteBuffer src,
                               long position,
                               A attachment,
                               CompletionHandler<Integer,? super A> handler);

我们可以看到,AIO 的读写主要也还是与 Buffer 打交道,这个与 NIO 是一脉相承的。

另外,还提供了用于将内存中的数据刷入到磁盘的方法:

1
public abstract void force(boolean metaData) throws IOException;

因为我们对文件的写操作,操作系统并不会直接针对文件操作,系统会缓存,然后周期性地刷入到磁盘。如果希望将数据及时写入到磁盘中,以免断电引发部分数据丢失,可以调用此方法。参数如果设置为 true,意味着同时也将文件属性信息更新到磁盘。

还有,还提供了对文件的锁定功能,我们可以锁定文件的部分数据,这样可以进行排他性的操作。

1
public abstract Future<FileLock> lock(long position, long size, boolean shared);

position 是要锁定内容的开始位置,size 指示了要锁定的区域大小,shared 指示需要的是共享锁还是排他锁

当然,也可以使用回调函数的版本:

1
2
3
4
5
public abstract <A> void lock(long position,
                              long size,
                              boolean shared,
                              A attachment,
                              CompletionHandler<FileLock,? super A> handler);

文件锁定功能上还提供了 tryLock 方法,此方法会快速返回结果:

1
2
public abstract FileLock tryLock(long position, long size, boolean shared)
    throws IOException;

这个方法很简单,就是尝试去获取锁,如果该区域已被其他线程或其他应用锁住,那么立刻返回 null,否则返回 FileLock 对象。

AsynchronousFileChannel 操作大体上也就以上介绍的这些接口,还是比较简单的,这里就少一些废话早点结束好了。

AsynchronousServerSocketChannel

这个类对应的是非阻塞 IO 的 ServerSocketChannel,大家可以类比下使用方式。

我们就废话少说,用代码说事吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.javadoop.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class Server {
    public static void main(String[] args) throws IOException {
          // 实例化,并监听端口
        AsynchronousServerSocketChannel server =
                AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));
        // 自己定义一个 Attachment 类,用于传递一些信息
        Attachment att = new Attachment();
        att.setServer(server);
        server.accept(att, new CompletionHandler<AsynchronousSocketChannel, Attachment>() {
            @Override
            public void completed(AsynchronousSocketChannel client, Attachment att) {
                try {
                    SocketAddress clientAddr = client.getRemoteAddress();
                    System.out.println("收到新的连接:" + clientAddr);
                    // 收到新的连接后,server 应该重新调用 accept 方法等待新的连接进来
                    att.getServer().accept(att, this);
                    Attachment newAtt = new Attachment();
                    newAtt.setServer(server);
                    newAtt.setClient(client);
                    newAtt.setReadMode(true);
                    newAtt.setBuffer(ByteBuffer.allocate(2048));
                    // 这里也可以继续使用匿名实现类,不过代码不好看,所以这里专门定义一个类
                    client.read(newAtt.getBuffer(), newAtt, new ChannelHandler());
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }
            @Override
            public void failed(Throwable t, Attachment att) {
                System.out.println("accept failed");
            }
        });
        // 为了防止 main 线程退出
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
        }
    }
}

看一下 ChannelHandler 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.javadoop.aio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
public class ChannelHandler implements CompletionHandler<Integer, Attachment> {
    @Override
    public void completed(Integer result, Attachment att) {
        if (att.isReadMode()) {
            // 读取来自客户端的数据
            ByteBuffer buffer = att.getBuffer();
            buffer.flip();
            byte bytes[] = new byte[buffer.limit()];
            buffer.get(bytes);
            String msg = new String(buffer.array()).toString().trim();
            System.out.println("收到来自客户端的数据: " + msg);
            // 响应客户端请求,返回数据
            buffer.clear();
            buffer.put("Response from server!".getBytes(Charset.forName("UTF-8")));
            att.setReadMode(false);
            buffer.flip();
            // 写数据到客户端也是异步
            att.getClient().write(buffer, att, this);
        } else {
            // 到这里,说明往客户端写数据也结束了,有以下两种选择:
            // 1. 继续等待客户端发送新的数据过来
//            att.setReadMode(true);
//            att.getBuffer().clear();
//            att.getClient().read(att.getBuffer(), att, this);
            // 2. 既然服务端已经返回数据给客户端,断开这次的连接
            try {
                att.getClient().close();
            } catch (IOException e) {
            }
        }
    }
    @Override
    public void failed(Throwable t, Attachment att) {
        System.out.println("连接断开");
    }
}

顺便再贴一下自定义的 Attachment 类:

1
2
3
4
5
6
7
public class Attachment {
    private AsynchronousServerSocketChannel server;
    private AsynchronousSocketChannel client;
    private boolean isReadMode;
    private ByteBuffer buffer;
    // getter & setter
}

这样,一个简单的服务端就写好了,接下来可以接收客户端请求了。上面我们用的都是回调函数的方式,读者要是感兴趣,可以试试写个使用 Future 的。

AsynchronousSocketChannel

其实,说完上面的 AsynchronousServerSocketChannel,基本上读者也就知道怎么使用 AsynchronousSocketChannel 了,和非阻塞 IO 基本类似。

这边做个简单演示,这样读者就可以配合之前介绍的 Server 进行测试使用了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.javadoop.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class Client {
    public static void main(String[] args) throws Exception {
        AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
          // 来个 Future 形式的
        Future<?> future = client.connect(new InetSocketAddress(8080));
        // 阻塞一下,等待连接成功
        future.get();
        Attachment att = new Attachment();
        att.setClient(client);
        att.setReadMode(false);
        att.setBuffer(ByteBuffer.allocate(2048));
        byte[] data = "I am obot!".getBytes();
        att.getBuffer().put(data);
        att.getBuffer().flip();
        // 异步发送数据到服务端
        client.write(att.getBuffer(), att, new ClientChannelHandler());
        // 这里休息一下再退出,给出足够的时间处理数据
        Thread.sleep(2000);
    }
}

往里面看下 ClientChannelHandler 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.javadoop.aio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
public class ClientChannelHandler implements CompletionHandler<Integer, Attachment> {
    @Override
    public void completed(Integer result, Attachment att) {
        ByteBuffer buffer = att.getBuffer();
        if (att.isReadMode()) {
            // 读取来自服务端的数据
            buffer.flip();
            byte[] bytes = new byte[buffer.limit()];
            buffer.get(bytes);
            String msg = new String(bytes, Charset.forName("UTF-8"));
            System.out.println("收到来自服务端的响应数据: " + msg);
            // 接下来,有以下两种选择:
            // 1. 向服务端发送新的数据
//            att.setReadMode(false);
//            buffer.clear();
//            String newMsg = "new message from client";
//            byte[] data = newMsg.getBytes(Charset.forName("UTF-8"));
//            buffer.put(data);
//            buffer.flip();
//            att.getClient().write(buffer, att, this);
            // 2. 关闭连接
            try {
                att.getClient().close();
            } catch (IOException e) {
            }
        } else {
            // 写操作完成后,会进到这里
            att.setReadMode(true);
            buffer.clear();
            att.getClient().read(buffer, att, this);
        }
    }
    @Override
    public void failed(Throwable t, Attachment att) {
        System.out.println("服务器无响应");
    }
}

以上代码都是可以运行调试的,如果读者碰到问题,请在评论区留言。

Asynchronous Channel Groups

为了知识的完整性,有必要对 group 进行介绍,其实也就是介绍 AsynchronousChannelGroup 这个类。之前我们说过,异步 IO 一定存在一个线程池,这个线程池负责接收任务、处理 IO 事件、回调等。这个线程池就在 group 内部,group 一旦关闭,那么相应的线程池就会关闭。

AsynchronousServerSocketChannels 和 AsynchronousSocketChannels 是属于 group 的,当我们调用 AsynchronousServerSocketChannel 或 AsynchronousSocketChannel 的 open() 方法的时候,相应的 channel 就属于默认的 group,这个 group 由 JVM 自动构造并管理。

如果我们想要配置这个默认的 group,可以在 JVM 启动参数中指定以下系统变量:

  • java.nio.channels.DefaultThreadPool.threadFactory

    此系统变量用于设置 ThreadFactory,它应该是 java.util.concurrent.ThreadFactory 实现类的全限定类名。一旦我们指定了这个 ThreadFactory 以后,group 中的线程就会使用该类产生。

  • java.nio.channels.DefaultThreadPool.initialSize

    此系统变量也很好理解,用于设置线程池的初始大小。

可能你会想要使用自己定义的 group,这样可以对其中的线程进行更多的控制,使用以下几个方法即可:

  • AsynchronousChannelGroup.withCachedThreadPool(ExecutorService executor, int initialSize)
  • AsynchronousChannelGroup.withFixedThreadPool(int nThreads, ThreadFactory threadFactory)
  • AsynchronousChannelGroup.withThreadPool(ExecutorService executor)

熟悉线程池的读者对这些方法应该很好理解,它们都是 AsynchronousChannelGroup 中的静态方法。

至于 group 的使用就很简单了,代码一看就懂:

1
2
3
4
AsynchronousChannelGroup group = AsynchronousChannelGroup
        .withFixedThreadPool(10, Executors.defaultThreadFactory());
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group);

AsynchronousFileChannels 不属于 group。但是它们也是关联到一个线程池的,如果不指定,会使用系统默认的线程池,如果想要使用指定的线程池,可以在实例化的时候使用以下方法:

1
2
3
4
5
6
public static AsynchronousFileChannel open(Path file,
                                           Set<? extends OpenOption> options,
                                           ExecutorService executor,
                                           FileAttribute<?>... attrs) {
    ...
}

到这里,异步 IO 就算介绍完成了。

小结

我想,本文应该是说清楚了非阻塞 IO 和异步 IO 了,对于异步 IO,由于网上的资料比较少,所以不免篇幅多了些。

我们也要知道,看懂了这些,确实可以学到一些东西,多了解一些知识,但是我们还是很少在工作中将这些知识变成工程代码。一般而言,我们需要在网络应用中使用 NIO 或 AIO 来提升性能,但是,在工程上,绝不是了解了一些概念,知道了一些接口就可以的,需要处理的细节还非常多。

这也是为什么 Netty/Mina 如此盛行的原因,因为它们帮助封装好了很多细节,提供给我们用户友好的接口,后面有时间我也会对 Netty 进行介绍。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/329927.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

element js 包含字符_selenium3.x(10)js弹框处理

web应用中&#xff0c;经常会遇到弹框。不处理弹框&#xff0c;页面其他元素都是不能操作的。js弹框有3种&#xff1a;alert警告框、confirm确认窗口、prompt信息输入窗口。webdriver提供了处理这3种弹框的方法。首先通过switch_to定位到弹框&#xff0c;然后针对弹框的不同&am…

转:centos8开启防火墙端口

转自&#xff1a; Centos8开放防火墙端口_Programmer-Awei的博客-CSDN博客查看防火墙某个端口是否开放firewall-cmd --query-port3306/tcp开放防火墙端口3306firewall-cmd --zonepublic --add-port3306/tcp --permanent查看防火墙状态systemctl status firewalld关闭防火墙sys…

AQS的细节--自用,非正常教程

AQS的概念 AQS叫抽象队列同步器&#xff0c;是一个框架&#xff0c;我们可以在JUC很多包看见AQS的具体实现&#xff0c;比如锁和读写锁&#xff0c;condition等&#xff0c;具有可扩展性&#xff0c;可以根据此自定义同步工具类&#xff0c;优点是系统开销低&#xff0c;实现锁…

mininet编程实现交换机规则的插入、删除与修改。_可编程网卡芯片在滴滴云网络的应用实践...

桔妹导读&#xff1a;随着云规模不断扩大以及业务层面对延迟、带宽的要求越来越高&#xff0c;采用DPDK 加速网络报文处理的方式在横向纵向扩展都出现了局限性。可编程芯片成为业界热点。本文主要讲述了可编程网卡芯片在滴滴云网络中的应用实践&#xff0c;遇到的问题、带来的收…

centos8上docker tomcat容器访问报404解决方法

目录 【README】 【1】docker安装tomcat 【2】启动多个tomcat容器 【README】 1.本文记录了 访问docker tomcat容器报404的解决方法&#xff1b; 2.附带安装tomcat步骤&#xff1b; 3.centos8 安装docker&#xff0c;refers2 centos8安装docker_PacosonSWJTU的博客-CSDN博…

ConcurrentHashMap--自用,非教学

结论先行&#xff0c;细节在下面 jdk1.7是如何解决并发问题的以及完整流程 一.首先new一个concurrentHashMap 调用默认构造方法 二.初始化 初始化initialCapacity&#xff08;默认是16&#xff0c;指一个segment内Entry的数量&#xff09;&#xff0c;loadFactor&#xff…

Java开发必须掌握的线上问题排查命令

转载自 Java开发必须掌握的线上问题排查命令作为一个合格的开发人员&#xff0c;不仅要能写得一手还代码&#xff0c;还有一项很重要的技能就是排查问题。这里提到的排查问题不仅仅是在coding的过程中debug等&#xff0c;还包括的就是线上问题的排查。由于在生产环境中&#x…

centos8启动docker-mysql8容器

【README】 本文记录了 centos8 安装&#xff0c;启动mysql8的docker容器的步骤&#xff1b; 【1】安装mysql8 docker容器 步骤1&#xff0c; 查看mysql8 docker镜像版本 &#xff1b; 最简单的方式是上 Docker Hubhttps://hub.docker.com/直接搜索mysql&#xff0c;查看其 …

Java命令学习系列(一)——Jps

转载自 Java命令学习系列&#xff08;一&#xff09;——Jpsjps位于jdk的bin目录下&#xff0c;其作用是显示当前系统的java进程情况&#xff0c;及其id号。 jps相当于Solaris进程工具ps。不象"pgrep java"或"ps -ef grep java"&#xff0c;jps并不使用应用…

springboot2.5.5配置druid数据源1.2.8与jdbc

【README】 本文记录了 springboot配置 druid数据源的步骤&#xff1b; 【1】新建springboot项目并配置druid 步骤1&#xff0c;新建springbt项目 步骤2&#xff0c;选择spring web&#xff0c;jdbc&#xff0c;mysql驱动依赖&#xff1b; 步骤3&#xff0c;添加 druid数据源…

tsc244标签编辑软件_能打小票的标签机,M110智能标签打印机来了!

每张被贴上的标签背后&#xff0c;都是对待梦想的认真、对待生活的用心&#xff0c;M110智能标签打印机为你标记美好&#xff0c;实现品质与效率兼得的追求。01、 产品简介M110智能标签打印机采取热敏无墨打印技术&#xff0c;无需碳带&#xff0c;便捷经济&#xff0c;配套“标…

面试官:简述实现一个线程池的设计思路

前言 二面碰到这个问题人都麻了&#xff0c;我扯了好多没用的&#xff0c;面后赶紧来补一下&#xff0c;但是找到的基本都是一堆纯代码&#xff0c;不是讲思路的。下面的思路是我参考美团技术团队文章后总结的。 具体思路 一、总体设计 线程池在内部实际上构建了一个生产者…

Java命令学习系列(四)——jstat

转载自 Java命令学习系列&#xff08;四&#xff09;——jstatjstat(JVM Statistics Monitoring Tool)是用于监控虚拟机各种运行状态信息的命令行工具。他可以显示本地或远程虚拟机进程中的类装载、内存、垃圾收集、JIT编译等运行数据&#xff0c;在没有GUI图形的服务器上&…

修改打印机ip工具_使用富士施乐一体机因动态IP导致不能打印与扫描的解决方法...

背景在使用富士施乐的一体机中(其他厂商的一体机 也类似)&#xff0c;很多人的网络环境是动态IP的&#xff0c;即打印的IP地址是不固定的&#xff0c;随着每次开机或关机会变化&#xff0c;从而经常有人反应打印机不能打印或者扫描了。。总体思路1. 检查当前的IP设置2. 把相应的…

springboot2.5.5配置mybatis

【README】 1.本文记录了 springboot2.5.5 配置 mybatis的步骤&#xff1b; 2.配置mybatis 分为注解和配置两种方式&#xff1b; 3.引入mybatis&#xff0c;包括了 创建springbt项目&#xff1b;druid数据源配置&#xff1b;数据库表与javabean&#xff1b;mybatis配置与sq…

Java命令学习系列(三)——Jmap

转载自 Java命令学习系列&#xff08;三&#xff09;——Jmapjmap是JDK自带的工具软件&#xff0c;主要用于打印指定Java进程(或核心文件、远程调试服务器)的共享对象内存映射或堆内存细节。可以使用jmap生成Heap Dump。在Java命令学习系列&#xff08;零&#xff09;——常见命…

skimage直方图如何保存_LightGBM的参数详解以及如何调优

lightGBM可以用来解决大多数表格数据问题的算法。有很多很棒的功能&#xff0c;并且在kaggle这种该数据比赛中会经常使用。但我一直对了解哪些参数对性能的影响最大以及我应该如何调优lightGBM参数以最大限度地利用它很感兴趣。我想我应该做一些研究&#xff0c;了解更多关于li…

基于springboot2.5.5自建启动器starter制品库

【README】 本文po出了自建springboot 启动器步骤&#xff1b; 【1】新建2个starter相关组件 根据 mybatis-spring-boot-starter&#xff0c;我们看到 自建starter需要两个组件&#xff0c;分别是 xxx-spring-boot-starter&#xff0c; xxx-spring-boot-starter-autoconfigu…

Java命令学习系列(二)——Jstack

转载自 Java命令学习系列&#xff08;二&#xff09;——Jstackjstack是java虚拟机自带的一种堆栈跟踪工具。功能 jstack用于生成java虚拟机当前时刻的线程快照。线程快照是当前java虚拟机内每一条线程正在执行的方法堆栈的集合&#xff0c;生成线程快照的主要目的是定位线程出…

python中ls是什么_使用Python代码实现Linux中的ls遍历目录命令的实例代码

一、写在前面 前几天在微信上看到这样一篇文章&#xff0c;链接为&#xff1a;https://www.jb51.net/it/692145.html&#xff0c;在这篇文章中&#xff0c;有这样一段话&#xff0c;吸引了我的注意&#xff1a;在 Linux 中 ls 是一个使用频率非常高的命令了&#xff0c;可选的参…