处理费率限制
本文介绍了OpenAI API的速率限制机制和处理速率限制错误的技巧。文章解释了速率限制的作用,以及通过限制请求次数和防止滥用或误用API来保护API和其用户的可靠操作。同时,本文还提供了示例脚本来限制并行请求以避免速率限制错误。
介绍
当您反复调用 OpenAI API 时,您可能会遇到错误消息 429:’Too Many Requests
‘ 或 RateLimitError
。 这些错误消息来自超出 API 的速率限制。
本指南分享了避免和处理速率限制错误的技巧。
要查看用于限制并行请求以避免速率限制错误的示例脚本,请参阅 api_request_parallel_processor.py 。
为什么存在速率限制
速率限制是 API 的常见做法,它们的实施有几个不同的原因。
首先,它们有助于防止滥用或误用 API。 例如,恶意行为者可能会向 API 发送大量请求,以试图使其过载或导致服务中断。 通过设置速率限制,OpenAI 可以防止此类活动。
其次,速率限制有助于确保每个人都能公平地访问 API。 如果一个人或组织发出过多的请求,可能会使其他人的 API 陷入困境。 通过限制单个用户可以发出的请求数量,OpenAI 确保每个人都有机会使用 API 而不会遇到速度下降的情况。
最后,速率限制可以帮助 OpenAI 管理其基础设施上的聚合负载。 如果对 API 的请求急剧增加,可能会对服务器造成负担并导致性能问题。 通过设置速率限制,OpenAI 可以帮助为所有用户保持流畅和一致的体验。
尽管达到速率限制可能令人沮丧,但速率限制的存在是为了保护 API 对其用户的可靠操作。
默认速率限制
截至 2023 年 1 月,违约率限制为:
文本补全 & 嵌入端点
代码 & 编辑端点
免费试用用户
20 个请求/分钟 150,000 个令牌/分钟
20 个请求/分钟 150,000 个令牌/分钟
现收现付用户(前 48 小时内)
60 个请求/分钟 250,000 davinci 令牌/分钟(对于更便宜的模型,比例更高)
20 个请求/分钟 150,000 个令牌/分钟
现收现付用户(前 48 小时后)
3,000 个请求/分钟 250,000 个 davinci 令牌/分钟(对于更便宜的型号,比例更高)
20 个请求/分钟 150,000 个令牌/分钟
作为参考,1,000 个标记大约是一页文本。
其他速率限制资源
在这些其他资源中阅读有关 OpenAI 速率限制的更多信息:
指南:速率限制
帮助中心:API 的使用是否有速率限制?
帮助中心:如何解决 429:’Too Many Requests’ 错误?
请求提高速率限制
如果您希望提高组织的速率限制,请填写以下表格:OpenAI 速率限制增加申请表
示例速率限制错误
API请求发送过快会出现限速错误。 如果使用 OpenAI Python 库,它们将类似于:
RateLimitError: Rate limit reached for default-codex in organization org- { id } on requests per min. Limit: 20.000000 / min. Current: 24.000000 / min. Contact support@openai. com if you continue to have issues or if you’d like to request an increase.
翻译内容如下:
RateLimitError:组织 org- { id } 中的 default-codex 已达到每分钟请求的速率限制。 限制: 20.000000 /分钟。 电流: 24.000000 /分钟。 如果您仍然遇到问题或想要请求增加,请联系 support@openai. com 。
下面是触发速率限制错误的示例代码:
import openai # for making OpenAI API requests
# request a bunch of completions in a loop
openai.Completion. create (
model= “code-cushman-001” ,
prompt= “def magic_function():\n\t” ,
如何避免速率限制错误
使用指数回退重试
避免速率限制错误的一种简单方法是自动重试具有随机指数回退的请求。使用指数回退重试意味着在遇到速率限制错误时执行短暂的休眠,然后重试不成功的请求。如果请求仍然不成功,则增加休眠时间并重复该过程。这将持续到请求成功或达到最大重试次数为止。
这种方法有许多好处:
自动重试意味着您可以在不崩溃或丢失数据的情况下从速率限制错误中恢复
指数回退意味着您的第一次重试可以快速完成,同时如果您的前几次重试失败,也可以从较长的延迟中受益
向延迟添加随机抖动有助于所有重试同时发生
请注意,不成功的请求会贡献到您每分钟的限制,因此不断重新发送请求是无效的。
以下是一些示例解决方案。
示例1:使用Tenacity库
Tenacity 是一个Apache 2.0许可的通用重试库,用Python编写,可简化将重试行为添加到几乎任何内容的任务。
要为您的请求添加指数回退,您可以使用tenacity.retry
装饰器 。以下示例使用tenacity.wait_random_exponential
函数向请求添加随机指数回退。
请注意,Tenacity库是第三方工具,OpenAI不保证其可靠性或安全性。
import openai # for OpenAI API calls
) # for exponential backoff
@ retry ( wait= wait_random_exponential ( min= 1 , max= 60 ) , stop= stop_after_attempt ( 6 ))
def completion_with_backoff ( **kwargs ) :
return openai.Completion. create ( **kwargs )
completion_with_backoff ( model= “text-davinci-002” , prompt= “Once upon a time,” )
<OpenAIObject text_completion id=cmpl-5oowO391reUW8RGVfFyzBM1uBs4A5 at 0x10d8cae00> JSON : {
“finish_reason”: “length” ,
“text”: ” a little girl dreamed of becoming a model.\n\nNowadays, that dream”
“id”: “cmpl-5oowO391reUW8RGVfFyzBM1uBs4A5” ,
“model”: “text-davinci-002” ,
“object”: “text_completion” ,
示例2:使用backoff库
另一个提供用于backoff和重试的函数装饰器的库是backoff 。
与Tenacity一样,backoff库是第三方工具,OpenAI对其可靠性或安全性不做任何保证。
import backoff # for exponential backoff
import openai # for OpenAI API calls
@backoff. on_exception ( backoff.expo, openai.error.RateLimitError )
def completions_with_backoff ( **kwargs ) :
return openai.Completion. create ( **kwargs )
completions_with_backoff ( model= “text-davinci-002” , prompt= “Once upon a time,” )
<OpenAIObject text_completion id=cmpl-5oowPhIdUvshEsF1rBhhwE9KFfI3M at 0x111043680> JSON : {
“finish_reason”: “length” ,
“text”: ” two children lived in a poor country village. In the winter, the temperature would”
“id”: “cmpl-5oowPhIdUvshEsF1rBhhwE9KFfI3M” ,
“model”: “text-davinci-002” ,
“object”: “text_completion” ,
示例 3:手动实现退避
如果您不想使用第三方库,可以实现自己的退避逻辑。
# define a retry decorator
def retry_with_exponential_backoff (
initial_delay: float = 1 ,
exponential_base: float = 2 ,
errors: tuple = ( openai.error.RateLimitError, ) ,
“””Retry a function with exponential backoff.”””
def wrapper ( *args, **kwargs ) :
# Loop until a successful response or max_retries is hit or an exception is raised
return func ( *args, **kwargs )
# Retry on specified errors
# Check if max retries has been reached
if num_retries > max_retries:
f “Maximum number of retries ({max_retries}) exceeded.”
delay *= exponential_base * ( 1 + jitter * random. random ())
# Raise exceptions for any errors not specified
@retry_with_exponential_backoff
def completions_with_backoff ( **kwargs ) :
return openai.Completion. create ( **kwargs )
completions_with_backoff ( model= “text-davinci-002” , prompt= “Once upon a time,” )
<OpenAIObject text_completion id=cmpl-5oowRsCXv3AkUgVJyyo3TQrVq7hIT at 0x111024220> JSON : {
“finish_reason”: “length” ,
“text”: ” a man decided to greatly improve his karma by turning his life around.\n\n”
“id”: “cmpl-5oowRsCXv3AkUgVJyyo3TQrVq7hIT” ,
“model”: “text-davinci-002” ,
“object”: “text_completion” ,
如何在限速的情况下最大化批处理的吞吐量
如果您正在处理来自用户的实时请求,则退避和重试是一种很好的策略,可以最小化延迟,同时避免速率限制错误。
但是,如果您正在处理大量批处理数据,其中吞吐量比延迟更重要,除了退避和重试之外,您还可以做一些其他事情。
主动在请求之间添加延迟
如果您不断触发速率限制,然后退避,然后再次触发速率限制,然后再次退避,则可能会浪费您的请求预算的一大部分来重试请求。这会限制您在固定速率限制下的处理吞吐量。
在这里,一种潜在的解决方案是计算您的速率限制,并为每个请求添加等于其倒数(例如,如果您的速率限制为每分钟20个请求,则为每个请求添加3-6秒的延迟)。这可以帮助您在不触发速率限制并遭受浪费请求的情况下操作接近速率限制上限。
添加延迟到请求的示例
翻译成中文:
如何在限速的情况下最大化批处理的吞吐量
如果您正在处理来自用户的实时请求,则退避和重试是一种很好的策略,可以最小化延迟,同时避免速率限制错误。
但是,如果您正在处理大量批处理数据,其中吞吐量比延迟更重要,除了退避和重试之外,您还可以做一些其他事情。
主动在请求之间添加延迟
如果您不断触发速率限制,然后退避,然后再次触发速率限制,然后再次退避,则可能会浪费您的请求预算的一大部分来重试请求。这会限制您在固定速率限制下的处理吞吐量。
在这里,一种潜在的解决方案是计算您的速率限制,并为每个请求添加等于其倒数(例如,如果您的速率限制为每分钟20个请求,则为每个请求添加3-6秒的延迟)。这可以帮助您在不触发速率限制并遭受浪费请求的情况下操作接近速率限制上限。
添加延迟到请求的示例
如果您正在处理大量批处理数据,其中吞吐量比延迟更重要,那么可以主动在请求之间添加延迟。计算您的速率限制,并为每个请求添加等于其倒数的延迟,这可以帮助您在不触发速率限制并遭受浪费请求的情况下操作接近速率限制上限。
# Define a function that adds a delay to a Completion API call
def delayed_completion ( delay_in_seconds: float = 1 , **kwargs ) :
“””Delay a completion by a specified amount of time.”””
time. sleep ( delay_in_seconds )
# Call the Completion API and return the result
return openai.Completion. create ( **kwargs )
# Calculate the delay based on your rate limit
rate_limit_per_minute = 20
delay = 60.0 / rate_limit_per_minute
model= “text-davinci-002” ,
prompt= “Once upon a time,”
<OpenAIObject text_completion id=cmpl-5oowVVZnAzdCPtUJ0rifeamtLcZRp at 0x11b2c7680> JSON : {
“finish_reason”: “length” ,
“text”: ” there was an idyllic little farm that sat by a babbling brook”
“id”: “cmpl-5oowVVZnAzdCPtUJ0rifeamtLcZRp” ,
“model”: “text-davinci-002” ,
“object”: “text_completion” ,
批量请求
OpenAI API 对每分钟的请求和每分钟的令牌有单独的限制。
如果您每分钟的请求达到了限制,但令牌每分钟有头空间,您可以通过将多个任务批量处理到每个请求中来提高吞吐量。 这将允许您每分钟处理更多令牌,特别是对于较小的模型。
将一批提示发送到API调用的方式与正常API调用完全相同,只是将一个字符串列表传递给prompt
参数而不是单个字符串。
*警告:**响应对象可能不会按照提示的顺序返回完成,因此始终要记住使用index
字段将响应与提示匹配。
示例没有批处理
import openai # for making OpenAI API requests
prompt = “Once upon a time,”
# serial example, with one story completion per request
for _ in range ( num_stories ) :
response = openai.Completion. create (
print ( prompt + response.choices [ 0 ] .text )
Once upon a time, before there were grandiloquent tales of the massacre at Fort Mims, there were stories of
Once upon a time, a full-sized search and rescue was created. However, CIDIs are the addition of requiring
Once upon a time, Schubert was hot with the films. “Schubert sings of honey, flowers,
Once upon a time, you could watch these films on your VCR, sometimes years after their initial theatrical release, and there
Once upon a time, there was a forest. In that forest, the forest animals ruled. The forest animals had their homes
Once upon a time, there were two programs that complained about false positive scans. Peacock and Midnight Manager alike, only
Once upon a time, a long, long time ago, tragedy struck. it was the darkest of nights, and there was
Once upon a time, when Adam was a perfect little gentleman, he was presented at Court as a guarantee of good character.
Once upon a time, Adam and Eve made a mistake. They ate the fruit from the tree of immortality and split the consequences
Once upon a time, there was a set of programming fundamental principles known as the “X model.” This is a set of
示例批处理
import openai # for making OpenAI API requests
prompts = [ “Once upon a time,” ] * num_stories
# batched example, with 10 stories completions per request
response = openai.Completion. create (
# match completions to prompts by index
stories = [ “” ] * len ( prompts )
for choice in response.choices:
stories [ choice.index ] = prompts [ choice.index ] + choice.text
Once upon a time, there were two sisters, Eliza Pickering and Ariana ‘Ari’ Lucas. When these lovely
Once upon a time, Keene was stung by a worm — actually, probably a python — snaking through his leg
Once upon a time, there was a professor of physics during the depression. It was difficult, during this time, to get
Once upon a time, before you got sick, you told stories to all and sundry, and your listeners believed in you
Once upon a time, there was one very old nice donkey. He was incredibly smart, in a very old, kind of
Once upon a time, the property of a common lodging house was a common cup for all the inhabitants. Betimes a constant
Once upon a time, in an unspecified country, there was a witch who had an illegal product. It was highly effective,
Once upon a time, a long time ago, I turned 13, my beautiful dog Duncan swept me up into his jaws like
Once upon a time, as a thoroughly reformed creature from an army of Nazis, he took On Judgement Day myself and his
Once upon a time, Capcom made a game for the Atari VCS called Missile Command. While it was innovative at the time
示例并行处理脚本
我们为并行处理大量API请求编写了一个示例脚本:api_request_parallel_processor.py 。
该脚本结合了一些方便的功能:
从文件流式请求,以避免对巨大作业耗尽内存
并发发出请求,以最大限度地提高吞吐量
限制请求和令牌使用,以保持在速率限制之下
重试失败的请求,以避免丢失数据
记录错误,以诊断请求问题
欢迎按原样使用或修改以适应您的需求。