frp 是一款高性能的反向代理應用,專注于內網穿透,支持多種協議和 P2P 通信功能,目前在 GitHub 上已有 80k 的 star。本文將深入探討其源碼,揭示其背后的實現原理。
1. 前言
frp 是一款高性能的反向代理應用,專注于內網穿透。它支持多種協議,包括 TCP、UDP、HTTP、HTTPS 等,并且具備 P2P 通信功能。使用 frp,您可以安全、便捷地將內網服務暴露到公網,通過擁有公網 IP 的節點進行中轉,具體場景就是:將客戶端部署到你的內網中,然后該客戶端與你內網服務網絡可達,當客戶端與在公網的服務端連接后,我們就可以通過訪問服務端的指定端口,去訪問到內網服務。
目前 GitHub 已經有 80k 的 star,這么猛的項目,我決定閱讀一番源碼偷師一波。
2. pkg/auth
這個包負責客戶端和服務端認證的代碼,這里面一共用到了 2 種驗證機制,一種是基于?token,就是預共享密鑰,客戶端和服務端實現配置一樣的字符串密鑰,第二種是?OAuth 2.0,依賴第三方授權服務器頒發的訪問令牌,然后客戶端帶著令牌去訪問服務端。
這里面有很多技巧值得學習:
2.1. 工廠函數
通過不同的配置生成對應的認證方式。
type Setter interface {SetLogin(*msg.Login) errorSetPing(*msg.Ping) errorSetNewWorkConn(*msg.NewWorkConn) error
}// 根據客戶端配置創建認證提供者
func NewAuthSetter(cfg v1.AuthClientConfig) (authProvider Setter) {switch cfg.Method {// token 認證模式case v1.AuthMethodToken:authProvider = NewTokenAuth(cfg.AdditionalScopes, cfg.Token)// openid 認證模式case v1.AuthMethodOIDC:authProvider = NewOidcAuthSetter(cfg.AdditionalScopes, cfg.OIDC)default:panic(fmt.Sprintf(「wrong method: 『%s』」, cfg.Method))}return authProvider
}
2.2. 常量時間的字符串比較
正常情況來說,token 模式下,兩邊比較一下字符串是不是相等就完了,但其實這個是有安全隱患的,第一個就是攻擊者可以進行重放攻擊,一直進行密碼爆破,第二個就是攻擊者可以進行定時攻擊,比如普通比較(如?==
)在發現第一個不匹配字節時會立即返回,攻擊者可通過測量響應時間差異推斷出匹配的字節位置,ConstantTimeCompare
?始終遍歷全部字節(即使已發現不匹配),使攻擊者無法通過時間差獲取敏感信息。
// token 和客戶端上線的時間戳組成 key
func GetAuthKey(token string, timestamp int64) (key string) {md5Ctx := md5.New()md5Ctx.Write([]byte(token))md5Ctx.Write([]byte(strconv.FormatInt(timestamp, 10)))data := md5Ctx.Sum(nil)return hex.EncodeToString(data)
}// 全量匹配字節
func ConstantTimeCompare(x, y []byte) int {if len(x) != len(y) {return 0}var v bytefor i := 0; i < len(x); i++ {v |= x[i] ^ y[i]}return ConstantTimeByteEq(v, 0)
}// ConstantTimeByteEq returns 1 if x == y and 0 otherwise.
func ConstantTimeByteEq(x, y uint8) int {return int((uint32(x^y) - 1) >> 31)
}
3. pkg/config
config 文件夾是?frp
?配置管理的核心模塊,涵蓋了配置的加載、解析、驗證、轉換和命令行支持等功能。它確保了?frp
?的靈活性和兼容性,同時為用戶提供了多種配置方式。
3.1. 使用環境變量進行模板渲染
serverAddr = 「{{ .Envs.FRP_SERVER_ADDR }}」
serverPort = 7000[[proxies]]
name = 「ssh」
type = 「tcp」
localIP = 「127.0.0.1」
localPort = 22
remotePort = {{ .Envs.FRP_SSH_REMOTE_PORT }}export FRP_SERVER_ADDR=「x.x.x.x」
export FRP_SSH_REMOTE_PORT=「6000」
./frpc -C ./frpc.toml
這個實現是采用了 template 模板庫,其中 Envs 前綴是由字段名?Envs
?決定的:
type Values struct {Envs map[string]string // 「{{ .Envs.FRP_SERVER_ADDR }}」 Envs 的由來
}func RenderWithTemplate(in []byte, values *Values) ([]byte, error) {tmpl, err := template.New(「frp」).Funcs(template.FuncMap{「parseNumberRange」: parseNumberRange,「parseNumberRangePair」: parseNumberRangePair,}).Parse(string(in))if err != nil {return nil, err}buffer := bytes.NewBufferString(「」)if err := tmpl.Execute(buffer, values); err != nil {return nil, err}return buffer.Bytes(), nil
}// 將端口范圍解析為 端口列表
func parseNumberRange(firstRangeStr string) ([]int64, error) {... ...
}
這里面有一些自定義的解析函數,比如說:
ports = 「{{ parseNumberRange .Envs.PORT_RANGE }}」export PORT_RANGE = 「1000-1005」// 這樣 ports 就會被 template 的 parseNumberRange 函數解析并渲染為
// ports = 1000, 1001, 1002, 1003, 1004, 1005
3.2. 配置拆分
通過?includes
?參數可以在主配置中包含其他配置文件,從而實現將代理配置拆分到多個文件中管理
# frpc.toml
serverAddr = 「x.x.x.x」
serverPort = 7000
includes = [「./confd/*.toml」]
上述配置在 frpc.toml 中通過 includes 額外包含了?./confd
?目錄下所有的 toml 文件的代理配置內容,效果等價于將這兩個文件合并成一個文件。
這個實現是采用了,循環讀取文件內容 + 模板渲染 + 配置合并+ toml 反序列化 的方法:
// 主文件配置,就是 frpc.toml
var content []byte
content, err = GetRenderedConfFromFile(filePath)
if err != nil {return
}
configBuffer := bytes.NewBuffer(nil)
configBuffer.Write(content)... ... var buf []byte
// 循環讀取 include 的文件
// getIncludeContents
// ->ReadFile
// ->RenderContent
// ->template.New(「frp」).Parse(string(in))
buf, err = getIncludeContents(cfg.IncludeConfigFiles)
if err != nil {err = fmt.Errorf(「getIncludeContents error: %v」, err)return
}
configBuffer.WriteString(「
」)
configBuffer.Write(buf)// 將所有配置合并,然后將 toml 序列化為 type ClientCommonConf struct
代理 Cfgs, visitorCfgs, err = LoadAllProxyConfsFromIni(cfg.User, configBuffer.Bytes(), cfg.Start)
if err != nil {return
}
return
3.3. 配置熱加載
frpc reload -C ./frpc.toml?
等待一段時間后,客戶端將根據新的配置文件創建、更新或刪除代理。
這里面也比較簡單,主要邏輯在于配置校驗,舊配置中與新配置里同名的且代理內容不一樣的 proxy 停止,新增的配置的 proxy 再啟動,也就是說老配置和新配置完全一樣的是不動的
func (pm *Manager) UpdateAll(proxyCfgs []v1.ProxyConfigurer) {xl := xlog.FromContextSafe(pm.ctx)proxyCfgsMap := lo.KeyBy(proxyCfgs, func(C v1.ProxyConfigurer) string {return C.GetBaseConfig().Name})pm.mu.Lock()defer pm.mu.Unlock()delPxyNames := make([]string, 0)for name, pxy := range pm.proxies {del := falsecfg, ok := proxyCfgsMap[name]if !ok || !reflect.DeepEqual(pxy.Cfg, cfg) {del = true}if del {delPxyNames = append(delPxyNames, name)delete(pm.proxies, name)pxy.Stop()}}if len(delPxyNames) > 0 {xl.Infof(「proxy removed: %s」, delPxyNames)}addPxyNames := make([]string, 0)for _, cfg := range proxyCfgs {name := cfg.GetBaseConfig().Nameif _, ok := pm.proxies[name]; !ok {pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter, pm.vnetController)if pm.inWorkConnCallback != nil {pxy.SetInWorkConnCallback(pm.inWorkConnCallback)}pm.proxies[name] = pxyaddPxyNames = append(addPxyNames, name)pxy.Start()}}if len(addPxyNames) > 0 {xl.Infof(「proxy added: %s」, addPxyNames)}
4. 監控
frps 服務端支持兩種監控系統:指標存在內存中,和指標輸出到?Prometheus。主要監控以下指標:
type serverMetrics struct {// 記錄當前連接到服務端的客戶端數量。clientCount Prometheus.Gauge// 記錄當前代理的數量,按代理類型(如 TCP、HTTP)分類。proxyCount *Prometheus.GaugeVec// 記錄當前連接的數量,按代理類型(如 TCP、HTTP)分類。connectionCount *Prometheus.GaugeVec// 記錄流入的總流量,按代理類型(如 TCP、HTTP)分類。trafficIn *Prometheus.CounterVec// 記錄流出的總流量,按代理類型(如 TCP、HTTP)分類。trafficOut *Prometheus.CounterVec
}
內存監控沒啥,但統計的增刪改,這里用到了原子操作的技巧:
func (C *StandardCounter) Count() int32 {return atomic.LoadInt32(&C.count)
}func (C *StandardCounter) Inc(count int32) {atomic.AddInt32(&C.count, count)
}func (C *StandardCounter) Dec(count int32) {atomic.AddInt32(&C.count, -count)
}
對于不同類型的 proxy 的統計,frp 沒有使用 syn map,而是用一把讀寫鎖保平安:
m.mu.Lock()defer m.mu.Unlock()counter, ok := m.info.ProxyTypeCounts[proxyType]if !ok {counter = metric.NewCounter()}
counter.Inc(1)
對于如何進行 Prometheus 監控,frp 的使用流程可以借鑒,整體來說分為以下幾個步驟:
- 編碼前,先定義指標,類似于:
Namespace: 「frp」,
Subsystem: 「server」,
Name: 「traffic_out」,
Help: 「The total out traffic」,
- frp 注冊 Prometheus 指標
trafficOut: Prometheus.NewCounterVec(Prometheus.CounterOpts{Namespace: namespace,Subsystem: serverSubsystem,Name: 「traffic_out」,Help: 「The total out traffic」,
}, []string{「name」, 「type」}),
}Prometheus.MustRegister(m.clientCount)
Prometheus.MustRegister(m.proxyCount)
Prometheus.MustRegister(m.connectionCount)
Prometheus.MustRegister(m.trafficIn)
Prometheus.MustRegister(m.trafficOut)
- frp 暴露 HTTP 服務,一般是/metric,promhttp 提供一個 HTTP 處理器,用于暴露所有注冊的 Prometheus 指標。
if svr.cfg.EnablePrometheus {subRouter.Handle(「/metrics」, promhttp.Handler())
}
- 配置 Prometheus 定時抓取這個 HTTP 路徑,舒服了
全球:scrape_interval: 15s # 每 15 秒抓取一次數據scrape_configs:- job_name: 「frp_server」static_configs:- targets: [「localhost:8080」] # 替換為 frp 服務端暴露的 /metrics 端點
5. 通信安全
當 frpc 和 frps 之間啟用了 TLS 之后,流量會被全局加密,不再需要配置單個代理上的加密,新版本中已經默認啟用。每一個代理都可以選擇是否啟用加密和壓縮的功能。
在每一個代理的配置中使用如下參數指定:
[[proxies]]
name = 「ssh」
type = 「tcp」
localPort = 22
remotePort = 6000
transport.useEncryption = true
transport.useCompression = true
5.1. 加密
通過設置?transport.useEncryption = true
,將 frpc 與 frps 之間的通信內容加密傳輸,將會有效防止傳輸內容被截取。
這個加密它使用了裝飾器模式,傳入普通的 IO,WithEncryption 后就會得到一個可以加密的 IO
remote, err = libio.WithEncryption(remote, encKey)
if err != nil {workConn.xl.Errorf(「create encryption stream error: %v」, err)return
}
我們接下來看如何加密的:
總體加密算法采用 aes-128-cfb,aes 是一個對稱加密,主要靠 key 和 iv 兩個值
// pbkdf2 會生成一個用于 aes 加密的 key
// 入參 key 為:配置的 token
// DefaultSalt 為字符串默認值
key = pbkdf2.Key(key, []byte(DefaultSalt), 64, aes.BlockSize, sha1.New)// iv 是用 rand 函數生成的安全加密的隨機數
if _, err := io.ReadFull(rand.Reader, iv); err != nil {return nil, err
}// Reader is a global, shared instance of a cryptographically
// secure random number generator. It is safe for concurrent use.
//
// - On Linux, FreeBSD, Dragonfly, and Solaris, Reader uses getrandom(2).
// - On legacy Linux (< 3.17), Reader opens /dev/urandom on first use.
// - On macOS, iOS, and OpenBSD Reader, uses arc4random_buf(3).
// - On NetBSD, Reader uses the kern.arandom sysctl.
// - On Windows, Reader uses the ProcessPrng API.
// - On js/wasm, Reader uses the Web Crypto API.
// - On wasi/wasm, Reader uses random_get.
//
// In FIPS 140-3 mode, the output passes through an SP 800-90A Rev. 1
// Deterministic Random Bit Generator (DRBG).
var Reader io.Reader
這樣后續的 IO 操作都會自帶加密了。
5.2. 壓縮
壓縮也是同理,搞一個壓縮的 IO 裝飾器就好了。
如果傳輸的報文長度較長,通過設置?transport.useCompression = true
?對傳輸內容進行壓縮,可以有效減小 frpc 與 frps 之間的網絡流量,加快流量轉發速度,但是會額外消耗一些 CPU 資源。
壓縮算法采用?snappy 庫。
sr := snappy.NewReader(rwc)
sw := snappy.NewWriter(rwc)
return WrapReadWriteCloser(sr, sw, func() error {_ = sw.Close()return rwc.Close()})
}
5.3. 自定義 TLS
這個其實就是使用自簽發的 CA,去生成密鑰和證書,然后客戶端和服務端加載起來后,可以進行雙向或者單向驗證,進行 HTTPS 握手,后續流量也是 HTTPS 加密的。
客戶端單向校驗服務端:
# frpc.toml
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」# frps.toml
transport.tls.certFile = 「/to/cert/path/server.crt」
transport.tls.keyFile = 「/to/key/path/server.key」
服務端單向校驗客戶端:
# frpc.toml
transport.tls.certFile = 「/to/cert/path/client.crt」
transport.tls.keyFile = 「/to/key/path/client.key」# frps.toml
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」
雙向驗證
# frpc.toml
transport.tls.certFile = 「/to/cert/path/client.crt」
transport.tls.keyFile = 「/to/key/path/client.key」
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」# frps.toml
transport.tls.certFile = 「/to/cert/path/server.crt」
transport.tls.keyFile = 「/to/key/path/server.key」
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」
介紹這個之前,我們先回顧以下 TLS 握手的過程,hhh:
okk,那我們看 frp 是如何實現 tls 的:
// 獲取 TLS 配置,作為 dial 選項
// tlsConfig, err = transport.NewClientTLSConfig
// tlsConfig, err = transport.NewServerTLSConfig
dialOptions = append(dialOptions, libnet.WithTLSConfig(tlsConfig))...// dail tcp 本身就是 tls 的了
conn, err := libnet.DialContext(C.ctx,net.JoinHostPort(C.cfg.ServerAddr, strconv.Itoa(C.cfg.ServerPort)),dialOptions...,
)// 加載服務端的 ca,證書+key
// 核心是 tls 庫 tls.LoadX509KeyPair(certfile, keyfile),去管理證書和 key
func NewServerTLSConfig(certPath, keyPath, caPath string) (*tls.Config, error) {base := &tls.Config{}if certPath == ?? || keyPath == ?? {// server will generate tls conf by itselfcert := newRandomTLSKeyPair()base.Certificates = []tls.Certificate{*cert}} else {// 調的是這個 tlsCert, err := tls.LoadX509KeyPair(certfile, keyfile)cert, err := newCustomTLSKeyPair(certPath, keyPath)if err != nil {return nil, err}base.Certificates = []tls.Certificate{*cert}}if caPath != '' {// ca 證書pool, err := newCertPool(caPath)if err != nil {return nil, err}// 校驗客戶端base.ClientAuth = tls.RequireAndVerifyClientCertbase.ClientCAs = pool}return base, nil
}// 加載客戶端的 ca,證書+key
func NewClientTLSConfig(certPath, keyPath, caPath, serverName string) (*tls.Config, error) {base := &tls.Config{}if certPath != '' && keyPath != '' {cert, err := newCustomTLSKeyPair(certPath, keyPath)if err != nil {return nil, err}base.Certificates = []tls.Certificate{*cert}}base.ServerName = serverNameif caPath != '' {pool, err := newCertPool(caPath)if err != nil {return nil, err}base.RootCAs = pool// 校驗服務端base.InsecureSkipVerify = false} else {base.InsecureSkipVerify = true}return base, nil
}// Only support one ca file to add
func newCertPool(caPath string) (*x509.CertPool, error) {pool := x509.NewCertPool()cacrt, err := os.ReadFile(caPath)if err != nil {return nil, err}pool.AppendCertsFromPEM(caCrt)return pool, nil
}
6. 代理配置
6.1. proxy
代理是 frp 的核心,這里詳細介紹一下它的流程。
frpc 和 frps 的整體流程,里面可以抽象為 3 種連接,整體我畫了一張圖:
- 用戶連接 (User Connection):
- 這是外部用戶連接到 FRP 服務端(frps)特定端口的連接,也就是說想要訪問內網服務的,例如,當運維訪問?
frps.example.com:8080
?時建立的連接就是用戶連接,它實際訪問的是客戶側某個管理平臺 - 在 frps 端,這個連接由?
handleUserTCPConnection?
函數處理。
- 工作連接 (Work Connection):
- 這是 frps 和 frpc 之間預先建立的連接,用于傳輸用戶連接的數據。
- frps 在需要處理用戶連接時會從連接池中獲取一個可用的工作連接。
- 如果池中沒有可用的工作連接,frps 會通知 frpc 創建新的工作連接。
- 工作連接是 frps 和 frpc 之間的隧道,用戶數據通過這個隧道在外部用戶和內部服務之間傳輸。
- 本地連接 (Local Connection):
- 在 frp 的上下文中,遠程連接通常指的是 frpc 連接到內部服務的連接。
- 例如,當 frpc 收到從工作連接傳來的數據時,它會創建一個連接到配置中指定的本地服務(如 localhost:80),這個連接就是遠程連接。
下面是 FRP 數據流的完整過程:
- 外部用戶(用戶連接) ->?frps?監聽端口
- frps 從工作連接池中獲取一個?工作連接(frps <-> frpc)
- frps 將用戶連接與工作連接綁定(通過雙向數據轉發)
- frpc 接收到來自工作連接的數據,然后建立一個?遠程連接(frpc -> 內部服務)
- frpc 將工作連接與遠程連接綁定(通過雙向數據轉發)
下面來看看關鍵代碼實現:
// 用戶連接 (User Connection):
// frps 側
// tcp 代理啟動
func (pxy *TCPProxy) Run() (string, error) {if pxy.cfg.LoadBalancer.Group != ?? {// 獲取組監聽器(實際共享端口)l, realBindPort, err := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.LoadBalancer.Group, ...)pxy.listeners = append(pxy.listeners, l)// 啟動連接處理器(最終調用 BaseProxy.startCommonTCPListenersHandler)pxy.startCommonTCPListenersHandler() }// ...
}
// 用戶鏈接處理
func (pxy *BaseProxy) startCommonTCPListenersHandler() {for _, listener := range pxy.listeners {Go func(l net.Listener) {for {conn, err := l.Accept() // 此處調用 TCPGroupListener.Accept()Go pxy.handleUserTCPConnection(conn) // 處理連接}}(listener)}
}// 工作連接 (Work Connection):
// frps 側
// 從連接池中獲取一個已建立的到 FRP 客戶端的連接
// 內部實現路徑:pxy.GetWorkConn() → pxy.workConnManager.Get()
// 底層通過 FRP 協議發送 NewWorkConn 消息到客戶端建立隧道,這部分就是內部服務不一樣的地方
// -> GetWorkConn
workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
if err != nil {return
}
defer workConn.Close()var local io.ReadWriteCloser = workConn
// 啟動雙向數據轉發
inCount, outCount, _ := libio.Join(local, userConn)// 在取出工作連接后,frps 會立即向 frpc 發送 msg.ReqWorkConn 消息,請求新的工作連接。
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
// 如果連接池為空,frps 會等待 frpc 創建新的工作連接并發送過來。
select {
case workConn, ok = <-ctl.workConnCh:if !ok {err = pkgerr.ErrCtlClosedxl.Warnf(「no work connections available, %v」, err)return}
case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):err = fmt.Errorf(「timeout trying to get work connection」)xl.Warnf(「%v」, err)return
}// 本地連接 (Local Connection):
// frpc 側
// handleReqWorkConn
// HandleWorkConn
// HandleTCPWorkConnection
unc (ctl *Control) handleReqWorkConn(_ msg.Message) {xl := ctl.xlworkConn, err := ctl.connectServer()if err != nil {xl.Warnf(「start new connection to server error: %v」, err)return}m := &msg.NewWorkConn{RunID: ctl.sessionCtx.RunID,}if err = ctl.sessionCtx.AuthSetter.SetNewWorkConn(m); err != nil {xl.Warnf(「error during NewWorkConn authentication: %v」, err)workConn.Close()return}if err = msg.WriteMsg(workConn, m); err != nil {xl.Warnf(「work connection write to server error: %v」, err)workConn.Close()return}var startMsg msg.StartWorkConnif err = msg.ReadMsgInto(workConn, &startMsg); err != nil {xl.Tracef(「work connection closed before response StartWorkConn message: %v」, err)workConn.Close()return}if startMsg.Error != 「」 {xl.Errorf(「StartWorkConn contains error: %s」, startMsg.Error)workConn.Close()return}// dispatch this work connection to related proxyctl.pm.HandleWorkConn(startMsg.ProxyName, workConn, &startMsg)
}remote = workConn
... ...
localConn, err := libnet.Dial(net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)),libnet.WithTimeout(10*time.Second),
)
... ...
_, _, errs := libio.Join(localConn, remote)
雙向轉發的實現灰常簡潔,值得學習:
func Join(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) (inCount int64, outCount int64, errors []error) {var wait sync.WaitGrouprecordErrs := make([]error, 2)pipe := func(number int, to io.ReadWriteCloser, from io.ReadWriteCloser, count *int64) {defer wait.Done()defer CosClose()defer from.Close()buf := pool.GetBuf(16 * 1024)defer pool.PutBuf(buf)*count, recordErrs[number] = io.CopyBuffer(to, from, buf)}wait.Add(2)Go pipe(0, c1, c2, &inCount)Go pipe(1, c2, c1, &outCount)wait.Wait()for _, e := range recordErrs {if e != nil {errors = append(errors, e)}}return
}
6.2. 負載均衡
你可以將多個相同類型的代理加入到同一個?group
?中,以實現負載均衡的能力,當用戶連接 frps 服務器的 80 端口時,frps 會將接收到的用戶連接隨機分發給其中一個存活的代理。這可以確保即使一臺 frpc 機器掛掉,仍然有其他節點能夠提供服務。
# frpc.toml
[[proxies]]
name = 「test1」
type = 「tcp」
localPort = 8080
remotePort = 80
loadBalancer.group = 「web」
loadBalancer.groupKey = 「123」[[proxies]]
name = 「test2」
type = 「tcp」
localPort = 8081
remotePort = 80
loadBalancer.group = 「web」
loadBalancer.groupKey = 「123」
這個負載均衡的實現的關鍵結構體是 TCPGroupCtl *group.TCPGroupCtl:
// 管理 TCP 代理的分組邏輯,包括分組的創建、監聽、連接分發等功能。
TCPGroupCtl *group.TCPGroupCtl// 主要有三大功能// 1. 分組管理:
// 將多個 TCP 代理分組到一起,形成一個邏輯組。
// 每個組可以共享一個端口,分發連接到組內的代理。// 2. 負載均衡:
// 根據一定的規則隨機分發,將鏈接分發到組內的代理。// 3. 資源管理:
// 負責監聽和關閉組內的連接。
// 管理組的生命周期。
// tcp 代理分組
// 分組內統一監聽,共享一個 remote port 的 coon,這個我們叫 remote conn,就是用戶 connection
func (tgc *TCPGroupCtl) Listen(proxyName string, group string, groupKey string, addr string, port int) (l net.Listener, realPort int, err error) {tgc.mu.Lock()tcpGroup, ok := tgc.groups[group]if !ok {tcpGroup = NewTCPGroup(tgc)tgc.groups[group] = tcpGroup}tgc.mu.Unlock()return tcpGroup.Listen(proxyName, group, groupKey, addr, port)
}// 代理加入組
func (tg *TCPGroup) Listen(proxyName, group, groupKey, addr string, port int) (*TCPGroupListener, int, error) {tg.mu.Lock()defer tg.mu.Unlock()// 首次加入組:創建真實監聽if len(tg.lns) == 0 {realPort, err := tg.ctl.portManager.Acquire(proxyName, port) // 申請端口tcpLn, err := net.Listen(「tcp」, net.JoinHostPort(addr, strconv.Itoa(port)))tg.realPort = realPorttg.tcpLn = tcpLnGo tg.worker() // 啟動連接分發協程...}
}// 當新連接到達共享端口時,會被放入全局通道(acceptCh),
// 組內所有代理通過競爭機制獲取鏈接,實現負載均衡
func (tg *TCPGroup) worker() {for {conn, err := tg.tcpLn.Accept() // 接收新連接tg.acceptCh <- conn // 放入全局通道}
}
func (ln *TCPGroupListener) Accept() (net.Conn, error) {select {case <-ln.closeCh:return nil, ErrListenerClosedcase conn := <-ln.group.acceptCh: // 從全局通道競爭獲取連接return conn, nil}
}// tcp 代理啟動
func (pxy *TCPProxy) Run() (string, error) {if pxy.cfg.LoadBalancer.Group != 「」 {// 獲取組監聽器(實際共享端口)l, realBindPort, err := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.LoadBalancer.Group, ...)pxy.listeners = append(pxy.listeners, l)// 啟動連接處理器(最終調用 BaseProxy.startCommonTCPListenersHandler)pxy.startCommonTCPListenersHandler() }// ...
}
6.3. 健康檢查
通過給代理配置健康檢查參數,可以在要反向代理的服務出現故障時,將該服務從 frps 中摘除。結合負載均衡的功能,這可用于實現高可用架構,避免服務單點故障。
[[proxies]]
name = 「test1」
type = 「tcp」
localPort = 22
remotePort = 6000
# 啟用健康檢查,類型為 tcp
healthCheck.type = 「tcp」
# 建立連接超時時間為 3 秒
healthCheck.timeoutSeconds = 3
# 連續 3 次檢查失敗,此 proxy 會被摘除
healthCheck.maxFailed = 3
# 每隔 10 秒進行一次健康檢查
healthCheck.intervalSeconds = 10
這個配置被加載到 TCPProxyConfig-》ProxyBaseConfig-》HealthCheckConfig
type HealthCheckConfig struct {// Type specifies what protocol to use for health checking.// Valid values include 「tcp」, 「HTTP」, and 「」. If this value is 「」, health// checking will not be performed.//// If the type is 「tcp」, a connection will be attempted to the target// server. If a connection cannot be established, the health check fails.//// If the type is 「HTTP」, a GET request will be made to the endpoint// specified by HealthCheckURL. If the response is not a 200, the health// check fails.Type string `json:「type」` // tcp | HTTP// TimeoutSeconds specifies the number of seconds to wait for a health// check attempt to connect. If the timeout is reached, this counts as a// health check failure. By default, this value is 3.TimeoutSeconds int `json:「timeoutSeconds,omitempty」`// MaxFailed specifies the number of allowed failures before the// is stopped. By default, this value is 1.MaxFailed int `json:「maxFailed,omitempty」`// IntervalSeconds specifies the time in seconds between health// checks. By default, this value is 10.IntervalSeconds int `json:「intervalSeconds」`// Path specifies the path to send health checks to if the// health check type is 「HTTP」.Path string `json:「path,omitempty」`// HTTPHeaders specifies the headers to send with the health request, if// the health check type is 「HTTP」.HTTPHeaders []HTTPHeader `json:「httpHeaders,omitempty」`
}
這部分代碼非常獨立,相當于起了一個定時的 monitor,去監控代理的 proxy 是否有效,連續檢查失敗,此 proxy 會被摘除
func (monitor *Monitor) checkWorker() {forerr := monitor.doCheck(doCtx)... ... time.Sleep(monitor.interval)}
}func (monitor *Monitor) doCheck(ctx context.Context) error {switch monitor.checkType {case 「tcp」:return monitor.doTCPCheck(ctx)case 「HTTP」:return monitor.doHTTPCheck(ctx)default:return ErrHealthCheckType}
}func (monitor *Monitor) doTCPCheck(ctx context.Context) error {// if tcp address is not specified, always return nilif monitor.addr == 「」 {return nil}var d net.Dialerconn, err := d.DialContext(ctx, 「tcp」, monitor.addr)if err != nil {return err}conn.Close()return nil
}
6.4. 代理限速
# frpc.toml
[[proxies]]
name = 「ssh」
type = 「tcp」
localPort = 22
remotePort = 6000
transport.bandwidthLimit = 「1MB」
核心代碼,依然是獲取 tcp 連接時,加一個限速的裝飾器:
var limiter *rate.Limiter
limitBytes := pxyConf.GetBaseConfig().Transport.BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseConfig().Transport.BandwidthLimitMode == types.BandwidthLimitModeClient {limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
}if pxy.GetLimiter() != nil {local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {return local.Close()})
}
limit 使用的是原生的 rate 包:
func (r *Reader) Read(p []byte) (n int, err error) {// 1. 獲取令牌桶的突發容量b := r.limiter.Burst()// 2. 如果請求的讀取量超過突發容量,調整讀取大小if b < len(p) {p = p[:b]}// 3. 執行實際讀取操作n, err = r.r.Read(p)if err != nil {// 4. 如果讀取過程中出錯,直接返回return}// 5. 根據實際讀取的字節數消耗令牌err = r.limiter.WaitN(context.Background(), n)if err != nil {return}return
}
7. 參考文獻
HTTPS://gofrp.org/zh-cn/docs/
HTTPS://blog.csdn.net/u012175637/article/details/84138925
HTTPS://cloud.tencent.com/developer/article/2093328