程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> 運用異步輸入輸出流編寫Socket進程通信

運用異步輸入輸出流編寫Socket進程通信

編輯:關於JAVA

同步?異步輸入輸出機制的引入

在Merlin之前,編寫Socket程序是比 較繁瑣的工作.因為輸入輸出都必須同步.這樣,對於多客戶端客戶/服務器模式, 不得不使用多線程.即為每個連接的客戶都分配一個線程來處理輸入輸出.由此而 帶來的問題是可想而知的.程序員不得不為了避免死鎖,線程安全等問題,進行大 量的編碼和測試.很多人都在抱怨為什麼不在Java中引入異步輸入輸出機制.比較 官方的解釋是,任何一種應用程序接口的引入,都必須兼容任何操作平台.因為 Java是跨平台的.而當時支持異步輸入輸出機制的操作平台顯然不可能是全部.自 Java 2 Platform以後,分離出J2SE,J2ME,J2EE三種不同類型的應用程序接口,以 適應不同的應用開發.Java標准的制訂者們意識到了這個問題,並且支持異步輸入 輸出機制的操作平台在當今操作平台中處於主流地位.於是,Jdk(J2SE) 的第五次 發布中引入了異步輸入輸出機制.

以前的Socket進程通信程序設計中,一 般客戶端和服務器端程序設計如下:

服務器端:

  //服務器 端監聽線程
  while (true) {
         .............
        Socket clientSocket;
         clientSocket = socket.accept(); //取得客戶請求Socket,如果沒 有//客戶請求連接,線程在此處阻塞
        //用取得的Socket 構造輸入輸出流
        PrintStream os = new PrintStream (new
        BufferedOutputStream (clientSocket.getOutputStream(),
        1024), false);
        BufferedReader is = new BufferedReader (new
        InputStreamReader (clientSocket.getInputStream()));
        //創建客戶會話 線程,進行輸入輸出控制,為同步機制
        new ClientSession();
        .......
         }

客戶端:

 ............
  clientSocket = new Socket(HOSTNAME, LISTENPORT);//連接服務器套接字
  //用取得的 Socket構造輸入輸出流
  PrintStream os = new PrintStream(new
        BufferedOutputStream(clientSocket.getOutputStream (),
        1024), false);
         BufferedReader is = new BufferedReader(new
         InputStreamReader(clientSocket.getInputStream()));
  //進行輸入 輸出控制
  .......

以上代碼段只是用同步機制編寫 Socket進程通信的一個框架,實際上要考慮的問題要復雜的多(有興趣的讀者可 以參考我的一篇文章《Internet 實時通信系統設計與實現》)。將這樣一個框 架列出來,只是為了與用異步機制實現的Socket進程通信進行比較。下面將介紹 使用異步機制的程序設計。

用異步輸入輸出流編寫Socket進程通信程序

在Merlin中加入了用於實現異步輸入輸出機制的應用程序接口包: java.nio(新的輸入輸出包,定義了很多基本類型緩沖(Buffer)), java.nio.channels(通道及選擇器等,用於異步輸入輸出),java.nio.charset (字符的編碼解碼)。通道(Channel)首先在選擇器(Selector)中注冊自己感興 趣的事件,當相應的事件發生時,選擇器便通過選擇鍵(SelectionKey)通知已注 冊的通道。然後通道將需要處理的信息,通過緩沖(Buffer)打包,編碼/解碼, 完成輸入輸出控制。

通道介紹:

這裡主要介紹 ServerSocketChannel和 SocketChannel.它們都是可選擇的(selectable)通道, 分別可以工作在同步和異步兩種方式下(注意,這裡的可選擇不是指可以選擇兩 種工作方式,而是指可以有選擇的注冊自己感興趣的事件)。可以用 channel.configureBlocking(Boolean )來設置其工作方式。與以前版本的API相 比較,ServerSocketChannel就相當於ServerSocket(ServerSocketChannel封裝 了ServerSocket),而SocketChannel就相當於Socket(SocketChannel封裝了 Socket)。當通道工作在同步方式時,編程方法與以前的基本相似,這裡主要介 紹異步工作方式。

