基于zbus的MySQL透明代理(100行)

項目地址 https://git.oschina.net/rushmore/zbus

我們上次講到zbus網絡通訊的核心API:

Dispatcher -- 負責-NIO網絡事件Selector引擎的管理,對Selector引擎負載均衡

IoAdaptor -- 網絡事件的處理,服務器與客戶端共用,負責讀寫,消息分包組包等

Session -- 代表網絡鏈接,可以讀寫消息

實際的應用,我們幾乎只需要做IoAdaptor的個性化實現就能完成高效的網絡通訊服務,今天我們將舉例說明如何個性化這個IoAdaptor。

我們今天要完成的目標是:實現MySQL服務器的透明代理。效果是,你訪問代理服務器跟訪問目標MySQL無差異。

我們在測試環境10.17.2.30:3306 這臺機器上提供了MySql,在我們本地機器上跑起來我們今天基于zbus.NET實現的一個代理程序,就能達到下面的效果。

image
image

完成大概不到100 行的代碼, Cool?Let’s roll!

首先,我們思考透明TCP代理到底在干啥,透明的TCP代理的業務邏輯其實非常簡單,可以描述為,將來自代理上游(發起請求到代理)的數據轉發到目標TCP服務器,把目標服務器回來的數據原路返回代理上游客戶端。 注意這個原路,如何做到原路返回成為關鍵點。這個示例其實跟MySQL沒有任何關系,原則上任何TCP層面的服務都應該適配。

基于zbus.NET怎么來將上面的邏輯在體現出來,也就是如何個性化IoAdaptor?直觀的講,我們要處理的幾個事件應該包括:1)從上游客戶端發起的鏈接請求--代理服務器的Accept事件,2)代理服務器連接目標服務器的Connect事件,3)上下游的數據事件onMessage。

zbus.NET的IoAdaptor提供的個性化事件如下

image

基本包括一個鏈接(客戶端或者服務端)的生命周期,與消息的編解碼。

我們的代理IoAdaptor就是逐一個性化處理。

第一步,編解碼: 透明代理對消息內容不做理解,所以不需要編解碼。

// 透傳不需要編解碼,簡單返回ByteBuffer數據public IoBuffer encode(Object msg) {if (msg instanceof IoBuffer) {IoBuffer buff = (IoBuffer) msg;return buff;} else {throw new RuntimeException("Message Not Support");}}// 透傳不需要編解碼,簡單返回ByteBuffer數據public Object decode(IoBuffer buff) {if (buff.remaining() > 0) {byte[] data = new byte[buff.remaining()];buff.readBytes(data);return IoBuffer.wrap(data);} else {return null;}}

第二步,代理服務接入:

@Overrideprotected void onSessionAccepted(Session sess) throws IOException {Session target = null;Dispatcher dispatcher = sess.getDispatcher();try {target = dispatcher.createClientSession(targetAddress, this);} catch (Exception e) {sess.asyncClose();return;}sess.chain = target;target.chain = sess;dispatcher.registerSession(SelectionKey.OP_CONNECT, target);}

這里的邏輯思路是,代理服務器每接受到一個請求--通過onSessionAccepted表達,我們將同時創建一個到目標服務器的鏈接,今天的例子是目標MySQL服務器,注意上面的處理中把創建目標服務器Session過程與真正鏈接到目標服務分開(Dispatcher也提供合并二者的工具方法),是為了能在沒有發生鏈接之前綁定上好上下游關系,通過Session的chain變量來表達,也就是當前Session的關聯Session,關聯好之后啟動感興趣Connect事件,邏輯處理完畢。

第三步,鏈接成功事件(第二步中需要鏈接到目標服務器)

@Overridepublic void onSessionConnected(Session sess) throws IOException {  Session chain = sess.chain;if(chain == null){ sess.asyncClose();return; }   if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ);chain.register(SelectionKey.OP_READ);}}

