同步?異步輸入輸出機制的引入
在Merlin之前,編寫Socket程序是比 較繁瑣的工作.因為輸入輸出都必須同步.這樣,對於多客戶端客戶/服務器模式, 不得不使用多線程.即為每個連接的客戶都分配一個線程來處理輸入輸出.由此而 帶來的問題是可想而知的.程序員不得不為了避免死鎖,線程安全等問題,進行大 量的編碼和測試.很多人都在抱怨為什麼不在Java中引入異步輸入輸出機制.比較 官方的解釋是,任何一種應用程序接口的引入,都必須兼容任何操作平台.因為 Java是跨平台的.而當時支持異步輸入輸出機制的操作平台顯然不可能是全部.自 Java 2 Platform以後,分離出J2SE,J2ME,J2EE三種不同類型的應用程序接口,以 適應不同的應用開發.Java標准的制訂者們意識到了這個問題,並且支持異步輸入 輸出機制的操作平台在當今操作平台中處於主流地位.於是,Jdk(J2SE) 的第五次 發布中引入了異步輸入輸出機制.
以前的Socket進程通信程序設計中,一 般客戶端和服務器端程序設計如下:
服務器端:
//服務器 端監聽線程
while (true) {
.............
Socket clientSocket;
clientSocket = socket.accept(); //取得客戶請求Socket,如果沒 有//客戶請求連接,線程在此處阻塞
//用取得的Socket 構造輸入輸出流
PrintStream os = new PrintStream (new
BufferedOutputStream (clientSocket.getOutputStream(),
1024), false);
BufferedReader is = new BufferedReader (new
InputStreamReader (clientSocket.getInputStream()));
//創建客戶會話 線程,進行輸入輸出控制,為同步機制
new ClientSession();
.......
}
客戶端:
............
clientSocket = new Socket(HOSTNAME, LISTENPORT);//連接服務器套接字
//用取得的 Socket構造輸入輸出流
PrintStream os = new PrintStream(new
BufferedOutputStream(clientSocket.getOutputStream (),
1024), false);
BufferedReader is = new BufferedReader(new
InputStreamReader(clientSocket.getInputStream()));
//進行輸入 輸出控制
.......
以上代碼段只是用同步機制編寫 Socket進程通信的一個框架,實際上要考慮的問題要復雜的多(有興趣的讀者可 以參考我的一篇文章《Internet 實時通信系統設計與實現》)。將這樣一個框 架列出來,只是為了與用異步機制實現的Socket進程通信進行比較。下面將介紹 使用異步機制的程序設計。
用異步輸入輸出流編寫Socket進程通信程序
在Merlin中加入了用於實現異步輸入輸出機制的應用程序接口包: java.nio(新的輸入輸出包,定義了很多基本類型緩沖(Buffer)), java.nio.channels(通道及選擇器等,用於異步輸入輸出),java.nio.charset (字符的編碼解碼)。通道(Channel)首先在選擇器(Selector)中注冊自己感興 趣的事件,當相應的事件發生時,選擇器便通過選擇鍵(SelectionKey)通知已注 冊的通道。然後通道將需要處理的信息,通過緩沖(Buffer)打包,編碼/解碼, 完成輸入輸出控制。
通道介紹:
這裡主要介紹 ServerSocketChannel和 SocketChannel.它們都是可選擇的(selectable)通道, 分別可以工作在同步和異步兩種方式下(注意,這裡的可選擇不是指可以選擇兩 種工作方式,而是指可以有選擇的注冊自己感興趣的事件)。可以用 channel.configureBlocking(Boolean )來設置其工作方式。與以前版本的API相 比較,ServerSocketChannel就相當於ServerSocket(ServerSocketChannel封裝 了ServerSocket),而SocketChannel就相當於Socket(SocketChannel封裝了 Socket)。當通道工作在同步方式時,編程方法與以前的基本相似,這裡主要介 紹異步工作方式。
所謂異步輸入輸出機制,是指在進行輸入輸出處理時 ,不必等到輸入輸出處理完畢才返回。所以異步的同義語是非阻塞(None Blocking)。在服務器端,ServerSocketChannel通過靜態函數open()返回一個 實例serverChl。然後該通道調用serverChl.socket().bind()綁定到服務器某端 口,並調用register(Selector sel, SelectionKey.OP_ACCEPT)注冊 OP_ACCEPT事件到一個選擇器中(ServerSocketChannel只可以注冊OP_ACCEPT事 件)。當有客戶請求連接時,選擇器就會通知該通道有客戶連接請求,就可以進 行相應的輸入輸出控制了;在客戶端,clientChl實例注冊自己感興趣的事件後 (可以是OP_CONNECT,OP_READ,OP_WRITE的組合),調用clientChl.connect (InetSocketAddress )連接服務器然後進行相應處理。注意,這裡的連接是異步 的,即會立即返回而繼續執行後面的代碼。
選擇器和選擇鍵介紹:
選擇器(Selector)的作用是:將通道感興趣的事件放入隊列中,而不 是馬上提交給應用程序,等已注冊的通道自己來請求處理這些事件。換句話說, 就是選擇器將會隨時報告已經准備好了的通道,而且是按照先進先出的順序。那 麼,選擇器是通過什麼來報告的呢?選擇鍵(SelectionKey)。選擇鍵的作用就是 表明哪個通道已經做好了准備,准備干什麼。你也許馬上會想到,那一定是已注 冊的通道感興趣的事件。不錯,例如對於服務器端serverChl來說,可以調用 key.isAcceptable()來通知serverChl有客戶端連接請求。相應的函數還有: SelectionKey.isReadable(),SelectionKey.isWritable()。一般的,在一個循 環中輪詢感興趣的事件(具體可參照下面的代碼)。如果選擇器中尚無通道已注 冊事件發生,調用Selector.select()將阻塞,直到有事件發生為止。另外,可 以調用selectNow()或者select(long timeout)。前者立即返回,沒有事件時返 回0值;後者等待timeout時間後返回。一個選擇器最多可以同時被63個通道一起 注冊使用。
應用實例:
下面是用異步輸入輸出機制實現的客戶/ 服務器實例程序清單(限於篇幅,只給出了服務器端 實現,讀者可以參照著實現客戶端代碼):
程序類圖
程序清單1
public class NBlockingServer {
int port = 8000;
int BUFFERSIZE = 1024;
Selector selector = null;
ServerSocketChannel serverChannel = null;
HashMap clientChannelMap = null;//用來存放每一個客戶連接對應的套接字和 通道
public NBlockingServer( int port ) {
this.clientChannelMap = new HashMap();
this.port = port;
}
public void initialize() throws IOException {
//初始化,分別實例化一個選擇器,一個服務器端 可選擇通道
this.selector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
this.serverChannel.configureBlocking(false);
InetAddress localhost = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(localhost, this.port );
this.serverChannel.socket().bind(isa);//將該套接字綁定到服務器某一可用 端口
}
//結束時釋放資源
public void finalize() throws IOException {
this.serverChannel.close ();
this.selector.close();
}
//將讀入字 節緩沖的信息解碼
public String decode( ByteBuffer byteBuffer ) throws
CharacterCodingException {
Charset charset = Charset.forName( "ISO-8859-1" );
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode( byteBuffer );
String result = charBuffer.toString();
return result;
}
//監聽端口,當通道准備好時進行相應操作
public void portListening() throws IOException, InterruptedException {
//服務器端通道注冊OP_ACCEPT事件
SelectionKey acceptKey =this.serverChannel.register( this.selector,
SelectionKey.OP_ACCEPT );
//當有已 注冊的事件發生時,select()返回值將大於0
while (acceptKey.selector().select() > 0 ) {
System.out.println("event happened");
//取 得所有已經准備好的所有選擇鍵
Set readyKeys = this.selector.selectedKeys();
//使用迭代器對選擇鍵進 行輪詢
Iterator i = readyKeys.iterator();
while (i.hasNext()) {
SelectionKey key = (SelectionKey)i.next();
i.remove();//刪除當前將要 處理的選擇鍵
if ( key.isAcceptable() ) {//如果是 有客戶端連接請求
System.out.println ("more client connect in!");
ServerSocketChannel nextReady =
(ServerSocketChannel)key.channel();
//獲取客 戶端套接字
Socket s = nextReady.accept();
//設置對應的通道為異步方式並注冊感興趣事件
s.getChannel().configureBlocking( false );
SelectionKey readWriteKey =
s.getChannel().register( this.selector,
SelectionKey.OP_READ|SelectionKey.OP_WRITE );
//將注冊的事件與該套接字聯系起來
readWriteKey.attach( s );
//將當前建立連接的客戶端套接字及對應的通道存放在哈希 表//clientChannelMap中
this.clientChannelMap.put( s, new
ClientChInstance( s.getChannel () ) );
}
else if ( key.isReadable() ) {//如果是通道讀准備好事件
System.out.println("Readable");
// 取得選擇鍵對應的通道和套接字
SelectableChannel nextReady =
(SelectableChannel) key.channel();
Socket socket = (Socket) key.attachment();
//處理該 事件,處理方法已封裝在類ClientChInstance中
this.readFromChannel( socket.getChannel(),
(ClientChInstance)
this.clientChannelMap.get( socket ) );
}
else if ( key.isWritable() ) {// 如果是通道寫准備好事件
System.out.println ("writeable");
//取得套接字後處理, 方法同上
Socket socket = (Socket) key.attachment();
SocketChannel channel = (SocketChannel)
socket.getChannel();
this.writeToChannel( channel,"This is from server!");
}
}
}
}
//對通道的寫操作
public void writeToChannel( SocketChannel channel, String message )
throws IOException {
ByteBuffer buf = ByteBuffer.wrap( message.getBytes() );
int nbytes = channel.write( buf );
}
//對通道的讀操 作
public void readFromChannel( SocketChannel channel, ClientChInstance clientInstance )
throws IOException, InterruptedException {
ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE );
int nbytes = channel.read( byteBuffer );
byteBuffer.flip();
String result = this.decode( byteBuffer );
//當客戶端 發出”@exit”退出命令時,關閉其通道
if ( result.indexOf( "@exit" ) >= 0 ) {
channel.close();
}
else {
clientInstance.append( result.toString() );
//讀入一行完畢,執行相應操作
if ( result.indexOf( "n" ) >= 0 ){
System.out.println("client input"+result);
clientInstance.execute();
}
}
}
//該類封裝了怎樣對客戶端的通道進行操作,具體實現可以 通過重載execute()方法
public class ClientChInstance {
SocketChannel channel;
StringBuffer buffer=new StringBuffer();
public ClientChInstance( SocketChannel channel ) {
this.channel = channel;
}
public void execute() throws IOException {
String message = "This is response after reading from channel! ";
writeToChannel( this.channel, message );
buffer = new StringBuffer();
}
//當一行沒有結束時,將當前字竄置於緩沖尾
public void append( String values ) {
buffer.append( values );
}
}
//主程序
public static void main( String[] args ) {
NBlockingServer nbServer = new NBlockingServer(8000);
try {
nbServer.initialize();
} catch ( Exception e ) {
e.printStackTrace();
System.exit( -1 );
}
try {
nbServer.portListening();
}
catch ( Exception e ) {
e.printStackTrace();
}
}
}
小結:
從以上程序段可以看出,服 務器端沒有引入多余線程就完成了多客戶的客戶/服務器模式。該程序中使用了 回調模式(CALLBACK),細心的讀者應該早就看出來了。需要注意的是,請不要將 原來的輸入輸出包與新加入的輸入輸出包混用,因為出於一些原因的考慮,這兩 個包並不兼容。即使用通道時請使用緩沖完成輸入輸出控制。該程序在 Windows2000,J2SE1.4下,用telnet測試成功。