背景
近日要開發一個tcp客戶端程序去對接上游廠商的數據源,決定使用netty去處理,由于很久沒有開發過netty了,順便學習記錄下
netty搭建
考慮到我們需要多個client去對接server服務,所以我們定義一個公共的AbstractNettyClient父類,定義一些公共的方法,比如,連接,重試等。以達到代碼復用
我這里采用的是三層結構的設計,因為對接的上游數據廠商的不止一家,每家廠商會存在一定的定制化邏輯,所以在此進行封裝
- 公共的nettyClient父類,所有netty 子類繼承
- 具體上游廠商的父類,實現廠商對接的一些公共處理
- 真正實現的子類,有多少個需要對接的,就實現多少
需要哪些公共方法
對于最上層的netty,我們應該定義有哪些全局的公共方法,這里給出幾個通用示例
public void start(){// 啟動操作
}public void stop(){// 停止操作
}public void restart(){// 重啟操作
}
需要哪些Handler?
了解過netty基礎的都知道,netty中有inbound和outbound兩個出入站的鏈路處理器供我們處理接受與發送的消息,那么作為全局公共的父類,自然要提供給子類可擴展的handler自選,同時也要維護全局公共的handler,那么這里定義一個公共模板的handler獲取方法
public ChannelInitializer getChannelHandler(AbstractNettyConnector connector) {return new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ReadTimeoutHandler(15, TimeUnit.SECONDS));pipeline.addLast(new LoggingHandler());addPipeline(pipeline);}};}public void addPipeline(){// 子類覆蓋時間該方法
}
通過這樣的方式我們在定義公共的handler同時也能支持子類實現自定義的handler處理特定的事件。
下邊給出一些常見的通用handler
- ReconnectHandler:重連處理,我們在連接服務端,可能由于網絡或者其他問題,會導致連接斷開,這個時候我們就需要一個handler去處理重連的情況
- ReadTimeoutHandler:因為客戶端只做讀取,所以這邊還需要一個超時讀取的handler,用于檢測服務讀取通道的狀態,如果超時未讀取數據,那么我們可以做一些操作
- DecodeHandler:解碼器,在處理真正的消息之前我們需要先解碼數據
- MessageHandler:消息處理器,解析完解碼的數據后,我們真正對消息進行處理的地方
我們整個通用鏈路的handler,大概就如下
解碼器的選擇
由于TCP連接中存在粘包黏包的現象,發過來的消息不一定是個完整的包,所以我們在我們自己的解碼器之前還需要定一個解碼器處理粘包黏包的問題,對于我自己這邊使用的是,定長的解碼器
new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN, 1500, 0, 2, -2, 0, true)
對于該解碼器構造參數的解析
- 大端和小端模式的選擇,這個需要具體詢問上游的消息格式
- 定長長度
- lengthFieldOffset:表示這個包長度的字段,是從第幾個字節開始讀
- lengthFieldLength:長度字段所占用的字節數
- lengthAdjustment:用來修正長度字段,比如說你的長度讀出來的字節數,并不包含自己,那么在該值里 你就得填這個長度字段的長度
舉例:假設現在有個2個字節的長度字段,讀出來的值是152,這個值是數據包的長度不包含長度字段,那么lengthAdjustment就應該填-2,減去長度字段 得到真正的數據長度
連接監聽器
在啟動netty客戶端時,可能會產生連接失敗等情況,這個時候我們可以在啟動處增加一個連接監聽器,用來監控啟動情況,這個不同于ReconnectHandler,那個是用于處理連接建立和斷開時的重連器
ChannelFuture connect = bootstrap.connect(serverAddress, serverPort);
connect.addListener(new ConnectListener(this));public class ConnectListener implements ChannelFutureListener {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {Throwable cause = future.cause();if (cause instanceof ConnectTimeoutException) {// do something} else {log.error("連接異常", cause);}future.channel().close();}}