這里的一個核心是當上下游都處于鏈接正常態,上下游Session都啟動感興趣消息讀事件(寫事件是在讀取處理中自動觸發),為什么在這里做的原因是一定要等上下游都正常態后才啟動雙方消息處理,不然會出現字節丟失。

第四步,處理上下游數據事件

@Overrideprotected void onMessage(Object msg, Session sess) throws IOException {  Session chain = sess.chain;if(chain == null){sess.asyncClose(); return;} chain.write(msg); }

是不是非常簡單,類似pipeline,從一端的數據寫到另外一端。

原則上面4步結束,整個透明代理就完成了,但是為了處理鏈接異常清理,我們增加了Session清理處理,如下

@Overridepublic void onSessionToDestroy(Session sess) throws IOException {   try {sess.close();} catch (IOException e) { //ignore} if (sess.chain == null) return; try {    sess.chain.close();    sess.chain.chain = null;sess.chain = null;} catch (IOException e) { }}

工作就是解決上下游鏈接清理鏈接。

至此為止我們的IoAdaptor個性化就完成了,是不是非常簡單,現在我們要跑起來測試了,下面的代碼就是上一次講到重復的設置,沒有新意。

public static void main(String[] args) throws Exception {   Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306); server.start();}

騷年,包括渣渣import和少許注釋加起來折騰了不到100行,該跑一跑了,還是那句話,不是HelloWorld,你可以規模壓力測。看看你是否在本地代理出來了你的目標服務MySQL,gl,hf, gogogo.

完整代碼可運行代碼如下,也可直接到zbus示例代碼庫中找到

https://git.oschina.net/rushmore/zbus/blob/master/src/test/java/org/zbus/net/TcpProxyAdaptor.java?dir=0&filepath=src%2Ftest%2Fjava%2Forg%2Fzbus%2Fnet%2FTcpProxyAdaptor.java&oid=08abff381d93519485e1c0ee2c35f1d4f8d1814c&sha=a29272ed99a8f21ec19a14b403ebee53a385e9a4

package org.zbus.net;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import org.zbus.net.core.Dispatcher;
import org.zbus.net.core.IoAdaptor;
import org.zbus.net.core.IoBuffer;
import org.zbus.net.core.Session;  
public class TcpProxyAdaptor extends IoAdaptor {private String targetAddress;public TcpProxyAdaptor(String targetAddress) {this.targetAddress = targetAddress;}// 透傳不需要編解碼,簡單返回ByteBuffer數據public IoBuffer encode(Object msg) {if (msg instanceof IoBuffer) {IoBuffer buff = (IoBuffer) msg;return buff;} else {throw new RuntimeException("Message Not Support");}}// 透傳不需要編解碼,簡單返回ByteBuffer數據public Object decode(IoBuffer buff) {if (buff.remaining() > 0) {byte[] data = new byte[buff.remaining()];buff.readBytes(data);return IoBuffer.wrap(data);} else {return null;}}@Overrideprotected void onSessionAccepted(Session sess) throws IOException {Session target = null;Dispatcher dispatcher = sess.getDispatcher();try {target = dispatcher.createClientSession(targetAddress, this);} catch (Exception e) {sess.asyncClose();return;}sess.chain = target;target.chain = sess;dispatcher.registerSession(SelectionKey.OP_CONNECT, target);}@Overridepublic void onSessionConnected(Session sess) throws IOException {  Session chain = sess.chain;if(chain == null){ sess.asyncClose();return; }   if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ);chain.register(SelectionKey.OP_READ);}}@Overrideprotected void onMessage(Object msg, Session sess) throws IOException {  Session chain = sess.chain;if(chain == null){sess.asyncClose(); return;} chain.write(msg); }@Overridepublic void onSessionToDestroy(Session sess) throws IOException {   try {sess.close();} catch (IOException e) { //ignore} if (sess.chain == null) return; try {    sess.chain.close();    sess.chain.chain = null;sess.chain = null;} catch (IOException e) { }}@SuppressWarnings("resource")public static void main(String[] args) throws Exception {   Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306);server.setServerName("TcpProxyServer");server.start();}
}

文章轉載自 開源中國社區[https://www.oschina.net]

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/395514.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/395514.shtml
英文地址,請注明出處:http://en.pswp.cn/news/395514.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

linux添加jetdirect協議,Padavan 路由器固件 不能驅動 hp1005、hp1020之類打印機 foo2zjs ZjStream協議的linux打印機驅動程序...

單擊鏈接,或剪切并粘貼下面的整個命令行以下載驅動程序。現在解壓縮它:Unpack:$ tar zxf foo2zjs.tar.gz$ cd foo2zjs現在編譯并安裝它。 INSTALL文件包含更詳細的說明; 請現在閱讀。Compile:$ makeGet extra files from the web, such as .ICM profiles…

返回指定月份的周列表 包含 周序號、開始日期、結束日期(不包含周末)

/*** 返回當前年月的周列表 包含 周序號、開始日期、結束日期(不包含周末)* param year 年* param month 月* returns {Array} */function getYearMonthWeekList(year,month) {var weekList[];var time year "/" month "/01";//取當前月的第…

tez-site.xml_數字支付系統的未來-Google Tez和音頻快速響應

tez-site.xmlby Vaidic Joshi通過Vaidic Joshi 數字支付系統的未來-Google Tez和音頻快速響應 (The future of digital payment systems — Google Tez and Audio Quick Response) Google recently marked its entry into the Indian digital payments market by introducing …

Window上安裝kafka

kafka在windows上的安裝、運行 - 進階者ryan-su - CSDN博客https://blog.csdn.net/u010283894/article/details/77106159 在Windows環境中安裝并使用kafka - 心靈空谷幽蘭 - 博客園https://www.cnblogs.com/xinlingyoulan/p/6054361.html?utm_sourceitdadao&utm_mediumref…

數集合有多少個TOJ(2469)

題目鏈接:http://acm.tju.edu.cn/toj/showp2469.html 感覺這個題目有點問題,算了不管他了,反正A了。 這里要注意的是求這個集合有多少種,那么就是要剔除重復數后,再數一下有多少個。 難一點的算法我也不會,…

linux path環境變量起什么作用,shell基礎(5)PATH環境變量的作用和使用方法

釋放雙眼,帶上耳機,聽聽看~!關于PATH的作用PATH說簡單點就是一個字符串變量,當輸入命令的時候LINUX會去查找PATH里面記錄的路徑。比如在根目錄/下可以輸入命令ls,在/usr目錄下也可以輸入ls,但其實ls這個命令根本不在這個兩個目錄下…

天氣城市編碼對應地區編碼_如何在您的城市中建立強大的編碼社區-我是如何做到的...

天氣城市編碼對應地區編碼by Billy Le比利勒(Billy Le) 如何在您的城市中建立強大的編碼社區-我是如何做到的 (How you can build a strong coding community in your city — and how I did it) Communities are important. They are the bedrock that glues together shared…

python3 自動打包部署war包

2019獨角獸企業重金招聘Python工程師標準>>> 1 調用maven 命令打包 mvn -B -f D:/workspace/ksdcourse clean package 2 調用tomcat 部署war包 ; 需要添加 CATALINA_HOME的環境變量 代碼如下: #!/usr/bin/python3# -*- coding: utf-8 -*-impo…

python 虛擬環境創建

創建虛擬環境:  sudo apt-get install virtualenv 新建虛擬環境文件夾 venv virtualenv venv 進入虛擬環境 source venv/bin/activate 安裝套件列表模塊: 用來記錄項目中所使用到的各種模塊,便于項目部署時統一安裝所需模塊 pip freeze > requir…

powershell開源新聞及簡介

作者:PowerShll傳教士 問:微軟的PowerShell腳本語言已經開源了 ? 答:絕對真的!已經! 問:源碼在哪? 答:微軟.net源碼網站。 http://referencesource.microsoft.com/ 問&…

linux nginx重新編譯安裝,Linux系統Nginx編譯安裝教程

1、下載nginx1.2.4#注:下載地址:http://nginx.org/download/nginx-1.2.4.tar.gzwget -c http://nginx.org/download/nginx-1.2.4.tar.gz2、安裝#注:默認安裝到/usr/local/nginxtar -zxvf nginx-1.2.4.tar.gzcd nginx-1.2.4./configure如果出現…

htt://3g.hn_根據我對“詢問HN:誰在招聘?”的分析,開發人員技能發展趨勢

htt://3g.hnby Ryan Williams瑞安威廉姆斯(Ryan Williams) 根據我對“詢問HN:誰在招聘?”的分析,開發人員技能發展趨勢 (Trending Developer Skills, Based on my Analysis of “Ask HN: Who’s Hiring?”) For people learning to code an…

day1作業二:多級菜單操作

作業二:多級菜單 (1)三級菜單 (2)可以次選擇進入各子菜單 (3)所需新知識點:列表、字典 要求:輸入back返回上一層,輸入quit退出整個程序 思路: &am…

JDK源碼分析(5)之 HashMap 相關

HashMap作為我們最常用的數據類型,當然有必要了解一下他內部是實現細節。相比于 JDK7 在JDK8 中引入了紅黑樹以及hash計算等方面的優化,使得 JDK8 中的HashMap效率要高于以往的所有版本,本文會詳細介紹相關的優化,但是主要還是寫 …

linux usb init,復制Linux liveUSB導致init.d腳本出錯 - 不可能..?

請發表您的想法或想出的任何想法。我很想知道別人在想什么。整體問題當我安裝一個簡單的Java應用程序(我寫的)通過/etc/init.d/在啟動(在后臺)運行時,它適用于我明確安裝它的liveUSB。當我制作該棒的副本時,它永遠不會成功啟動。在引導liveUSB副本時&…

最小費用最大流模版

#include <iostream> #include <cstring> #include <cstdio> #include <queue> #include <algorithm>using namespace std;const int MAXN10100; const int MAXM40010; const int INF0x3f3f3f3f;struct Edge      //cost代表單位流量流過該…

fpga中的slack_是否想減少部署過程的恐怖程度? 在Slack中構建ChatOps。

fpga中的slackby Rick Mak麥瑞克(Rick Mak) 是否想減少部署過程的恐怖程度&#xff1f; 在Slack中構建ChatOps。 (Want to make the deployment process less scary? Build ChatOps in Slack.) In a company that makes mobile and web products, developers shouldn’t be t…

位運算-查找數組中唯一成對的數

基礎實例一&#xff1a;使用位運算判斷數的奇偶性 實例代碼&#xff1a; public class Test {public static void main(String[] args) {System.out.println(isOdd(49));System.out.println(isOdd(50));}// 與運算public static boolean isOdd(int i){return (i & 1) ! 0;…

Docker實踐:Cannot connect to the Docker daemon.

Docker實踐&#xff1a;Cannot connect to the Docker daemon.查看docker daemon是否在運行 [rootlocalhost openec]# ps aux | grep dockerroot 3030 0.0 0.0 112656 984 pts/0 S 16:20 0:00 grep --colorauto docker啟動docker[rootlocalhost openec]# ser…

linux虛擬終端時間短,使用Screen創建虛擬終端避免Linux遠程斷線

維護Linux的ssh工具在使用中&#xff0c;一旦遇到網絡中斷&#xff0c;則當前的shell就會自動關閉當前的工作進度就會丟失&#xff0c;這對于遠程升級等比較耗費時間的工作是非常不利的對于遠程調適代碼也是很不可靠不安全的為此&#xff0c;可以使用screen這個工具來解決這個問…