Yellowstone gRPC 是一個功能強大、可用于生產環境且經過實戰檢驗的工具,用于流式傳輸實時的 Solana 數據。但在實際條件下,網絡中斷或服務器重啟可能導致連接中斷。如果沒有適當的重連策略,你的應用程序可能會錯過區塊鏈的關鍵更新。?為了防止這種情況,重要的是構建一個系統,該系統不僅自動重連,而且還從特定的 slot 恢復數據流,從而確保一致性、可靠性和零遺漏事件。
開始之前
要開始,我們需要準備一些東西。
認證:gRPC 端點和 gRPC Token?Shyft 的 gRPC?節點遍布歐盟和美國地區的各個位置。要訪問,我們需要一個特定于區域的 gRPC 端點和一個訪問Token,你可以在?Shyft 儀表板?上購買。
服務器端后端(如 NodeJS)用于接收 gRPC 數據?由于 Web 瀏覽器不支持 gRPC 服務,因此你需要一個后端應用程序,例如 C#、Go、Java、Python?等,來接收 gRPC 數據。
代碼示例:實現重連機制
為了確保流從臨時斷開連接中自動恢復,我們實現了一個簡單的重連循環。如果連接因錯誤而斷開,應用程序會等待一段短暫的延遲,然后使用相同的訂閱請求重新啟動流。這確保了連續的數據流,無需手動干預,即使在不穩定的網絡條件下也是如此。
/*** 重連機制在 handle stream 函數中實現* 如果發生任何錯誤,流將等待 1000 毫秒,然后調用* handleStream 函數,該函數反過來將重新啟動流*/
async function subscribeCommand(client: Client, args: SubscribeRequest) {while (true) {try {await handleStream(client, args); // 訂閱并處理流} catch (error) {console.error("Stream error, retrying in 1 second...", error);await new Promise((resolve) => setTimeout(resolve, 1000));// 可以在這里更改超時時間}}
}
該代碼演示了一個 while 循環,其中調用了?handleStream()
?函數。handleStream()
?函數負責訂閱和接收流。一旦流中斷,將從 handle stream 函數拋出一個錯誤,該錯誤在循環內處理。然后,循環等待給定的超時時間并迭代,重新發送訂閱請求。
你可以查看?我們的文檔?,或者直接運行 Repl?此處的代碼?以獲取上面示例的完整代碼。
代碼示例:從特定 Slot 重放更新
為了避免在斷開連接期間丟失任何數據,流會跟蹤從每個交易更新收到的最后一個 slot。當流遇到錯誤時,它會嘗試重新連接,并使用?SubscribeRequest?中的 fromSlot 字段從該確切 slot 恢復。此邏輯確保在重新連接時不會跳過任何交易更新。使用重試計數器來防止無限次嘗試 — 達到限制后,系統會回退到從最新的可用 slot 進行流式傳輸。
require("dotenv").config();
import Client, { CommitmentLevel } from "@triton-one/yellowstone-grpc";
import { SubscribeRequest } from "@triton-one/yellowstone-grpc/dist/types/grpc/geyser";
import * as bs58 from "bs58";const MAX_RETRY_WITH_LAST_SLOT = 30;
const RETRY_DELAY_MS = 1000;
const ADDRESS_TO_STREAM_FROM = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P";type StreamResult = {lastSlot?: string;hasRcvdMSg: boolean;
};async function handleStream(client: Client,args: SubscribeRequest,lastSlot?: string
): Promise<StreamResult> {const stream = await client.subscribe();let hasRcvdMSg = false;return new Promise((resolve, reject) => {stream.on("data", (data) => {const tx = data.transaction?.transaction?.transaction;if (tx?.signatures?.[0]) {const sig = bs58.encode(tx.signatures[0]);console.log("Got tx:", sig);lastSlot = data.transaction.slot;hasRcvdMSg = true;}});stream.on("error", (err) => {stream.end();reject({ error: err, lastSlot, hasRcvdMSg });});const finalize = () => resolve({ lastSlot, hasRcvdMSg });stream.on("end", finalize);stream.on("close", finalize);stream.write(args, (err: any) => {if (err) reject({ error: err, lastSlot, hasRcvdMSg });});});
}async function subscribeCommand(client: Client, args: SubscribeRequest) {let lastSlot: string | undefined;let retryCount = 0;while (true) {try {if (args.fromSlot) {console.log("Starting stream from slot", args.fromSlot);}const result = await handleStream(client, args, lastSlot);lastSlot = result.lastSlot;if (result.hasRcvdMSg) retryCount = 0;} catch (err: any) {console.error(`Stream error, retrying in ${RETRY_DELAY_MS / 1000} second...`);await new Promise((resolve) => setTimeout(resolve, RETRY_DELAY_MS));lastSlot = err.lastSlot;if (err.hasRcvdMSg) retryCount = 0;if (lastSlot && retryCount < MAX_RETRY_WITH_LAST_SLOT) {console.log(`#${retryCount} retrying with last slot ${lastSlot}, remaining retries ${MAX_RETRY_WITH_LAST_SLOT - retryCount}`);args.fromSlot = lastSlot;retryCount++;} else {console.log("Retrying from latest slot (no last slot available)");delete args.fromSlot;retryCount = 0;lastSlot = undefined;}}}
}const client = new Client(process.env.GRPC_URL!, process.env.X_TOKEN!, {"grpc.keepalive_permit_without_calls": 1,"grpc.keepalive_time_ms": 10000,"grpc.keepalive_timeout_ms": 1000,"grpc.default_compression_algorithm": 2,
});const req: SubscribeRequest = {accounts: {},slots: {},transactions: {pumpFun: {vote: false,failed: false,accountInclude: [ADDRESS_TO_STREAM_FROM],accountExclude: [],accountRequired: [],},},transactionsStatus: {},blocks: {},blocksMeta: {},entry: {},accountsDataSlice: [],commitment: CommitmentLevel.CONFIRMED,
};subscribeCommand(client, req);
與上一種方法類似,無限循環確保流在每次斷開連接時都保持重新連接。我們初始化兩個變量,一個用于存儲?lastSlot
,即從流收到的最新 slot,另一個用于?retryCount
,它限制了從先前 slot 重試的次數,以避免卡在錯誤數據或間隙上。
if (args.fromSlot) {console.log("Starting stream from slot", args.fromSlot);
}
在啟動流之前,代碼檢查是否設置了?fromSlot
。如果是,流將從該特定?slot
?恢復,而不是從最新的區塊開始。handleStream
?函數打開流,偵聽傳入的交易數據,并跟蹤收到的最新 slot。如果收到任何數據,它會將流標記為成功(hasRcvdMSg
?=?true
)并重置重試計數器,以便系統可以在需要時繼續從上次已知的 slot 重試。
const result = await handleStream(client, args, lastSlot);
lastSlot = result.lastSlot;
if (result.hasRcvdMSg) retryCount = 0;
- 如果在發生錯誤之前成功記錄了?
lastSlot
,則將在下一次嘗試中重復使用它。 - 如果之前的流確實傳遞了數據,我們將重置?
retryCount
。
智能回退
if (lastSlot && retryCount < MAX_RETRY_WITH_LAST_SLOT) {args.fromSlot = lastSlot;retryCount++;
} else {delete args.fromSlot;retryCount = 0;lastSlot = undefined;
}
這是核心的彈性邏輯:
- 如果我們仍然有有效的?
lastSlot
?并且沒有超過重試限制,我們將嘗試從它恢復。 - 如果我們重試的次數過多或者沒有有效的 slot,我們將清除?
fromSlot
?并讓流從區塊鏈的頂端開始。
本文的完整代碼可在?GitHub?上獲取 — 隨意克隆并進行測試。我們還在?GitHub?上分享了一系列涵蓋 gRPC 和 DeFi 的示例用例,你可以克隆并進行實驗。
結論
構建具有?基于 slot 的重放?的重連策略,可確保你的 Solana 應用程序?保持可靠和實時?— 即使在網絡中斷的情況下也是如此。通過跟蹤上次收到的 slot 并智能地重試,你可以從中斷的地方恢復流式傳輸,避免錯過更新或重復數據。這種方法?增加了彈性,并保證了?任何生產級區塊鏈應用程序的?更順暢的?用戶體驗,更多相關文章,請,https://t.me/gtokentool。