Continually Facing rate limit on OpenAI API

I am buliding a tool that can create blogs with the help of OpenAI API key. And then I apply some techniques to make it less recognizable as AI-generated. But I’m continously facing a problem of rate limit, even after implementing exponential backoff, batching requests, and asynchronous tasks. Can anyone please help me?

https://replit.com/@ShrutiRajpurohi/AI-blogger?s=app

import openai
import requests
import json
import os
from retry import retry
import logging
import concurrent.futures
from ratelimit import limits, sleep_and_retry

from ai_recognition import (
    paraphrase,
    style_transfer,
    introduce_errors,
    reduce_ai_recognition,
    generate_sub_headings
)
NUM_PROCESSES = 4


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

my_secret = os.environ['AIRTABLE_API_KEY']
AIRTABLE_BASE_ID = 'appDdPu44GxMuBEzh'
AIRTABLE_TABLE_NAME = 'tbl6hgDr5863EXxn9'
openai.api_key = os.environ['OPENAI_API_KEY']
AIRTABLE_API_ENDPOINT = 'https://api.airtable.com/v0/appDdPu44GxMuBEzh'

headers = {
    'Authorization': f"Bearer {os.environ['AIRTABLE_API_KEY']}",
    'Content-Type': 'application/json'
}

# Rate limiting configuration
RATE_LIMIT_THRESHOLD = 55
RATE_LIMIT_PERIOD = 60


@retry(tries=5, delay=3, backoff=2)
@sleep_and_retry
@limits(calls=RATE_LIMIT_THRESHOLD, period=RATE_LIMIT_PERIOD)
def api_request(*args, **kwargs):
    response = requests.get(*args, **kwargs)
    response.raise_for_status()
    return response


def get_input_keywords():
    logging.info("Fetching Input Keywords from Airtable")
    url = f'https://api.airtable.com/v0/{AIRTABLE_BASE_ID}/{AIRTABLE_TABLE_NAME}?view=Grid%20view'
    response = api_request(url, headers=headers)
    data = response.json()
    try:
        records = data['records']
        input_data = [
            (record['fields'].get('Input Keyword'), record['fields'].get('Prompt'), record['id'])
            for record in records if 'Input Keyword' in record['fields']
        ]
        return input_data
    except KeyError as e:
        print(f"KeyError: {e}")
        return []


def process_style_and_errors(sub_headings_variations, sub_heading, keyword, num_variations):
    styled_heading = style_transfer(sub_heading, keyword, num_variations)
    error_text = introduce_errors(styled_heading, num_variations)
    sub_headings_variations[sub_heading] = (styled_heading, error_text)


def style_and_error_batch(sub_headings, keyword, num_variations):
    styled_and_error_text_batch = {}

    with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_PROCESSES) as executor:
        for sub_heading in sub_headings:
            executor.submit(process_style_and_errors, styled_and_error_text_batch, sub_heading, keyword, num_variations)

    return list(styled_and_error_text_batch.values())


def process_sub_heading(sub_heading, keyword, num_variations, sub_headings_variations): 
    sub_heading_text = f"{sub_heading}" 
    paraphrased_sub_heading = paraphrase(sub_heading_text, num_variations) 
    contents = [] 
    for paraphrased_heading in paraphrased_sub_heading: 
        styled_heading = style_transfer(paraphrased_heading, keyword, num_variations) 
        for styled_text in styled_heading: 
            error_text = introduce_errors(styled_text, num_variations) 
            for error in error_text: 
                reduced_text = reduce_ai_recognition(error, num_variations) 
                content = f"## Sub-heading: {sub_heading}\n\n{reduced_text}\n\n" 
                contents.append(content) 
                
    sub_headings_variations[sub_heading_text] = contents


def generate_content(keyword, prompt, sub_headings, num_variations): 
    logging.info("Starting content generation process") 
    intro = f"Welcome to our blog post on {keyword}! In this article, we will explore the various aspects of {keyword}.\n" 
    contents = [] 
    sub_headings_variations = {} 
  
    with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_PROCESSES) as executor: 
        for sub_heading in sub_headings: 
            executor.submit(process_sub_heading, sub_heading, keyword, num_variations, sub_headings_variations) 
            
    for sub_heading in sub_headings: 
        contents.extend(sub_headings_variations.get(sub_heading, [])) 
    
    blog_title = f"{keyword}" 
    blog_content = "\n".join(contents) 
    blog = f"# {blog_title}\n\n{intro}{blog_content}" 
          
    response_stream = openai.Completion.create( 
        engine="text-davinci-003", 
        prompt=prompt, 
        max_tokens=200, 
        temperature=0.7, 
        n=1
    ) 

    choices = response_stream.choices[0] 
    text = choices.text 
    blog += text
    return blog, sub_headings_variations


@retry(tries=5, delay=2, backoff=2)
def update_column(content, keyword, prompt, record_id):
    logging.info("Updating Output column in Airtable base")
    endpoint = f"https://api.airtable.com/v0/{AIRTABLE_BASE_ID}/{AIRTABLE_TABLE_NAME}/{record_id}"
    data = {
        "fields": {
            "Keyword": keyword,
            "Prompt": prompt,
            "Output": content
        }
    }
    response = requests.patch(endpoint, headers=headers, data=json.dumps(data))
    response.raise_for_status()
    return response.status_code


def generate():
    logging.info("Putting all the pieces together.")
    input_data = get_input_keywords()
    num_variations = 5

    for input_keyword, prompt, record_id in input_data:
        sub_headings = generate_sub_headings(input_keyword, num_sub_headings=5)
        content = generate_content(input_keyword, prompt, sub_headings, num_variations)
        update_column(content, input_keyword, prompt, record_id)


if __name__ == '__main__':
    generate()

Are you getting the rate limit from OpenAI or AirTable?

Your code has implemented rate limiting for Airtable API requests, but not for OpenAI API requests. You might be hitting the rate limit on OpenAI’s side when generating text completions.