IT序号网

Netty基础-BIO/NIO/AIO

lxf 2021年06月07日 程序员 384 0

同步阻塞IO(BIO):

        我们熟知的Socket编程就是BIO,每个请求对应一个线程去处理。一个socket连接一个处理线程(这个线程负责这个Socket连接的一系列数据传输操作)。阻塞的原因在于:操作系统允许的线程数量是有限的,多个socket申请与服务端建立连接时,服务端不能提供相应数量的处理线程,没有分配到处理线程的连接就会阻塞等待或被拒绝。

  如下图就是BIO(1:1同步阻塞)通信模型,每当有一个请求过来,都会创建新的线程,当线程数达到一定数量,占满了整台机器的资源,那么机器就挂掉了。对于CPU来说也是一个不好的事情,因为会导致频繁的切换上下文。

  那么我们有如下的改进措施(M:N同步阻塞IO),但是还是有上面的一些问题,仅仅是解决了频繁的创建线程的问题,不过由于是同步,如果读写速度慢,那么每个线程进来是会导致阻塞的,性能的高低完全取决于阻塞的时间。这个对于用户的体验也是相当不好的。

 同步非阻塞IO(NIO):

         NIO 是一种同步非阻塞的 IO 模型。同步是指线程不断轮询 IO 事件是否就绪,非阻塞是指线程在等待 IO 的时候,可以同时做其他任务。同步的核心就是 Selector,Selector 代替了线程本身轮询 IO 事件,避免了阻塞同时减少了不必要的线程消耗;非阻塞的核心就是通道和缓冲区,当 IO 事件就绪时,可以通过写道缓冲区,保证 IO 的成功,而无需线程阻塞式地等待。

  非阻塞式IO模型(NIO)NIO+单线程Reactor模式:reactor设计模式是event-driven architecture的一种实现方式,处理多个客户端并发的向服务端请求服务的场景。每种服务在服务端可能由多个方法组成。reactor会解耦并发请求的服务并分发给对应的事件处理器来处理。

  reactor主要由以下几个角色构成:handle、Synchronous Event Demultiplexer、Initiation Dispatcher、Event Handler、Concrete Event Handler

  • Handle(handle在linux中一般称为文件描述符):而在window称为句柄,两者的含义一样。handle是事件的发源地。比如一个网络socket、磁盘文件等。而发生在handle上的事件可以有connection、ready for read、ready for write等。
  • Synchronous Event Demultiplexer(同步事件分离器):本质上是系统调用。比如linux中的select、poll、epoll等。比如,select方法会一直阻塞直到handle上有事件发生时才会返回。
  • Event Handler(事件处理器):其会定义一些回调方法或者称为钩子函数,当handle上有事件发生时,回调方法便会执行,一种事件处理机制。
  • Concrete Event Handler(具体的事件处理器):实现了Event Handler。在回调方法中会实现具体的业务逻辑。
  • Initiation Dispatcher(初始分发器):也是reactor角色,提供了注册、删除与转发event handler的方法。当Synchronous Event Demultiplexer检测到handle上有事件发生时,便会通知initiation dispatcher调用特定的event handler的回调方法。

处理流程:

  1. 当应用向Initiation Dispatcher注册Concrete Event Handler时,应用会标识出该事件处理器希望Initiation Dispatcher在某种类型的事件发生发生时向其通知,事件与handle关联
  2. Initiation Dispatcher要求注册在其上面的Concrete Event Handler传递内部关联的handle,该handle会向操作系统标识
  3. 当所有的Concrete Event Handler都注册到 Initiation Dispatcher上后,应用会调用handle_events方法来启动Initiation Dispatcher的事件循环,这时Initiation Dispatcher会将每个Concrete Event Handler关联的handle合并,并使用Synchronous Event Demultiplexer来等待这些handle上事件的发生
  4. 当与某个事件源对应的handle变为ready时,Synchronous Event Demultiplexer便会通知 Initiation Dispatcher。比如tcp的socket变为ready for reading
  5. Initiation Dispatcher会触发事件处理器的回调方法。当事件发生时, Initiation Dispatcher会将被一个“key”(表示一个激活的handle)定位和分发给特定的Event Handler的回调方法
  6. Initiation Dispatcher调用特定的Concrete Event Handler的回调方法来响应其关联的handle上发生的事件

  这种模型情况下,由于 acceptor 是单线程的,既要接受请求,还要去处理时间,如果某一些事件处理请求花费的时间比较长,那么这个请求将会进入等待,整个情况下会同步。基于这种问题下我们有什么改进措施呢?

 非阻塞式IO模型(NIO)NIO+多线程Reactor模式:可以使用多线程去处理,使用线程池,让acceptor仅仅去接受请求,把事件的处理交给线程池中的线程去处理:

  那么在这种情况下还存在哪些弊端呢?将处理器的执行放入线程池,多线程进行业务处理。但Reactor仍为单个线程。还是acceptor是单线程的,无法去并行的去响应多个客户端,那么要怎么处理呢?

  NIO+主从多线程Reactor模式:

 

  mainReactor负责监听连接,accept连接给subReactor处理,为什么要单独分一个Reactor来处理监听呢?因为像TCP这样需要经过3次握手才能建立连接,这个建立连接的过程也是要耗时间和资源的,单独分一个Reactor来处理,可以提高性能。

