Java中的IO、NIO、AIO:
BIO:在Java1.4之前,我們建立網絡連接均使用BIO,屬于同步阻塞IO。默認情況下,當有一條請求接入就有一條線程專門接待。所以,在客戶端向服務端請求時,會詢問是否有空閑線程進行接待,如若沒有則一直等待或拒接。當并發量小時還可以接受,當請求量一多起來則會有許多線程生成,在Java中,多線程的上下文切換會消耗計算機有限的資源和性能,造成資源浪費。
NIO:NIO的出現是為了解決再BIO下的大并發量問題。其特點是能用一條線程管理所有連接。如下圖所示:
NIO是面向緩沖流的,即數據寫入寫出都是通過 Channel —— Buffer 這一途徑。(雙向流通)
AIO:與之前兩個IO模型不同的是,AIO屬于異步非阻塞模型。當進行讀寫操作時只須調用api的read方法和write方法,這兩種方法均是異步。對于讀方法來說,當有流可讀取時,操作系統會將可讀的流傳入read方法的緩沖區,并通知應用程序;對于寫操作而言,當操作系統將write方法傳遞的流寫入完畢時,操作系統主動通知應用程序。換言之就是當調用完api后,操作系統完成后會調用回調函數。
總結:一般IO分為同步阻塞模型(BIO),同步非阻塞模型(NIO),異步阻塞模型,異步非阻塞模型(AIO)
同步阻塞模型指的是當調用io操作時必須等到其io操作結束
同步非阻塞模型指當調用io操作時不必等待可以繼續干其他事,但必須不斷詢問io操作是否完成。
異步阻塞模型指應用調用io操作后,由操作系統完成io操作,但應用必須等待或去詢問操作系統是否完成。
異步非阻塞指應用調用io操作后,由操作系統完成io操作并調用回調函數,應用完成放手不管。
NIO的小Demo之服務端
首先,先看下服務端的大體代碼
public class ServerHandle implements Runnable{//帶參數構造函數public ServerHandle(int port){}//停止方法public void shop(){}//寫方法private void write(SocketChannel socketChannel, String response)throws IOException{}//當有連接進來時的處理方法private void handleInput(SelectionKey key) throws IOException{} //服務端運行主體方法@Overridepublic void run() {}
}
復制代碼
首先我們先看看該服務端的構造函數的實現:
public ServerHandle(int port){try {//創建選擇器selector = Selector.open();//打開監聽通道serverSocketChannel = ServerSocketChannel.open();//設置為非阻塞模式serverSocketChannel.configureBlocking(false);//傳入端口,并設定連接隊列最大為1024serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);//監聽客戶端請求serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//標記啟動標志started = true;System.out.println("服務器已啟動,端口號為:" + port);} catch (IOException e){e.printStackTrace();System.exit(1);}}
復制代碼
在這里創建了選擇器和監聽通道,并將該監聽通道注冊到選擇器上并選擇其感興趣的事件(accept)。后續其他接入的連接都將通過該 監聽通道 傳入。
然后就是寫方法的實現:
private void doWrite(SocketChannel channel, String response) throws IOException {byte[] bytes = response.getBytes();ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);wirteBuffer.put(bytes);//將寫模式改為讀模式wirteBuffer.flip();//寫入管道channel.write(wirteBuffer);}
復制代碼
其次是當由事件傳入時,即對連接進來的鏈接的處理方法
private void handleInput(SelectionKey key) throws IOException{//當該鍵可用時if (key.isValid()){if (key.isAcceptable()){//返回該密鑰創建的通道。ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();通過該通道獲取鏈接進來的通道SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);}if (key.isReadable()){//返回該密鑰創建的通道。SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int readBytes = socketChannel.read(byteBuffer);if (readBytes > 0){byteBuffer.flip();byte[] bytes = new byte[byteBuffer.remaining()];byteBuffer.get(bytes);String expression = new String(bytes, "UTF-8");System.out.println("服務器收到的信息:" + expression);//此處是為了區別打印在工作臺上的數據是由客戶端產生還是服務端產生doWrite(socketChannel, "+++++" + expression + "+++++");} else if(readBytes == 0){//無數據,忽略}else if (readBytes < 0){//資源關閉key.cancel();socketChannel.close();}}}}
復制代碼
這里要說明的是,只要ServerSocketChannel及SocketChannel向Selector注冊了特定的事件,Selector就會監控這些事件是否發生。 如在構造方法中有一通道serverSocketChannel注冊了accept事件。當其就緒時就可以通過調用selector的selectorKeys()方法,訪問”已選擇鍵集“中的就緒通道。
壓軸方法:
@Overridepublic void run() {//循環遍歷while (started) {try {//當沒有就緒事件時阻塞selector.select();//返回就緒通道的鍵Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> iterator = keys.iterator();SelectionKey key;while (iterator.hasNext()){key = iterator.next();//獲取后必須移除,否則會陷入死循環iterator.remove();try {//對就緒通道的處理方法,上述有描述handleInput(key);} catch (Exception e){if (key != null){key.cancel();if (key.channel() != null) {key.channel().close();}}}}}catch (Throwable throwable){throwable.printStackTrace();}}}
復制代碼
此方法為服務端的主體方法。大致流程如下:
- 打開ServerSocketChannel,監聽客戶端連接
- 綁定監聽端口,設置連接為非阻塞模式(阻塞模式下不能注冊到選擇器)
- 創建Reactor線程,創建選擇器并啟動線程
- 將ServerSocketChannel注冊到Reactor線程中的Selector上,監聽ACCEPT事件
- Selector輪詢準備就緒的key
- Selector監聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路
- 設置客戶端鏈路為非阻塞模式
- 將新接入的客戶端連接注冊到Reactor線程的Selector上,監聽讀操作,讀取客戶端發送的網絡消息 異步讀取客戶端消息到緩沖區
- 調用write將消息異步發送給客戶端
NIO的小Demo之客戶端
public class ClientHandle implements Runnable{//構造函數,構造時順便綁定public ClientHandle(String ip, int port){}//處理就緒通道private void handleInput(SelectionKey key) throws IOException{}//寫方法(與服務端的寫方法一致)private void doWrite(SocketChannel channel,String request) throws IOException{}//連接到服務端private void doConnect() throws IOException{}//發送信息public void sendMsg(String msg) throws Exception{}
}
復制代碼
首先先看構造函數的實現:
public ClientHandle(String ip,int port) {this.host = ip;this.port = port;try{//創建選擇器selector = Selector.open();//打開監聽通道socketChannel = SocketChannel.open();//如果為 true,則此通道將被置于阻塞模式;如果為 false,則此通道將被置于非阻塞模式socketChannel.configureBlocking(false);started = true;}catch(IOException e){e.printStackTrace();System.exit(1);}}
復制代碼
接下來看對就緒通道的處理辦法:
private void handleInput(SelectionKey key) throws IOException{if(key.isValid()){SocketChannel sc = (SocketChannel) key.channel();if(key.isConnectable()){//這里的作用將在后面的代碼(doConnect方法)說明if(sc.finishConnect()){System.out.println("已連接事件");}else{System.exit(1);}}//讀消息if(key.isReadable()){//創建ByteBuffer,并開辟一個1k的緩沖區ByteBuffer buffer = ByteBuffer.allocate(1024);//讀取請求碼流,返回讀取到的字節數int readBytes = sc.read(buffer);//讀取到字節,對字節進行編解碼if(readBytes>0){buffer.flip();//根據緩沖區可讀字節數創建字節數組byte[] bytes = new byte[buffer.remaining()];//將緩沖區可讀字節數組復制到新建的數組中buffer.get(bytes);String result = new String(bytes,"UTF-8");System.out.println("客戶端收到消息:" + result);}lse if(readBytes==0){//忽略}else if(readBytes<0){//鏈路已經關閉,釋放資源key.cancel();sc.close();}}}}
復制代碼
在run方法之前需先看下此方法的實現:
private void doConnect() throws IOException{if(socketChannel.connect(new InetSocketAddress(host,port))){System.out.println("connect");}else {socketChannel.register(selector, SelectionKey.OP_CONNECT);System.out.println("register");}}
復制代碼
當SocketChannel工作于非阻塞模式下時,調用connect()時會立即返回: 如果連接建立成功則返回的是true(比如連接localhost時,能立即建立起連接),否則返回false。
在非阻塞模式下,返回false后,必須要在隨后的某個地方調用finishConnect()方法完成連接。 當SocketChannel處于阻塞模式下時,調用connect()時會進入阻塞,直至連接建立成功或者發生IO錯誤時,才從阻塞狀態中退出。
所以該代碼在connect服務端后返回false(但還是有作用的),并在else語句將該通道注冊在選擇器上并選擇connect事件。
客戶端的run方法:
@Overridepublic void run() {try{doConnect();}catch(IOException e){e.printStackTrace();System.exit(1);}//循環遍歷selectorwhile(started){try{selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();SelectionKey key ;while(it.hasNext()){key = it.next();it.remove();try{handleInput(key);}catch(Exception e){if(key != null){key.cancel();if(key.channel() != null){key.channel().close();}}}}}catch(Exception e){e.printStackTrace();System.exit(1);}}//selector關閉后會自動釋放里面管理的資源if(selector != null){try{selector.close();}catch (Exception e) {e.printStackTrace();}}}
復制代碼
發送信息到服務端的方法:
public void sendMsg(String msg) throws Exception{//覆蓋其之前感興趣的事件(connect),將其更改為OP_READsocketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel, msg);}
復制代碼
完整代碼:
服務端:
/*** Created by innoyiya on 2018/8/20.*/
public class Service {private static int DEFAULT_POST = 12345;private static ServerHandle serverHandle;public static void start(){start(DEFAULT_POST);}public static synchronized void start(int post) {if (serverHandle != null){serverHandle.shop();}serverHandle = new ServerHandle(post);new Thread(serverHandle,"server").start();}
}
復制代碼
服務端主體:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;/*** Created by innoyiya on 2018/8/20.*/
public class ServerHandle implements Runnable{private Selector selector;private ServerSocketChannel serverSocketChannel;private volatile boolean started;public ServerHandle(int port){try {//創建選擇器selector = Selector.open();//打開監聽通道serverSocketChannel = ServerSocketChannel.open();//設置為非阻塞模式serverSocketChannel.configureBlocking(false);//判定端口,并設定連接隊列最大為1024serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);//監聽客戶端請求serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//標記啟動標志started = true;System.out.println("服務器已啟動,端口號為:" + port);} catch (IOException e){e.printStackTrace();System.exit(1);}}public void shop(){started = false;}private void doWrite(SocketChannel channel, String response) throws IOException {byte[] bytes = response.getBytes();ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);wirteBuffer.put(bytes);wirteBuffer.flip();channel.write(wirteBuffer);}private void handleInput(SelectionKey key) throws IOException{if (key.isValid()){if (key.isAcceptable()){ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);}if (key.isReadable()){SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int readBytes = socketChannel.read(byteBuffer);if (readBytes > 0){byteBuffer.flip();byte[] bytes = new byte[byteBuffer.remaining()];byteBuffer.get(bytes);String expression = new String(bytes, "UTF-8");System.out.println("服務器收到的信息:" + expression);doWrite(socketChannel, "+++++" + expression + "+++++");} else if (readBytes < 0){key.cancel();socketChannel.close();}}}}@Overridepublic void run() {//循環遍歷while (started) {try {selector.select();//System.out.println(selector.select());Set<SelectionKey> keys = selector.selectedKeys();//System.out.println(keys.size());Iterator<SelectionKey> iterator = keys.iterator();SelectionKey key;while (iterator.hasNext()){key = iterator.next();iterator.remove();try {handleInput(key);} catch (Exception e){if (key != null){key.cancel();if (key.channel() != null) {key.channel().close();}}}}}catch (Throwable throwable){throwable.printStackTrace();}}}
}
復制代碼
客戶端:
/*** Created by innoyiya on 2018/8/20.*/
public class Client {private static String DEFAULT_HOST = "localhost";private static int DEFAULT_PORT = 12345;private static ClientHandle clientHandle;private static final String EXIT = "exit";public static void start() {start(DEFAULT_HOST, DEFAULT_PORT);}public static synchronized void start(String ip, int port) {if (clientHandle != null){clientHandle.stop();}clientHandle = new ClientHandle(ip, port);new Thread(clientHandle, "Server").start();}//向服務器發送消息public static boolean sendMsg(String msg) throws Exception {if (msg.equals(EXIT)){return false;}clientHandle.sendMsg(msg);return true;}}
復制代碼
客戶端主體代碼:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;/*** Created by innoyiya on 2018/8/20.*/public class ClientHandle implements Runnable{private String host;private int port;private Selector selector;private SocketChannel socketChannel;private volatile boolean started;public ClientHandle(String ip,int port) {this.host = ip;this.port = port;try{//創建選擇器selector = Selector.open();//打開監聽通道socketChannel = SocketChannel.open();//如果為 true,則此通道將被置于阻塞模式;如果為 false,則此通道將被置于非阻塞模式socketChannel.configureBlocking(false);started = true;}catch(IOException e){e.printStackTrace();System.exit(1);}}public void stop(){started = false;}private void handleInput(SelectionKey key) throws IOException{if(key.isValid()){SocketChannel sc = (SocketChannel) key.channel();if(key.isConnectable()){if(sc.finishConnect()){System.out.println("已連接事件");}else{System.exit(1);}}//讀消息if(key.isReadable()){//創建ByteBuffer,并開辟一個1M的緩沖區ByteBuffer buffer = ByteBuffer.allocate(1024);//讀取請求碼流,返回讀取到的字節數int readBytes = sc.read(buffer);//讀取到字節,對字節進行編解碼if(readBytes>0){//將緩沖區當前的limit設置為position=0,用于后續對緩沖區的讀取操作buffer.flip();//根據緩沖區可讀字節數創建字節數組byte[] bytes = new byte[buffer.remaining()];//將緩沖區可讀字節數組復制到新建的數組中buffer.get(bytes);String result = new String(bytes,"UTF-8");System.out.println("客戶端收到消息:" + result);} else if(readBytes<0){key.cancel();sc.close();}}}}//異步發送消息private void doWrite(SocketChannel channel,String request) throws IOException{byte[] bytes = request.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);//flip操作writeBuffer.flip();//發送緩沖區的字節數組channel.write(writeBuffer);}private void doConnect() throws IOException{if(socketChannel.connect(new InetSocketAddress(host,port))){System.out.println("connect");}else {socketChannel.register(selector, SelectionKey.OP_CONNECT);System.out.println("register");}}public void sendMsg(String msg) throws Exception{//覆蓋其之前感興趣的事件,將其更改為OP_READsocketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel, msg);}@Overridepublic void run() {try{doConnect();}catch(IOException e){e.printStackTrace();System.exit(1);}//循環遍歷selectorwhile(started){try{selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();SelectionKey key ;while(it.hasNext()){key = it.next();it.remove();try{handleInput(key);}catch(Exception e){if(key != null){key.cancel();if(key.channel() != null){key.channel().close();}}}}}catch(Exception e){e.printStackTrace();System.exit(1);}}//selector關閉后會自動釋放里面管理的資源if(selector != null){try{selector.close();}catch (Exception e) {e.printStackTrace();}}}
}
復制代碼
測試類:
import java.util.Scanner;/*** Created by innoyiya on 2018/8/20.*/
public class Test {public static void main(String[] args) throws Exception {Service.start();Thread.sleep(1000);Client.start();while(Client.sendMsg(new Scanner(System.in).nextLine()));}
}
復制代碼
控制臺打印:
服務器已啟動,端口號為:12345
register
已連接事件
1234
服務器收到的信息:1234
客戶端收到消息:+++++1234+++++
5678
服務器收到的信息:5678
客戶端收到消息:+++++5678+++++
復制代碼
如有不妥之處,請告訴我。