Spaces:
Running
Running
from utils import cosineSim, googleSearch, getSentences, parallel_scrap, matchingScore | |
import gradio as gr | |
from urllib.request import urlopen, Request | |
from googleapiclient.discovery import build | |
import requests | |
import httpx | |
import re | |
from bs4 import BeautifulSoup | |
import numpy as np | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
import asyncio | |
from scipy.special import softmax | |
from evaluate import load | |
from datetime import date | |
import nltk | |
import fitz | |
from transformers import GPT2LMHeadModel, GPT2TokenizerFast | |
import nltk, spacy, subprocess, torch | |
import plotly.graph_objects as go | |
import nltk | |
from unidecode import unidecode | |
nltk.download('punkt') | |
from writing_analysis import ( | |
normalize, | |
preprocess_text1, | |
preprocess_text2, | |
vocabulary_richness_ttr, | |
calculate_gunning_fog, | |
calculate_average_sentence_length, | |
calculate_average_word_length, | |
calculate_syntactic_tree_depth, | |
calculate_perplexity, | |
) | |
np.set_printoptions(suppress=True) | |
def plagiarism_check( | |
plag_option, | |
input, | |
year_from, | |
month_from, | |
day_from, | |
year_to, | |
month_to, | |
day_to, | |
domains_to_skip, | |
): | |
api_key = "AIzaSyCLyCCpOPLZWuptuPAPSg8cUIZhdEMVf6g" | |
api_key = "AIzaSyCS1WQDMl1IMjaXtwSd_2rA195-Yc4psQE" | |
api_key = "AIzaSyCB61O70B8AC3l5Kk3KMoLb6DN37B7nqIk" | |
api_key = "AIzaSyCg1IbevcTAXAPYeYreps6wYWDbU0Kz8tg" | |
cse_id = "851813e81162b4ed4" | |
sentences = getSentences(input) | |
urlCount = {} | |
ScoreArray = [] | |
urlList = [] | |
date_from = build_date(year_from, month_from, day_from) | |
date_to = build_date(year_to, month_to, day_to) | |
sort_date = f"date:r:{date_from}:{date_to}" | |
# get list of URLS to check | |
urlCount, ScoreArray = googleSearch( | |
plag_option, | |
sentences, | |
urlCount, | |
ScoreArray, | |
urlList, | |
sort_date, | |
domains_to_skip, | |
api_key, | |
cse_id, | |
) | |
print("Number of URLs: ", len(urlCount)) | |
print(urlList) | |
# Scrape URLs in list | |
formatted_tokens = [] | |
soups = asyncio.run(parallel_scrap(urlList)) | |
print(len(soups)) | |
print( | |
"Successful scraping: " | |
+ str(len([x for x in soups if x is not None])) | |
+ "out of " | |
+ str(len(urlList)) | |
) | |
# Populate matching scores for scrapped pages | |
for i, soup in enumerate(soups): | |
print(f"Analyzing {i+1} of {len(soups)} soups........................") | |
if soup: | |
page_content = soup.text | |
for j, sent in enumerate(sentences): | |
score = matchingScore(sent, page_content) | |
ScoreArray[i][j] = score | |
# ScoreArray = asyncio.run(parallel_analyze_2(soups, sentences, ScoreArray)) | |
# print("New Score Array:\n") | |
# print2D(ScoreArray) | |
# Gradio formatting section | |
sentencePlag = [False] * len(sentences) | |
sentenceToMaxURL = [-1] * len(sentences) | |
for j in range(len(sentences)): | |
if j > 0: | |
maxScore = ScoreArray[sentenceToMaxURL[j - 1]][j] | |
sentenceToMaxURL[j] = sentenceToMaxURL[j - 1] | |
else: | |
maxScore = -1 | |
for i in range(len(ScoreArray)): | |
margin = ( | |
0.1 | |
if (j > 0 and sentenceToMaxURL[j] == sentenceToMaxURL[j - 1]) | |
else 0 | |
) | |
if ScoreArray[i][j] - maxScore > margin: | |
maxScore = ScoreArray[i][j] | |
sentenceToMaxURL[j] = i | |
if maxScore > 0.5: | |
sentencePlag[j] = True | |
if ( | |
(len(sentences) > 1) | |
and (sentenceToMaxURL[1] != sentenceToMaxURL[0]) | |
and ( | |
ScoreArray[sentenceToMaxURL[0]][0] | |
- ScoreArray[sentenceToMaxURL[1]][0] | |
< 0.1 | |
) | |
): | |
sentenceToMaxURL[0] = sentenceToMaxURL[1] | |
index = np.unique(sentenceToMaxURL) | |
urlScore = {} | |
for url in index: | |
s = [ | |
ScoreArray[url][sen] | |
for sen in range(len(sentences)) | |
if sentenceToMaxURL[sen] == url | |
] | |
urlScore[url] = sum(s) / len(s) | |
index_descending = sorted(urlScore, key=urlScore.get, reverse=True) | |
urlMap = {} | |
for count, i in enumerate(index_descending): | |
urlMap[i] = count + 1 | |
for i, sent in enumerate(sentences): | |
formatted_tokens.append( | |
(sent, "[" + str(urlMap[sentenceToMaxURL[i]]) + "]") | |
) | |
formatted_tokens.append(("\n", None)) | |
formatted_tokens.append(("\n", None)) | |
formatted_tokens.append(("\n", None)) | |
print(formatted_tokens) | |
print(index_descending) | |
for ind in index_descending: | |
formatted_tokens.append( | |
( | |
urlList[ind] + " --- Matching Score: " + f"{str(round(urlScore[ind] * 100, 2))}%", | |
"[" + str(urlMap[ind]) + "]", | |
) | |
) | |
formatted_tokens.append(("\n", None)) | |
print(f"Formatted Tokens: {formatted_tokens}") | |
return formatted_tokens | |
""" | |
AI DETECTION SECTION | |
""" | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
text_bc_model_path = "polygraf-ai/v3-bert-3-2m-trun-bc-lighter-spec" | |
text_bc_tokenizer = AutoTokenizer.from_pretrained(text_bc_model_path) | |
text_bc_model = AutoModelForSequenceClassification.from_pretrained(text_bc_model_path).to(device) | |
text_mc_model_path = "polygraf-ai/ai-text-detection-mc-robert-open-ai-detector-v4" | |
text_mc_tokenizer = AutoTokenizer.from_pretrained(text_mc_model_path) | |
text_mc_model = AutoModelForSequenceClassification.from_pretrained(text_mc_model_path).to(device) | |
def remove_accents(input_str): | |
# nfkd_form = unicodedata.normalize('NFKD', input_str) | |
# return "".join([char for char in nfkd_form if not unicodedata.combining(char)]) | |
text_no_accents = unidecode(input_str) | |
return text_no_accents | |
def remove_special_characters(text): | |
text = remove_accents(text) | |
pattern = r'[^\w\s\d.,!?\'"()-;]+' | |
text = re.sub(pattern, '', text) | |
return text | |
def remove_special_characters_2(text): | |
pattern = r'[^a-zA-Z0-9 ]+' | |
text = re.sub(pattern, '', text) | |
return text | |
def update_character_count(text): | |
return f"{len(text)} characters" | |
def split_text_allow_complete_sentences_nltk(text, max_length=256, tolerance=40, min_last_segment_length=150, type_det): | |
sentences = nltk.sent_tokenize(text) | |
segments = [] | |
current_segment = [] | |
current_length = 0 | |
if type_det = 'bc': | |
tokenizer = text_bc_tokenizer | |
elif type_det = 'mc': | |
tokenizer = text_mc_tokenizer | |
for sentence in sentences: | |
tokens = tokenizer.tokenize(sentence) | |
sentence_length = len(tokens) | |
if current_length + sentence_length <= max_length + tolerance - 2: | |
current_segment.append(sentence) | |
current_length += sentence_length | |
else: | |
if current_segment: | |
encoded_segment = tokenizer.encode(' '.join(current_segment), add_special_tokens=True, max_length=max_length+tolerance, truncation=True) | |
segments.append((current_segment, len(encoded_segment))) | |
current_segment = [sentence] | |
current_length = sentence_length | |
if current_segment: | |
encoded_segment = tokenizer.encode(' '.join(current_segment), add_special_tokens=True, max_length=max_length+tolerance, truncation=True) | |
segments.append((current_segment, len(encoded_segment))) | |
final_segments = [] | |
for i, (seg, length) in enumerate(segments): | |
if i == len(segments) - 1: | |
if length < min_last_segment_length and len(final_segments) > 0: | |
prev_seg, prev_length = final_segments[-1] | |
combined_encoded = tokenizer.encode(' '.join(prev_seg + seg), add_special_tokens=True, max_length=max_length+tolerance, truncation=True) | |
if len(combined_encoded) <= max_length + tolerance: | |
final_segments[-1] = (prev_seg + seg, len(combined_encoded)) | |
else: | |
final_segments.append((seg, length)) | |
else: | |
final_segments.append((seg, length)) | |
else: | |
final_segments.append((seg, length)) | |
decoded_segments = [] | |
encoded_segments = [] | |
for seg, _ in final_segments: | |
encoded_segment = tokenizer.encode(' '.join(seg), add_special_tokens=True, max_length=max_length+tolerance, truncation=True) | |
decoded_segment = tokenizer.decode(encoded_segment) | |
decoded_segments.append(decoded_segment) | |
return decoded_segments | |
def predict_bc(model, tokenizer, text): | |
tokens = text_bc_tokenizer( | |
text, padding='max_length', truncation=True, max_length=256, return_tensors="pt" | |
).to(device)["input_ids"] | |
output = model(tokens) | |
output_norm = softmax(output.logits.detach().cpu().numpy(), 1)[0] | |
print("BC Score: ", output_norm) | |
return output_norm | |
def predict_mc(model, tokenizer, text): | |
tokens = text_mc_tokenizer( | |
text, padding='max_length', truncation=True, return_tensors="pt", max_length=256 | |
).to(device)["input_ids"] | |
output = model(tokens) | |
output_norm = softmax(output.logits.detach().cpu().numpy(), 1)[0] | |
print("MC Score: ", output_norm) | |
return output_norm | |
def ai_generated_test(ai_option, input): | |
bc_scores = [] | |
mc_scores = [] | |
samples_len_bc = len(split_text_allow_complete_sentences_nltk(input, type_det = 'bc')) | |
samples_len_mc = len(split_text_allow_complete_sentences_nltk(input, type_det = 'mc')) | |
segments_bc = split_text_allow_complete_sentences_nltk(input, type_det = 'bc') | |
segments_mc = split_text_allow_complete_sentences_nltk(input, type_det = 'bc') | |
for i in range(samples_len_bc): | |
cleaned_text_bc = remove_special_characters(segments_bc[i]) | |
bc_score = predict_bc(text_bc_model, text_bc_tokenizer,cleaned_text_bc ) | |
bc_scores.append(bc_score) | |
for i in range(samples_len_mc): | |
cleaned_text_mc = remove_special_characters(segments_mc[i]) | |
mc_score = predict_mc(text_mc_model, text_mc_tokenizer, cleaned_text_mc) | |
mc_scores.append(mc_score) | |
bc_scores_array = np.array(bc_scores) | |
mc_scores_array = np.array(mc_scores) | |
average_bc_scores = np.mean(bc_scores_array, axis=0) | |
average_mc_scores = np.mean(mc_scores_array, axis=0) | |
bc_score_list = average_bc_scores.tolist() | |
mc_score_list = average_mc_scores.tolist() | |
bc_score = {"AI": bc_score[1].item(), "HUMAN": bc_score[0].item()} | |
mc_score = {} | |
label_map = ["OpenAI GPT", "Mistral", "CLAUDE", "Gemini", "LLAMA 2"] | |
for score, label in zip(mc_score_list, label_map): | |
mc_score[label.upper()] = score | |
sum_prob = 1 - bc_score["HUMAN"] | |
for key, value in mc_score.items(): | |
mc_score[key] = value * sum_prob | |
if ai_option == "Human vs AI": | |
mc_score = {} | |
if sum_prob < 0.01 : | |
mc_score = {} | |
return bc_score, mc_score | |
else: | |
return bc_score, mc_score | |
# COMBINED | |
def main( | |
ai_option, | |
plag_option, | |
input, | |
# models, | |
year_from, | |
month_from, | |
day_from, | |
year_to, | |
month_to, | |
day_to, | |
domains_to_skip, | |
): | |
formatted_tokens = plagiarism_check( | |
plag_option, | |
input, | |
year_from, | |
month_from, | |
day_from, | |
year_to, | |
month_to, | |
day_to, | |
domains_to_skip, | |
) | |
depth_analysis_plot = depth_analysis(input) | |
bc_score, mc_score = ai_generated_test(ai_option,input) | |
return ( | |
bc_score, | |
mc_score, | |
formatted_tokens, | |
depth_analysis_plot, | |
) | |
def build_date(year, month, day): | |
return f"{year}{months[month]}{day}" | |
def len_validator(text): | |
min_tokens = 128 | |
lengt = len(tokenizer.tokenize(text = text, return_tensors="pt")) | |
if lengt < min_tokens: | |
return f"Warning! Input length is {lengt}. Please input a text that is greater than {min_tokens} tokens long. Recommended length {min_tokens*2} tokens." | |
else : | |
return f"Input length is satisified." | |
def extract_text_from_pdf(pdf_path): | |
doc = fitz.open(pdf_path) | |
text = "" | |
for page in doc: | |
text += page.get_text() | |
return text | |
# DEPTH ANALYSIS | |
print("loading depth analysis") | |
nltk.download('stopwords') | |
nltk.download('punkt') | |
command = ['python3', '-m', 'spacy', 'download', 'en_core_web_sm'] | |
# Execute the command | |
subprocess.run(command) | |
nlp = spacy.load("en_core_web_sm") | |
# for perplexity | |
model_id = "gpt2" | |
gpt2_model = GPT2LMHeadModel.from_pretrained(model_id).to(device) | |
gpt2_tokenizer = GPT2TokenizerFast.from_pretrained(model_id) | |
def depth_analysis(input_text): | |
# vocanulary richness | |
processed_words = preprocess_text1(input_text) | |
ttr_value = vocabulary_richness_ttr(processed_words) | |
# readability | |
gunning_fog = calculate_gunning_fog(input_text) | |
gunning_fog_norm = normalize(gunning_fog, min_value=0, max_value=20) | |
# average sentence length and average word length | |
words, sentences = preprocess_text2(input_text) | |
average_sentence_length = calculate_average_sentence_length(sentences) | |
average_word_length = calculate_average_word_length(words) | |
average_sentence_length_norm = normalize(average_sentence_length, min_value=0, max_value=40) | |
average_word_length_norm = normalize(average_word_length, min_value=0, max_value=8) | |
# syntactic_tree_depth | |
average_tree_depth = calculate_syntactic_tree_depth(nlp, input_text) | |
average_tree_depth_norm = normalize(average_tree_depth, min_value=0, max_value=10) | |
# perplexity | |
perplexity = calculate_perplexity(input_text, gpt2_model, gpt2_tokenizer, device) | |
perplexity_norm = normalize(perplexity, min_value=0, max_value=30) | |
features = { | |
"readability": gunning_fog_norm, | |
"syntactic tree depth": average_tree_depth_norm, | |
"vocabulary richness": ttr_value, | |
"perplexity": perplexity_norm, | |
"average sentence length": average_sentence_length_norm, | |
"average word length": average_word_length_norm, | |
} | |
print(features) | |
fig = go.Figure() | |
fig.add_trace(go.Scatterpolar( | |
r=list(features.values()), | |
theta=list(features.keys()), | |
fill='toself', | |
name='Radar Plot' | |
)) | |
fig.update_layout( | |
polar=dict( | |
radialaxis=dict( | |
visible=True, | |
range=[0, 100], | |
)), | |
showlegend=False, | |
# autosize=False, | |
# width=600, | |
# height=600, | |
margin=dict( | |
l=10, | |
r=20, | |
b=10, | |
t=10, | |
# pad=100 | |
), | |
) | |
return fig | |
# START OF GRADIO | |
title = "Copyright Checker" | |
months = { | |
"January": "01", | |
"February": "02", | |
"March": "03", | |
"April": "04", | |
"May": "05", | |
"June": "06", | |
"July": "07", | |
"August": "08", | |
"September": "09", | |
"October": "10", | |
"November": "11", | |
"December": "12", | |
} | |
with gr.Blocks() as demo: | |
today = date.today() | |
# dd/mm/YY | |
d1 = today.strftime("%d/%B/%Y") | |
d1 = d1.split("/") | |
model_list = ["OpenAI GPT", "Mistral", "CLAUDE", "Gemini", "LLAMA2"] | |
domain_list = ["com", "org", "net", "int", "edu", "gov", "mil"] | |
gr.Markdown( | |
""" | |
# Copyright Checker | |
""" | |
) | |
with gr.Row(): | |
input_text = gr.Textbox(label="Input text", lines=6, placeholder="") | |
file_input = gr.File(label="Upload PDF") | |
file_input.change(fn=extract_text_from_pdf, inputs=file_input, outputs=input_text) | |
char_count = gr.Textbox(label="Minumum Character Limit Check") | |
input_text.change(fn=len_validator, inputs=input_text, outputs=char_count) | |
with gr.Row(): | |
with gr.Column(): | |
ai_option = gr.Radio(["Human vs AI", "Human vs AI Source Models"], label="Choose an option please.") | |
with gr.Column(): | |
plag_option = gr.Radio(["Standard", "Advanced"], label="Choose an option please.") | |
with gr.Row(): | |
with gr.Column(): | |
only_ai_btn = gr.Button("AI Check") | |
with gr.Column(): | |
only_plagiarism_btn = gr.Button("Source Detection") | |
with gr.Row(): | |
depth_analysis_btn = gr.Button("Detailed Writing Analysis") | |
with gr.Row(): | |
full_check_btn = gr.Button("Full Check") | |
gr.Markdown( | |
""" | |
## Output | |
""" | |
) | |
# models = gr.Dropdown( | |
# model_list, | |
# value=model_list, | |
# multiselect=True, | |
# label="Models to test against", | |
# ) | |
with gr.Row(): | |
with gr.Column(): | |
bcLabel = gr.Label(label="Source") | |
with gr.Column(): | |
mcLabel = gr.Label(label="Creator") | |
with gr.Group(): | |
with gr.Row(): | |
month_from = gr.Dropdown( | |
choices=months, | |
label="From Month", | |
value="January", | |
interactive=True, | |
) | |
day_from = gr.Textbox(label="From Day", value="01") | |
year_from = gr.Textbox(label="From Year", value="2000") | |
# from_date_button = gr.Button("Submit") | |
with gr.Row(): | |
month_to = gr.Dropdown( | |
choices=months, | |
label="To Month", | |
value=d1[1], | |
interactive=True, | |
) | |
day_to = gr.Textbox(label="To Day", value=d1[0]) | |
year_to = gr.Textbox(label="To Year", value=d1[2]) | |
# to_date_button = gr.Button("Submit") | |
with gr.Row(): | |
domains_to_skip = gr.Dropdown( | |
domain_list, | |
multiselect=True, | |
label="Domain To Skip", | |
) | |
with gr.Row(): | |
with gr.Column(): | |
sentenceBreakdown = gr.HighlightedText( | |
label="Source Detection Sentence Breakdown", | |
combine_adjacent=True, | |
color_map={ | |
"[1]": "red", | |
"[2]": "orange", | |
"[3]": "yellow", | |
"[4]": "green", | |
}, | |
) | |
with gr.Row(): | |
with gr.Column(): | |
writing_analysis_plot = gr.Plot( | |
label="Writing Analysis Plot" | |
) | |
full_check_btn.click( | |
fn=main, | |
inputs=[ | |
ai_option, | |
plag_option, | |
input_text, | |
# models, | |
year_from, | |
month_from, | |
day_from, | |
year_to, | |
month_to, | |
day_to, | |
domains_to_skip, | |
], | |
outputs=[ | |
bcLabel, | |
mcLabel, | |
sentenceBreakdown, | |
writing_analysis_plot, | |
], | |
api_name="main", | |
) | |
only_ai_btn.click( | |
fn=ai_generated_test, | |
inputs=[ai_option, input_text], | |
outputs=[ | |
bcLabel, | |
mcLabel, | |
], | |
api_name="ai_check", | |
) | |
only_plagiarism_btn.click( | |
fn=plagiarism_check, | |
inputs=[ | |
plag_option, | |
input_text, | |
year_from, | |
month_from, | |
day_from, | |
year_to, | |
month_to, | |
day_to, | |
domains_to_skip, | |
], | |
outputs=[ | |
sentenceBreakdown, | |
], | |
api_name="plagiarism_check", | |
) | |
depth_analysis_btn.click( | |
fn=depth_analysis, | |
inputs=[input_text], | |
outputs=[writing_analysis_plot], | |
api_name="depth_analysis", | |
) | |
date_from = "" | |
date_to = "" | |
demo.launch(share=True, server_name="0.0.0.0", auth=("polygraf-admin", "test@aisd")) |