Buffer是一個對象,包含一些要寫入和讀出的數據。
在NIO中,所有的數據都是用緩沖區處理的,讀取數據時,它是從通道(Channel)直接讀到緩沖區中,在寫入數據時,也是從緩沖區寫入到通道。
緩沖區實質上是一個數組,通常是一個字節數組(ByteBuffer),也可以是其它類型的數組,此外緩沖區還提供了對數據的結構化訪問以及維護讀寫位置等信息。
Buffer類的繼承關系如下圖所示:
Channel是一個通道,網絡數據通過Channel讀取和寫入。通道和流的不同之處在於通道是雙向的(通道可以用於讀、寫後者二者同時進行),流只是在一個方向上移動。
Channel大體上可以分為兩類:用於網絡讀寫的SelectableChannel(ServerSocketChannel和SocketChannel就是其子類)、用於文件操作的FileChannel。
下面的例子給出通過FileChannel來向文件中寫入數據、從文件中讀取數據,將文件數據拷貝到另一個文件中:
public class NioTest { public static void main(String[] args) throws IOException { copyFile(); } //拷貝文件 private static void copyFile() { FileInputStream in=null; FileOutputStream out=null; try { in=new FileInputStream("src/main/java/data/in-data.txt"); out=new FileOutputStream("src/main/java/data/out-data.txt"); FileChannel inChannel=in.getChannel(); FileChannel outChannel=out.getChannel(); ByteBuffer buffer=ByteBuffer.allocate(1024); int bytesRead = inChannel.read(buffer); while (bytesRead!=-1) { buffer.flip(); outChannel.write(buffer); buffer.clear(); bytesRead = inChannel.read(buffer); } } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //寫文件 private static void writeFileNio() { try { RandomAccessFile fout = new RandomAccessFile("src/main/java/data/nio-data.txt", "rw"); FileChannel fc=fout.getChannel(); ByteBuffer buffer=ByteBuffer.allocate(1024); buffer.put("hi123".getBytes()); buffer.flip(); try { fc.write(buffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //讀文件 private static void readFileNio() { FileInputStream fileInputStream; try { fileInputStream = new FileInputStream("src/main/java/data/nio-data.txt"); FileChannel fileChannel=fileInputStream.getChannel();//從 FileInputStream 獲取通道 ByteBuffer byteBuffer=ByteBuffer.allocate(1024);//創建緩沖區 int bytesRead=fileChannel.read(byteBuffer);//將數據讀到緩沖區 while(bytesRead!=-1) { /*limit=position * position=0; */ byteBuffer.flip(); //hasRemaining():告知在當前位置和限制之間是否有元素 while (byteBuffer.hasRemaining()) { System.out.print((char) byteBuffer.get()); } /* * 清空緩沖區 * position=0; * limit=capacity; */ byteBuffer.clear(); bytesRead = fileChannel.read(byteBuffer); } } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
多路復用器提供選擇已經就緒的任務的能力。Selector會不斷的輪詢注冊在其上的Channel,如果某個Channel上面發送讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。
一個多路復用器Selector可以同時輪詢多個Channel,由於JDK使用了epoll代替了傳統的select實現,所以它沒有最大連接句柄1024/2048的限制,意味著只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端。其模型如下圖所示:
用單線程處理一個Selector。要使用Selector,得向Selector注冊Channel,然後調用它的select()方法。這個方法會一直阻塞到某個注冊的通道有事件就緒。一旦這個方法返回,線程就可以處理這些事件,事件的例子有如新連接進來,數據接收等。
注:
1、什麼select模型?
select是事件觸發機制,當等待的事件發生就觸發進行處理,多用於Linux實現的服務器對客戶端的處理。
可以阻塞地同時探測一組支持非阻塞的IO設備,是否有事件發生(如可讀、可寫,有高優先級錯誤輸出等),直至某一個設備觸發了事件或者超過了指定的等待時間。也就是它們的職責不是做IO,而是幫助調用者尋找當前就緒的設備。
2、什麼是epoll模型?
epoll的設計思路,是把select/poll單個的操作拆分為1個epoll_create+多個epoll_ctrl+一個wait。此外,內核針對epoll操作添加了一個文件系統”eventpollfs”,每一個或者多個要監視的文件描述符都有一個對應的eventpollfs文件系統的inode節點,主要信息保存在eventpoll結構體中。而被監視的文件的重要信息則保存在epitem結構體中。所以他們是一對多的關系。
功能說明:開啟服務器端,對每一個接入的客戶端都向其發送hello字符串。
使用NIO進行服務器端開發主要有以下幾個步驟:
1、創建ServerSocketChannel,配置它為非阻塞模式
serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false);
2、綁定監聽,配置TCP參數,如backlog大小
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
3、創建一個獨立的I/O線程,用於輪詢多路復用器Selector
4、創建Selector,將之前創建的ServerSocketChannel注冊到Selector上,監聽SelectionKey.ACCEPT
selector=Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
5、啟動I/O線程,在循環體內執行Selector.select()方法,輪詢就緒的Channel
while(true) { try { //select()阻塞到至少有一個通道在你注冊的事件上就緒了 //如果沒有准備好的channel,就在這一直阻塞 //select(long timeout)和select()一樣,除了最長會阻塞timeout毫秒(參數)。 selector.select(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); break; }
}
6、當輪詢到了處於就緒狀態的Channel時,需對其進行判斷,如果是OP_ACCEPT狀態,說明是新的客戶端接入,則調用ServerSocketChannel.accept()方法接受新的客戶端
//返回已經就緒的SelectionKey,然後迭代執行 Set<SelectionKey> readKeys=selector.selectedKeys(); for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();) { SelectionKey key=it.next(); it.remove(); try { if(key.isAcceptable()) { ServerSocketChannel server=(ServerSocketChannel) key.channel(); SocketChannel client=server.accept(); client.configureBlocking(false); client.register(selector,SelectionKey.OP_WRITE); } else if(key.isWritable()) { SocketChannel client=(SocketChannel) key.channel(); ByteBuffer buffer=ByteBuffer.allocate(20); String str="hello"; buffer=ByteBuffer.wrap(str.getBytes()); client.write(buffer); key.cancel(); } }catch(IOException e) { e.printStackTrace(); key.cancel(); try { key.channel().close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } }
7、設置新接入的客戶端鏈路SocketChannel為非阻塞模式,配置其他的一些TCP參數
if(key.isAcceptable()) { ServerSocketChannel server=(ServerSocketChannel) key.channel(); SocketChannel client=server.accept(); client.configureBlocking(false); ... }
8、將SocketChannel注冊到Selector,監聽OP_WRITE
client.register(selector,SelectionKey.OP_WRITE);
9、如果輪詢的Channel為OP_WRITE,則說明要向SockChannel中寫入數據,則構造ByteBuffer對象,寫入數據包
else if(key.isWritable()) { SocketChannel client=(SocketChannel) key.channel(); ByteBuffer buffer=ByteBuffer.allocate(20); String str="hello"; buffer=ByteBuffer.wrap(str.getBytes()); client.write(buffer); key.cancel(); }
完整代碼如下:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class ServerSocketChannelDemo { public static void main(String[] args) { ServerSocketChannel serverSocketChannel; Selector selector=null; try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(8080)); selector=Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } while(true) { try { //select()阻塞到至少有一個通道在你注冊的事件上就緒了 //如果沒有准備好的channel,就在這一直阻塞 //select(long timeout)和select()一樣,除了最長會阻塞timeout毫秒(參數)。 selector.select(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); break; } //返回已經就緒的SelectionKey,然後迭代執行 Set<SelectionKey> readKeys=selector.selectedKeys(); for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();) { SelectionKey key=it.next(); it.remove(); try { if(key.isAcceptable()) { ServerSocketChannel server=(ServerSocketChannel) key.channel(); SocketChannel client=server.accept(); client.configureBlocking(false); client.register(selector,SelectionKey.OP_WRITE); } else if(key.isWritable()) { SocketChannel client=(SocketChannel) key.channel(); ByteBuffer buffer=ByteBuffer.allocate(20); String str="hello"; buffer=ByteBuffer.wrap(str.getBytes()); client.write(buffer); key.cancel(); } }catch(IOException e) { e.printStackTrace(); key.cancel(); try { key.channel().close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } } } } View Code我們用telnet localhost 8080模擬出多個客戶端:
程序運行結果如下:
1、netty權威指南(李林峰)