Netty为什么选择NIO

2020-03-10 👁 1 ← 返回列表

前言:总所周知,Netty是基于Nio的,本篇将会从代码的角度来说明Netty为什么选择Nio。

一、前言

下文将会以一个实际例子——回文服务器(实时返回客户端发送的消息),来详细比较Bio、Nio、Aio之间的性能关系,这里对于它们之间的关系以及具体原理就不再多做过多的介绍了,读者可自行查阅相关资料。

先上客户端代码,如下

// 客户端是通用的
public class Client {

    private void initClient(int port) throws IOException, InterruptedException {
        Socket socket = new Socket(InetAddress.getLocalHost(), port);
        PrintWriter pw = new PrintWriter(socket.getOutputStream());
        BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));

        String req = "hello";
        pw.print(req);
        pw.flush();

        String temp;
        while ((temp = br.readLine()) != null) {
            System.out.println("收到服务端回声消息:" + temp);
        }
        pw.close();
    }

    public void beginTest(int port) throws InterruptedException {
        int number = 3000;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(number, number + 10, 10, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(number), new ThreadFactory() {
            @Override
            public Thread newThread( Runnable r) {
                return new Thread(r, "线程" + (number - atomicInteger.incrementAndGet()));
            }
        });
        int preparedThread = threadPoolExecutor.prestartAllCoreThreads();
        System.out.println("线程成功预热数:" + preparedThread);

        long beginTime = System.currentTimeMillis();
        CyclicBarrier barrier = new CyclicBarrier(number);
        CountDownLatch countDownLatch = new CountDownLatch(number);

        for (int i = 0; i < number; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        barrier.await();
                        initClient(port);
                    } catch (IOException | InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        countDownLatch.await();
        System.out.println(number + "线程连接服务器,总耗时:" + (System.currentTimeMillis() - beginTime) + "ms");
        System.exit(0);
    }
}

二、BIO

BIO(Block input output),同步阻塞IO模式,我们所熟悉的java.io包下面都是指的是BIO。IO线程阻塞于外部输入,这个过程会一直占着CPU资源不放,这种用法目前只限于应用内部的单线程逻辑,面对互联网上的高并发请求明显是不可取的。

回文服务器示例代码如下

public class BIOServer {

    public void initServer () throws IOException {
        ServerSocket serverSocket = new ServerSocket(2222);
        System.out.println("服务器主线程等待连接....");
        AtomicInteger i = new AtomicInteger(0);
        Socket client;
        while ((client = serverSocket.accept()) != null) {
            i.incrementAndGet();
            Socket finalClient = client;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        byte[] bytes = new byte["hello".getBytes().length];
                        while (finalClient.getInputStream().read(bytes) != -1) {
                            finalClient.getOutputStream().write(bytes);
                            finalClient.close();
                            return;
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }, "服务器线程" + i.get()).start();
        }
    }
}

public class BIO {

    public static void main(String[] args) throws IOException, InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    new BIOServer().initServer();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        Thread.sleep(1000);
        new Client().beginTest(2222);
    }
}

三、NIO

1、概述

NIO(Non-block input output),非阻塞模式IO。NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)。

传统IO基于流进行操作的,而NIO基于Channel和Buffer进行操作,是面向缓存的,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达),当监听事件发生时,才去唤醒线程处理请求,而不必阻塞于外部的输入。

下面是BIO的处理流程,可以看到每个线程都阻塞在socket的读写,而不能去处理其他事情,白白浪费了CPU资源。

BIO访问流程

下面是NIO的处理流程,NIO区别于BIO的关键步骤在于有一个专门的Selector线程轮询socket发来的有效数据,然后将其交给相应的线程进行处理,这里少了一个等待客户端发生数据的时间。

NIO访问流程

不做深究,点到为止,这里给一篇链接,算是讲得挺好的。

2、示例代码

public class NIOServer {

    private Selector selector;

    public void initServer(int port) throws IOException{
        // 打开ServerSocket通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(port));

        // 获取一个选择器
        this.selector = Selector.open();
        // 将通道管理器与该通道进行绑定,并为该通道注册SelectionKey.OP_ACCEPT事件
        // 注册事件后,当该事件触发时会使selector.select()返回,
        // 否则selector.select()一直阻塞
        serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        listen();
    }