所謂異步輸入輸出機制,是指在進行輸入輸出處理時 ,不必等到輸入輸出處理完畢才返回。所以異步的同義語是非阻塞(None Blocking)。在服務器端,ServerSocketChannel通過靜態函數open()返回一個 實例serverChl。然後該通道調用serverChl.socket().bind()綁定到服務器某端 口,並調用register(Selector sel, SelectionKey.OP_ACCEPT)注冊 OP_ACCEPT事件到一個選擇器中(ServerSocketChannel只可以注冊OP_ACCEPT事 件)。當有客戶請求連接時,選擇器就會通知該通道有客戶連接請求,就可以進 行相應的輸入輸出控制了;在客戶端,clientChl實例注冊自己感興趣的事件後 (可以是OP_CONNECT,OP_READ,OP_WRITE的組合),調用clientChl.connect (InetSocketAddress )連接服務器然後進行相應處理。注意,這裡的連接是異步 的,即會立即返回而繼續執行後面的代碼。

選擇器和選擇鍵介紹:

選擇器(Selector)的作用是:將通道感興趣的事件放入隊列中,而不 是馬上提交給應用程序,等已注冊的通道自己來請求處理這些事件。換句話說, 就是選擇器將會隨時報告已經准備好了的通道,而且是按照先進先出的順序。那 麼,選擇器是通過什麼來報告的呢?選擇鍵(SelectionKey)。選擇鍵的作用就是 表明哪個通道已經做好了准備,准備干什麼。你也許馬上會想到,那一定是已注 冊的通道感興趣的事件。不錯,例如對於服務器端serverChl來說,可以調用 key.isAcceptable()來通知serverChl有客戶端連接請求。相應的函數還有: SelectionKey.isReadable(),SelectionKey.isWritable()。一般的,在一個循 環中輪詢感興趣的事件(具體可參照下面的代碼)。如果選擇器中尚無通道已注 冊事件發生,調用Selector.select()將阻塞,直到有事件發生為止。另外,可 以調用selectNow()或者select(long timeout)。前者立即返回,沒有事件時返 回0值;後者等待timeout時間後返回。一個選擇器最多可以同時被63個通道一起 注冊使用。

應用實例:

下面是用異步輸入輸出機制實現的客戶/ 服務器實例程序清單(限於篇幅,只給出了服務器端 實現,讀者可以參照著實現客戶端代碼):

程序類圖

程序清單1

public class NBlockingServer {
  int port = 8000;
  int BUFFERSIZE = 1024;
  Selector selector = null;
  ServerSocketChannel serverChannel = null;
   HashMap clientChannelMap = null;//用來存放每一個客戶連接對應的套接字和 通道
  
  public NBlockingServer( int port ) {
     this.clientChannelMap = new HashMap();
    this.port = port;
  }
  
