我想用這個話題小結下最近這一階段的各種測試和開發。其實文章的內容主要還是想總結一下
NIO Socket
,以及兩種不同操作系統實現
NIO
的方式,
selector
和
epoll
。
問題應該從服務器端開始說起。我們都寫過net包下的socket,用socket的accept方法來等待客戶端的請求,請求來了則處理,沒有則一直等待,然后反復循環。這樣的方式,類似于重用進程,要說線程也可以,始終就在這一條路上堵著。這樣沒有并發可言,我們想到了可以用多線程,用線程池的方式來解決這個問題。這樣一般的小問題能解決了,100個初始化的線程,解決幾千個連接應該沒什么問題。可是,如果我做的是傳輸的項目呢,這100個線程還是阻塞的,第101個線程連接的時候,如果前面的100個都還在傳輸,那這第101個人還是在空等,而且他連個回音都不能收到。而且這樣的方式實現起來并不怎么容易,雖然線程池有Exectors這樣的類幫你生成,可是遇到共享變量和協同等問題還是很頭疼。一個更好的做法是,模仿FTP一樣,將指令與傳輸分開進行,一個端口負責簡短的指令,盡可能的端連接,而其他端口處理業務。這樣至少服務器能返回消息了。
第三種解決方案,多數情況下,也是最快的一種被提出來了,選擇器selector。用一個線程來查詢一組socket,找出已經準備好讀寫的,然后按順序處理socket,當然這種實現方式的前提是IO必須使用通道和緩沖區而不是流。
用java來開發NIO socket的程序,最先要理解的還是各種概念。
通道channel,在NIO socket中使用到的通道有三個,SocketChannel、ServerSocketChannel、DatagramChannel。前兩種基于TCP,最后一種一種用來實現UDP的通信。ServerSocketChannel不做任何數據上的處理,只是提供通道,負責連接。SocketChannel職責和net包下的socket類似,只不過這里是以通道的形勢來對接。
緩沖區Buffer,這里最常用的還是ByteBuffer,在mina中使用的IoBuffer也是基于ByteBuffer實現的。它的好處是可以自己拼裝去想到的數據。當然利用通道后數據切換的速度也會更快了。
要實現非阻塞IO最重要的還是選擇器:
Selector
The Selector class manages information about a set of registered channels and their readiness states. Channels are registered with selectors, and a selector can be asked to update the readiness states of the channels currently registered with it. When doing so, the invoking thread can optionally indicate that it would prefer to be suspended until one of the registered channels is ready.
SelectableChannel
This abstract class provides the common methods needed to implement channel selectability. It's the superclass of all channel classes that support readiness selection. FileChannel objects are not selectable because they don't extend from SelectableChannel. All the socket channel classes are selectable, as well as the channels obtained from a Pipe object. SelectableChannel objects can be registered with Selector objects, along with an indication of which operations on that channel are of interest for that selector. A channel can be registered with multiple selectors, but only once per selector.
SelectionKey
A SelectionKey encapsulates the registration relationship between a specific channel and a specific selector. A SelectionKey object is returned from SelectableChannel.register( ) and serves as a token representing the registration. SelectionKey objects contain two bit sets (encoded as integers) indicating which channel operations the registrant has an interest in and which operations the channel is ready to perform.
Selector
管理被注冊的通道的集合的信息和其就緒狀態,同時也更新通道的就緒狀態。并且一個通道可以被注冊到多個選擇器上,而對于同一個選擇器則只能被注冊一次。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class SelectSockets {
private static final int PORT = 8082;
private ByteBuffer buffer = ByteBuffer.allocate(1024);
public static void main(String[] args) throws IOException {
SelectSockets ss = new SelectSockets();
ss.go();
}
public void go() throws IOException {
System.out.println("listening on port:" + PORT);
ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocket ss = ssc.socket();
Selector selector = Selector.open();
ss.bind(new InetSocketAddress(PORT));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int n = selector.select();
if (n == 0) {
continue;
}
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
SocketChannel client = server.accept();
register(selector, client, SelectionKey.OP_READ);
System.out.println("Accept client:" + client);
acceptClient(client);
}
if (key.isReadable()) {
readData(key);
}
iter.remove();
}
}
}
protected void register(Selector selector, SelectableChannel channel,
int ops) throws IOException {
if (channel == null) {
return;
}
channel.configureBlocking(false);
channel.register(selector, ops);
}
protected void readData(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
int count;
while ((count = socketChannel.read(buffer)) > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
buffer.clear();
if (count < 0) {
socketChannel.close();
}
}
}
private void acceptClient(SocketChannel channel) throws IOException {
buffer.clear();
buffer.put("you have already connected server!".getBytes());
buffer.flip();
channel.write(buffer);
}
}
上面的代碼就是一般的
NIO
服務器端實現的過程。
當然對于一個企業級的應用這樣的代碼肯定是太單薄了,僅僅就
selector
而言,
我們是不是能共通過用多線程的方式來增強他的處理能力?是不是只有一個線程在跑
selector
,讓這一個線程處理那么多的連接有點兒過意不去。答案并不是和你想的一樣
:
For the first scenario, in which you want to bring more threads into play to service channels,
resist the urge to use multiple selectors
. Performing readiness selection on large numbers of channels is not expensive;
most of the work is done by the underlying operating system
. Maintaining multiple selectors and randomly assigning channels to one of them is not a satisfactory solution to this problem. It simply makes smaller versions of the same scenario.
如果您想要將更多的線程來為通道提供服務,請抵抗住使用多個選擇器的欲望。在大量通道上執行就緒選擇并不會有很大的開銷,大多數工作是由底層操作系統完成的。管理多個選擇器并隨機地將通道分派給它們當中的一個并不是這個問題的合理的解決方案。這只會形成這個場景的一個更小的版本。
一個更好的方案就是,讓一個線程處理selector,讓其他線程去處理就緒通道的業務。
模式就和上圖上描述的一樣,但是我的疑問來了,這樣和線程池下的服務端連接好像看起來并沒有多少大的優勢。同樣還是要啟那么多的線程去處理這些業務。這也是我最近一直想從
mina
源碼中找到的答案,可是還是沒有發現我想要的,它也是通過
IoSession
用原型模型的方式來實現并發的。不過我估計
Mina
應該是有異步的實現,這樣也會對性能上有影響。具體還有待研究。
最后要說的就是客戶端了,最近其實也的客戶端比較多,版本一個接一個,寫了不下七八個,各種方式,各種測試,其實有幾點心得可以分享:
l? 如果是做大文件的傳輸,切分的性價比其實比連續傳的性價比高不了多少,雖然像迅雷這樣可以分好多塊傳輸,但那畢竟是
P2P
的結構,文件本來就松散的。考慮到切分再校驗再重組,這樣還不如切大塊,然后順序傳。
l? 盡量將指令和傳輸分開,指令可以加密,然后更具協議,返回端口和地址讓服務器端做到分布式的處理。
l? 還有就是客戶端是否要用非阻塞模式,客戶端如果不是做出
P2P
模式的,而且能用多線程解決問題的,就沒必要用非阻塞的模式,因為非阻塞模式的發送和接收的時機很難控制,特別是用原生的
NIO
寫的
socket
。
l? 在做兩端通信的時候,特別是不同語言寫的程序和不同操作系統下,要注意字節序(高有效和低有效)和進制的問題。
l
Mina
這樣的框架很好,如果再配上
protobuf
這樣的多平臺序列化工具,可以很好的實現自定義協議的通信。自己訂協議的好處就是安全,而且能做應答機制。
Epoll是Linux下多路復用IO接口select/poll的增強版本,它能顯著提高程序在大量并發連接中只有少量活躍的情況下的系統CPU利用率,因為它會復用文件描述符集合來傳遞結果而不用迫使開發者每次等待事件之前都必須重新準備要被偵聽的文件描述符集合,另一點原因就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就行了。epoll除了提供select/poll那種IO事件的電平觸發(Level Triggered)外,還提供了邊沿觸發(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態,減少epoll_wait/epoll_pwait的調用,提高應用程序效率。
在linux下,NIO可以采用epoll來實現非阻塞,注意,是在linux下:
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
--------------------------------------------------------------------
下周就要開始弄kafka了,其實網絡方面的測試還沒有找到最好的解決方案,雖然測了很多東西,但是真的是沒有時間去做更多更復雜的東西。接下來趕緊把mina的源碼看完,再看看netty就要把重心放在kafka上了。其實網絡編程還是很有必要的,好多分布式的通信方案都可以建立在一套比較完整的消息機制上。