异步阻塞IO(AIO):

          NIO是同步的IO,是因为程序需要IO操作时,必须获得了IO权限后亲自进行IO操作才能进行下一步操作。AIO是对NIO的改进(所以AIO又叫NIO.2),它是基于Proactor模型的。每个socket连接在事件分离器注册 IO完成事件 和 IO完成事件处理器。程序需要进行IO时,向分离器发出IO请求并把所用的Buffer区域告知分离器,分离器通知操作系统进行IO操作,操作系统自己不断尝试获取IO权限并进行IO操作(数据保存在Buffer区),操作完成后通知分离器;分离器检测到 IO完成事件,则激活 IO完成事件处理器,处理器会通知程序说“IO已完成”,程序知道后就直接从Buffer区进行数据的读写。

          也就是说:AIO是发出IO请求后,由操作系统自己去获取IO权限并进行IO操作;NIO则是发出IO请求后,由线程不断尝试获取IO权限,获取到后通知应用程序自己进行IO操作。

  同步/异步:数据如果尚未就绪,是否需要等待数据结果。

  阻塞/非阻塞:进程/线程需要操作的数据如果尚未就绪,是否妨碍了当前进程/线程的后续操作。应用程序的调用是否立即返回!

  NIO与BIO最大的区别是 BIO是面向流的,而NIO是面向Buffer的。

Java NIO 核心组件:

  在NIO 中有几个核心对象需要掌握:缓冲区(Buffer)、选择器(Selector)、通道(Channel)。

缓冲区Buffer:

1.Buffer 操作基本API

  是一块连续的内存块,是 NIO 数据读或写的中转地。 为什么说NIO是基于缓冲区的IO方式呢?因为,当一个链接建立完成后,IO的数据未必会马上到达,为了当数据到达时能够正确完成IO操作,在BIO(阻塞IO)中,等待IO的线程必须被阻塞,以全天候地执行IO操作。为了解决这种IO方式低效的问题,引入了缓冲区的概念,当数据到达时,可以预先被写入缓冲区,再由缓冲区交给线程,因此线程无需阻塞地等待IO。

  缓冲区实际上是一个容器对象,更直接的说,其实就是一个数组,在NIO 库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的; 在写入数据时,它也是写入到缓冲区中的;任何时候访问NIO 中的数据,都是将它放到缓冲区中。而在面向流I/O 系统中,所有数据都是直接写入或者直接将数据读取到Stream 对象中。在NIO 中,所有的缓冲区类型都继承于抽象类Buffer,最常用的就是ByteBuffer,对于Java 中的基本类型,基本都有一个具体Buffer 类型与之相对应,它们之间的继承关系如下图所示:

  java 层面中Buffer是一个顶层抽象类,我们需要先了解一下常用的实现进行数据的编/解码:

public class BufferDemo { 
 
    public static void decode(String str) throws UnsupportedEncodingException { 
    	// 开辟一个长度为128的字节空间 
        ByteBuffer byteBuffer = ByteBuffer.allocate(128); 
        //写入数据 
        byteBuffer.put(str.getBytes("UTF-8")); 
        //写完数据以后要进行读取,需要设置 limit 为 position 的值,然后 position 置为0。 
        byteBuffer.flip(); 
        /*获取utf8的编解码器*/ 
        Charset utf8 = Charset.forName("UTF-8"); 
        CharBuffer charBuffer = utf8.decode(byteBuffer);/*对bytebuffer中的内容解码*/ 
 
        /*array()返回的就是内部的数组引用,编码以后的有效长度是0~limit*/ 
        char[] charArr = Arrays.copyOf(charBuffer.array(), charBuffer.limit()); 
        System.out.println(charArr); 
    } 
 
    public static void encode(String str){ 
        CharBuffer charBuffer = CharBuffer.allocate(128); 
        charBuffer.append(str); 
        charBuffer.flip(); 
 
        /*对获取utf8的编解码器*/ 
        Charset utf8 = Charset.forName("UTF-8"); 
        ByteBuffer byteBuffer = utf8.encode(charBuffer); /*对charbuffer中的内容解码*/ 
 
        /*array()返回的就是内部的数组引用,编码以后的有效长度是0~limit*/ 
        byte[] bytes = Arrays.copyOf(byteBuffer.array(), byteBuffer.limit()); 
        System.out.println(Arrays.toString(bytes)); 
    } 
 
    public static void main(String[] args) throws UnsupportedEncodingException { 
 
        BufferDemo.decode("解码测试"); 
        BufferDemo.encode("编码测试"); 
    } 
} 

  在Java NIO中,如果两个通道中有一个是FileChannel,那你可以直接将数据从一个channel传输到另外一个channel。对应的api是 transferFrom() 跟transferTo()。再来看看 FileChannel 的简单应用:

