J2SE 1.4版引入了非阻隔套接字(Nonblocking sockets),它允許在網絡通信應用程序和沒有阻隔的進程中使用套接字。本文將詳細介紹什麼是非阻隔套接字(Nonblocking sockets)及其工作原理和用途。
從Java 1.4起,程序員便能用一組新的API來進行I/O操作。這是JSR 51項目的結果,自2000年1月的Java 1.4 beta版,程序員便可以使用JSR 51了。在Java 1.4中增加了一些非常重要的新技術來處理諸如在文件和套接字上進行高性能的讀/寫操作,正規表達式,譯碼/編碼字符集,內存映射和文件鎖定。在這篇文章中,我們將討論一個特殊的新API――New I/O API: Nonblocking sockets。
非阻隔套接字允許在通道上做輸入/輸出操作而不用阻塞該通道的進程。本文中我將討論異步高性能讀/寫操作和翻轉上下設計和開發基於接口的應用程序的技巧。
Java開發者也許會問,為什麼介紹一種新的技術來處理套接字?Java 1.3.x的套接字又有哪些問題?假設實現服務器端接受不同的客戶端的連接。同樣,假設客戶端能支持處理同步的多請求。使用Java 1.3.x,開發這樣的服務器端有兩種不同的選擇:
●實現多線程服務為每個連接用戶處理線程。
●使用外部第三方模塊。
這兩種方法都可以實現,但是如果適用第一種方法――整個線程管理方案,包括相關並發性和沖突問題――都需要靠程序員來處理。第二個方案也許花費更大,且使應用程序依靠“non-JDK”的外部模塊。依靠非阻隔套接字,你能實現非阻隔的服務無需直接管理線程或者采用外部模塊。
Buffer
在我們考慮非阻隔套接字以前,不得不花費一些時間在一個新的Java 1.4的類:java.nio.Buffer上。一個Buffer實例只是原始數據的一個有限的容器。稱其有限是因為它只能包含有限數量的字節;換句語說,它不是一個像Vector或是ArrayList一樣的容器,後兩者從理論上說是沒有限度的。另外,一個Buffer實例僅能包含屬於Java的基本數據類型。例如:int,char,double,Boolean,等等。
Buffer類是一個抽象類,它有7個子類分別對應於七種基本的數據類型:
●ByteBuffer
●CharBuffer
●DoubleBuffer
●FloatBuffer
●IntBuffer
●LongBuffer
●ShortBuffer
在非阻隔套接字編程中,通常所有新 I/O系統能工作的環境中,極其重要的是解決Buffer對象如何工作。這是因為新套接字通道使用Buffer對象通過網絡來傳送數據。
你可以使用以下靜態方法(即類方法)來創建一個新的Buffer實例:allocate,allocateDirect,wrap。在下面的例子中,三個Buffer對象將用三種不同的方法來實例化。
ByteBuffer buffer1 = ByteBuffer.allocate(1024);
ByteBuffer buffer2 = ByteBuffer.allocateDirect(1024);
ByteBuffer buffer3 = ByteBuffer.wrap(new String("hello").getBytes());
這段代碼的前兩行創建了兩個ByteBuffer對象,它們都包含1024個字節。allocate和allocateDirect方法都做了相同的工作,不同的是第二個方法直接使用操作系統來分配Buffer。從而它將提供更快的訪問速度。不幸的是,並非所有的虛擬機都支持這種直接分配的方法。第三行使用wrap方法。它創建了一個ByteBuffer對象,包含的字節由字符串“hello”組成。
Buffer對象的作用或多或少的與流的作用相似。“當前位置(current position)”是一個極其重要的概念,它計算出你將要處理的Buffer對象的適當的位置。在任何時候,一個Buffer對象都有一個當前位置指向某一項。之後,每一次讀或寫操作都會自動的將當前位置指向Buffer中的下一項。
你可以用put方法寫入一些數據到Buffer中:
// Writing on a buffer
IntBuffer buffer = IntBuffer.allocate(10);
for (int i=0; i < buffer.capacity(); i++) {
buffer.put(i);
}
這段代碼創建了一個包含10個整型值的Buffer,然後將數字0到9放入到Buffer中。同時你可以看到,我使用了capacity方法來獲得Buffer的容量。
要想讀取Buffer的內容,你可以用如下方法來處理:
// Reading from a buffer
buffer.position(0);
while (buffer.hasRemaining()) {
int i = buffer.get();
System.out.println("i="+i);
}
調用position方法,你能設置當前位置為0;即Bufferr的起始位置。當在當前位置和limit值之間有元素時,hasRemaining方法返回true;直到超出這個范圍時,這個方法將返回flase。while循環中的代碼調用get方法讀取各項,並同時顯示在控制台上。
理解Buffer的limit和capacity這兩個值之間的區別是十分重要的。Capacity是某個Buffer對象所能包含的項數的最大值。Limit是在0到capacity之間的一個值,它表示一個限度,可以使用limit或者flip方法來設置它。我們來看下面的例子:
// Sample of using flip
buffer.position(5);
buffer.flip();
while (buffer.hasRemaining()) {
int i = buffer.get();
System.out.println("i="+i);
}
當前位置被position方法設置成5。Flip方法進行如下操作:先將設置limit為當前位置的值,即5;然後再設置當前位置的值為0。因此,此後的while循環就只能掃描到前5個元素了,因為flip方法設置了新的limit值,即為5。從而,數字0,1,2,3,4將被顯示出來。
另一個重要的Buffer類的方法是clear,它將設置position為0並設置limit為Buffer的容量值。基本上,clear方法消除這之前flip(或limit)方法產生的影響。考慮下例:
// Sample of using clear
buffer.clear();
while (buffer.hasRemaining()) {
int i = buffer.get();
System.out.println("i="+i);
}
這段代碼將顯示數字0到9,而與Buffer的當前位置和limit值無關。
非阻隔(Nonblocking)體系結構
在這一部分,我將從理論的角度上來解釋非阻隔體系的結構及其工作原理。這部“喜劇(當然,如果你喜歡的話也可以稱做戲劇)”的“人物”如下:
●服務器:接收請求的應用程序。
●客戶:一組向服務器端發出請求的應用程序。
●套接字通道:客戶端與服務器端之間的通信通道。它能識別服務器端的IP地址和端口號。數據以Buffer中元素的形式通過套接字通道傳送。
●選擇器:所有非阻隔技術的主要對象。它監視著已注冊的套接字通道,並序列化服務器需要應答的請求。
●關鍵字:選擇器用來對對象的請求進行排序。每個關鍵字代表一個單獨的客戶端子請求並包含識別客戶端和請求類型的信息。
圖一:使用非阻隔端口體系的結構圖。
圖1:非阻隔端口結構
你可能注意到,客戶端應用程序同時執行對服務器端的請求,選擇器將其集中起來,創建關鍵字,然後將其發送至服務器端。這看起來像是阻隔(Blocking)體系,因為在一定時間內只處理一個請求,但事實並非如此。實際上,每個關鍵字不代表從客戶端發至服務器端的整個信息流,僅僅只是一部分。我們不要忘了選擇器能分割那些被關鍵字標識的子請求裡的數據。因此,如果有更多連續地數據發送至服務器端,那麼選擇器就會創建更多的根據時間共享策略(Time-sharing policy)來進行處理的關鍵字。強調一下,在圖一中關鍵字的顏色與客戶端的顏色相對應。
服務器端非阻隔(Server Nonblocking)
我以前的部分介紹過的實體都有與其相當的Java實體。客戶端和服務器端是兩個Java應用程序。套接字通道是SocketChannel類的實例,這個類允許通過網絡傳送數據。它們能被Java程序員看作是一個新的套接字。SocketChannel類被定義在java.nio.channel包中。
選擇器是一個Selector類的對象。該類的每個實例均能監視更多的套接字通道,進而建立更多的連接。當一些有意義的事發生在通道上(如客戶端試圖連接服務器端或進行讀/寫操作),選擇器便會通知應用程序處理請求。選擇器會創建一個關鍵字,這個關鍵字是SelectionKey類的一個實例。每個關鍵字都保存著應用程序的標識及請求的類型。其中,請求的類型可以是如下之一:
●嘗試連接(客戶端)
●嘗試連接(服務器端)
●讀取操作
●寫入操作
一個通用的實現非阻隔服務器的算法如下:
create SocketChannel;
create Selector
associate the SocketChannel to the Selector
for(;;) {
waiting events from the Selector;
event arrived; create keys;
for each key created by Selector {
check the type of request;
isAcceptable:
get the client SocketChannel;
associate that SocketChannel to the Selector;
record it for read/write operations
continue;
isReadable:
get the client SocketChannel;
read from the socket;
continue;
isWriteable:
get the client SocketChannel;
write on the socket;
continue;
}
}
基本上,服務器端的實現是由選擇器等待事件和創建關鍵字的無限循環組成的。根據關鍵字的類型,及時的執行操作。關鍵字存在以下4種可能的類型。
Acceptable: 相應的客戶端要求連接。
Connectable:服務器端接受連接。
Readable:服務器端可讀。
Writeable:服務器端可寫。
通常一個表示接受的關鍵字創建在服務器端。事實上,這種關鍵字僅僅通知一下服務器端客戶端請求連接。在這種環境下,正如你通過算法得到的結論一樣,服務器端個性化套接字通道和連接這個通道到選擇器以便進行讀/寫操作。從這一刻起,當接受客戶端讀或寫操作時,選擇器將為客戶端創建Readable或Writeable關鍵字。從而,服務器端將截取這些關鍵字並執行正確的動作。
現在,你可以用下面這個推薦算法和Java語言寫服務器端了。用這種方法能成功的創建套接字通道,選擇器,和套接字-選擇器注冊(socket-selector registration)。
// Create the server socket channel
ServerSocketChannel server = ServerSocketChannel.open();
// nonblocking I/O
server.configureBlocking(false);
// host-port 8000
server.socket().bind(new java.net.InetSocketAddress(host,8000));
System.out.println("Server attivo porta 8000");
// Create the selector
Selector selector = Selector.open();
// Recording server to selector (type OP_ACCEPT)
server.register(selector,SelectionKey.OP_ACCEPT);
這個靜態(static)open類方法創建了一個SocketChannel類的實例。configureBlocking(false)調用設置通道為非阻隔。通過bind方法建立到服務器端的連接。字符串“host”代表服務器的IP地址,8000是通信套接字。你可以調用你可以調用Selector類的靜態(static)open方法創建選擇器。最後,register方法用來連接選擇器和套接字通道。
第二個參數代表注冊的類型。在這個例子中,我們使用OP_ACCEPT,意思是選擇器僅能報告客戶端試圖嘗試連接服務器端。其他可能的選項是:OP_CONNECT,在客戶端下使用;OP_READ; 和OP_WRITE
用Java語言描述的無限for循環的代碼如下:
// Infinite server loop
for(;;) {
// Waiting for events
selector.select();
// Get keys
Set keys = selector.selectedKeys();
Iterator i = keys.iterator();
// For each keys...
while(i.hasNext()) {
SelectionKey key = (SelectionKey) i.next();
// Remove the current key
i.remove();
// if isAccetable = true
// then a client required a connection
if (key.isAcceptable()) {
// get client socket channel
SocketChannel client = server.accept();
// Non Blocking I/O
client.configureBlocking(false);
// recording to the selector (reading)
client.register(selector, SelectionKey.OP_READ);
continue;
}
// if isReadable = true
// then the server is ready to read
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
// Read byte coming from the client
int BUFFER_SIZE = 32;
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
try {
client.read(buffer);
}
catch (Exception e) {
// client is no longer active
e.printStackTrace();
continue;
}
// Show bytes on the console
buffer.flip();
Charset charset=Charset.forName("ISO-8859-1");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(buffer);
System.out.print(charBuffer.toString());
continue;
}
}
}
循環的第一行是調用select方法,它會阻塞進程執行並等待選擇器上記錄的事件。在這段代碼中,套接字通道由服務器變量指代。實際上,服務器端不是一個SocketChannel對象,但是一個ServerSocketChannel對象。它象SocketChannel一樣是SelectableChannel類的一般化,通常用於服務器端的應用程序。
選擇器等待的事件是客戶端嘗試連接。當這樣的操作出現時,服務器端的應用程序便獲得一個由選擇器創建的關鍵字和檢查每個關鍵字的類型。你也許注意到,當一個關鍵字被處理時,它不得不調用remove方法從這組關鍵字中被移出。如果這個關鍵字的類型是可接受的(isAcceptable()=true),那麼服務器端便通過調用accept方法來查找客戶端套接字通道,設置它為非阻隔,並將OP_READ選項把它登記進選擇器中。我們也可以使用OP_WRITE 或者是OP_READ|OP_WRITE選項,但為了簡單,我實現的服務器端僅僅能從通道中讀取,不能進入寫入操作。
客戶端套接字通道現在已經登記入選擇器並可進行讀取操作。從而,當客戶端在套接字通道上寫數據時,選擇器將通知服務器端應用程序這裡有一些數據讀。隨著可讀關鍵字的創建,從而isReadable()=true。在這個例子中,應用程序從套接字通道上讀取數據使用的是32個字節的ByteBuffer,字節譯碼使用的是ISO-8859-1編碼規則,同時讀取的數據也會顯示在服務器端的控制台上。
客戶端非阻隔(Client Nonblocking)
為了檢驗編制的服務器端能否以非阻隔的方法工作正常,我將實現一個客戶端以在套接字通道上連續的寫字符串“Client XXX”,這裡的“XXX”是命令行所傳遞的參數。例如,當客戶端運行的命令行的參數是89時,服務器端的控制台上就會顯示“Client 89 Client 89 Client 89 Client 89 ...”。如果其它的客戶端開始的參數是92時會發生些什麼呢?如果服務器端已阻隔,任何事情都不會發生;服務器端還是顯示連續的字符串“Client 89”。自從我們的服務器使用了非阻隔套接字,那麼控制台就會顯示下面這樣的字符串:"Client 89 Client 89 Client 92 Client 89 Client 92 Client 92 Client 89 Client 89 ...",這意味著在套接字通道上的讀/寫操作並不阻塞服務器應用程序的執行。
這裡有一段客戶端應用程序的代碼:
// Create client SocketChannel
SocketChannel client = SocketChannel.open();
// nonblocking I/O
client.configureBlocking(false);
// Connection to host port 8000
client.connect(new java.net.InetSocketAddress(host,8000));
// Create selector
Selector selector = Selector.open();
// Record to selector (OP_CONNECT type)
SelectionKey clientKey = client.register(selector, SelectionKey.OP_CONNECT);
// Waiting for the connection
while (selector.select(500)> 0) {
// Get keys
Set keys = selector.selectedKeys();
Iterator i = keys.iterator();
// For each key...
while (i.hasNext()) {
SelectionKey key = (SelectionKey)i.next();
// Remove the current key
i.remove();
// Get the socket channel held by the key
SocketChannel channel = (SocketChannel)key.channel();
// Attempt a connection
if (key.isConnectable()) {
// Connection OK
System.out.println("Server Found");
// Close pendent connections
if (channel.isConnectionPending())
channel.finishConnect();
// Write continuously on the buffer
ByteBuffer buffer = null;
for (;;) {
buffer =
ByteBuffer.wrap(
new String(" Client " + id + " ").getBytes());
channel.write(buffer);
buffer.clear();
}
}
}
}
也許,客戶端應用程序的結構讓你回憶起服務器端應用程序的結構。然而,這裡也有許多不同的地方。套接字通道使用OP_CONNECT選項連接到選擇器上,意思是當服務器接受連接時選擇器將不得不通知客戶端,這個循環不是無窮的。While循環的條件是:
while (selector.select(500)> 0)
意思是客戶端嘗試連接,最大時長是500毫秒;如果服務器端沒有應答,selete方法將返回0,因為在通道上的服務器沒有激活。在循環裡,服務器端檢測關鍵字是否可連接。在這個例子中,如果有一些不確定的連接,客戶端就關閉那些不確定的連接,然後寫入字符串“Client”後面接著從命令行參數中帶來的變量ID。
結論
新的Java1.4 I/O體系是實現快速,靈活和可升級的Java應用程序的重要的一大步。看完這篇文章,依靠非阻隔套接字技術你可以寫一個基於非阻隔套接字的應用程序而不用手工來處理多線程。