    public void listen() throws IOException{
        System.out.println("启动服务器!");
        while (true) {
            // select()方法一直阻塞直到有注册的通道准备好了才会返回
            selector.select();
            Iterator<?> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = (SelectionKey) iterator.next();
                // 删除已选的key,防止重复处理
                iterator.remove();
                handler(key);
            }
        }
    }

    public void handler(SelectionKey key)throws IOException{
        if (key.isAcceptable()) {
            handlerAccept(key);
        }else if (key.isReadable()){
            handlerRead(key);
        }else if (key.isWritable()){
            System.out.println("can write!");
        }else if (key.isConnectable()){
            System.out.println("is connectable");
        }
    }


    public void handlerAccept(SelectionKey key) throws IOException{
        // 从SelectionKey中获取ServerSocketChannel
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        // 获取SocketChannel
        SocketChannel socketChannel = server.accept();
        // 设置成非阻塞
        socketChannel.configureBlocking(false);
        // 为socketChannel通道建立 OP_READ 读操作,使客户端发送的内容可以被读到
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    public void handlerRead(SelectionKey key)throws IOException{
        SocketChannel socketChannel = (SocketChannel) key.channel();
        // 创建读取缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        // 从通道读取可读取的字节数
        try {
            int readCount = socketChannel.read(byteBuffer);
            if (readCount > 0) {
                byte[] data = byteBuffer.array();
                ByteBuffer outBuffer = ByteBuffer.wrap(data);
                socketChannel.write(outBuffer);
                socketChannel.close();
            } else {
                System.out.println("客户端异常退出");
            }
        } catch (IOException e) {
            key.cancel();
        }
    }
}

public class NIO {

    public static void main(String[] args) throws IOException, InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    new NIOServer().initServer(4444);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        Thread.sleep(1000);
        new Client().beginTest(4444);
    }
}

四、AIO

AIO(Asynchronous Input Output),异步非阻塞IO模式。AIO的核心机制类似Future(CompletableFuture)机制,不同的是方法回调是由操作系统把控的,当操作系统完成读/写IO操作时,会回调Java线程。

示例代码如下

public class AIOServer {

    private static final ExecutorService executorService = Executors.newFixedThreadPool(200);

    public void init() throws Exception {
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService);
        AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
        server.bind(new InetSocketAddress(3333));
        server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel client, Object attachment) {
                server.accept(null, this);
//                try {
//                    System.out.println(Thread.currentThread().getName() + ":服务器与客户端" + client.getRemoteAddress() + "建立连接");
//                } catch (IOException e) {
//                    e.printStackTrace();
//                }
                ByteBuffer buffer = ByteBuffer.allocate("hello".getBytes().length);
                client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer index, ByteBuffer buffer) {
                        try {
                            buffer.flip();
                            client.write(buffer).get();//这个是异步的,一定要用get 确保执行结束 才能clear
                            client.close();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        System.out.println(exc.getMessage());
                    }
                });
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                throw new RuntimeException(exc.getMessage());
            }
        });
    }
}

public class AIO {

    public static void main(String[] args) throws Exception {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    new AIOServer().init();
                } catch (Exception e) {
                    e.printStackTrace();
                    return;
                }
            }
        }, "服务端线程").start();
        Thread.sleep(1000);
        if (args.length != 0) {
            new Client(Integer.parseInt(args[0])).beginTest(3333);
        } else {
            new Client().beginTest(3333);
        }
    }
}

五、性能比较

1、bash脚本测试

10线程50线程100线程500线程1000线程
BIO10 23、19、26、25、14、20、16、20、23、27 21.3ms
NIO10 16、20、25、21、25、21、19、22、21、15 20.5ms
AIO10
BIO100
NIO100
AIO100
BIO500
NIO500
AIO500
BIO1000
NIO1000
AIO1000
BIO2000
NIO2000
AIO2000
BIO3000
NIO3000
AIO3000
BIO5000
NIO5000
AIO5000
收藏本文