I am buliding a tool that can create blogs with the help of OpenAI API key. And then I apply some techniques to make it less recognizable as AI-generated. But I’m continously facing a problem of rate limit, even after implementing exponential backoff, batching requests, and asynchronous tasks. Can anyone please help me?
https://replit.com/@ShrutiRajpurohi/AI-blogger?s=app
import openai
import requests
import json
import os
from retry import retry
import logging
import concurrent.futures
from ratelimit import limits, sleep_and_retry
from ai_recognition import (
paraphrase,
style_transfer,
introduce_errors,
reduce_ai_recognition,
generate_sub_headings
)
NUM_PROCESSES = 4
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
my_secret = os.environ['AIRTABLE_API_KEY']
AIRTABLE_BASE_ID = 'appDdPu44GxMuBEzh'
AIRTABLE_TABLE_NAME = 'tbl6hgDr5863EXxn9'
openai.api_key = os.environ['OPENAI_API_KEY']
AIRTABLE_API_ENDPOINT = 'https://api.airtable.com/v0/appDdPu44GxMuBEzh'
headers = {
'Authorization': f"Bearer {os.environ['AIRTABLE_API_KEY']}",
'Content-Type': 'application/json'
}
# Rate limiting configuration
RATE_LIMIT_THRESHOLD = 55
RATE_LIMIT_PERIOD = 60
@retry(tries=5, delay=3, backoff=2)
@sleep_and_retry
@limits(calls=RATE_LIMIT_THRESHOLD, period=RATE_LIMIT_PERIOD)
def api_request(*args, **kwargs):
response = requests.get(*args, **kwargs)
response.raise_for_status()
return response
def get_input_keywords():
logging.info("Fetching Input Keywords from Airtable")
url = f'https://api.airtable.com/v0/{AIRTABLE_BASE_ID}/{AIRTABLE_TABLE_NAME}?view=Grid%20view'
response = api_request(url, headers=headers)
data = response.json()
try:
records = data['records']
input_data = [
(record['fields'].get('Input Keyword'), record['fields'].get('Prompt'), record['id'])
for record in records if 'Input Keyword' in record['fields']
]
return input_data
except KeyError as e:
print(f"KeyError: {e}")
return []
def process_style_and_errors(sub_headings_variations, sub_heading, keyword, num_variations):
styled_heading = style_transfer(sub_heading, keyword, num_variations)
error_text = introduce_errors(styled_heading, num_variations)
sub_headings_variations[sub_heading] = (styled_heading, error_text)
def style_and_error_batch(sub_headings, keyword, num_variations):
styled_and_error_text_batch = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_PROCESSES) as executor:
for sub_heading in sub_headings:
executor.submit(process_style_and_errors, styled_and_error_text_batch, sub_heading, keyword, num_variations)
return list(styled_and_error_text_batch.values())
def process_sub_heading(sub_heading, keyword, num_variations, sub_headings_variations):
sub_heading_text = f"{sub_heading}"
paraphrased_sub_heading = paraphrase(sub_heading_text, num_variations)
contents = []
for paraphrased_heading in paraphrased_sub_heading:
styled_heading = style_transfer(paraphrased_heading, keyword, num_variations)
for styled_text in styled_heading:
error_text = introduce_errors(styled_text, num_variations)
for error in error_text:
reduced_text = reduce_ai_recognition(error, num_variations)
content = f"## Sub-heading: {sub_heading}\n\n{reduced_text}\n\n"
contents.append(content)
sub_headings_variations[sub_heading_text] = contents
def generate_content(keyword, prompt, sub_headings, num_variations):
logging.info("Starting content generation process")
intro = f"Welcome to our blog post on {keyword}! In this article, we will explore the various aspects of {keyword}.\n"
contents = []
sub_headings_variations = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_PROCESSES) as executor:
for sub_heading in sub_headings:
executor.submit(process_sub_heading, sub_heading, keyword, num_variations, sub_headings_variations)
for sub_heading in sub_headings:
contents.extend(sub_headings_variations.get(sub_heading, []))
blog_title = f"{keyword}"
blog_content = "\n".join(contents)
blog = f"# {blog_title}\n\n{intro}{blog_content}"
response_stream = openai.Completion.create(
engine="text-davinci-003",
prompt=prompt,
max_tokens=200,
temperature=0.7,
n=1
)
choices = response_stream.choices[0]
text = choices.text
blog += text
return blog, sub_headings_variations
@retry(tries=5, delay=2, backoff=2)
def update_column(content, keyword, prompt, record_id):
logging.info("Updating Output column in Airtable base")
endpoint = f"https://api.airtable.com/v0/{AIRTABLE_BASE_ID}/{AIRTABLE_TABLE_NAME}/{record_id}"
data = {
"fields": {
"Keyword": keyword,
"Prompt": prompt,
"Output": content
}
}
response = requests.patch(endpoint, headers=headers, data=json.dumps(data))
response.raise_for_status()
return response.status_code
def generate():
logging.info("Putting all the pieces together.")
input_data = get_input_keywords()
num_variations = 5
for input_keyword, prompt, record_id in input_data:
sub_headings = generate_sub_headings(input_keyword, num_sub_headings=5)
content = generate_content(input_keyword, prompt, sub_headings, num_variations)
update_column(content, input_keyword, prompt, record_id)
if __name__ == '__main__':
generate()