Spaces:
Runtime error
Runtime error
File size: 7,711 Bytes
c05c725 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
import openai
import backoff
import json
import re
import random
import mgr_bias_scoring as bt_mgr
def initOpenAI(key):
openai.api_key = key
# list models
models = openai.Model.list()
return models
# construct prompts from example_shots
def examples_to_prompt(example_shots, kwd_pair):
prompt = ""
for shot in example_shots:
prompt += "Keywords: "+', '.join(shot['Keywords'])+" ## Sentence: "+ \
shot['Sentence']+" ##\n"
prompt += f"Keywords: {kwd_pair[0]}, {kwd_pair[1]} ## Sentence: "
return prompt
def genChatGPT(model_name, kwd_pair, bias_spec, num2gen, numTries, temperature=0.8):
t1, t2, a1, a2 = bt_mgr.get_words(bias_spec)
att_terms_str = ','.join([f"'{t}'" for t in random.sample(a1+a2, min(8, len(a1+a2)))])
t_terms_str = ','.join([f"'{t}'" for t in random.sample(t1+t2, min(8, len(t1+t2)))])
# find out which social group the generator term belongs to
grp_term = kwd_pair[0]
if grp_term in t1:
grp_term_idx = t1.index(grp_term)
grp_term_pair = [grp_term, t2[grp_term_idx]]
else:
grp_term_idx = t2.index(grp_term)
grp_term_pair = [grp_term, t1[grp_term_idx]]
# construct prompt
#instruction = f"Write a sentence including terms \"{kwd_pair[0]}\" and \"{kwd_pair[1]}\"."# Use examples as guide for the type of sentences to write."
#prompt = examples_to_prompt(example_shots, kwd_pair)
instruction = f"Write a sentence including target term \"{kwd_pair[0]}\" and attribute term \"{kwd_pair[1]}\".\n \
Other target terms in this context are: {t_terms_str}. Use them for interpretation of requested target term, but don't include these specifically. \
Other attribute terms in this context are: {att_terms_str}. Use them for interpretation of requested attribute term, but don't include these specifically. "# Use examples as guide for the type of sentences to write."
#print(f"Prompt: {prompt}")
#print(f"Instruction: {instruction}")
# https://github.com/openai/openai-cookbook/blob/main/examples/How_to_handle_rate_limits.ipynb
@backoff.on_exception(backoff.expo, (openai.error.RateLimitError,
openai.error.APIError,
openai.error.ServiceUnavailableError,
ConnectionResetError,
json.decoder.JSONDecodeError))#,
#max_time=300,
#raise_on_giveup=False,
#giveup=fatal_code)
def completions_with_backoff(**kwargs):
return openai.ChatCompletion.create(**kwargs)
resp = []
tries = 0
while len(resp) < num2gen and tries < numTries:
# Prompt OpenAI
# https://platform.openai.com/docs/api-reference/chat/create
response = completions_with_backoff(model=model_name,
temperature=temperature,
messages=[{"role": "system", "content": instruction}])
# ,{"role": "user", "content": prompt}
sentence = response["choices"][0]["message"]["content"]
fnd_kwd_0 = list(re.finditer(f'{kwd_pair[0].lower()}[ .,!]+', sentence.lower()))
fnd_kwd_1 = list(re.finditer(f'{kwd_pair[1].lower()}[ .,!]+', sentence.lower()))
if len(fnd_kwd_0)>0 and len(fnd_kwd_1)>0:
resp.append([kwd_pair[0], kwd_pair[1], sentence, grp_term_pair[0], grp_term_pair[1]])
tries += 1
return resp, instruction
# Prompt ChatGPT to write a sentence alternaitve for the other social group term
def promptChatGPTTemplate(model_name, term1, term2, sentence, temperature=0.0):
instruction = f"Rewrite the sentence to replace {term1} with {term2}. Make only minimal changes to preserve grammar."
prompt = f"Sentence: {sentence}, Rewrite: "
# https://github.com/openai/openai-cookbook/blob/main/examples/How_to_handle_rate_limits.ipynb
@backoff.on_exception(backoff.expo, (openai.error.RateLimitError,
openai.error.APIError,
openai.error.ServiceUnavailableError,
ConnectionResetError,
json.decoder.JSONDecodeError))
def completions_with_backoff(**kwargs):
return openai.ChatCompletion.create(**kwargs)
# Prompt OpenAI
# https://platform.openai.com/docs/api-reference/chat/create
response = completions_with_backoff(model=model_name,
temperature=temperature,
messages=[{"role": "system", "content": instruction},
{"role": "user", "content": prompt}])
return response["choices"][0]["message"]["content"]
# turn generated sentence into a test templates
def chatgpt_sentence_alternative(row, model_name):
sentence = row['Sentence']
grp_term = row['org_grp_term']
att_term = row['Attribute term']
grp_term1 = row['Group term 1']
grp_term2 = row['Group term 2']
rewrite = promptChatGPTTemplate(model_name, grp_term1, grp_term2, sentence)
#template, grp_refs = maskDifferences(sentence, rewrite, grp_term_pair, att_term)
return rewrite
def generateTestSentencesCustom(model_name, gr1_kwds, gr2_kwds, attribute_kwds, att_counts, bias_spec, progress):
print(f"Running Custom Sentence Generator, Counts:\n {att_counts}")
print(f"Groups: [{gr1_kwds}, {gr2_kwds}]\nAttributes: {attribute_kwds}")
numGlobTries = 5
numTries = 10
all_gens = []
show_instr = False
num_steps = len(attribute_kwds)
for ai, att_kwd in enumerate(attribute_kwds):
print(f'Running att: {att_kwd}..')
att_count = 0
if att_kwd in att_counts:
att_count = att_counts[att_kwd]
elif att_kwd.replace(' ','-') in att_counts:
att_count = att_counts[att_kwd.replace(' ','-')]
else:
print(f"Missing count for attribute: <{att_kwd}>")
if att_count != 0:
print(f"For {att_kwd} generate {att_count}")
att_gens = []
glob_tries = 0
while len(att_gens) < att_count and glob_tries < att_count*numGlobTries:
gr1_kwd = random.sample(gr1_kwds, 1)[0]
gr2_kwd = random.sample(gr2_kwds, 1)[0]
for kwd_pair in [[gr1_kwd.strip(), att_kwd.strip()], [gr2_kwd.strip(), att_kwd.strip()]]:
progress((ai)/num_steps, desc=f"Generating {kwd_pair[0]}<>{att_kwd}...")
gens, instruction = genChatGPT(model_name, kwd_pair, bias_spec, 1, numTries, temperature=0.8)
att_gens.extend(gens)
if show_instr == False:
print(f"Instruction: {instruction}")
show_instr = True
glob_tries += 1
print(".", end="", flush=True)
print()
if len(att_gens) > att_count:
print(f"Downsampling from {len(att_gens)} to {att_count}...")
att_gens = random.sample(att_gens, att_count)
print(f"Num generated: {len(att_gens)}")
all_gens.extend(att_gens)
return all_gens
# generate sentences
def generateTestSentences(model_name, group_kwds, attribute_kwds, num2gen, progress):
print(f"Groups: [{group_kwds}]\nAttributes: [{attribute_kwds}]")
numTries = 5
#num2gen = 2
all_gens = []
num_steps = len(group_kwds)*len(attribute_kwds)
for gi, grp_kwd in enumerate(group_kwds):
for ai, att_kwd in enumerate(attribute_kwds):
progress((gi*len(attribute_kwds)+ai)/num_steps, desc=f"Generating {grp_kwd}<>{att_kwd}...")
kwd_pair = [grp_kwd.strip(), att_kwd.strip()]
gens = genChatGPT(model_name, kwd_pair, num2gen, numTries, temperature=0.8)
#print(f"Gens for pair: <{kwd_pair}> -> {gens}")
all_gens.extend(gens)
return all_gens
|