【開源品鑒】FRP源碼閱讀

frp 是一款高性能的反向代理應用,專注于內網穿透,支持多種協議和 P2P 通信功能,目前在 GitHub 上已有 80k 的 star。本文將深入探討其源碼,揭示其背后的實現原理。

1. 前言

frp 是一款高性能的反向代理應用,專注于內網穿透。它支持多種協議,包括 TCP、UDP、HTTP、HTTPS 等,并且具備 P2P 通信功能。使用 frp,您可以安全、便捷地將內網服務暴露到公網,通過擁有公網 IP 的節點進行中轉,具體場景就是:將客戶端部署到你的內網中,然后該客戶端與你內網服務網絡可達,當客戶端與在公網的服務端連接后,我們就可以通過訪問服務端的指定端口,去訪問到內網服務。

目前 GitHub 已經有 80k 的 star,這么猛的項目,我決定閱讀一番源碼偷師一波。

2. pkg/auth

這個包負責客戶端和服務端認證的代碼,這里面一共用到了 2 種驗證機制,一種是基于?token,就是預共享密鑰,客戶端和服務端實現配置一樣的字符串密鑰,第二種是?OAuth 2.0,依賴第三方授權服務器頒發的訪問令牌,然后客戶端帶著令牌去訪問服務端。

這里面有很多技巧值得學習:

2.1. 工廠函數

通過不同的配置生成對應的認證方式。

type Setter interface {SetLogin(*msg.Login) errorSetPing(*msg.Ping) errorSetNewWorkConn(*msg.NewWorkConn) error
}// 根據客戶端配置創建認證提供者
func NewAuthSetter(cfg v1.AuthClientConfig) (authProvider Setter) {switch cfg.Method {// token 認證模式case v1.AuthMethodToken:authProvider = NewTokenAuth(cfg.AdditionalScopes, cfg.Token)// openid 認證模式case v1.AuthMethodOIDC:authProvider = NewOidcAuthSetter(cfg.AdditionalScopes, cfg.OIDC)default:panic(fmt.Sprintf(「wrong method: 『%s』」, cfg.Method))}return authProvider
}

2.2. 常量時間的字符串比較

正常情況來說,token 模式下,兩邊比較一下字符串是不是相等就完了,但其實這個是有安全隱患的,第一個就是攻擊者可以進行重放攻擊,一直進行密碼爆破,第二個就是攻擊者可以進行定時攻擊,比如普通比較(如?==)在發現第一個不匹配字節時會立即返回,攻擊者可通過測量響應時間差異推斷出匹配的字節位置,ConstantTimeCompare?始終遍歷全部字節(即使已發現不匹配),使攻擊者無法通過時間差獲取敏感信息。

// token 和客戶端上線的時間戳組成 key
func GetAuthKey(token string, timestamp int64) (key string) {md5Ctx := md5.New()md5Ctx.Write([]byte(token))md5Ctx.Write([]byte(strconv.FormatInt(timestamp, 10)))data := md5Ctx.Sum(nil)return hex.EncodeToString(data)
}// 全量匹配字節
func ConstantTimeCompare(x, y []byte) int {if len(x) != len(y) {return 0}var v bytefor i := 0; i < len(x); i++ {v |= x[i] ^ y[i]}return ConstantTimeByteEq(v, 0)
}// ConstantTimeByteEq returns 1 if x == y and 0 otherwise.
func ConstantTimeByteEq(x, y uint8) int {return int((uint32(x^y) - 1) >> 31)
}

3. pkg/config

config 文件夾是?frp?配置管理的核心模塊,涵蓋了配置的加載、解析、驗證、轉換和命令行支持等功能。它確保了?frp?的靈活性和兼容性,同時為用戶提供了多種配置方式。

3.1. 使用環境變量進行模板渲染

serverAddr = 「{{ .Envs.FRP_SERVER_ADDR }}」
serverPort = 7000[[proxies]]
name = 「ssh」
type = 「tcp」
localIP = 「127.0.0.1」
localPort = 22
remotePort = {{ .Envs.FRP_SSH_REMOTE_PORT }}export FRP_SERVER_ADDR=「x.x.x.x」
export FRP_SSH_REMOTE_PORT=「6000」
./frpc -C ./frpc.toml

這個實現是采用了 template 模板庫,其中 Envs 前綴是由字段名?Envs?決定的:

type Values struct {Envs map[string]string // 「{{ .Envs.FRP_SERVER_ADDR }}」 Envs 的由來
}func RenderWithTemplate(in []byte, values *Values) ([]byte, error) {tmpl, err := template.New(「frp」).Funcs(template.FuncMap{「parseNumberRange」:     parseNumberRange,「parseNumberRangePair」: parseNumberRangePair,}).Parse(string(in))if err != nil {return nil, err}buffer := bytes.NewBufferString(「」)if err := tmpl.Execute(buffer, values); err != nil {return nil, err}return buffer.Bytes(), nil
}// 將端口范圍解析為 端口列表
func parseNumberRange(firstRangeStr string) ([]int64, error) {... ... 
}

這里面有一些自定義的解析函數,比如說:

ports = 「{{ parseNumberRange .Envs.PORT_RANGE }}」export PORT_RANGE = 「1000-1005」// 這樣 ports 就會被 template 的 parseNumberRange 函數解析并渲染為
// ports = 1000, 1001, 1002, 1003, 1004, 1005

3.2. 配置拆分

通過?includes?參數可以在主配置中包含其他配置文件,從而實現將代理配置拆分到多個文件中管理

# frpc.toml
serverAddr = 「x.x.x.x」
serverPort = 7000
includes = [「./confd/*.toml」]

上述配置在 frpc.toml 中通過 includes 額外包含了?./confd?目錄下所有的 toml 文件的代理配置內容,效果等價于將這兩個文件合并成一個文件。

這個實現是采用了,循環讀取文件內容 + 模板渲染 + 配置合并+ toml 反序列化 的方法:

// 主文件配置,就是 frpc.toml
var content []byte
content, err = GetRenderedConfFromFile(filePath)
if err != nil {return
}
configBuffer := bytes.NewBuffer(nil)
configBuffer.Write(content)... ... var buf []byte
// 循環讀取 include 的文件
// getIncludeContents
// ->ReadFile
// ->RenderContent
//   ->template.New(「frp」).Parse(string(in))
buf, err = getIncludeContents(cfg.IncludeConfigFiles)
if err != nil {err = fmt.Errorf(「getIncludeContents error: %v」, err)return
}
configBuffer.WriteString(「
」)
configBuffer.Write(buf)// 將所有配置合并,然后將 toml 序列化為 type ClientCommonConf struct
代理 Cfgs, visitorCfgs, err = LoadAllProxyConfsFromIni(cfg.User, configBuffer.Bytes(), cfg.Start)
if err != nil {return
}
return

3.3. 配置熱加載

frpc reload -C ./frpc.toml?等待一段時間后,客戶端將根據新的配置文件創建、更新或刪除代理。

這里面也比較簡單,主要邏輯在于配置校驗,舊配置中與新配置里同名的且代理內容不一樣的 proxy 停止,新增的配置的 proxy 再啟動,也就是說老配置和新配置完全一樣的是不動的

func (pm *Manager) UpdateAll(proxyCfgs []v1.ProxyConfigurer) {xl := xlog.FromContextSafe(pm.ctx)proxyCfgsMap := lo.KeyBy(proxyCfgs, func(C v1.ProxyConfigurer) string {return C.GetBaseConfig().Name})pm.mu.Lock()defer pm.mu.Unlock()delPxyNames := make([]string, 0)for name, pxy := range pm.proxies {del := falsecfg, ok := proxyCfgsMap[name]if !ok || !reflect.DeepEqual(pxy.Cfg, cfg) {del = true}if del {delPxyNames = append(delPxyNames, name)delete(pm.proxies, name)pxy.Stop()}}if len(delPxyNames) > 0 {xl.Infof(「proxy removed: %s」, delPxyNames)}addPxyNames := make([]string, 0)for _, cfg := range proxyCfgs {name := cfg.GetBaseConfig().Nameif _, ok := pm.proxies[name]; !ok {pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter, pm.vnetController)if pm.inWorkConnCallback != nil {pxy.SetInWorkConnCallback(pm.inWorkConnCallback)}pm.proxies[name] = pxyaddPxyNames = append(addPxyNames, name)pxy.Start()}}if len(addPxyNames) > 0 {xl.Infof(「proxy added: %s」, addPxyNames)}

4. 監控

frps 服務端支持兩種監控系統:指標存在內存中,和指標輸出到?Prometheus。主要監控以下指標:

type serverMetrics struct {// 記錄當前連接到服務端的客戶端數量。clientCount     Prometheus.Gauge// 記錄當前代理的數量,按代理類型(如 TCP、HTTP)分類。proxyCount      *Prometheus.GaugeVec// 記錄當前連接的數量,按代理類型(如 TCP、HTTP)分類。connectionCount *Prometheus.GaugeVec// 記錄流入的總流量,按代理類型(如 TCP、HTTP)分類。trafficIn       *Prometheus.CounterVec// 記錄流出的總流量,按代理類型(如 TCP、HTTP)分類。trafficOut      *Prometheus.CounterVec
}

內存監控沒啥,但統計的增刪改,這里用到了原子操作的技巧:

func (C *StandardCounter) Count() int32 {return atomic.LoadInt32(&C.count)
}func (C *StandardCounter) Inc(count int32) {atomic.AddInt32(&C.count, count)
}func (C *StandardCounter) Dec(count int32) {atomic.AddInt32(&C.count, -count)
}

對于不同類型的 proxy 的統計,frp 沒有使用 syn map,而是用一把讀寫鎖保平安

m.mu.Lock()defer m.mu.Unlock()counter, ok := m.info.ProxyTypeCounts[proxyType]if !ok {counter = metric.NewCounter()}
counter.Inc(1)

對于如何進行 Prometheus 監控,frp 的使用流程可以借鑒,整體來說分為以下幾個步驟:

  1. 編碼前,先定義指標,類似于:
Namespace: 「frp」,
Subsystem: 「server」,
Name:      「traffic_out」,
Help:      「The total out traffic」,
  1. frp 注冊 Prometheus 指標
trafficOut: Prometheus.NewCounterVec(Prometheus.CounterOpts{Namespace: namespace,Subsystem: serverSubsystem,Name:      「traffic_out」,Help:      「The total out traffic」,
}, []string{「name」, 「type」}),
}Prometheus.MustRegister(m.clientCount)
Prometheus.MustRegister(m.proxyCount)
Prometheus.MustRegister(m.connectionCount)
Prometheus.MustRegister(m.trafficIn)
Prometheus.MustRegister(m.trafficOut)
  1. frp 暴露 HTTP 服務,一般是/metric,promhttp 提供一個 HTTP 處理器,用于暴露所有注冊的 Prometheus 指標。
if svr.cfg.EnablePrometheus {subRouter.Handle(「/metrics」, promhttp.Handler())
}
  1. 配置 Prometheus 定時抓取這個 HTTP 路徑,舒服了
全球:scrape_interval: 15s # 每 15 秒抓取一次數據scrape_configs:- job_name: 「frp_server」static_configs:- targets: [「localhost:8080」] # 替換為 frp 服務端暴露的 /metrics 端點

5. 通信安全

當 frpc 和 frps 之間啟用了 TLS 之后,流量會被全局加密,不再需要配置單個代理上的加密,新版本中已經默認啟用。每一個代理都可以選擇是否啟用加密和壓縮的功能。

在每一個代理的配置中使用如下參數指定:

[[proxies]]
name = 「ssh」
type = 「tcp」
localPort = 22
remotePort = 6000
transport.useEncryption = true
transport.useCompression = true

5.1. 加密

通過設置?transport.useEncryption = true,將 frpc 與 frps 之間的通信內容加密傳輸,將會有效防止傳輸內容被截取。

這個加密它使用了裝飾器模式,傳入普通的 IO,WithEncryption 后就會得到一個可以加密的 IO

remote, err = libio.WithEncryption(remote, encKey)
if err != nil {workConn.xl.Errorf(「create encryption stream error: %v」, err)return
}

我們接下來看如何加密的:

總體加密算法采用 aes-128-cfb,aes 是一個對稱加密,主要靠 key 和 iv 兩個值

// pbkdf2 會生成一個用于 aes 加密的 key
// 入參 key 為:配置的 token
// DefaultSalt 為字符串默認值
key = pbkdf2.Key(key, []byte(DefaultSalt), 64, aes.BlockSize, sha1.New)// iv 是用 rand 函數生成的安全加密的隨機數
if _, err := io.ReadFull(rand.Reader, iv); err != nil {return nil, err
}// Reader is a global, shared instance of a cryptographically
// secure random number generator. It is safe for concurrent use.
//
//   - On Linux, FreeBSD, Dragonfly, and Solaris, Reader uses getrandom(2).
//   - On legacy Linux (< 3.17), Reader opens /dev/urandom on first use.
//   - On macOS, iOS, and OpenBSD Reader, uses arc4random_buf(3).
//   - On NetBSD, Reader uses the kern.arandom sysctl.
//   - On Windows, Reader uses the ProcessPrng API.
//   - On js/wasm, Reader uses the Web Crypto API.
//   - On wasi/wasm, Reader uses random_get.
//
// In FIPS 140-3 mode, the output passes through an SP 800-90A Rev. 1
// Deterministic Random Bit Generator (DRBG).
var Reader io.Reader

這樣后續的 IO 操作都會自帶加密了。

5.2. 壓縮

壓縮也是同理,搞一個壓縮的 IO 裝飾器就好了。

如果傳輸的報文長度較長,通過設置?transport.useCompression = true?對傳輸內容進行壓縮,可以有效減小 frpc 與 frps 之間的網絡流量,加快流量轉發速度,但是會額外消耗一些 CPU 資源。

壓縮算法采用?snappy 庫

sr := snappy.NewReader(rwc)
sw := snappy.NewWriter(rwc)
return WrapReadWriteCloser(sr, sw, func() error {_ = sw.Close()return rwc.Close()})
}

5.3. 自定義 TLS

這個其實就是使用自簽發的 CA,去生成密鑰和證書,然后客戶端和服務端加載起來后,可以進行雙向或者單向驗證,進行 HTTPS 握手,后續流量也是 HTTPS 加密的。

客戶端單向校驗服務端:

# frpc.toml
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」# frps.toml
transport.tls.certFile = 「/to/cert/path/server.crt」
transport.tls.keyFile = 「/to/key/path/server.key」

服務端單向校驗客戶端:

# frpc.toml
transport.tls.certFile = 「/to/cert/path/client.crt」
transport.tls.keyFile = 「/to/key/path/client.key」# frps.toml
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」

雙向驗證

# frpc.toml
transport.tls.certFile = 「/to/cert/path/client.crt」
transport.tls.keyFile = 「/to/key/path/client.key」
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」# frps.toml
transport.tls.certFile = 「/to/cert/path/server.crt」
transport.tls.keyFile = 「/to/key/path/server.key」
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」

介紹這個之前,我們先回顧以下 TLS 握手的過程,hhh:

okk,那我們看 frp 是如何實現 tls 的:

// 獲取 TLS 配置,作為 dial 選項
// tlsConfig, err = transport.NewClientTLSConfig
// tlsConfig, err = transport.NewServerTLSConfig
dialOptions = append(dialOptions, libnet.WithTLSConfig(tlsConfig))...// dail tcp 本身就是 tls 的了
conn, err := libnet.DialContext(C.ctx,net.JoinHostPort(C.cfg.ServerAddr, strconv.Itoa(C.cfg.ServerPort)),dialOptions...,
)// 加載服務端的 ca,證書+key
// 核心是 tls 庫 tls.LoadX509KeyPair(certfile, keyfile),去管理證書和 key
func NewServerTLSConfig(certPath, keyPath, caPath string) (*tls.Config, error) {base := &tls.Config{}if certPath == ?? || keyPath == ?? {// server will generate tls conf by itselfcert := newRandomTLSKeyPair()base.Certificates = []tls.Certificate{*cert}} else {// 調的是這個 tlsCert, err := tls.LoadX509KeyPair(certfile, keyfile)cert, err := newCustomTLSKeyPair(certPath, keyPath)if err != nil {return nil, err}base.Certificates = []tls.Certificate{*cert}}if caPath != '' {// ca 證書pool, err := newCertPool(caPath)if err != nil {return nil, err}// 校驗客戶端base.ClientAuth = tls.RequireAndVerifyClientCertbase.ClientCAs = pool}return base, nil
}// 加載客戶端的 ca,證書+key
func NewClientTLSConfig(certPath, keyPath, caPath, serverName string) (*tls.Config, error) {base := &tls.Config{}if certPath != '' && keyPath != '' {cert, err := newCustomTLSKeyPair(certPath, keyPath)if err != nil {return nil, err}base.Certificates = []tls.Certificate{*cert}}base.ServerName = serverNameif caPath != '' {pool, err := newCertPool(caPath)if err != nil {return nil, err}base.RootCAs = pool// 校驗服務端base.InsecureSkipVerify = false} else {base.InsecureSkipVerify = true}return base, nil
}// Only support one ca file to add
func newCertPool(caPath string) (*x509.CertPool, error) {pool := x509.NewCertPool()cacrt, err := os.ReadFile(caPath)if err != nil {return nil, err}pool.AppendCertsFromPEM(caCrt)return pool, nil
}

6. 代理配置

6.1. proxy

代理是 frp 的核心,這里詳細介紹一下它的流程。

frpc 和 frps 的整體流程,里面可以抽象為 3 種連接,整體我畫了一張圖:

  1. 用戶連接 (User Connection):
  • 這是外部用戶連接到 FRP 服務端(frps)特定端口的連接,也就是說想要訪問內網服務的,例如,當運維訪問?frps.example.com:8080?時建立的連接就是用戶連接,它實際訪問的是客戶側某個管理平臺
  • 在 frps 端,這個連接由?handleUserTCPConnection?函數處理。
  • 工作連接 (Work Connection):
  • 這是 frps 和 frpc 之間預先建立的連接,用于傳輸用戶連接的數據。
  • frps 在需要處理用戶連接時會從連接池中獲取一個可用的工作連接。
  • 如果池中沒有可用的工作連接,frps 會通知 frpc 創建新的工作連接。
  • 工作連接是 frps 和 frpc 之間的隧道,用戶數據通過這個隧道在外部用戶和內部服務之間傳輸。
  • 本地連接 (Local Connection):
  • 在 frp 的上下文中,遠程連接通常指的是 frpc 連接到內部服務的連接
  • 例如,當 frpc 收到從工作連接傳來的數據時,它會創建一個連接到配置中指定的本地服務(如 localhost:80),這個連接就是遠程連接。

下面是 FRP 數據流的完整過程:

  1. 外部用戶(用戶連接) ->?frps?監聽端口
  2. frps 從工作連接池中獲取一個?工作連接(frps <-> frpc)
  3. frps 將用戶連接與工作連接綁定(通過雙向數據轉發)
  4. frpc 接收到來自工作連接的數據,然后建立一個?遠程連接(frpc -> 內部服務)
  5. frpc 將工作連接與遠程連接綁定(通過雙向數據轉發)

下面來看看關鍵代碼實現:

// 用戶連接 (User Connection):
// frps 側
// tcp 代理啟動
func (pxy *TCPProxy) Run() (string, error) {if pxy.cfg.LoadBalancer.Group != ?? {// 獲取組監聽器(實際共享端口)l, realBindPort, err := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.LoadBalancer.Group, ...)pxy.listeners = append(pxy.listeners, l)// 啟動連接處理器(最終調用 BaseProxy.startCommonTCPListenersHandler)pxy.startCommonTCPListenersHandler() }// ...
}
// 用戶鏈接處理
func (pxy *BaseProxy) startCommonTCPListenersHandler() {for _, listener := range pxy.listeners {Go func(l net.Listener) {for {conn, err := l.Accept() // 此處調用 TCPGroupListener.Accept()Go pxy.handleUserTCPConnection(conn) // 處理連接}}(listener)}
}// 工作連接 (Work Connection):
// frps 側
// 從連接池中獲取一個已建立的到 FRP 客戶端的連接
// 內部實現路徑:pxy.GetWorkConn() → pxy.workConnManager.Get()
// 底層通過 FRP 協議發送 NewWorkConn 消息到客戶端建立隧道,這部分就是內部服務不一樣的地方
// -> GetWorkConn
workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
if err != nil {return
}
defer workConn.Close()var local io.ReadWriteCloser = workConn
// 啟動雙向數據轉發 
inCount, outCount, _ := libio.Join(local, userConn)// 在取出工作連接后,frps 會立即向 frpc 發送 msg.ReqWorkConn 消息,請求新的工作連接。
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
// 如果連接池為空,frps 會等待 frpc 創建新的工作連接并發送過來。
select {
case workConn, ok = <-ctl.workConnCh:if !ok {err = pkgerr.ErrCtlClosedxl.Warnf(「no work connections available, %v」, err)return}
case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):err = fmt.Errorf(「timeout trying to get work connection」)xl.Warnf(「%v」, err)return
}// 本地連接 (Local Connection):
// frpc 側
// handleReqWorkConn
// HandleWorkConn
// HandleTCPWorkConnection
unc (ctl *Control) handleReqWorkConn(_ msg.Message) {xl := ctl.xlworkConn, err := ctl.connectServer()if err != nil {xl.Warnf(「start new connection to server error: %v」, err)return}m := &msg.NewWorkConn{RunID: ctl.sessionCtx.RunID,}if err = ctl.sessionCtx.AuthSetter.SetNewWorkConn(m); err != nil {xl.Warnf(「error during NewWorkConn authentication: %v」, err)workConn.Close()return}if err = msg.WriteMsg(workConn, m); err != nil {xl.Warnf(「work connection write to server error: %v」, err)workConn.Close()return}var startMsg msg.StartWorkConnif err = msg.ReadMsgInto(workConn, &startMsg); err != nil {xl.Tracef(「work connection closed before response StartWorkConn message: %v」, err)workConn.Close()return}if startMsg.Error != 「」 {xl.Errorf(「StartWorkConn contains error: %s」, startMsg.Error)workConn.Close()return}// dispatch this work connection to related proxyctl.pm.HandleWorkConn(startMsg.ProxyName, workConn, &startMsg)
}remote = workConn
... ... 
localConn, err := libnet.Dial(net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)),libnet.WithTimeout(10*time.Second),
)
... ... 
_, _, errs := libio.Join(localConn, remote)

雙向轉發的實現灰常簡潔,值得學習:

func Join(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) (inCount int64, outCount int64, errors []error) {var wait sync.WaitGrouprecordErrs := make([]error, 2)pipe := func(number int, to io.ReadWriteCloser, from io.ReadWriteCloser, count *int64) {defer wait.Done()defer CosClose()defer from.Close()buf := pool.GetBuf(16 * 1024)defer pool.PutBuf(buf)*count, recordErrs[number] = io.CopyBuffer(to, from, buf)}wait.Add(2)Go pipe(0, c1, c2, &inCount)Go pipe(1, c2, c1, &outCount)wait.Wait()for _, e := range recordErrs {if e != nil {errors = append(errors, e)}}return
}

6.2. 負載均衡

你可以將多個相同類型的代理加入到同一個?group?中,以實現負載均衡的能力,當用戶連接 frps 服務器的 80 端口時,frps 會將接收到的用戶連接隨機分發給其中一個存活的代理。這可以確保即使一臺 frpc 機器掛掉,仍然有其他節點能夠提供服務。

# frpc.toml
[[proxies]]
name = 「test1」
type = 「tcp」
localPort = 8080
remotePort = 80
loadBalancer.group = 「web」
loadBalancer.groupKey = 「123」[[proxies]]
name = 「test2」
type = 「tcp」
localPort = 8081
remotePort = 80
loadBalancer.group = 「web」
loadBalancer.groupKey = 「123」

這個負載均衡的實現的關鍵結構體是 TCPGroupCtl *group.TCPGroupCtl:

// 管理 TCP 代理的分組邏輯,包括分組的創建、監聽、連接分發等功能。
TCPGroupCtl *group.TCPGroupCtl// 主要有三大功能// 1. 分組管理:
// 將多個 TCP 代理分組到一起,形成一個邏輯組。
// 每個組可以共享一個端口,分發連接到組內的代理。// 2. 負載均衡:
// 根據一定的規則隨機分發,將鏈接分發到組內的代理。// 3. 資源管理:
// 負責監聽和關閉組內的連接。
// 管理組的生命周期。
// tcp 代理分組
// 分組內統一監聽,共享一個 remote port 的 coon,這個我們叫 remote conn,就是用戶 connection
func (tgc *TCPGroupCtl) Listen(proxyName string, group string, groupKey string, addr string, port int) (l net.Listener, realPort int, err error) {tgc.mu.Lock()tcpGroup, ok := tgc.groups[group]if !ok {tcpGroup = NewTCPGroup(tgc)tgc.groups[group] = tcpGroup}tgc.mu.Unlock()return tcpGroup.Listen(proxyName, group, groupKey, addr, port)
}// 代理加入組
func (tg *TCPGroup) Listen(proxyName, group, groupKey, addr string, port int) (*TCPGroupListener, int, error) {tg.mu.Lock()defer tg.mu.Unlock()// 首次加入組:創建真實監聽if len(tg.lns) == 0 {realPort, err := tg.ctl.portManager.Acquire(proxyName, port) // 申請端口tcpLn, err := net.Listen(「tcp」, net.JoinHostPort(addr, strconv.Itoa(port)))tg.realPort = realPorttg.tcpLn = tcpLnGo tg.worker() // 啟動連接分發協程...}
}// 當新連接到達共享端口時,會被放入全局通道(acceptCh),
// 組內所有代理通過競爭機制獲取鏈接,實現負載均衡
func (tg *TCPGroup) worker() {for {conn, err := tg.tcpLn.Accept() // 接收新連接tg.acceptCh <- conn            // 放入全局通道}
}
func (ln *TCPGroupListener) Accept() (net.Conn, error) {select {case <-ln.closeCh:return nil, ErrListenerClosedcase conn := <-ln.group.acceptCh: // 從全局通道競爭獲取連接return conn, nil}
}// tcp 代理啟動
func (pxy *TCPProxy) Run() (string, error) {if pxy.cfg.LoadBalancer.Group != 「」 {// 獲取組監聽器(實際共享端口)l, realBindPort, err := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.LoadBalancer.Group, ...)pxy.listeners = append(pxy.listeners, l)// 啟動連接處理器(最終調用 BaseProxy.startCommonTCPListenersHandler)pxy.startCommonTCPListenersHandler() }// ...
}

6.3. 健康檢查

通過給代理配置健康檢查參數,可以在要反向代理的服務出現故障時,將該服務從 frps 中摘除。結合負載均衡的功能,這可用于實現高可用架構,避免服務單點故障。

[[proxies]]
name = 「test1」
type = 「tcp」
localPort = 22
remotePort = 6000
# 啟用健康檢查,類型為 tcp
healthCheck.type = 「tcp」
# 建立連接超時時間為 3 秒
healthCheck.timeoutSeconds = 3
# 連續 3 次檢查失敗,此 proxy 會被摘除
healthCheck.maxFailed = 3
# 每隔 10 秒進行一次健康檢查
healthCheck.intervalSeconds = 10

這個配置被加載到 TCPProxyConfig-》ProxyBaseConfig-》HealthCheckConfig

type HealthCheckConfig struct {// Type specifies what protocol to use for health checking.// Valid values include 「tcp」, 「HTTP」, and 「」. If this value is 「」, health// checking will not be performed.//// If the type is 「tcp」, a connection will be attempted to the target// server. If a connection cannot be established, the health check fails.//// If the type is 「HTTP」, a GET request will be made to the endpoint// specified by HealthCheckURL. If the response is not a 200, the health// check fails.Type string `json:「type」` // tcp | HTTP// TimeoutSeconds specifies the number of seconds to wait for a health// check attempt to connect. If the timeout is reached, this counts as a// health check failure. By default, this value is 3.TimeoutSeconds int `json:「timeoutSeconds,omitempty」`// MaxFailed specifies the number of allowed failures before the// is stopped. By default, this value is 1.MaxFailed int `json:「maxFailed,omitempty」`// IntervalSeconds specifies the time in seconds between health// checks. By default, this value is 10.IntervalSeconds int `json:「intervalSeconds」`// Path specifies the path to send health checks to if the// health check type is 「HTTP」.Path string `json:「path,omitempty」`// HTTPHeaders specifies the headers to send with the health request, if// the health check type is 「HTTP」.HTTPHeaders []HTTPHeader `json:「httpHeaders,omitempty」`
}

這部分代碼非常獨立,相當于起了一個定時的 monitor,去監控代理的 proxy 是否有效,連續檢查失敗,此 proxy 會被摘除

func (monitor *Monitor) checkWorker() {forerr := monitor.doCheck(doCtx)... ... time.Sleep(monitor.interval)}   
}func (monitor *Monitor) doCheck(ctx context.Context) error {switch monitor.checkType {case 「tcp」:return monitor.doTCPCheck(ctx)case 「HTTP」:return monitor.doHTTPCheck(ctx)default:return ErrHealthCheckType}
}func (monitor *Monitor) doTCPCheck(ctx context.Context) error {// if tcp address is not specified, always return nilif monitor.addr == 「」 {return nil}var d net.Dialerconn, err := d.DialContext(ctx, 「tcp」, monitor.addr)if err != nil {return err}conn.Close()return nil
}

6.4. 代理限速

# frpc.toml
[[proxies]]
name = 「ssh」
type = 「tcp」
localPort = 22
remotePort = 6000
transport.bandwidthLimit = 「1MB」

核心代碼,依然是獲取 tcp 連接時,加一個限速的裝飾器:

var limiter *rate.Limiter
limitBytes := pxyConf.GetBaseConfig().Transport.BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseConfig().Transport.BandwidthLimitMode == types.BandwidthLimitModeClient {limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
}if pxy.GetLimiter() != nil {local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {return local.Close()})
}

limit 使用的是原生的 rate 包:

func (r *Reader) Read(p []byte) (n int, err error) {// 1. 獲取令牌桶的突發容量b := r.limiter.Burst()// 2. 如果請求的讀取量超過突發容量,調整讀取大小if b < len(p) {p = p[:b]}// 3. 執行實際讀取操作n, err = r.r.Read(p)if err != nil {// 4. 如果讀取過程中出錯,直接返回return}// 5. 根據實際讀取的字節數消耗令牌err = r.limiter.WaitN(context.Background(), n)if err != nil {return}return
}

7. 參考文獻

HTTPS://gofrp.org/zh-cn/docs/

HTTPS://blog.csdn.net/u012175637/article/details/84138925

HTTPS://cloud.tencent.com/developer/article/2093328

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/87952.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/87952.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/87952.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

day048-系統負載高排查流程與前后端分離項目

文章目錄 0. 老男孩思想1. 系統負載高排查流程1.1 進程/線程相關命令1.1.1 jps1.1.2 jstack1.1.3 jmap1.1.4 top -Hp pid 1.2 排查流程圖 2. 前后端分離項目2.1 項目說明2.2 負載均衡2.3 數據庫配置2.3.1 安裝數據庫服務2.3.2 配置數據庫環境 2.4 后端配置2.5 四層負載均衡配置…

Spring Boot 牽手EasyExcel:解鎖高效數據處理姿勢

引言 在日常的 Java 開發中&#xff0c;處理 Excel 文件是一個極為常見的需求。無論是數據的導入導出&#xff0c;還是報表的生成&#xff0c;Excel 都扮演著重要的角色。例如&#xff0c;在企業的財務管理系統中&#xff0c;需要將每月的財務數據導出為 Excel 報表&#xff0…

【ARM AMBA AXI 入門 21 -- AXI partial 訪問和 narrow 訪問的區別】

文章目錄 Overview一、定義區別二、AXI 信號層面對比三、舉例說明示例一:Partial Access示例二:Narrow Access四、硬件/系統處理角度五、AXI 總線接口信號舉例對比Partial Write 事務:Narrow Write 事務(32-bit Master on 64-bit Bus):六、總結對比表七,軟件判斷判斷 Pa…

使用Ideal創建一個spring boot的helloWorld項目

說明&#xff1a;本篇將介紹如何使用Ideal2024.2.1去創建一個spring boot的helloWorld項目&#xff0c;本篇將包含創建的詳細步驟以及spring boot項目的目錄結構說明&#xff0c;創建過程中的選項說明等。詳細步驟如下&#xff1a;第一步&#xff1a;點擊文件——新建——項目&…

國內Ubuntu訪問不了github等外網

各位小伙伴們&#xff0c;大家好呀。 大家是不是經常遇到訪問不了外網的情況呀。 在Ubuntu中可以這樣做。 訪問這個網站網站測速-Ping檢測-Trace查詢-Dig查詢-路由跟蹤查詢-tools.ipip.net&#xff0c; 對于github.com&#xff0c;在這個網站輸入github.com&#xff0c;會返…

PDF轉換工具,即開即用

在辦公室里&#xff0c;這句話被反復驗證。每天面對成堆的Word和Excel文件&#xff0c;將它們轉換成PDF格式是常有的事。可之前用過的工具&#xff0c;不是一次只能轉一個&#xff0c;就是操作繁瑣得讓人頭疼。記得有次趕項目&#xff0c;需要把二十多個文檔轉成PDF&#xff0c…

2. 你可以說一下 http 版本的發展過程嗎

你可以說一下 http 版本的發展過程嗎 總結&#xff1a;0.9&#xff1a;只能發送 get&#xff0c;無狀態。1.0&#xff1a;新增 post&#xff0c;請求頭&#xff0c;狀態碼&#xff0c;cookie。1.1&#xff1a;新增 put/delete/options/patch&#xff0c;keep-alive&#xff0c…

04-Linux驅動模塊的自動加載

概述 上一節&#xff0c;我們講述了Linux驅動開發的基本的模塊代碼編寫和手動執行模塊加載的操作&#xff0c; 這一節&#xff0c;我們講述嵌入式設備上使用Sysvint引導方式下如何開機自動加載模塊的步驟。感興趣的同學看下使用systemd引導方式的開啟自動加載模塊的步驟 操作…

【牛客算法】游游的整數切割

文章目錄 一、題目介紹1.1 題目鏈接1.2 題目描述1.3 輸入描述1.4 輸出描述1.5 示例二、解題思路2.1 核心算法設計2.2 性能優化關鍵2.3 算法流程圖三、解法實現3.1 解法一:基礎遍歷法3.1.1 初級版本分析3.2 解法二:奇偶預統計法(推薦)3.2.1 優化版本分析四、總結與拓展4.1 關…

筆記本電腦忽亮忽暗問題

關于筆記本電腦忽亮忽暗的問題這個問題困擾了我大半年&#xff0c;最后忽然找到解決方法了---主要的話有三種可能性1.關閉顯示器自動調亮的功能2.關閉節能模式自動調亮功能3.調整顯卡的功率&#xff0c;關閉自動調亮功能一開始一直都是嘗試的第一種方法&#xff0c;沒解決。。。…

Qt的頂部工具欄在多個界面使用

Qt的工具欄在多個界面使用1、前言2、創建一個工具欄類2.1 新建一個工具欄類3、提升工具欄類3.1登錄界面添加工具欄3.2 創建工具欄對象4、總結1、前言 今天遇到了個問題&#xff0c;頂部的工具欄&#xff0c;像軟鍵盤&#xff0c;時間顯示和退出按鈕那些&#xff0c;想在多個界…

C#和SQL Server連接常用通訊方式

C#和SQL Server連接通訊 在 C# 中與 SQL Server 建立數據庫連接&#xff0c;主要通過 ADO.NET 技術實現。以下是幾種常見的連接方式及相關實踐&#xff1a; ADO.NET 全面指南&#xff1a;C# 數據庫訪問核心技術 ADO.NET 是 .NET Framework 中用于數據訪問的核心組件&#xf…

安卓10.0系統修改定制化____實現自動開啟 USB 調試?的步驟解析 列舉常用的幾種修改方法

對于安卓開發者、測試人員,甚至是喜歡折騰手機的數碼愛好者來說,USB 調試是一個非常重要的功能。它能讓手機與電腦相連,實現應用安裝、系統調試、數據傳輸等操作。但每次連接手機都要手動去設置里開啟 USB 調試,實在麻煩。其實,通過修改安卓 10.0 的 ROM,就能讓手機自動開…

Redisson詳細教程 - 從入門到精通

目錄 1. 什么是Redisson 2. 為什么要用Redisson 3. 環境準備和配置 4. 基礎使用方法 5. 分布式數據結構 6. 分布式鎖詳解 7. 分布式服務 8. 實際應用場景 9. 最佳實踐 10. 常見問題解答 總結 1. 什么是Redisson 簡單理解 想象一下,Redis就像一個超級強大的"內…

動態規劃VS記憶化搜索(2)

luoguP1434滑雪 題目描述 Michael 喜歡滑雪。這并不奇怪&#xff0c;因為滑雪的確很刺激。可是為了獲得速度&#xff0c;滑的區域必須向下傾斜&#xff0c;而且當你滑到坡底&#xff0c;你不得不再次走上坡或者等待升降機來載你。Michael 想知道在一個區域中最長的滑坡。區域由…

如何將服務守護進程化

進程組 什么是進程組 之前我們提到了進程的概念&#xff0c; 其實每一個進程除了有一個進程 ID(PID)之外 還屬于一個進程組。進程組是一個或者多個進程的集合&#xff0c; 一個進程組可以包含多個進程。 每一個進程組也有一個唯一的進程組 ID(PGID)&#xff0c; 并且這個 PGID …

【跟著PMP學習項目管理】項目管理 之 范圍管理知識點

目錄 一、收集需求 1、知識點匯總 2、輸入 3、工具 4、輸出 二、定義范圍 1、知識點匯總 2、輸入 3、工具 4、輸出 三、創作工作分解結構 1、知識點匯總 2、輸入 3、工具 4、輸出 四、核實范圍 1、知識點匯總 2、輸入 3、工具 4、輸出 五、控制范圍 1、知…

AIX 環境磁盤空間管理指南

AIX 環境磁盤空間管理指南 在AIX環境中&#xff0c;磁盤空間的監控、管理與擴展是運維人員必備的技能。本文通過實際案例&#xff0c;系統地介紹如何查詢磁盤信息、卷組(VG)、邏輯卷(LV)信息&#xff0c;以及在磁盤空間不足時的擴容方案&#xff0c;幫助讀者掌握磁盤空間管理的…

k8s將service的IP對應的不同端口分配到不同的pod上

在Kubernetes中&#xff0c;Service是一種抽象層&#xff0c;它將請求路由到一組Pod。當你需要將Service的不同端口映射到不同的Pod時&#xff0c;可以通過以下兩種主要方式實現&#xff1a; 方法一&#xff1a;使用單個Service的多端口配置 如果不同的Pod提供不同的服務&…

aic8800M40低功耗sdio wifi在arm-linux平臺調試經驗

背景 好多年沒有搞過wifi相關的內容了,最近也被安排上了,把一顆低功耗aic8800M40的芯片在arm-linux開發板上做bring up,記錄一下SDIO wifi調試的過程和經驗,SDIO驅動這里需要改動一些linux內核HOST驅動代碼,會在文章中貼出來: AIC8800M40芯片簡介 這個wifi芯片是一顆低…