文章目錄
- 1. 概述
- 2. TCP 阻塞式IO 網絡編程實例
- 2.1 TCP網絡編程服務端
- 2.2 ByteBufferUtil
- 2.3 客戶端代碼
- 2.4 運行截圖
- 3. TCP 非阻塞式IO 網絡編程實例
- 3.1 服務端
- 3.2 客戶端
- 3.3 運行截圖
- 4. 多路復用
- 4.1 服務器端
- 4.2 客戶端
- 4.3 運行截圖
- 5. AIO
- 5.1 AIO 服務端
- 5.2 客戶端
- 5.3 運行截圖
- 6. Channel / Buffer
- 6.1 Channel
- 6.2 ByteBuffer
- 參考文獻
1. 概述
- 網絡編程, 就是編寫程序, 使兩臺聯網的電腦可以交換數據,
- 套接字是網絡數據傳輸用的軟件設備, 用來連接網絡的工具
- 在 linux中 socket被認為是文件中的一種, 在網絡數據傳輸過程中, 使用文件I/O的相關函數
- socket 幫助程序員封裝了網絡的底層細節,如:錯誤檢測、包大小、包分解、包重傳、網絡地址等,讓程序員將網絡連接看作可以讀/寫字節的流
- 套接字常用網絡協議: TCP、UDP
之前還有一篇文章: Linux C++ Socket 套接字、select、poll、epoll 實例
套接字進行網絡連接流程, 如下圖:
服務器端:
- 創建服務器套接字
socket()
- 綁定端口
bind()
- 監聽端口
listen()
- 接受客戶端請求
accept()
- 讀取客戶端請求的數據
read()
- 返回客戶端要響應的數據
write()
- …
- 關閉與客戶端的連接
close()
- 關閉服務器套接字
close()
客戶端:
- 創建客戶端套接字
socket()
- 連接服務端
connect()
- 請求服務端數據, 發送操作數和操作符到服務器
write()
- 從服務器讀取操作結果
read()
- …
- 關閉客戶端套接字
close()
流程圖如下, 具體代碼示例可以看下面的 2. TCP 阻塞式IO 網絡編程實例
2. TCP 阻塞式IO 網絡編程實例
accept 和 read 都是阻塞的, 當 accept 到新連接, 或者 read 到數據程序才往下走
為了提高服務端處理能力, 一個客戶端連接一個線程處理
不能一個線程處理多個客戶端, 某個客戶端會阻塞這個線程處理其他客戶端
2.1 TCP網絡編程服務端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;public class BlockServer {public static void main(String[] args) throws IOException {// 0. ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);// 1. 創建了服務器ServerSocketChannel ssc = ServerSocketChannel.open();// 2. 綁定監聽端口ssc.bind(new InetSocketAddress(8080));// 3. 連接集合List<SocketChannel> channels = new ArrayList<>();while (true) {// 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信System.out.println("等待客戶端連接...");SocketChannel sc = ssc.accept(); // 阻塞方法,線程停止運行System.out.println("接收到客戶端連接: " + sc);channels.add(sc);for (SocketChannel channel : channels) {// 5. 接收客戶端發送的數據System.out.println("開始讀取客戶端中的數據:" + channel);channel.read(buffer); // 阻塞方法,線程停止運行buffer.flip();String request = ByteBufferUtil.read(buffer);System.out.println(request);buffer.clear();System.out.println("已經讀取完客戶端中的數據:" + channel);}}}
}
2.2 ByteBufferUtil
public class ByteBufferUtil {public static String read(ByteBuffer byteBuffer) throws CharacterCodingException {CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBuffer);return charBuffer.toString();}public static ByteBuffer read(String string) throws CharacterCodingException {return StandardCharsets.UTF_8.encode(string);}public static void main(String[] args) throws CharacterCodingException {System.out.println(ByteBufferUtil.read(ByteBufferUtil.read("test")));}}
2.3 客戶端代碼
public class BlockClient {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();System.out.println("開始連接服務端...");sc.connect(new InetSocketAddress("localhost", 8080));String str = "test";System.out.println("連接服務端成功,寫入數據: " + str);sc.write(ByteBufferUtil.read(str));}
}
2.4 運行截圖
3. TCP 非阻塞式IO 網絡編程實例
不停的輪詢, 看看有沒有accept 到新連接, 沒有連接不阻塞等待, 繼續去看看已經建立的連接有沒有read到客戶端的新數據, read到新數據處理, read不到不處理
為了提高服務端處理能力, 可以一個客戶端連接一個線程處理, 線程不停的輪詢自己要處理的客戶端
也可以一個線程處理多個客戶端, 相較于上面的阻塞I/O模型, 非阻塞不至于某個客戶端阻塞這個線程處理其他客戶端
3.1 服務端
ssc.configureBlocking(false);
設置為非阻塞模式
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;public class NonBlockServer {public static void main(String[] args) throws IOException, InterruptedException {// 0. ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);// 1. 創建了服務器ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false); // 非阻塞模式// 2. 綁定監聽端口ssc.bind(new InetSocketAddress(8080));// 3. 連接集合List<SocketChannel> channels = new ArrayList<>();while (true) {// 4. accept 建立與客戶端連接, SocketChannel 用來與客戶端之間通信SocketChannel sc = ssc.accept(); // 非阻塞,線程還會繼續運行,如果沒有連接建立,但sc是nullif (sc != null) {System.out.println("接收到客戶端連接: " + sc);sc.configureBlocking(false); // 非阻塞模式channels.add(sc);}for (SocketChannel channel : channels) {System.out.println("開始讀取客戶端中的數據:" + channel);// 5. 接收客戶端發送的數據int read = channel.read(buffer);// 非阻塞,線程仍然會繼續運行,如果沒有讀到數據,read 返回 0if (read > 0) {buffer.flip();System.out.println((ByteBufferUtil.read(buffer)));buffer.clear();System.out.println("已經讀取完客戶端中的數據:" + channel);} else {TimeUnit.MILLISECONDS.sleep(100);}}}}
}
3.2 客戶端
客戶端同上
3.3 運行截圖
4. 多路復用
可以調用 select/poll/epoll , 阻塞在select/poll/epoll, select/poll/epoll 監聽多個客戶端連接事件或寫入的數據, 然后這些事件可再有多個線程分一分處理掉
4.1 服務器端
打開選擇器并將其與通道注冊,監聽接受連接操作:
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
監聽選擇器上的事件,返回已就緒的通道數量:
int count = selector.select();
獲取所有事件(連接、讀取):
Set<SelectionKey> keys = selector.selectedKeys();
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;public class SelectorServer {public static void main(String[] args) {try (ServerSocketChannel channel = ServerSocketChannel.open()) {// 綁定端口并打印通道信息channel.bind(new InetSocketAddress(6666));System.out.println(channel);// 打開選擇器并將其與通道注冊,監聽接受連接操作Selector selector = Selector.open();channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_ACCEPT);// 無限循環,等待選擇器上的事件while (true) {// 監聽選擇器上的事件,返回已就緒的通道數量int count = selector.select();System.out.println("select count: " + count);// 如果沒有就緒的通道,則繼續循環等待if (count <= 0) {continue;}// 獲取并迭代處理所有就緒的事件// 獲取所有事件Set<SelectionKey> keys = selector.selectedKeys();// 遍歷所有事件,逐一處理Iterator<SelectionKey> iter = keys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();// 處理接受連接事件// 判斷事件類型if (key.isAcceptable()) {ServerSocketChannel c = (ServerSocketChannel) key.channel();// 必須處理SocketChannel sc = c.accept();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_READ);System.out.println("連接已建立:" + sc);}// 處理讀取數據事件else if (key.isReadable()) {SocketChannel sc = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(128);int read = sc.read(buffer);if (read == -1) {// 如果讀取返回-1,表示連接已關閉key.cancel();sc.close();} else {// 否則,將緩沖區反轉并打印讀取的數據buffer.flip();System.out.println(new String(buffer.array(), StandardCharsets.UTF_8));}}// 事件處理完畢后,從迭代器中移除,避免重復處理// 處理完畢,必須將事件移除iter.remove();}}} catch (IOException e) {// 打印IO異常堆棧跟蹤e.printStackTrace();}}
}
4.2 客戶端
import netty.ByteBufferUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;public class SelectorClient {public static void main(String[] args) throws IOException {// 創建Socket通道并連接到服務器SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost", 6666));// 初始化輸入和輸出ByteBufferByteBuffer inputBuffer = ByteBuffer.allocate(512);ByteBuffer serverOutput = ByteBuffer.allocate(512);// 循環接收用戶輸入并發送給服務器while (true) {// 使用Scanner獲取用戶輸入Scanner in = new Scanner(System.in);String input = in.nextLine();System.out.println("user input: " + input);// 清空輸入緩沖區,放入用戶輸入,然后反轉準備寫入inputBuffer.clear();inputBuffer.put(input.getBytes(StandardCharsets.UTF_8));inputBuffer.flip();// 將輸入數據寫入Socket通道sc.write(inputBuffer);System.out.println("send to server " + input);// 循環讀取服務器響應while (true) {// 清空服務器響應緩沖區,準備讀取數據serverOutput.clear();// 從Socket通道讀取數據sc.read(serverOutput);// 如果沒有讀取到數據,繼續嘗試讀取if (!serverOutput.hasRemaining()) {continue;}// 反轉緩沖區,讀取數據并打印serverOutput.flip();System.out.println("server response " + ByteBufferUtil.read(serverOutput));// 讀取完成后退出內層循環break;}}}
}
4.3 運行截圖
5. AIO
異步I/O模型
告訴內核啟動某個操作, 并且把數據copy到用戶緩沖區再通知我們
5.1 AIO 服務端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;/*** AIO服務器類,用于演示異步IO的服務器端實現。* 使用AsynchronousServerSocketChannel處理客戶端連接和數據傳輸。*/
public class AIOServer {/*** 程序入口,初始化并啟動AIO服務器。* 綁定服務器端口并等待客戶端連接。** @param args 命令行參數* @throws IOException 如果綁定端口失敗*/public static void main(String[] args) throws IOException {AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();ssc.bind(new InetSocketAddress(6666));ssc.accept(null, new AcceptHandler(ssc));while (true) ;}/*** 關閉客戶端通道的方法。* 用于處理讀取或寫入操作失敗時關閉通道。** @param sc 客戶端通道*/private static void closeChannel(AsynchronousSocketChannel sc) {try {System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());sc.close();} catch (IOException e) {e.printStackTrace();}}/*** 讀取數據的完成處理器,實現讀取客戶端數據并響應的邏輯。*/private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {private final AsynchronousSocketChannel sc;public ReadHandler(AsynchronousSocketChannel sc) {this.sc = sc;}/*** 當讀取操作完成時被調用。* 解析讀取的數據并寫回響應到客戶端。** @param result 讀取操作的結果* @param attachment 讀取操作的附加上下文*/@Overridepublic void completed(Integer result, ByteBuffer attachment) {try {if (result == -1) {return;}System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());attachment.flip();String request = Charset.defaultCharset().decode(attachment).toString();System.out.println(request.toString());attachment.clear();attachment.put(("你好:" + request).getBytes());attachment.flip();sc.write(attachment);attachment.clear();// 讀取下一個讀時間sc.read(attachment, attachment, new ReadHandler(sc));} catch (IOException e) {e.printStackTrace();}}/*** 當讀取操作失敗時被調用。* 關閉客戶端通道并打印異常堆棧跟蹤。** @param exc 引發的異常* @param attachment 讀取操作的附加上下文*/@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {closeChannel(sc);exc.printStackTrace();}}/*** 接受連接的完成處理器,用于處理客戶端的連接請求。*/private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {private final AsynchronousServerSocketChannel ssc;public AcceptHandler(AsynchronousServerSocketChannel ssc) {this.ssc = ssc;}/*** 當接受操作完成時被調用。* 設置讀取緩沖區并開始讀取客戶端發送的數據。** @param sc 接受到的客戶端通道* @param attachment 接受操作的附加上下文*/@Overridepublic void completed(AsynchronousSocketChannel sc, Object attachment) {try {System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());} catch (IOException e) {e.printStackTrace();}ByteBuffer buffer = ByteBuffer.allocate(1024);// 讀事件由 ReadHandler 處理System.out.println("開始讀");sc.read(buffer, buffer, new ReadHandler(sc));System.out.println("讀完成");// 處理完第一個 accept 時,需要再次調用 accept 方法來處理下一個 accept 事件ssc.accept(null, this);}/*** 當接受操作失敗時被調用。* 打印異常堆棧跟蹤。** @param exc 引發的異常* @param attachment 接受操作的附加上下文*/@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}}
}
5.2 客戶端
同 4.2
5.3 運行截圖
6. Channel / Buffer
6.1 Channel
Channel: 傳輸數據的通道
其實和數據流挺像的,不過數據流是單向的而Channel 是雙向的,可以向channel中寫數據,也可以從channel中讀取數據
NIO 基礎組件之 Channel
6.2 ByteBuffer
ByteBuffer是Buffer子類,是字節緩沖區,特點如下所示。
大小不可變。一旦創建,無法改變其容量大小,無法擴容或者縮容;
讀寫靈活。內部通過指針移動來實現靈活讀寫;
支持堆上內存分配和直接內存分配
一文搞懂ByteBuffer使用與原理
參考文獻
- UNIX 網絡編程 卷1: 套接字聯網API
- TCP/IP網絡編程 尹圣雨 著 金國哲 譯
- Linux IO模式及 select、poll、epoll詳解
- 淺談select,poll和epoll的區別
- 黑馬 Netty 課程