1 package org.windwant.nio; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.net.ServerSocket; 6 import java.nio.ByteBuffer; 7 import java.nio.channels.SelectionKey; 8 import java.nio.channels.Selector; 9 import java.nio.channels.ServerSocketChannel; 10 import java.nio.channels.SocketChannel; 11 import java.util.Iterator; 12 import java.util.concurrent.ExecutorService; 13 import java.util.concurrent.Executors; 14 15 /** 16 * Created by windwant on 2016/10/27. 17 */ 18 public class SocketChannelOpt { 19 20 private static final String HOST = "localhost"; 21 private static final int PORT = 8888; 22 23 private static ExecutorService read = Executors.newFixedThreadPool(5); 24 private static ExecutorService write = Executors.newFixedThreadPool(5); 25 26 public static void main(String[] args){ 27 ServerSocketChannel serverSocketChannel = null; 28 ServerSocket serverSocket = null; 29 Selector selector = null; 30 try { 31 serverSocketChannel = ServerSocketChannel.open();//工廠方法創建ServerSocketChannel 32 serverSocket = serverSocketChannel.socket(); //獲取channel對應的ServerSocket 33 serverSocket.bind(new InetSocketAddress(HOST, PORT)); //綁定地址 34 serverSocketChannel.configureBlocking(false); //設置ServerSocketChannel非阻塞模式 35 selector = Selector.open();//工廠方法創建Selector 36 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//通道注冊選擇器,接受連接就緒狀態。 37 while (true){//循環檢查 38 if(selector.select() == 0){//阻塞檢查,當有就緒狀態發生,返回鍵集合 39 continue; 40 } 41 42 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //獲取就緒鍵遍歷對象。 43 while (it.hasNext()){ 44 SelectionKey selectionKey = it.next(); 45 //處理就緒狀態 46 if (selectionKey.isAcceptable()){ 47 ServerSocketChannel schannel = (ServerSocketChannel) selectionKey.channel();//只負責監聽,阻塞,管理,不發送、接收數據 48 SocketChannel socketChannel = schannel.accept();//就緒後的操作,剛到達的socket句柄 49 if(null == socketChannel){ 50 continue; 51 } 52 socketChannel.configureBlocking(false); 53 socketChannel.register(selector, SelectionKey.OP_READ); //告知選擇器關心的通道,准備好讀數據 54 }else if(selectionKey.isReadable()){ 55 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); 56 ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024); 57 58 StringBuilder result = new StringBuilder(); 59 while (socketChannel.read(byteBuffer) > 0){//確保讀完 60 byteBuffer.flip(); 61 result.append(new String(byteBuffer.array())); 62 byteBuffer.clear();//每次清空 對應上面flip() 63 } 64 65 System.out.println("server receive: " + result.toString()); 66 socketChannel.register(selector, SelectionKey.OP_WRITE); 67 68 }else if(selectionKey.isWritable()){ 69 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); 70 String sendStr = "server send data: " + Math.random(); 71 ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes()); 72 while (send.hasRemaining()){ 73 socketChannel.write(send); 74 } 75 socketChannel.register(selector, SelectionKey.OP_READ); 76 System.out.println(sendStr); 77 } 78 it.remove(); 79 } 80 } 81 82 } catch (IOException e) { 83 e.printStackTrace(); 84 } 85 } 86 }Selector多線程執行,同步需求。 一個線程監控通道的就緒狀態,一個線程池處理業務需求。 線程池也可以擴展為不同的業務處理線程池,如日志、業務、心跳。
1 package org.windwant.nio; 2 3 import java.io.IOException; 4 import java.net.InetSocketAddress; 5 import java.net.ServerSocket; 6 import java.nio.ByteBuffer; 7 import java.nio.channels.SelectionKey; 8 import java.nio.channels.Selector; 9 import java.nio.channels.ServerSocketChannel; 10 import java.nio.channels.SocketChannel; 11 import java.util.Iterator; 12 import java.util.concurrent.ExecutorService; 13 import java.util.concurrent.Executors; 14 15 /** 16 * 線程處理讀取,寫出 17 * Created by windwant on 2016/10/27. 18 */ 19 public class TSocketChannelOpt { 20 21 private static final String HOST = "localhost"; 22 private static final int PORT = 8888; 23 24 private static ExecutorService read = Executors.newFixedThreadPool(5); 25 private static ExecutorService write = Executors.newFixedThreadPool(5); 26 27 public static void main(String[] args){ 28 ServerSocketChannel serverSocketChannel = null; 29 ServerSocket serverSocket = null; 30 Selector selector = null; 31 try { 32 serverSocketChannel = ServerSocketChannel.open();//工廠方法創建ServerSocketChannel 33 serverSocket = serverSocketChannel.socket(); //獲取channel對應的ServerSocket 34 serverSocket.bind(new InetSocketAddress(HOST, PORT)); //綁定地址 35 serverSocketChannel.configureBlocking(false); //設置ServerSocketChannel非阻塞模式 36 selector = Selector.open();//工廠方法創建Selector 37 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//通道注冊選擇器,接受連接就緒狀態。 38 while (true){//循環檢查 39 if(selector.select() == 0){//阻塞檢查,當有就緒狀態發生,返回鍵集合 40 continue; 41 } 42 43 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //獲取就緒鍵遍歷對象。 44 while (it.hasNext()){ 45 SelectionKey selectionKey = it.next(); 46 it.remove(); 47 //處理就緒狀態 48 if (selectionKey.isAcceptable()){ 49 ServerSocketChannel schannel = (ServerSocketChannel) selectionKey.channel();//只負責監聽,阻塞,管理,不發送、接收數據 50 SocketChannel socketChannel = schannel.accept();//就緒後的操作,剛到達的socket句柄 51 if(null == socketChannel){ 52 continue; 53 } 54 socketChannel.configureBlocking(false); 55 socketChannel.register(selector, SelectionKey.OP_READ); //告知選擇器關心的通道,准備好讀數據 56 }else if(selectionKey.isReadable()){ 57 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); 58 read.execute(new MyReadRunnable(socketChannel)); 59 60 // SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); 61 // ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024); 62 // 63 // StringBuilder result = new StringBuilder(); 64 // while (socketChannel.read(byteBuffer) > 0){//確保讀完 65 // byteBuffer.flip(); 66 // result.append(new String(byteBuffer.array())); 67 // byteBuffer.clear();//每次清空 對應上面flip() 68 // } 69 // 70 // System.out.println("server receive: " + result.toString()); 71 socketChannel.register(selector, SelectionKey.OP_WRITE); 72 73 }else if(selectionKey.isWritable()){ 74 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); 75 write.execute(new MyWriteRunnable(socketChannel)); 76 // String sendStr = "server send data: " + Math.random(); 77 // ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes()); 78 // while (send.hasRemaining()){ 79 // socketChannel.write(send); 80 // } 81 // System.out.println(sendStr); 82 socketChannel.register(selector, SelectionKey.OP_READ); 83 } 84 } 85 } 86 87 } catch (IOException e) { 88 e.printStackTrace(); 89 } 90 } 91 92 static class MyReadRunnable implements Runnable { 93 94 private SocketChannel channel; 95 96 public MyReadRunnable(SocketChannel channel){ 97 this.channel = channel; 98 } 99 100 @Override 101 public synchronized void run() { 102 ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024); 103 104 StringBuilder result = new StringBuilder(); 105 try { 106 while (channel.read(byteBuffer) > 0){//確保讀完 107 byteBuffer.flip(); 108 result.append(new String(byteBuffer.array())); 109 byteBuffer.clear();//每次清空 對應上面flip() 110 } 111 System.out.println("server receive: " + result.toString()); 112 } catch (IOException e) { 113 e.printStackTrace(); 114 } 115 116 117 } 118 } 119 120 static class MyWriteRunnable implements Runnable { 121 122 private SocketChannel channel; 123 124 public MyWriteRunnable(SocketChannel channel){ 125 this.channel = channel; 126 } 127 128 @Override 129 public void run() { 130 String sendStr = "server send data: " + Math.random(); 131 ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes()); 132 try { 133 while (send.hasRemaining()) { 134 channel.write(send); 135 } 136 System.out.println(sendStr); 137 }catch (Exception e){ 138 e.printStackTrace(); 139 } 140 141 } 142 } 143 }