小文件系統的請求異步化高并發性能優化
222_分布式圖片存儲系統中的高性能指的到底是什么?
重構系統架構,來實現一個高性能。然后就要做非常完善的一個測試,最后對這個系統做一個總結,說說后續我們還要做一些什么東西。另外,我還要給大家留一些作業,相當于是讓大家課后自己去做的,就不是完全拷貝我的代碼
高并發
前面已經通過Reactor模式實現了
高性能主要是兩塊
第一塊:客戶端現在是短連接,每次發送請求,都需要建立連接,然后斷開連接。站在客戶端的角度而言,發現每執行一次文件上傳和下載的操作,速度都很慢
第二塊:文件上傳,需要多副本上傳。一般來說,針對kafka,多副本的時候默認情況下只要寫成功一個副本,就返回了。另外其他的副本的寫都是異步慢慢來執行的,kafka采取的是副本pull數據的機制,只要在一個數據節點上寫成功數據,別的數據節點會主從從這個寫成功的數據節點上pull數據
Kafka,強調高性能,生產消息的行為都是盡快的可以完成
HDFS,不強調高性能,它主要針對的是幾個GB的大文件上傳到服務器上去,只要慢慢上傳就可以了,速度慢點無所謂,只要能上傳成功。所以,HDFS采用的是多個副本一定要依次上傳成功,才可以說是本次文件上傳成功了。所以,HDFS的上傳速度肯定是很慢的,因為它們根本不強調文件上傳過程的高性能。所以Kafka和HDFS的應用場景本身就不相同
高性能架構的重構
- 短連接 -> 長連接;
- 同步上傳多副本 -> 寫一個副本,其他副本在后臺慢慢的異步復制和拉取
這樣,文件上傳和文件下載,性能至少會提升好幾倍
223_回頭審視一下客戶端的短連接模式有哪些問題?
除了客戶端有NioClient以外,數據節點也有NioClient,因為他在進行數據節點擴縮容時,需要從其他的數據節點拷貝副本過來寫入本地,這個過程使用短連接也無所謂,因為這個過程都是后臺慢慢執行的,但是當然最好也是重構成長連接模式
224_初步實現用于進行網絡管理的NetworkManager組件
225_在NetworkManager中實現核心線程無限循環進行poll操作
NetworkManager
/*** 網絡連接管理器*/
public class NetworkManager {// 正在連接中public static final Integer CONNECTING = 1;// 已經建立連接public static final Integer CONNECTED = 2;// 多路復用Selectorprivate Selector selector;// 所有的連接private Map<String, SocketChannel> connections;// 每個數據節點的連接狀態private Map<String, Integer> connectState;// 等待建立連接的機器private ConcurrentLinkedQueue<Host> waitingConnectHosts;public NetworkManager() {try {this.selector = Selector.open();} catch (IOException e) {e.printStackTrace();}this.connections = new ConcurrentHashMap<String, SocketChannel>();this.connectState = new ConcurrentHashMap<String, Integer>();this.waitingConnectHosts = new ConcurrentLinkedQueue<Host>();new NetworkPollThread().start();}/*** 嘗試連接到數據節點的端口上去*/public void maybeConnect(String hostname, Integer nioPort) throws Exception {synchronized(this) {if(!connectState.containsKey(hostname)) {connectState.put(hostname, CONNECTING);waitingConnectHosts.offer(new Host(hostname, nioPort)); }while(connectState.get(hostname).equals(CONNECTING)) {wait(100);}}}/*** 嘗試把排隊中的機器發起連接的請求*/private void tryConnect() {try {Host host = null;SocketChannel channel = null;while((host = waitingConnectHosts.poll()) != null) {channel = SocketChannel.open(); channel.configureBlocking(false); channel.connect(new InetSocketAddress(host.hostname, host.nioPort)); channel.register(selector, SelectionKey.OP_CONNECT); }} catch (Exception e) {e.printStackTrace();}}// 網絡連接的核心線程class NetworkPollThread extends Thread {@Overridepublic void run() {while(true) {tryConnect();}}}// 代表了一臺機器class Host {String hostname;Integer nioPort;public Host(String hostname, Integer nioPort) {this.hostname = hostname;this.nioPort = nioPort;}}}
226_在無限循環的poll方法中完成網絡連接的建立
public class NetworkManager {// 正在連接中public static final Integer CONNECTING = 1;// 已經建立連接public static final Integer CONNECTED = 2;// 網絡poll操作的超時時間public static final Long POLL_TIMEOUT = 500L; // 多路復用Selectorprivate Selector selector;// 所有的連接private Map<String, SocketChannel> connections;// 每個數據節點的連接狀態private Map<String, Integer> connectState;// 等待建立連接的機器private ConcurrentLinkedQueue<Host> waitingConnectHosts;public NetworkManager() {try {this.selector = Selector.open();} catch (IOException e) {e.printStackTrace();}this.connections = new ConcurrentHashMap<String, SocketChannel>();this.connectState = new ConcurrentHashMap<String, Integer>();this.waitingConnectHosts = new ConcurrentLinkedQueue<Host>();new NetworkPollThread().start();}/*** 嘗試連接到數據節點的端口上去*/public void maybeConnect(String hostname, Integer nioPort) throws Exception {synchronized(this) {if(!connectState.containsKey(hostname)) {connectState.put(hostname, CONNECTING);waitingConnectHosts.offer(new Host(hostname, nioPort)); }while(connectState.get(hostname).equals(CONNECTING)) {wait(100);}}}// 網絡連接的核心線程class NetworkPollThread extends Thread {@Overridepublic void run() {while(true) {tryConnect();poll();}}/*** 嘗試把排隊中的機器發起連接的請求*/private void tryConnect() {try {Host host = null;SocketChannel channel = null;while((host = waitingConnectHosts.poll()) != null) {channel = SocketChannel.open(); channel.configureBlocking(false); channel.connect(new InetSocketAddress(host.hostname, host.nioPort)); channel.register(selector, SelectionKey.OP_CONNECT); }} catch (Exception e) {e.printStackTrace();}}/*** 嘗試完成網絡連接、請求發送、響應讀取*/private void poll() {SocketChannel channel = null;try {int selectedKeys = selector.select(500); if(selectedKeys <= 0) {return;}Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator(); while(keysIterator.hasNext()){ SelectionKey key = (SelectionKey) keysIterator.next(); keysIterator.remove(); // 如果是網絡連接操作if(key.isConnectable()){ channel = (SocketChannel) key.channel();if(channel.isConnectionPending()){ while(!channel.finishConnect()) {Thread.sleep(100); }} System.out.println("完成與服務端的連接的建立......"); InetSocketAddress remoteAddress = (InetSocketAddress)channel.getRemoteAddress();connectState.put(remoteAddress.getHostName(), CONNECTED);connections.put(remoteAddress.getHostName(), channel);}}} catch (Exception e) {e.printStackTrace();if(channel != null) {try {channel.close();} catch (IOException e1) {e1.printStackTrace();}}}}}// 代表了一臺機器class Host {String hostname;Integer nioPort;public Host(String hostname, Integer nioPort) {this.hostname = hostname;this.nioPort = nioPort;}}}
227_客戶端的核心業務方法對要發送的請求進行封裝
228_將封裝好的請求放入NetworkManager的請求隊列中
229_如何實現異步發送請求以及同步等待響應兩個接口
230_對每個數據節點獲取一個請求緩存起來等待發送
231_在核心的poll方法中將每個機器暫存等待的請求發送出去
232_在核心的poll方法中對機器返回的響應進行讀取
拿到響應
客戶端將請求發出以后,就每隔100ms輪詢一次,有沒有響應結果返回回來
233_客戶端建立連接的過程中異常了該如何返回響應?
234_客戶端發送請求過程中異常了該如何返回響應?
/*** 發送請求*/private void sendRequest(SelectionKey key, SocketChannel channel) {InetSocketAddress remoteAddress = null;try {remoteAddress = (InetSocketAddress) channel.getRemoteAddress();String hostname = remoteAddress.getHostName();// 獲取要發送到這臺機器的請求的數據NetworkRequest request = toSendRequests.get(hostname);ByteBuffer buffer = request.getBuffer();// 將請求發送到對方機器上去channel.write(buffer);while (buffer.hasRemaining()) {channel.write(buffer);}System.out.println("本次請求發送完畢......");key.interestOps(SelectionKey.OP_READ);} catch (Exception e) {e.printStackTrace();// 發送失敗,就取消關注OP_WRITE事件key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);if (remoteAddress != null) {String hostname = remoteAddress.getHostName();NetworkRequest request = toSendRequests.get(hostname);if (request.needResponse()) {NetworkResponse response = new NetworkResponse();response.setHostname(hostname);response.setRequestId(request.getId());// 請求發送失敗,則客戶端手動構造一個響應response.setError(true);finishedResponses.put(request.getId(), response);} else {toSendRequests.remove(hostname);}}}}
完整代碼
NioClient
*** 客戶端的一個NIOClient,負責跟數據節點進行網絡通信*/
public class NioClient {private NetworkManager networkManager;public NioClient() {this.networkManager = new NetworkManager();}/*** 發送一個文件過去*/public Boolean sendFile(String hostname, int nioPort, byte[] file, String filename, long fileLength) { // 先根據hostname來檢查一下,跟對方機器的連接是否建立好了// 沒有建立好,那么就直接在此建立連接; 建立好連接后,就把連接給緩存起來,以備下次使用try {// 如果此時還沒跟那個數據節點建立好連接if(!networkManager.maybeConnect(hostname, nioPort)) {return false;}NetworkRequest request = createSendFileRequest(hostname, nioPort, file, filename, fileLength);networkManager.sendRequest(request); NetworkResponse response = networkManager.waitResponse(request.getId());if(response.error()) {// 請求發送失敗,客戶端自己構造的response,并將response.error 設置為truereturn false;}ByteBuffer buffer = response.getBuffer();String responseStatus = new String(buffer.array(), 0, buffer.remaining());System.out.println("收到" + hostname + "的響應:" + responseStatus);return responseStatus.equals(NetworkResponse.RESPONSE_SUCCESS);} catch (Exception e) {e.printStackTrace(); }return false;}/*** 構建一個發送文件的網絡請求*/private NetworkRequest createSendFileRequest(String hostname, Integer nioPort, byte[] file, String filename, long fileLength) {NetworkRequest request = new NetworkRequest();ByteBuffer buffer = ByteBuffer.allocate(NetworkRequest.REQUEST_TYPE + NetworkRequest.FILENAME_LENGTH + filename.getBytes().length + NetworkRequest.FILE_LENGTH + (int)fileLength); buffer.putInt(NetworkRequest.REQUEST_SEND_FILE); buffer.putInt(filename.getBytes().length); buffer.put(filename.getBytes()); buffer.putLong(fileLength); buffer.put(file);buffer.rewind(); request.setId(UUID.randomUUID().toString()); request.setHostname(hostname); request.setNioPort(nioPort); request.setBuffer(buffer); request.setNeedResponse(true); return request;}}
NetworkManager
/*** 網絡連接管理器*/
public class NetworkManager {// 正在連接中public static final Integer CONNECTING = 1;// 已經建立連接public static final Integer CONNECTED = 2;// 斷開連接public static final Integer DISCONNECTED = 3;// 響應狀態:成功public static final Integer RESPONSE_SUCCESS = 1;// 響應狀態:失敗public static final Integer RESPONSE_FAILURE = 2;// 網絡poll操作的超時時間public static final Long POLL_TIMEOUT = 500L;// 多路復用Selectorprivate Selector selector;// 所有的連接private Map<String, SelectionKey> connections;// 每個數據節點的連接狀態private Map<String, Integer> connectState;// 等待建立連接的機器private final ConcurrentLinkedQueue<Host> waitingConnectHosts;// 排隊等待發送的網絡請求private Map<String, ConcurrentLinkedQueue<NetworkRequest>> waitingRequests;// 馬上準備要發送的網絡請求private Map<String, NetworkRequest> toSendRequests;// 已經完成請求的響應private Map<String, NetworkResponse> finishedResponses;public NetworkManager() {try {this.selector = Selector.open();} catch (IOException e) {e.printStackTrace();}this.connections = new ConcurrentHashMap<String, SelectionKey>();this.connectState = new ConcurrentHashMap<String, Integer>();this.waitingConnectHosts = new ConcurrentLinkedQueue<Host>();this.waitingRequests = new ConcurrentHashMap<String, ConcurrentLinkedQueue<NetworkRequest>>();this.toSendRequests = new ConcurrentHashMap<String, NetworkRequest>();this.finishedResponses = new ConcurrentHashMap<String, NetworkResponse>();new NetworkPollThread().start();}/*** 嘗試連接到數據節點的端口上去*/public Boolean maybeConnect(String hostname, Integer nioPort) {synchronized (this) {if (!connectState.containsKey(hostname) ||connectState.get(hostname).equals(DISCONNECTED)) {connectState.put(hostname, CONNECTING);waitingConnectHosts.offer(new Host(hostname, nioPort));}while (connectState.get(hostname).equals(CONNECTING)) {try {wait(100);} catch (InterruptedException e) {e.printStackTrace();}}if (connectState.get(hostname).equals(DISCONNECTED)) {return false;}return true;}}/*** 發送網絡請求** @param request*/public void sendRequest(NetworkRequest request) {ConcurrentLinkedQueue<NetworkRequest> requestQueue =waitingRequests.get(request.getHostname());requestQueue.offer(request);}/*** 等待指定請求的響應*/public NetworkResponse waitResponse(String requestId) throws Exception {NetworkResponse response = null;while ((response = finishedResponses.get(requestId)) == null) {Thread.sleep(100);}toSendRequests.remove(response.getHostname());finishedResponses.remove(requestId);return response;}// 網絡連接的核心線程class NetworkPollThread extends Thread {@Overridepublic void run() {while (true) {tryConnect();prepareRequests();poll();}}/*** 嘗試把排隊中的機器發起連接的請求*/private void tryConnect() {Host host = null;SocketChannel channel = null;while ((host = waitingConnectHosts.poll()) != null) {try {channel = SocketChannel.open();channel.configureBlocking(false);channel.connect(new InetSocketAddress(host.hostname, host.nioPort));channel.register(selector, SelectionKey.OP_CONNECT);} catch (Exception e) {e.printStackTrace();connectState.put(host.hostname, DISCONNECTED);}}}/*** 準備好要發送的請求*/private void prepareRequests() {for (String hostname : waitingRequests.keySet()) {// 看一下這臺機器當前是否還沒有請求馬上就要發送出去了ConcurrentLinkedQueue<NetworkRequest> requestQueue =waitingRequests.get(hostname);if (!requestQueue.isEmpty() && !toSendRequests.containsKey(hostname)) {// 對這臺機器獲取一個派對的請求出來NetworkRequest request = requestQueue.poll();// 將這個請求暫存起來,接下來 就可以等待發送出去toSendRequests.put(hostname, request);// 讓這臺機器對應的連接關注的事件為OP_WRITESelectionKey key = connections.get(hostname);key.interestOps(SelectionKey.OP_WRITE);}}}/*** 嘗試完成網絡連接、請求發送、響應讀取*/private void poll() {SocketChannel channel = null;try {int selectedKeys = selector.select(500);if (selectedKeys <= 0) {return;}Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();while (keysIterator.hasNext()) {SelectionKey key = (SelectionKey) keysIterator.next();keysIterator.remove();channel = (SocketChannel) key.channel();// 如果是網絡連接操作if (key.isConnectable()) {// 建立連接finishConnect(key, channel);} else if (key.isWritable()) {// 發送請求sendRequest(key, channel);} else if (key.isReadable()) {// 接收響應readResponse(key, channel);}}} catch (Exception e) {e.printStackTrace();if (channel != null) {try {channel.close();} catch (IOException e1) {e1.printStackTrace();}}}}/*** 完成跟機器的連接*/private void finishConnect(SelectionKey key, SocketChannel channel) {InetSocketAddress remoteAddress = null;try {remoteAddress = (InetSocketAddress) channel.getRemoteAddress();if (channel.isConnectionPending()) {while (!channel.finishConnect()) {Thread.sleep(100);}}System.out.println("完成與服務端的連接的建立......");waitingRequests.put(remoteAddress.getHostName(),new ConcurrentLinkedQueue<NetworkRequest>());connections.put(remoteAddress.getHostName(), key);// 將連接狀態置為:已連接connectState.put(remoteAddress.getHostName(), CONNECTED);} catch (Exception e) {e.printStackTrace();if (remoteAddress != null) {connectState.put(remoteAddress.getHostName(), DISCONNECTED);}}}/*** 發送請求*/private void sendRequest(SelectionKey key, SocketChannel channel) {InetSocketAddress remoteAddress = null;try {remoteAddress = (InetSocketAddress) channel.getRemoteAddress();String hostname = remoteAddress.getHostName();// 獲取要發送到這臺機器的請求的數據NetworkRequest request = toSendRequests.get(hostname);ByteBuffer buffer = request.getBuffer();// 將請求發送到對方機器上去channel.write(buffer);while (buffer.hasRemaining()) {channel.write(buffer);}System.out.println("本次請求發送完畢......");key.interestOps(SelectionKey.OP_READ);} catch (Exception e) {e.printStackTrace();// 發送失敗,就取消關注OP_WRITE事件key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);if (remoteAddress != null) {String hostname = remoteAddress.getHostName();NetworkRequest request = toSendRequests.get(hostname);if (request.needResponse()) {NetworkResponse response = new NetworkResponse();response.setHostname(hostname);response.setRequestId(request.getId());// 請求發送失敗,則客戶端手動構造一個響應response.setError(true);finishedResponses.put(request.getId(), response);} else {toSendRequests.remove(hostname);}}}}/*** 讀取響應信息*/private void readResponse(SelectionKey key, SocketChannel channel) throws Exception {InetSocketAddress remoteAddress = (InetSocketAddress) channel.getRemoteAddress();String hostname = remoteAddress.getHostName();NetworkRequest request = toSendRequests.get(hostname);NetworkResponse response = null;if (request.getRequestType().equals(NetworkRequest.REQUEST_SEND_FILE)) {response = readSendFileResponse(request.getId(), hostname, channel);}key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);// 如果發送請求時,明確表示需要返回值if (request.needResponse()) {finishedResponses.put(request.getId(), response);} else {toSendRequests.remove(hostname);}}/*** 讀取上傳文件的響應*/private NetworkResponse readSendFileResponse(String requestId,String hostname, SocketChannel channel) throws Exception {ByteBuffer buffer = ByteBuffer.allocate(1024);channel.read(buffer);buffer.flip();NetworkResponse response = new NetworkResponse();response.setRequestId(requestId);response.setHostname(hostname);response.setBuffer(buffer);response.setError(false);return response;}}// 代表了一臺機器class Host {String hostname;Integer nioPort;public Host(String hostname, Integer nioPort) {this.hostname = hostname;this.nioPort = nioPort;}}}