  public void initialize() throws IOException {
   //初始化,分別實例化一個選擇器,一個服務器端 可選擇通道
   this.selector = Selector.open();
    this.serverChannel = ServerSocketChannel.open();
    this.serverChannel.configureBlocking(false);
   InetAddress localhost = InetAddress.getLocalHost();
   InetSocketAddress isa = new InetSocketAddress(localhost, this.port );
    this.serverChannel.socket().bind(isa);//將該套接字綁定到服務器某一可用 端口
  }
  //結束時釋放資源
  public void finalize() throws IOException {
    this.serverChannel.close ();
    this.selector.close();
  }
  //將讀入字 節緩沖的信息解碼
  public String decode( ByteBuffer byteBuffer ) throws
CharacterCodingException {
    Charset charset = Charset.forName( "ISO-8859-1" );
     CharsetDecoder decoder = charset.newDecoder();
    CharBuffer charBuffer = decoder.decode( byteBuffer );
    String result = charBuffer.toString();
    return result;
  }
  //監聽端口,當通道准備好時進行相應操作
  public void portListening() throws IOException, InterruptedException {
    //服務器端通道注冊OP_ACCEPT事件
   SelectionKey acceptKey =this.serverChannel.register( this.selector,
                       SelectionKey.OP_ACCEPT );
    //當有已 注冊的事件發生時,select()返回值將大於0
    while (acceptKey.selector().select() > 0 ) {
       System.out.println("event happened");
      //取 得所有已經准備好的所有選擇鍵
      Set readyKeys = this.selector.selectedKeys();
      //使用迭代器對選擇鍵進 行輪詢
      Iterator i = readyKeys.iterator();
       while (i.hasNext()) {
        SelectionKey key = (SelectionKey)i.next();
        i.remove();//刪除當前將要 處理的選擇鍵
        if ( key.isAcceptable() ) {//如果是 有客戶端連接請求
          System.out.println ("more client connect in!");
           ServerSocketChannel nextReady =
             (ServerSocketChannel)key.channel();
          //獲取客 戶端套接字
          Socket s = nextReady.accept();
          //設置對應的通道為異步方式並注冊感興趣事件
          s.getChannel().configureBlocking( false );
           SelectionKey readWriteKey =
             s.getChannel().register( this.selector,
               SelectionKey.OP_READ|SelectionKey.OP_WRITE );
           //將注冊的事件與該套接字聯系起來
readWriteKey.attach( s );
//將當前建立連接的客戶端套接字及對應的通道存放在哈希 表//clientChannelMap中
           this.clientChannelMap.put( s, new
ClientChInstance( s.getChannel () ) );
          }
        else if ( key.isReadable() ) {//如果是通道讀准備好事件
           System.out.println("Readable");
          // 取得選擇鍵對應的通道和套接字
           SelectableChannel nextReady =
             (SelectableChannel) key.channel();
          Socket socket = (Socket) key.attachment();
          //處理該 事件,處理方法已封裝在類ClientChInstance中
           this.readFromChannel( socket.getChannel(),
           (ClientChInstance)
this.clientChannelMap.get( socket ) );
         }
        else if ( key.isWritable() ) {// 如果是通道寫准備好事件
          System.out.println ("writeable");
          //取得套接字後處理, 方法同上
          Socket socket = (Socket) key.attachment();
          SocketChannel channel = (SocketChannel)
socket.getChannel();
           this.writeToChannel( channel,"This is from server!");
        }
      }
    }
  }
   //對通道的寫操作
  public void writeToChannel( SocketChannel channel, String message )
throws IOException {
     ByteBuffer buf = ByteBuffer.wrap( message.getBytes() );
     int nbytes = channel.write( buf );
  }
   //對通道的讀操 作
  public void readFromChannel( SocketChannel channel, ClientChInstance clientInstance )
  throws IOException, InterruptedException {
    ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE );
    int nbytes = channel.read( byteBuffer );
    byteBuffer.flip();
     String result = this.decode( byteBuffer );
    //當客戶端 發出”@exit”退出命令時,關閉其通道
    if ( result.indexOf( "@exit" ) >= 0 ) {
       channel.close();
    }
    else {
         clientInstance.append( result.toString() );
         //讀入一行完畢,執行相應操作
        if ( result.indexOf( "n" ) >= 0 ){
         System.out.println("client input"+result);
         clientInstance.execute();
        }
    }
  }
  //該類封裝了怎樣對客戶端的通道進行操作,具體實現可以 通過重載execute()方法
  public class ClientChInstance {
     SocketChannel channel;
    StringBuffer buffer=new StringBuffer();
    public ClientChInstance( SocketChannel channel ) {
      this.channel = channel;
    }
    public void execute() throws IOException {
       String message = "This is response after reading from channel! ";
      writeToChannel( this.channel, message );
      buffer = new StringBuffer();
    }
     //當一行沒有結束時,將當前字竄置於緩沖尾
    public void append( String values ) {
      buffer.append( values );
    }
  }
  //主程序
  public static void main( String[] args ) {
    NBlockingServer nbServer = new NBlockingServer(8000);
    try {
       nbServer.initialize();
    } catch ( Exception e ) {
       e.printStackTrace();
      System.exit( -1 );
    }
    try {
       nbServer.portListening();
    }
    catch ( Exception e ) {
      e.printStackTrace();
    }
  }
}

小結:

從以上程序段可以看出,服 務器端沒有引入多余線程就完成了多客戶的客戶/服務器模式。該程序中使用了 回調模式(CALLBACK),細心的讀者應該早就看出來了。需要注意的是,請不要將 原來的輸入輸出包與新加入的輸入輸出包混用,因為出於一些原因的考慮,這兩 個包並不兼容。即使用通道時請使用緩沖完成輸入輸出控制。該程序在 Windows2000,J2SE1.4下,用telnet測試成功。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved