Fluent Bit針對kafka心跳重連機制詳解(下)

#作者:程宏斌

文章目錄

    • disconnect
    • reconnect

接上篇:https://blog.csdn.net/qq_40477248/article/details/150957571?spm=1001.2014.3001.5501

disconnect

斷開連接的情況主要是兩種:
連接或傳輸過程中有錯誤發生
超時, 比如空閑時間超時

**
* Close and destroy a transport handle
*/
void rd_kafka_transport_close(rd_kafka_transport_t *rktrans) {
...// 清除接收緩沖區if (rktrans->rktrans_recv_buf)rd_kafka_buf_destroy(rktrans->rktrans_recv_buf);
...if (rktrans->rktrans_s != -1) // 自定義close或者socket.close()rd_kafka_transport_close0(rktrans->rktrans_rkb->rkb_rk,rktrans->rktrans_s);
?rd_free(rktrans);
}
/**
* @brief Failure propagation to application.
*
* Will tear down connection to broker and trigger a reconnect.
*
* \p level is the log level, <=LOG_INFO will be logged while =LOG_DEBUG will
* be debug-logged.
*
* @locality broker thread
*/
void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,int level,rd_kafka_resp_err_t err,const char *fmt,...) {
...if (rkb->rkb_transport) {// close socketrd_kafka_transport_close(rkb->rkb_transport);rkb->rkb_transport = NULL;
...// 設置狀態rd_kafka_broker_set_state(rkb, RD_KAFKA_BROKER_STATE_DOWN);
}
?
/**
* @brief Check if connections.max.idle.ms has been exceeded and if so
*        close the connection.
* 空閑時間探查
* @remark Must only be called if connections.max.idle.ms > 0 and
*         the current broker state is UP (or UPDATE).
*
* @locality broker thread
*/
static RD_INLINE void rd_kafka_broker_idle_check(rd_kafka_broker_t *rkb) {
…// 連接空閑時間 是否超過 服務端最大空閑時間, 默認10分鐘if (likely(idle_ms < rkb->rkb_rk->rk_conf.connections_max_idle_ms))return;// 超過, 服務端會斷開連接; client保險起見, 強制關閉連接rd_kafka_broker_fail(rkb, LOG_DEBUG, RD_KAFKA_RESP_ERR__TRANSPORT,"Connection max idle time exceeded ""(%dms since last activity)",idle_ms);

reconnect

連接失敗時, 系統自動發起重連. 重連不會終止, 直到連接成功或者系統退出.
nodename更改時, 會嘗試斷開重連

/**
* @brief Update the reconnect backoff.
*        Should be called when a connection is made, or all addresses
*        a broker resolves to has been exhausted without successful connect.
* 設置更新重試時間
* @locality broker thread
* @locks none
*/
static void
rd_kafka_broker_update_reconnect_backoff(rd_kafka_broker_t *rkb,const rd_kafka_conf_t *conf,rd_ts_t now) {
…/* 重試時間(間隔)已超過最大限制時間reconnect.backoff.max.ms* 重置下次的重試時間. */if (rkb->rkb_ts_reconnect + (conf->reconnect_backoff_max_ms * 1000) < now)rkb->rkb_reconnect_backoff_ms = conf->reconnect_backoff_ms;
?/* 在區間[-25%, +50%]內隨機取一個重試時間*/backoff = rd_jitter((int)((float)rkb->rkb_reconnect_backoff_ms * 0.75),(int)((float)rkb->rkb_reconnect_backoff_ms * 1.5));
?/* 不能超過reconnect.backoff.max.ms. */backoff = RD_MIN(backoff, conf->reconnect_backoff_max_ms);
?/* Set time of next reconnect */rkb->rkb_ts_reconnect         = now + (backoff * 1000);rkb->rkb_reconnect_backoff_ms = RD_MIN(rkb->rkb_reconnect_backoff_ms * 2, conf->reconnect_backoff_max_ms);
}
?
/**
* @brief Calculate time until next reconnect attempt.
*
* @returns the number of milliseconds to the next connection attempt, or 0
*          if immediate.
* @locality broker thread
* @locks none
*/
// 計算距離下次重試的時間間隔
static RD_INLINE int
rd_kafka_broker_reconnect_backoff(const rd_kafka_broker_t *rkb, rd_ts_t now) {
…remains = rkb->rkb_ts_reconnect - now;
…
}
?
static int rd_kafka_broker_thread_main(void *arg) {
...switch (rkb->rkb_state) {
...case RD_KAFKA_BROKER_STATE_TRY_CONNECT:
.../* Throttle & jitter reconnects to avoid* thundering horde of reconnecting clients after* a broker / network outage. Issue #403 */backoff =rd_kafka_broker_reconnect_backoff(rkb, rd_clock());if (backoff > 0) {rd_rkb_dbg(rkb, BROKER, "RECONNECT","Delaying next reconnect by %dms",backoff);rd_kafka_broker_serve(rkb, (int)backoff);continue;}
...case RD_KAFKA_BROKER_STATE_CONNECT:case RD_KAFKA_BROKER_STATE_SSL_HANDSHAKE:case RD_KAFKA_BROKER_STATE_AUTH_LEGACY:case RD_KAFKA_BROKER_STATE_AUTH_REQ:case RD_KAFKA_BROKER_STATE_AUTH_HANDSHAKE:case RD_KAFKA_BROKER_STATE_APIVERSION_QUERY:/* Asynchronous connect in progress. */rd_kafka_broker_serve(rkb, rd_kafka_max_block_ms);
?/* Connect failure.* Try the next resolve result until we've* tried them all, in which case we back off the next* connection attempt to avoid busy looping. */if (rkb->rkb_state == RD_KAFKA_BROKER_STATE_DOWN &&rd_kafka_broker_addresses_exhausted(rkb))rd_kafka_broker_update_reconnect_backoff(rkb, &rkb->rkb_rk->rk_conf, rd_clock());/* If we haven't made progress from the last state, and* if we have exceeded* socket_connection_setup_timeout_ms, then error out.* Don't error out in case this is a reauth, for which* socket_connection_setup_timeout_ms is not* applicable. */else if (rkb->rkb_state == orig_state &&!rkb->rkb_reauth_in_progress &&rd_clock() >=(rkb->rkb_ts_connect +(rd_ts_t)rk->rk_conf.socket_connection_setup_timeout_ms *1000))rd_kafka_broker_fail(rkb, LOG_WARNING,RD_KAFKA_RESP_ERR__TRANSPORT,"Connection setup timed out in state %s",rd_kafka_broker_state_names[rkb->rkb_state]);
?break;
…
}
?
/**
* @brief Update the nodename (address) of broker \p rkb
*        with the nodename from broker \p from_rkb (may be NULL).
*
*        If \p rkb is connected, the connection will be torn down.
*        A new connection may be attempted to the new address
*        if a persistent connection is needed (standard connection rules).
*
*        The broker's logname is also updated to include \p from_rkb's
*        broker id.
*
* @param from_rkb Use the nodename from this broker. If NULL, clear
*                 the \p rkb nodename.
*
* @remark Must only be called for logical brokers.
*
* @locks none
*/
void rd_kafka_broker_set_nodename(rd_kafka_broker_t *rkb,rd_kafka_broker_t *from_rkb) {
…// nodename已更改過, 需要觸發斷線和重連/* Trigger a disconnect & reconnect */rd_kafka_broker_schedule_connection(rkb);
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/97609.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/97609.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/97609.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

React 第七十一節 Router中generatePath的使用詳解及注意事項

前言 generatePath 是 React Router 的一個實用工具函數&#xff0c;用于根據路徑模式和參數對象生成實際的 URL 路徑。它在需要動態構建鏈接的場景中非常有用&#xff0c;比如生成導航鏈接或重定向路徑。 1、基本用法和注意事項 import { generatePath } from react-router-do…

Python 爬蟲案例:爬取豆瓣電影 Top250 數據

一、案例背景與目標 豆瓣電影 Top250 是國內權威的電影評分榜單之一&#xff0c;包含電影名稱、評分、評價人數、導演、主演、上映年份、國家 / 地區、類型等關鍵信息。本案例將使用 Python 編寫爬蟲&#xff0c;實現以下目標&#xff1a; 自動請求豆瓣電影 Top250 的 10 個分…

SPA安全警示:OAuth2.0致命漏洞

OAuth2.0在SPA應用中的安全陷阱SPA&#xff08;單頁應用&#xff09;通常采用隱式授權&#xff08;Implicit Flow&#xff09;或PKCE&#xff08;Proof Key for Code Exchange&#xff09;授權模式&#xff0c;但存在以下安全隱患&#xff1a;隱式授權模式的漏洞訪問令牌直接暴…

table表格字段明細展示

文章目錄1、字段渲染2、異步請求展示明細3、hover展示問題3.1 基本邏輯3.2 hover時長判斷3.3 renderhover表格字段明細展示&#xff0c;屬于比較小的需求&#xff0c;但是也有一定交互細節&#xff0c;本文選取部分場景。 1、字段渲染 render和渲染組件是有區別的。render常見為…

主網上線后生態極速擴張的 Berachain 生態,有哪些值得關注的項目?

Berachain 是典型的將 DeFi 思維嵌入到共識機制中的 Layer1&#xff0c;其核心是 PoL&#xff08;Proof of Liquidity&#xff09;共識。PoL 要求驗證者在獲得區塊獎勵前&#xff0c;必須將流動性導入白名單協議&#xff0c;并由市場決定資金流向。這樣&#xff0c;驗證者的權重…

claude-code對比GitHub-Copilot

Claude Code 文檔日期&#xff1a;2025 年 08 月 20 日 定位 項目級開發助手&#xff0c;專注于全局視野和復雜任務的處理。 特點 超長上下文支持&#xff1a;支持 200k 超長上下文&#xff0c;適合處理復雜項目。豐富的自定義命令&#xff1a;提供靈活的命令配置&#xff0c;滿…

Roo Code自定義Mode(模式)

什么是自定義模式&#xff1f; 簡單來說&#xff0c;自定義模式就像是給Roo Code穿上不同的"職業裝"。你可以創建針對特定任務或工作流程量身定制的模式&#xff0c;讓Roo在不同場景下表現出專業的行為。 這些模式分為兩種類型&#xff1a;全局模式&#xff08;在所有…

Next.js渲染模式:SSR、SSG與ISR揭秘

Next.js 核心渲染模式深度解析&#xff1a;SSR、SSG 與 ISR 在構建現代 Web 應用時&#xff0c;性能和用戶體驗是至關重要的考量。Next.js 作為 React 生態中一個備受推崇的框架&#xff0c;其強大的服務端渲染&#xff08;SSR&#xff09;、靜態站點生成&#xff08;SSG&#…

Veo Videos Generation API 對接說明

本文介紹了如何對接 Veo Videos Generation API&#xff0c;通過輸入自定義參數生成Veo官方視頻。 下面將詳細闡述 Veo Videos Generation API 的對接流程。 申請流程 使用 API 前&#xff0c;需前往 Veo Videos Generation API 頁面申請服務。進入頁面后&#xff0c;點擊「…

YOLO 目標檢測:YOLOv3網絡結構、特征輸出、FPN、多尺度預測

文章目錄一、YOLOV31、網絡結構1.1 整體結構1.2 主干網絡1.3 特征輸出1.4 特征融合FPN&#xff08;Feature Pyramid Networks&#xff09;FPN 融合上采樣融合2、多尺度預測3、損失函數4、性能對比一、YOLOV3 YOLOv3&#xff08;You Only Look Once v3&#xff09;是YOLO系列中…

【GIS圖像處理】有哪些SOTA方法可以用于將1.5米分辨率遙感圖像超分辨率至0.8米精度的?

針對將1.5米分辨率遙感圖像超分辨率至0.8米的需求,當前主流方法可分為以下幾類,結合最新研究進展和實際應用場景,具體技術方案及SOTA方法如下: 一、基于Transformer的高效建模 1. Top-k標記選擇Transformer(TTST) 核心機制:通過動態選擇前k個關鍵標記(token),消除冗…

【電力電子】逆變器控制策略:PQ Droop下垂控制、電壓電流雙環控制與SPWM調制

逆變器中的 PQ Droop 控制。 1. PQ Droop 控制的定義 PQ Droop(有時也稱為功率下垂控制,Power Droop Control)是微電網、并聯系統或逆變器并網運行中常用的一種分布式功率控制方法。 P-Droop(有功下垂):通過調節逆變器輸出頻率與有功功率之間的關系實現功率分配。 Q-Dro…

【LeetCode 熱題 100】5. 最長回文子串——中心擴散法

Problem: 5. 最長回文子串 文章目錄整體思路完整代碼時空復雜度時間復雜度&#xff1a;O(N^2)空間復雜度&#xff1a;O(1)整體思路 這段代碼旨在解決經典的 “最長回文子串” (Longest Palindromic Substring) 問題。問題要求在一個給定的字符串 S 中&#xff0c;找到一個最長…

六、練習3:Gitee平臺操作

練習3&#xff1a;Gitee平臺操作 練習目標 掌握Gitee平臺的基本操作&#xff0c;包括創建倉庫、推送代碼、團隊協作等。 練習步驟 步驟1&#xff1a;Gitee賬號準備 訪問 gitee.com注冊賬號&#xff08;如果還沒有&#xff09;登錄Gitee 步驟2&#xff1a;配置SSH密鑰 # …

Git軟件版本控制

軟件版本控制作用&#xff1a;軟件源碼版本管理、多人協作開發、版本多分支開發、代碼回滾&#xff08;回退&#xff09;等功能。集中式版本控制&#xff1a;將代碼倉庫放在一臺服務器上&#xff0c;開發時要依賴這臺服務器。優點&#xff1a;簡單、方便管理、適合中小型項目缺…

生產環境Spark Structured Streaming實時數據處理應用實踐分享

生產環境Spark Structured Streaming實時數據處理應用實踐分享 一、業務場景描述 我們所在的電商平臺需要實時監控用戶行為數據&#xff08;如點擊、下單、支付等&#xff09;&#xff0c;基于事件級別的流式數據進行實時統計、會話聚合、漏斗分析&#xff0c;并將結果推送到Da…

海康相機開發---HCNetSDK

HCNetSDK&#xff08;Hikvision Network Software Development Kit&#xff09;是海康威視專為旗下安防監控設備打造的二次開發工具包&#xff0c;是連接上層應用與海康設備的核心橋梁。其封裝了設備底層通信協議&#xff08;包括私有協議與部分標準協議&#xff09;&#xff0…

構建無廣告私人圖書館Reader與cpolar讓電子書庫隨身攜帶

文章目錄前言&#xff1a;告別書荒&#xff0c;拯救靈魂的“摸魚神器”1、關于Reader&#xff1a;小而美的開源在線閱讀器2、Docker部署3、簡單使用reader和添加書源4.群暉安裝Cpolar工具5.創建reader閱讀器的公網地址6.配置固定公網地址前言&#xff1a;告別書荒&#xff0c;拯…

amd cpu是x86架構嗎

是的&#xff0c;AMD CPU屬于x86架構?&#xff0c;其64位擴展&#xff08;x86-64&#xff09;最初由AMD設計并成為行業標準。? ?AMD與x86架構的關系? ?技術淵源?&#xff1a;AMD自1976年起通過技術授權成為x86架構的合法制造商&#xff0c;與英特爾共同主導x86市場。2003…

vercel上線資源無法加載

背景&#xff1a;在本地跑開發服務器沒問題&#xff0c;但是部署到 vercel 上就有問題上一次出現類似問題是在更新游戲引擎方法后本地可以跑但是上線沒有成功&#xff0c;當時是因為 runner.html 是在部署時通過腳本從遠端倉庫拉取的&#xff0c;所以解決方案&#xff1a;1.更新…