public class FileChannelDemo {	 
	public static void main(String[] args) throws Exception { 
		 
		/*-------从buffer往fileChannel中写入数据-------------------------*/ 
		File file =new File("D:/nio.data"); 
		if(!file.exists()) {//判断文件是否存在,不存在则创建 
			file.createNewFile(); 
		} 
		//获取输出流 
		FileOutputStream outputStream = new FileOutputStream(file); 
		//从输出流中获取channel 
		FileChannel writeFileChannel = outputStream.getChannel(); 
		//开辟新的字节空间 
		ByteBuffer byteBuffer = ByteBuffer.allocate(128); 
		//写入数据 
		byteBuffer.put("fileChannel hello".getBytes("UTF-8")); 
		//刷新指针 
		byteBuffer.flip(); 
		//进行写操作 
		writeFileChannel.write(byteBuffer); 
		byteBuffer.clear(); 
		outputStream.close(); 
		writeFileChannel.close(); 
		 
		/*-------从fileChannel往buffer中写入数据-------------------------*/ 
		Path path = Paths.get("D:/nio.data"); 
		FileChannel readFileChannel = FileChannel.open(path); 
		ByteBuffer byteBuffer2 = ByteBuffer.allocate((int)readFileChannel.size()+1); 
		Charset charset = Charset.forName("UTF-8"); 
		readFileChannel.read(byteBuffer2); 
		byteBuffer2.flip(); 
		CharBuffer charBuffer = charset.decode(byteBuffer2); 
		System.out.println(charBuffer.toString()); 
		byteBuffer2.clear(); 
		readFileChannel.close();		 
	} 
}

 2.Buffer 的基本的原理

  在谈到缓冲区时,我们说缓冲区对象本质上是一个数组,但它其实是一个特殊的数组,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况,如果我们使用get()方法从缓冲区获取数据或者使用put()方法把数据写入缓冲区,都会引起缓冲区状态的变化。在缓冲区中,最重要的属性有下面三个,它们一起合作完成对缓冲区内部状态的变化跟踪:

position:指定下一个将要被写入或者读取的元素索引,它的值由get()/put()方法自动更新,在新创建一个Buffer 对象时,position 被初始化为0。

limit:指定还有多少数据需要取出(在从缓冲区写入通道时),或者还有多少空间可以放入数据(在从通道读入缓冲区时)。

capacity:指定了可以存储在缓冲区中的最大数据容量,实际上,它指定了底层数组的大小,或者至少是指定了准许我们使用的底层数组的容量。

Buffer初始时3个变量的情况如下图:

  以上三个属性值之间有一些相对大小的关系:0 <= position <= limit <= capacity。如果我们创建一个新的容量大小为10 的ByteBuffer 对象,在初始化的时候,position 设置为0,limit 和capacity 被设置为10,在以后使用ByteBuffer对象过程中,capacity 的值不会再发生变化,而其它两个个将会随着使用而变化。下面我们用代码来演示一遍,准备一个txt 文档,存放的E 盘,输入以下内容:

  在对Buffer进行读/写的过程中,position会往后移动,而 limit 就是 position 移动的边界。由此不难想象,在对Buffer进行写入操作时,limit应当设置为capacity的大小,而对Buffer进行读取操作时,limit应当设置为数据的实际结束位置。(注意:将Buffer数据 写入 通道是Buffer 读取 操作,从通道 读取 数据到Buffer是Buffer 写入 操作)

  在对Buffer进行读/写操作前,我们可以调用Buffer类提供的一些辅助方法来正确设置 position 和 limit 的值,主要有如下几个:

  • flip(): 设置 limit 为 position 的值,然后 position 置为0。对Buffer进行读取操作前调用。
  • rewind(): 仅仅将 position 置0。一般是在重新读取Buffer数据前调用,比如要读取同一个Buffer的数据写入多个通道时会用到。
  • clear(): 回到初始状态,即 limit 等于 capacity,position 置0。重新对Buffer进行写入操作前调用。
  • compact(): 将未读取完的数据(position 与 limit 之间的数据)移动到缓冲区开头,并将 position 设置为这段数据末尾的下一个位置。其实就等价于重新向缓冲区中写入了这么一段数据。

下面我们用一段代码来验证position、limit 和capacity 这几个值的变化过程,代码如下:

public class BufferAPIDemo { 
    public static void main(String args[]) throws Exception { 
        //这用用的是文件IO 处理 
        FileInputStream fin = new FileInputStream("E://test.txt"); 
        //创建文件的操作管道 
        FileChannel fc = fin.getChannel(); 
        //分配一个10 个大小缓冲区,说白了就是分配一个10 个大小的byte 数组 
        ByteBuffer buffer = ByteBuffer.allocate(10); 
        output("初始化", buffer); 
        //先读一下 
        fc.read(buffer); 
        output("调用read()", buffer); 
        //准备操作之前,先锁定操作范围 
        buffer.flip(); 
        output("调用flip()", buffer); 
        //判断有没有可读数据 
        while (buffer.remaining() > 0) { 
            byte b = buffer.get(); 
            // System.out.print(((char)b)); 
        } 
        output("调用get()", buffer); 
        //可以理解为解锁 
        buffer.clear(); 
        output("调用clear()", buffer); 
        //最后把管道关闭 
        fin.close(); 
    } 
    //把这个缓冲里面实时状态给打印出来 
    public static void output(String step, Buffer buffer) { 
        System.out.println(step + " : "); 
        //容量,数组大小 
        System.out.print("capacity: " + buffer.capacity() + ", "); 
        //当前操作数据所在的位置,也可以叫做游标 
        System.out.print("position: " + buffer.position() + ", "); 
        //锁定值,flip,数据操作范围索引只能在position - limit 之间 
        System.out.println("limit: " + buffer.limit()); 
        System.out.println(); 
    } 
}

  完成的输出结果为:

  运行结果我们已经看到,下面呢对以上结果进行图解,三个属性值分别如图所示:

 

  我们可以从通道中读取一些数据到缓冲区中,注意从通道读取数据,相当于往缓冲区中写入数据。如果读取4 个自己的数据,则此时position 的值为4,即下一个将要被写入的字节索引为4,而limit 仍然是10,如下图所示:

  下一步把读取的数据写入到输出通道中,相当于从缓冲区中读取数据,在此之前,必须调用flip()方法,该方法将会完成两件事情:

  1. 把limit 设置为当前的position 值
  2. 把position 设置为0

  由于position 被设置为0,所以可以保证在下一步输出时读取到的是缓冲区中的第一个字节,而limit 被设置为当前的position,可以保证读取的数据正好是之前写入到缓冲区中的数据,如下图所示:

  现在调用get()方法从缓冲区中读取数据写入到输出通道,这会导致position 的增加而limit 保持不变,但position 不会超过limit 的值,所以在读取我们之前写入到缓冲区中的4 个字节之后,position 和limit 的值都为4,如下图所示:

 

  在从缓冲区中读取数据完毕后,limit 的值仍然保持在我们调用flip()方法时的值,调用clear()方法能够把所有的状态变化设置为初始化时的值,如下图所示:

3.缓冲区的分配

  在前面的几个例子中,我们已经看过了,在创建一个缓冲区对象时,会调用静态方法allocate()来指定缓冲区的容量,其实调用allocate()相当于创建了一个指定大小的数组,并把它包装为缓冲区对象。或者我们也可以直接将一个现有的数组,包装为缓冲区对象,如下示例代码所示:

/** 手动分配缓冲区*/ 
public class BufferWrap { 
  public void myMethod() { 
    // 分配指定大小的缓冲区 
    ByteBuffer buffer1 = ByteBuffer.allocate(10); 
    // 包装一个现有的数组 
    byte array[] = new byte[10]; 
    ByteBuffer buffer2 = ByteBuffer.wrap( array ); 
  } 
}

4.缓冲区分片

  在NIO 中,除了可以分配或者包装一个缓冲区对象外,还可以根据现有的缓冲区对象来创建一个子缓冲区,即在现有缓冲区上切出一片来作为一个新的缓冲区,但现有的缓冲区与创建的子缓冲区在底层数组层面上是数据共享的,也就是说,子缓冲区相当于是现有缓冲区的一个视图窗口。调用slice()方法可以创建一个子缓冲区,让我们通过例子来看一下:

//缓冲区分片 
public class BufferSlice { 
    static public void main(String args[]) throws Exception { 
        ByteBuffer buffer = ByteBuffer.allocate(10); 
        // 缓冲区中的数据0-9 
        for (int i = 0; i < buffer.capacity(); ++i) { 
            buffer.put((byte) i); 
        } 
        // 创建子缓冲区 
        buffer.position(3); 
        buffer.limit(7); 
        ByteBuffer slice = buffer.slice(); 
        // 改变子缓冲区的内容 
        for (int i = 0; i < slice.capacity(); ++i) { 
            byte b = slice.get(i); 
            b *= 10; 
            slice.put(i, b); 
        } 
        buffer.position(0); 
        buffer.limit(buffer.capacity()); 
        while (buffer.remaining() > 0) { 
            System.out.println(buffer.get()); 
        } 
    } 
}

  在该示例中,分配了一个容量大小为10 的缓冲区,并在其中放入了数据0-9,而在该缓冲区基础之上又创建了一个子缓冲区,并改变子缓冲区中的内容,从最后输出的结果来看,只有子缓冲区“可见的”那部分数据发生了变化,并且说明子缓冲区与原缓冲区是数据共享的,输出结果如下所示:

5.只读缓冲区

  只读缓冲区非常简单,可以读取它们,但是不能向它们写入数据。可以通过调用缓冲区的asReadOnlyBuffer()方法,将任何常规缓冲区转换为只读缓冲区,这个方法返回一个与原缓冲区完全相同的缓冲区,并与原缓冲区共享数据,只不过它是只读的。如果原缓冲区的内容发生了变化,只读缓冲区的内容也随之发生变化:

public class ReadOnlyBuffer { 
    static public void main(String args[]) throws Exception { 
        ByteBuffer buffer = ByteBuffer.allocate(10); 
        // 缓冲区中的数据0-9 
        for (int i = 0; i < buffer.capacity(); ++i) { 
            buffer.put((byte) i); 
        } 
        // 创建只读缓冲区 
        ByteBuffer readonly = buffer.asReadOnlyBuffer(); 
        // 改变原缓冲区的内容 
        for (int i = 0; i < buffer.capacity(); ++i) { 
            byte b = buffer.get(i); 
            b *= 10; 
            buffer.put(i, b); 
        } 
        readonly.position(0); 
        readonly.limit(buffer.capacity()); 
        // 只读缓冲区的内容也随之改变 
        while (readonly.remaining() > 0) { 
            System.out.println(readonly.get()); 
        } 
    } 
}

  如果尝试修改只读缓冲区的内容,则会报ReadOnlyBufferException 异常。只读缓冲区对于保护数据很有用。在将缓冲区传递给某个对象的方法时,无法知道这个方法是否会修改缓冲区中的数据。创建一个只读的缓冲区可以保证该缓冲区不会被修改。只可以把常规缓冲区转换为只读缓冲区,而不能将只读的缓冲区转换为可写的缓冲区。

6.直接缓冲区

  直接缓冲区是为加快I/O 速度,使用一种特殊方式为其分配内存的缓冲区,JDK 文档中的描述为:给定一个直接字节缓冲区,Java虚拟机将尽最大努力直接对它执行本机I/O 操作。也就是说,它会在每一次调用底层操作系统的本机I/O 操作之前(或之后),尝试避免将缓冲区的内容拷贝到一个中间缓冲区中或者从一个中间缓冲区中拷贝数据。要分配直接缓冲区,需要调用allocateDirect()方法,而不是allocate()方法,使用方式与普通缓冲区并无区别,如下面的拷贝文件示例:

public class DirectBuffer { 
    static public void main(String args[]) throws Exception { 
        //首先我们从磁盘上读取刚才我们写出的文件内容 
        String infile = "E://test.txt"; 
        FileInputStream fin = new FileInputStream(infile); 
        FileChannel fcin = fin.getChannel(); 
        //把刚刚读取的内容写入到一个新的文件中 
        String outfile = String.format("E://testcopy.txt"); 
        FileOutputStream fout = new FileOutputStream(outfile); 
        FileChannel fcout = fout.getChannel(); 
        // 使用allocateDirect,而不是allocate 
        ByteBuffer buffer = ByteBuffer.allocateDirect(1024); 
        while (true) { 
            buffer.clear(); 
            int r = fcin.read(buffer); 
            if (r == -1) { 
                break; 
            } 
            buffer.flip(); 
            fcout.write(buffer); 
        } 
    } 
}

7.内存映射

  内存映射是一种读和写文件数据的方法,它可以比常规的基于流或者基于通道的I/O 快的多。内存映射文件I/O 是通过使文件中的数据出现为内存数组的内容来完成的,这其初听起来似乎不过就是将整个文件读到内存中,但是事实上并不是这样。一般来说,只有文件中实际读取或者写入的部分才会映射到内存中。如下面的示例代码:

public class MappedBuffer { 
 
    static private final int start = 0; 
    static private final int size = 1024; 
 
    static public void main(String args[]) throws Exception { 
        RandomAccessFile raf = new RandomAccessFile("E://test.txt", "rw"); 
        FileChannel fc = raf.getChannel(); 
        //把缓冲区跟文件系统进行一个映射关联 
        //只要操作缓冲区里面的内容,文件内容也会跟着改变 
        MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, start, size); 
        mbb.put(0, (byte) 97); 
        mbb.put(1023, (byte) 122); 
        raf.close(); 
    } 
}

 选择器Selector

 selector:

  Selector(选择器)是一个特殊的组件,用于采集各个通道的状态(或者说事件)。我们先将通道注册到选择器,并设置好关心的事件,然后就可以通过调用select()方法,静静地等待事件发生。通道有如下4个事件可供我们监听:

  • Accept:有可以接受的连接
  • Connect:连接成功
  • Read:有数据可读
  • Write:可以写入数据了

  由于如果用阻塞I/O,需要多线程(浪费内存),如果用非阻塞I/O,需要不断重试(耗费CPU)。Selector的出现解决了这尴尬的问题,非阻塞模式下,通过Selector,我们的线程只为已就绪的通道工作,不用盲目的重试了。比如,当所有通道都没有数据到达时,也就没有Read事件发生,我们的线程会在select()方法处被挂起,从而让出了CPU资源。

  传统的Server/Client 模式会基于TPR(Thread per Request),服务器会为每个客户端请求建立一个线程,由该线程单独负责处理一个客户请求。这种模式带来的一个问题就是线程数量的剧增,大量的线程会增大服务器的开销。大多数的实现为了避免这个问题,都采用了线程池模型,并设置线程池线程的最大数量,这又带来了新的问题,如果线程池中有200 个线程,而有200 个用户都在进行大文件下载,会导致第201个用户的请求无法及时处理,即便第201 个用户只想请求一个几KB 大小的页面。

  NIO 中非阻塞I/O 采用了基于Reactor 模式的工作方式,I/O 调用不会被阻塞,相反是注册感兴趣的特定I/O 事件,如可读数据到达,新的套接字连接等等,在发生特定事件时,系统再通知我们。NIO 中实现非阻塞I/O 的核心对象就是Selector,Selector 就是注册各种I/O 事件地方,而且当那些事件发生时,就是这个对象告诉我们所发生的事件,如下图所示:

 

  从图中可以看出,当有读或写等任何注册的事件发生时,可以从Selector 中获得相应的SelectionKey,同时从SelectionKey 中可以找到发生的事件和该事件所发生的具体的SelectableChannel,以获得客户端发送过来的数据。使用NIO 中非阻塞I/O 编写服务器处理程序,大体上可以分为下面三个步骤:

  1. 向Selector 对象注册感兴趣的事件。
  2. 从Selector 中获取感兴趣的事件。
  3. 根据不同的事件进行相应的处理。

 通道Channel

  Channel: 数据的源头或者数据的目的地 ,用于向 buffer 提供数据或者读取 buffer 数据 ,buffer 对象的唯一接口,异步 I/O 支持。

  Buffer作为IO流中数据的缓冲区,而Channel则作为socket的IO流与Buffer的传输通道。客户端socket与服务端socket之间的IO传输不直接把数据交给CPU使用,而是先经过Channel通道把数据保存到Buffer,然后CPU直接从Buffer区读写数据,一次可以读写更多的内容。使用Buffer提高IO效率的原因(这里与IO流里面的BufferedXXStream、BufferedReader、BufferedWriter提高性能的原理一样):IO的耗时主要花在数据传输的路上,普通的IO是一个字节一个字节地传输,而采用了Buffer的话,通过Buffer封装的方法(比如一次读一行,则以行为单位传输而不是一个字节一次进行传输)就可以实现“一大块字节”的传输。比如:IO就是送快递,普通IO是一个快递跑一趟,采用了Buffer的IO就是一车跑一趟。很明显,buffer效率更高,花在传输路上的时间大大缩短。

  面向buffer的通道,一个Channel(通道)代表和某一实体的连接,这个实体可以是文件、网络套接字等。也就是说,通道是Java NIO提供的一座桥梁,用于我们的程序和操作系统底层I/O服务进行交互。通道是一种很基本很抽象的描述,和不同的I/O服务交互,执行不同的I/O操作,实现不一样,因此具体的有FileChannel、SocketChannel,ServerSocketChannel,DatagramChannel等。通道使用起来跟Stream比较像,可以读取数据到Buffer中,也可以把Buffer中的数据写入通道。但是channel是双向的,而stream是单向的。

  通道是一个对象,通过它可以读取和写入数据,当然了所有数据都通过Buffer 对象来处理。我们永远不会将字节直接写入通道中,相反是将数据写入包含一个或者多个字节的缓冲区。同样不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。在NIO 中,提供了多种通道对象,而所有的通道对象都实现了Channel 接口。它们之间的继承关系如下图所示:

   结合上面的三大组件,来实现一下基本的NIO流程,服务端:

/*服务器端,:接收客户端发送过来的数据并显示, 
 *服务器把上接收到的数据加上"echo from service:"再发送回去*/ 
public class ServiceSocketChannelDemo { 
    public static class TCPEchoServer implements Runnable{ 
        /*服务器地址*/ 
        private InetSocketAddress localAddress; 
        public TCPEchoServer(int port) throws IOException { 
            this.localAddress = new InetSocketAddress(port); 
        } 
        @Override 
        public void run(){ 
            Charset utf8 = Charset.forName("UTF-8"); 
            ServerSocketChannel ssc = null; 
            Selector selector = null; 
            Random rnd = new Random(); 
            try { 
                /*创建选择器*/ 
                selector = Selector.open(); 
                /*创建服务器通道*/ 
                ssc = ServerSocketChannel.open(); 
                ssc.configureBlocking(false); 
                /*设置监听服务器的端口,设置最大连接缓冲数为100*/ 
                ssc.bind(localAddress, 100); 
                /*服务器通道只能对tcp链接事件感兴趣*/ 
                ssc.register(selector, SelectionKey.OP_ACCEPT); 
            } catch (IOException e1) { 
                System.out.println("server start failed"); 
                return; 
            } 
            System.out.println("server start with address : " + localAddress); 
            /*服务器线程被中断后会退出*/ 
            try{ 
                while(!Thread.currentThread().isInterrupted()){ 
                    int n = selector.select(); 
                    if(n == 0){ 
                        continue; 
                    } 
                    Set<SelectionKey> keySet = selector.selectedKeys(); 
                    Iterator<SelectionKey> it = keySet.iterator(); 
                    SelectionKey key = null; 
                    while(it.hasNext()){ 
                        key = it.next(); 
                        /*防止下次select方法返回已处理过的通道*/ 
                        it.remove(); 
                        /*若发现异常,说明客户端连接出现问题,但服务器要保持正常*/ 
                        try{ 
                            /*ssc通道只能对链接事件感兴趣*/ 
                            if(key.isAcceptable()){ 
                                /*accept方法会返回一个普通通道, 
                                     每个通道在内核中都对应一个socket缓冲区*/ 
                                SocketChannel sc = ssc.accept(); 
                                sc.configureBlocking(false); 
                                /*向选择器注册这个通道和普通通道感兴趣的事件,同时提供这个新通道相关的缓冲区*/ 
                                int interestSet = SelectionKey.OP_READ; 
                                sc.register(selector, interestSet, new Buffers(256, 256)); 
                                System.out.println("accept from " + sc.getRemoteAddress()); 
                            } 
                            /*(普通)通道感兴趣读事件且有数据可读*/ 
                            if(key.isReadable()){ 
                                /*通过SelectionKey获取通道对应的缓冲区*/ 
                                Buffers  buffers = (Buffers)key.attachment(); 
                                ByteBuffer readBuffer = buffers.getReadBuffer(); 
                                ByteBuffer writeBuffer = buffers.gerWriteBuffer(); 
                                /*通过SelectionKey获取对应的通道*/ 
                                SocketChannel sc = (SocketChannel) key.channel(); 
                                /*从底层socket读缓冲区中读入数据*/ 
                                sc.read(readBuffer); 
                                readBuffer.flip(); 
                                /*解码显示,客户端发送来的信息*/ 
                                CharBuffer cb = utf8.decode(readBuffer); 
                                System.out.println(cb.array()); 
                                readBuffer.rewind(); 
                                /*准备好向客户端发送的信息*/ 
                                /*先写入"echo:",再写入收到的信息*/ 
                                writeBuffer.put("echo from service:".getBytes("UTF-8")); 
                                writeBuffer.put(readBuffer); 
                                readBuffer.clear(); 
                                /*设置通道写事件*/ 
                                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); 
                            } 
                            /*通道感兴趣写事件且底层缓冲区有空闲*/ 
                            if(key.isWritable()){ 
                                Buffers  buffers = (Buffers)key.attachment(); 
                                ByteBuffer writeBuffer = buffers.gerWriteBuffer(); 
                                writeBuffer.flip(); 
                                SocketChannel sc = (SocketChannel) key.channel(); 
                                int len = 0; 
                                while(writeBuffer.hasRemaining()){ 
                                    len = sc.write(writeBuffer); 
                                    /*说明底层的socket写缓冲已满*/ 
                                    if(len == 0){ 
                                        break; 
                                    } 
                                } 
                                writeBuffer.compact(); 
 
                                /*说明数据全部写入到底层的socket写缓冲区*/ 
                                if(len != 0){ 
                                    /*取消通道的写事件*/ 
                                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); 
                                } 
                            } 
                        }catch(IOException e){ 
                            System.out.println("service encounter client error"); 
                            /*若客户端连接出现异常,从Seletcor中移除这个key*/ 
                            key.cancel(); 
                            key.channel().close(); 
                        } 
                    } 
                    Thread.sleep(rnd.nextInt(500)); 
                } 
            }catch(InterruptedException e){ 
                System.out.println("serverThread is interrupted"); 
            } catch (IOException e1) { 
                System.out.println("serverThread selecotr error"); 
            }finally{ 
                try{ 
                    selector.close(); 
                }catch(IOException e){ 
                    System.out.println("selector close failed"); 
                }finally{ 
                    System.out.println("server close"); 
                } 
            } 
        } 
    } 
    public static void main(String[] args) throws InterruptedException, IOException{ 
        Thread thread = new Thread(new TCPEchoServer(8080)); 
        thread.start(); 
        Thread.sleep(100000); 
        /*结束服务器线程*/ 
        thread.interrupt(); 
    } 
} 

  Buffers:

/*自定义Buffer类中包含读缓冲区和写缓冲区,用于注册通道时的附加对象*/ 
public class Buffers { 
 
    ByteBuffer readBuffer; 
    ByteBuffer writeBuffer; 
 
    public Buffers(int readCapacity, int writeCapacity){ 
        readBuffer = ByteBuffer.allocate(readCapacity); 
        writeBuffer = ByteBuffer.allocate(writeCapacity); 
    } 
    public ByteBuffer getReadBuffer(){ 
        return readBuffer; 
    } 
    public ByteBuffer gerWriteBuffer(){ 
        return writeBuffer; 
    } 
} 

  客户端:

/*客户端:客户端每隔1~2秒自动向服务器发送数据,接收服务器接收到数据并显示*/ 
public class ClientSocketChannelDemo { 
 
    public static class TCPEchoClient implements Runnable{ 
        /*客户端线程名*/ 
        private String name; 
        private Random rnd = new Random(); 
        /*服务器的ip地址+端口port*/ 
        private InetSocketAddress remoteAddress; 
        public TCPEchoClient(String name, InetSocketAddress remoteAddress){ 
            this.name = name; 
            this.remoteAddress = remoteAddress; 
        } 
        @Override 
        public void run(){ 
            /*创建解码器*/ 
            Charset utf8 = Charset.forName("UTF-8"); 
            Selector selector; 
            try { 
                /*创建TCP通道*/ 
                SocketChannel sc = SocketChannel.open(); 
                /*设置通道为非阻塞*/ 
                sc.configureBlocking(false); 
                /*创建选择器*/ 
                selector = Selector.open(); 
                /*注册感兴趣事件*/ 
                int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; 
                /*向选择器注册通道*/ 
                sc.register(selector, interestSet, new Buffers(256, 256)); 
                /*向服务器发起连接,一个通道代表一条tcp链接*/ 
                sc.connect(remoteAddress); 
                /*等待三次握手完成*/ 
                while(!sc.finishConnect()){ 
                    ; 
                } 
                System.out.println(name + " " + "finished connection"); 
            } catch (IOException e) { 
                System.out.println("client connect failed"); 
                return; 
            } 
            /*与服务器断开或线程被中断则结束线程*/ 
            try{ 
                int i = 1; 
                while(!Thread.currentThread().isInterrupted()){ 
                    /*阻塞等待*/ 
                    selector.select(); 
                    /*Set中的每个key代表一个通道*/ 
                    Set<SelectionKey> keySet = selector.selectedKeys(); 
                    Iterator<SelectionKey> it = keySet.iterator(); 
                    /*遍历每个已就绪的通道,处理这个通道已就绪的事件*/ 
                    while(it.hasNext()){ 
                        SelectionKey key = it.next(); 
                        /*防止下次select方法返回已处理过的通道*/ 
                        it.remove(); 
                        /*通过SelectionKey获取对应的通道*/ 
                        Buffers  buffers = (Buffers)key.attachment(); 
                        ByteBuffer readBuffer = buffers.getReadBuffer(); 
                        ByteBuffer writeBuffer = buffers.gerWriteBuffer(); 
                        /*通过SelectionKey获取通道对应的缓冲区*/ 
                        SocketChannel sc = (SocketChannel) key.channel(); 
                        /*表示底层socket的读缓冲区有数据可读*/ 
                        if(key.isReadable()){ 
                            /*从socket的读缓冲区读取到程序定义的缓冲区中*/ 
                            sc.read(readBuffer); 
                            readBuffer.flip(); 
                            /*字节到utf8解码*/ 
                            CharBuffer cb = utf8.decode(readBuffer); 
                            /*显示接收到由服务器发送的信息*/ 
                            System.out.println(cb.array()); 
                            readBuffer.clear(); 
                        } 
                        /*socket的写缓冲区可写*/ 
                        if(key.isWritable()){ 
                            writeBuffer.put((name + "  " + i).getBytes("UTF-8")); 
                            writeBuffer.flip(); 
                            /*将程序定义的缓冲区中的内容写入到socket的写缓冲区中*/ 
                            sc.write(writeBuffer); 
                            writeBuffer.clear(); 
                            i++; 
                        } 
                    } 
                    Thread.sleep(1000 + rnd.nextInt(1000)); 
                } 
            }catch(InterruptedException e){ 
                System.out.println(name + " is interrupted"); 
            }catch(IOException e){ 
                System.out.println(name + " encounter a connect error"); 
            }finally{ 
                try { 
                    selector.close(); 
                } catch (IOException e1) { 
                    System.out.println(name + " close selector failed"); 
                }finally{ 
                    System.out.println(name + "  closed"); 
                } 
            } 
        } 
    } 
    public static void main(String[] args) throws InterruptedException{ 
        InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 8080); 
        Thread ta = new Thread(new TCPEchoClient("thread a", remoteAddress)); 
        ta.start(); 
        Thread.sleep(5000); 
        /*结束客户端a*/ 
        ta.interrupt(); 
    } 
} 

IO 多路复用

  我们试想一下这样的现实场景:

  一个餐厅同时有100 位客人到店,当然到店后第一件要做的事情就是点菜。但是问题来了,餐厅老板为了节约人力成本目前只有一位大堂服务员拿着唯一的一本菜单等待客人进行服务。那么最笨(但是最简单)的方法是(方法A),无论有多少客人等待点餐,服务员都把仅有的一份菜单递给其中一位客人,然后站在客人身旁等待这个客人完成点菜过程。在记录客人点菜内容后,把点菜记录交给后堂厨师。然后是第二位客人。。。。然后是第三位客人。很明显,只有脑袋被门夹过的老板,才会这样设置服务流程。因为随后的80 位客人,再等待超时后就会离店(还会给差评)。

  于是还有一种办法(方法B),老板马上新雇佣99 名服务员,同时印制99 本新的菜单。每一名服务员手持一本菜单负责一位客人(关键不只在于服务员,还在于菜单。因为没有菜单客人也无法点菜)。在客人点完菜后,记录点菜内容交给后堂厨师(当然为了更高效,后堂厨师最好也有100 名)。这样每一位客人享受的就是VIP 服务咯,当然客人不会走,但是人力成本可是一个大头哦(亏死你)。

  另外一种办法(方法C:多路复用IO),就是改进点菜的方式,当客人到店后,自己申请一本菜单。想好自己要点的才后,就呼叫服务员。服务员站在自己身边后记录客人的菜单内容。将菜单递给厨师的过程也要进行改进,并不是每一份菜单记录好以后,都要交给后堂厨师。服务员可以记录号多份菜单后,同时交给厨师就行了。那么这种方式,对于老板来说人力成本是最低的;对于客人来说,虽然不再享受VIP 服务并且要进行一定的等待,但是这些都是可接受的;对于服务员来说,基本上她的时间都没有浪费,基本上被老板压杆了最后一滴油水。

  目前流行的多路复用IO 实现主要包括四种:select、poll、epoll、kqueue。下表是他们的一些重要特性的比较:

select: 性能较高,关键思路是Reactor ,操作系统支持windows/Linux,JAVA支持,Reactor 模式(反应器设计模式)。Linux 操作系统的kernels 2.4 内核版本之前, 默认使用select;而目前windows 下对同步IO 的支持,都是select 模型。

poll :性能较高,关键思路是Reactor ,操作系统支持Linux。Linux 下的JAVA NIO 框架,Linux kernels 2.6 内核版本之前使用poll 进行支持。也是使用的Reactor 模式。

epoll :性能高,关键思路是Reactor/Proactor ,操作系统支持Linux 。Linux kernels 2.6 内核版本及以后使用epoll 进行支持;Linux kernels 2.6 内核版本之前使用poll进行支持;另外一定注意,由于Linux 下没有Windows 下的IOCP 技术提供真正的异步IO 支持,所以Linux 下使用epoll 模拟异步IO。

kqueue :性能高,关键思路是Proactor ,操作系统支持Linux 。目前JAVA 的版本不支持。


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!