OpenAI API接口請求速率限制
速率限制以五種方式衡量:RPM(每分鐘請求數)、RPD(每天請求數)、TPM(每分鐘令牌數)、TPD(每天令牌數)和IPM(每分鐘圖像數)。
任何選項都可能會達到速率限制,具體取決于首先發生的情況。例如,向 ChatCompletions 端點發送僅包含 100 個令牌的 20 個請求,這將達到限制(如果RPM 為 20),即使在這 20 個請求中沒有發送 150k 令牌(如果TPM 限制為 150k) 。
其他值得注意的重要事項:
- 速率限制是在組織級別而不是用戶級別施加的。
- 速率限制因所使用的模型而異。
- 組織每月可以在 API 上花費的總金額也受到限制。這些也稱為“使用限制”。
解決方法
OpenAI Cookbook 有一個Python 筆記本,解釋了如何避免速率限制錯誤,以及一個用于在批處理 API 請求時保持速率限制的示例Python 腳本。
在提供編程訪問、批量處理功能和自動社交媒體發布時,考慮只為部分用戶啟用這些功能。
為了防止自動和大量濫用,請在指定時間范圍內(每日、每周或每月)為單個用戶設置使用限制。考慮對超出限制的用戶實施硬上限或手動審核流程。
方法一:使用指數退避重試
避免速率限制錯誤的一種簡單方法是使用隨機指數退避自動重試請求。使用指數退避重試意味著在遇到速率限制錯誤時執行短暫睡眠,然后重試不成功的請求。如果請求仍然不成功,則增加睡眠長度并重復該過程。這將持續到請求成功或達到最大重試次數為止。這種方法有很多好處:
- 自動重試意味著您可以從速率限制錯誤中恢復,而不會崩潰或丟失數據
- 指數退避意味著您可以快速嘗試第一次重試,同時如果前幾次重試失敗,仍然可以從更長的延遲中受益
- 在延遲中添加隨機抖動有助于同時重試所有命中。
請注意,不成功的請求會影響您的每分鐘限制,因此連續重新發送請求將不起作用。
下面是一些使用指數退避的Python解決方案示例。
示例 1:使用 Tenacity 庫
Tenacity 是一個 Apache 2.0 許可的通用重試庫,用 Python 編寫,用于簡化向任何事物添加重試行為的任務。要為您的請求添加指數退避,您可以使用tenacity.retry裝飾器。下面的示例使用該tenacity.wait_random_exponential函數向請求添加隨機指數退避。
from openai import OpenAI
client = OpenAI()from tenacity import (retry,stop_after_attempt,wait_random_exponential,
) # 指數退避@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def completion_with_backoff(**kwargs):return client.completions.create(**kwargs)completion_with_backoff(model="gpt-3.5-turbo-instruct", prompt="Once upon a time,")
請注意,Tenacity 庫是第三方工具,OpenAI 不保證其可靠性或安全性。
示例 2:使用backoff庫
另一個為退避和重試提供函數裝飾器的 python 庫是backoff:
import backoff
import openai
from openai import OpenAI
client = OpenAI()@backoff.on_exception(backoff.expo, openai.RateLimitError)
def completions_with_backoff(**kwargs):return client.completions.create(**kwargs)completions_with_backoff(model="gpt-3.5-turbo-instruct", prompt="Once upon a time,")
與 Tenacity 一樣,backoff 庫是第三方工具,OpenAI 不保證其可靠性或安全性。
示例 3:手動退避實現
如果您不想使用第三方庫,您可以按照以下示例實現自己的退避邏輯:
import random
import timeimport openai
from openai import OpenAI
client = OpenAI()# 定義一個重試裝飾器
def retry_with_exponential_backoff(func,initial_delay: float = 1,exponential_base: float = 2,jitter: bool = True,max_retries: int = 10,errors: tuple = (openai.RateLimitError,),
):"""Retry a function with exponential backoff."""def wrapper(*args, **kwargs):# 初始化變量num_retries = 0delay = initial_delay# 循環直到成功響應或達到 max_retries 或引發異常while True:try:return func(*args, **kwargs)# 重試特定錯誤except errors as e:# 增量重試num_retries += 1# 檢查是否已達到最大重試次數if num_retries > max_retries:raise Exception(f"Maximum number of retries ({max_retries}) exceeded.")# 增加延遲delay *= exponential_base * (1 + jitter * random.random())time.sleep(delay)# 針對任何未指定的錯誤引發異常except Exception as e:raise ereturn wrapper@retry_with_exponential_backoff
def completions_with_backoff(**kwargs):return client.completions.create(**kwargs)
同樣,OpenAI 不保證該解決方案的安全性或效率,但它可以成為您自己的解決方案的良好起點。
方法二:充分利用max_tokens以匹配您完成的規模
max_tokens您的速率限制是根據您的請求的字符數計算的令牌的最大值和估計數量。嘗試將該max_tokens值設置為盡可能接近您的預期響應大小。
批量請求
OpenAI API 對每分鐘請求數和每分鐘令牌數有單獨的限制。
如果您達到了每分鐘的請求限制,但每分鐘的令牌有可用容量,則可以通過將多個任務批處理到每個請求中來提高吞吐量。這將使您每分鐘處理更多令牌,特別是對于我們較小的模型。
發送一批提示的工作方式與普通 API 調用完全相同,只不過您將字符串列表而不是單個字符串傳遞給提示參數。
- 沒有批處理的示例
from openai import OpenAI
client = OpenAI()num_stories = 10
prompt = "Once upon a time,"# 示例,每個請求完成一個故事
for _ in range(num_stories):response = client.completions.create(model="curie",prompt=prompt,max_tokens=20,)# 輸出故事print(prompt + response.choices[0].text)
- 批處理示例
from openai import OpenAI
client = OpenAI()num_stories = 10
prompts = ["Once upon a time,"] * num_stories# 批量示例,每個請求完成 10 個故事
response = client.completions.create(model="curie",prompt=prompts,max_tokens=20,
)# 按索引將完成與提示進行匹配
stories = [""] * len(prompts)
for choice in response.choices:stories[choice.index] = prompts[choice.index] + choice.text# 輸出故事
for story in stories:print(story)