Spaces:
Paused
Paused
import json | |
import os | |
import traceback | |
from typing import List, Tuple | |
import gradio as gr | |
import requests | |
from huggingface_hub import HfApi | |
hf_api = HfApi() | |
roots_datasets = { | |
dset.id.split("/")[-1]: dset | |
for dset in hf_api.list_datasets( | |
author="bigscience-data", use_auth_token=os.environ.get("bigscience_data_token") | |
) | |
} | |
def get_docid_html(docid): | |
data_org, dataset, docid = docid.split("/") | |
metadata = roots_datasets[dataset] | |
if metadata.private: | |
docid_html = """ | |
<a title="This dataset is private. See the introductory text for more information" | |
style="color:#AA4A44; font-weight: bold; text-decoration:none" | |
onmouseover="style='color:#AA4A44; font-weight: bold; text-decoration:underline'" | |
onmouseout="style='color:#AA4A44; font-weight: bold; text-decoration:none'" | |
href="https://huggingface.co/datasets/bigscience-data/{dataset}" | |
target="_blank"> | |
π{dataset} | |
</a> | |
<span style="color:#7978FF; ">/{docid}</span>""".format( | |
dataset=dataset, docid=docid | |
) | |
else: | |
docid_html = """ | |
<a title="This dataset is licensed {metadata}" | |
style="color:#7978FF; font-weight: bold; text-decoration:none" | |
onmouseover="style='color:#7978FF; font-weight: bold; text-decoration:underline'" | |
onmouseout="style='color:#7978FF; font-weight: bold; text-decoration:none'" | |
href="https://huggingface.co/datasets/bigscience-data/{dataset}" | |
target="_blank"> | |
{dataset} | |
</a> | |
<span style="color:#7978FF; ">/{docid}</span>""".format( | |
metadata=metadata.tags[0].split(":")[-1], dataset=dataset, docid=docid | |
) | |
return docid_html | |
PII_TAGS = {"KEY", "EMAIL", "USER", "IP_ADDRESS", "ID", "IPv4", "IPv6"} | |
PII_PREFIX = "PI:" | |
def process_pii(text): | |
for tag in PII_TAGS: | |
text = text.replace( | |
PII_PREFIX + tag, | |
"""<b><mark style="background: Fuchsia; color: Lime;">REDACTED {}</mark></b>""".format( | |
tag | |
), | |
) | |
return text | |
def flag(query, language, num_results, issue_description): | |
try: | |
post_data = { | |
"query": query, | |
"k": num_results, | |
"flag": True, | |
"description": issue_description, | |
} | |
if language != "detect_language": | |
post_data["lang"] = language | |
output = requests.post( | |
os.environ.get("address"), | |
headers={"Content-type": "application/json"}, | |
data=json.dumps(post_data), | |
timeout=120, | |
) | |
results = json.loads(output.text) | |
except: | |
print("Error flagging") | |
return "" | |
def format_result(result, highlight_terms, exact_search, datasets_filter=None): | |
text, url, docid = result | |
if datasets_filter is not None: | |
datasets_filter = set(datasets_filter) | |
dataset = docid.split("/")[1] | |
if not dataset in datasets_filter: | |
return "" | |
if exact_search: | |
query_start = text.find(highlight_terms) | |
query_end = query_start + len(highlight_terms) | |
tokens_html = text[0:query_start] | |
tokens_html += "<b>{}</b>".format(text[query_start:query_end]) | |
tokens_html += text[query_end:] | |
else: | |
tokens = text.split() | |
tokens_html = [] | |
for token in tokens: | |
if token in highlight_terms: | |
tokens_html.append("<b>{}</b>".format(token)) | |
else: | |
tokens_html.append(token) | |
tokens_html = " ".join(tokens_html) | |
tokens_html = process_pii(tokens_html) | |
meta_html = ( | |
"""<p class='underline-on-hover' style='font-size:12px; font-family: Arial; color:#585858; text-align: left;'> | |
<a href='{}' target='_blank'>{}</a></p>""".format( | |
url, url | |
) | |
if url is not None | |
else "" | |
) | |
docid_html = get_docid_html(docid) | |
language = "FIXME" | |
return """{} | |
<p style='font-size:14px; font-family: Arial; color:#7978FF; text-align: left;'>Document ID: {}</p> | |
<p style='font-size:12px; font-family: Arial; color:MediumAquaMarine'>Language: {}</p> | |
<p style='font-family: Arial;'>{}</p> | |
<br> | |
""".format( | |
meta_html, docid_html, language, tokens_html | |
) | |
def format_result_page( | |
language, results, highlight_terms, num_results, exact_search, datasets_filter=None | |
) -> gr.HTML: | |
filtered_num_results = 0 | |
header_html = "" | |
# FIX lang detection by normalizing format on the backend | |
if language == "detect_language" and not exact_search: | |
header_html += """<p style='font-family: Arial; color:MediumAquaMarine; text-align: center; line-height: 3em'> | |
Detected language: <b> FIX MEEEE !!! </b><hr></p><br>""" | |
results_html = "" | |
for lang, results_for_lang in results.items(): | |
if len(results_for_lang) == 0: | |
if exact_search: | |
results_html += """<p style='font-family: Arial; color:Silver; text-align: left; line-height: 3em'> | |
No results found.<hr></p>""" | |
else: | |
results_html += """<p style='font-family: Arial; color:Silver; text-align: left; line-height: 3em'> | |
No results for language: <b>{}</b><hr></p>""".format( | |
lang | |
) | |
continue | |
results_for_lang_html = "" | |
for result in results_for_lang: | |
result_html = format_result( | |
result, highlight_terms, exact_search, datasets_filter | |
) | |
if result_html != "": | |
filtered_num_results += 1 | |
results_for_lang_html += result_html | |
if language == "all" and not exact_search: | |
results_for_lang_html = f""" | |
<details> | |
<summary style='font-family: Arial; color:MediumAquaMarine; text-align: left; line-height: 3em'> | |
Results for language: <b>{lang}</b><hr> | |
</summary> | |
{results_for_lang_html} | |
</details>""" | |
results_html += results_for_lang_html | |
if num_results is not None: | |
header_html += """<p style='font-family: Arial; color:MediumAquaMarine; text-align: center; line-height: 3em'> | |
Total number of matches: <b>{}</b><hr></p><br>""".format( | |
filtered_num_results | |
) | |
return header_html + results_html | |
def extract_results_from_payload(query, language, payload, exact_search): | |
results = payload["results"] | |
processed_results = dict() | |
datasets = set() | |
highlight_terms = None | |
num_results = None | |
if exact_search: | |
highlight_terms = query | |
num_results = payload["num_results"] | |
results = {language: results} | |
else: | |
highlight_terms = payload["highlight_terms"] | |
# unify format - might be best fixed on server side | |
if language != "all": | |
results = {language: results} | |
for lang, results_for_lang in results.items(): | |
processed_results[lang] = list() | |
for result in results_for_lang: | |
text = result["text"] | |
url = ( | |
result["meta"]["url"] | |
if "meta" in result | |
and result["meta"] is not None | |
and "url" in result["meta"] | |
else None | |
) | |
docid = result["docid"] | |
_, dataset, _ = docid.split("/") | |
datasets.add(dataset) | |
processed_results[lang].append((text, url, docid)) | |
return processed_results, highlight_terms, num_results, list(datasets) | |
def process_error(error_type): | |
if error_type == "unsupported_lang": | |
detected_lang = payload["err"]["meta"]["detected_lang"] | |
return f""" | |
<p style='font-size:18px; font-family: Arial; color:MediumVioletRed; text-align: center;'> | |
Detected language <b>{detected_lang}</b> is not supported.<br> | |
Please choose a language from the dropdown or type another query. | |
</p><br><hr><br>""" | |
def extract_error_from_payload(payload): | |
if "err" in payload: | |
return payload["err"]["type"] | |
return None | |
def request_payload(query, language, exact_search, num_results=10): | |
post_data = {"query": query, "k": num_results} | |
if language != "detect_language": | |
post_data["lang"] = language | |
address = "http://34.105.160.81:8080" if exact_search else os.environ.get("address") | |
output = requests.post( | |
address, | |
headers={"Content-type": "application/json"}, | |
data=json.dumps(post_data), | |
timeout=60, | |
) | |
payload = json.loads(output.text) | |
return payload | |
description = """# <p style="text-align: center;"> πΈ π ROOTS search tool π πΈ </p> | |
The ROOTS corpus was developed during the [BigScience workshop](https://bigscience.huggingface.co/) for the purpose | |
of training the Multilingual Large Language Model [BLOOM](https://huggingface.co/bigscience/bloom). This tool allows | |
you to search through the ROOTS corpus. We serve a BM25 index for each language or group of languages included in | |
ROOTS. You can read more about the details of the tool design | |
[here](https://huggingface.co/spaces/bigscience-data/scisearch/blob/main/roots_search_tool_specs.pdf). For more | |
information and instructions on how to access the full corpus check [this form](https://forms.gle/qyYswbEL5kA23Wu99).""" | |
if __name__ == "__main__": | |
demo = gr.Blocks( | |
css=".underline-on-hover:hover { text-decoration: underline; } .flagging { font-size:12px; color:Silver; }" | |
) | |
with demo: | |
processed_results_state = gr.State([]) | |
highlight_terms_state = gr.State([]) | |
num_results_state = gr.State(0) | |
exact_search_state = gr.State(False) | |
lang_state = gr.State("") | |
with gr.Row(): | |
gr.Markdown(value=description) | |
with gr.Row(): | |
query = gr.Textbox( | |
lines=1, | |
max_lines=1, | |
placeholder="Put your query in double quotes for exact search.", | |
label="Query", | |
) | |
with gr.Row(): | |
lang = gr.Dropdown( | |
choices=[ | |
"ar", | |
"ca", | |
"code", | |
"en", | |
"es", | |
"eu", | |
"fr", | |
"id", | |
"indic", | |
"nigercongo", | |
"pt", | |
"vi", | |
"zh", | |
"detect_language", | |
"all", | |
], | |
value="en", | |
label="Language", | |
) | |
with gr.Row(): | |
k = gr.Slider(1, 100, value=10, step=1, label="Max Results") | |
with gr.Row(): | |
submit_btn = gr.Button("Submit") | |
with gr.Row(visible=False) as datasets_filter: | |
available_datasets = gr.Dropdown( | |
type="value", | |
choices=[], | |
value=[], | |
label="Datasets Filter", | |
multiselect=True, | |
) | |
with gr.Row(): | |
results = gr.HTML(label="Results") | |
with gr.Column(visible=False) as flagging_form: | |
flag_txt = gr.Textbox( | |
lines=1, | |
placeholder="Type here...", | |
label="""If you choose to flag your search, we will save the query, language and the number of results | |
you requested. Please consider adding relevant additional context below:""", | |
) | |
flag_btn = gr.Button("Flag Results") | |
flag_btn.click(flag, inputs=[query, lang, k, flag_txt], outputs=[flag_txt]) | |
def submit(query, lang, k, dropdown_input): | |
print("submitting", query, lang, k) | |
query = query.strip() | |
exact_search = False | |
if query.startswith('"') and query.endswith('"') and len(query) >= 2: | |
exact_search = True | |
query = query[1:-1] | |
else: | |
query = " ".join(query.split()) | |
if query == "" or query is None: | |
return None | |
results_html = "" | |
payload = request_payload(query, lang, exact_search, k) | |
err = extract_error_from_payload(payload) | |
if err is not None: | |
return process_error(err) | |
( | |
processed_results, | |
highlight_terms, | |
num_results, | |
datasets, | |
) = extract_results_from_payload(query, lang, payload, exact_search) | |
results_html = format_result_page( | |
lang, processed_results, highlight_terms, num_results, exact_search | |
) | |
return { | |
processed_results_state: processed_results, | |
highlight_terms_state: highlight_terms, | |
num_results_state: num_results, | |
exact_search_state: exact_search, | |
results: results_html, | |
flagging_form: gr.update(visible=True), | |
datasets_filter: gr.update(visible=True), | |
available_datasets: gr.Dropdown.update( | |
choices=datasets, value=datasets | |
), | |
} | |
def filter_datasets( | |
lang, | |
processed_results, | |
highlight_terms, | |
num_results, | |
exact_search, | |
datasets_filter, | |
): | |
results_html = format_result_page( | |
lang, | |
processed_results, | |
highlight_terms, | |
num_results, | |
exact_search, | |
datasets_filter, | |
) | |
return {results: results_html} | |
query.submit( | |
fn=submit, | |
inputs=[query, lang, k, available_datasets], | |
outputs=[ | |
processed_results_state, | |
highlight_terms_state, | |
num_results_state, | |
exact_search_state, | |
results, | |
flagging_form, | |
datasets_filter, | |
available_datasets, | |
], | |
) | |
submit_btn.click( | |
submit, | |
inputs=[query, lang, k, available_datasets], | |
outputs=[ | |
processed_results_state, | |
highlight_terms_state, | |
num_results_state, | |
exact_search_state, | |
results, | |
flagging_form, | |
datasets_filter, | |
available_datasets, | |
], | |
) | |
available_datasets.change( | |
filter_datasets, | |
inputs=[ | |
lang, | |
processed_results_state, | |
highlight_terms_state, | |
num_results_state, | |
exact_search_state, | |
available_datasets, | |
], | |
outputs=[results], | |
) | |
demo.launch(enable_queue=True, debug=True) | |