import gradio as gr import json import re import sqlite3 import logging from collections import defaultdict from typing import Tuple, Dict, List # Assuming you have these files in your project from util import process_json_files from gematria import calculate_gematria from deep_translator import GoogleTranslator, exceptions from urllib.parse import quote_plus from tqdm import tqdm # Constants DATABASE_FILE = 'gematria.db' MAX_PHRASE_LENGTH_LIMIT = 20 BATCH_SIZE = 10000 # Set up logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') # Global variables conn: sqlite3.Connection = None translator: GoogleTranslator = None book_names: Dict[int, str] = {} gematria_cache: Dict[Tuple[int, int], List[Tuple[str, str, int, int, int, str]]] = {} translation_cache: Dict[str, str] = {} total_word_count: int = 0 # Global counter for word position def initialize_database() -> None: """Initializes the SQLite database.""" global conn conn = sqlite3.connect(DATABASE_FILE) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS results ( gematria_sum INTEGER, words TEXT, translation TEXT, book TEXT, chapter INTEGER, verse INTEGER, phrase_length INTEGER, word_position TEXT, PRIMARY KEY (gematria_sum, words, book, chapter, verse, word_position) ) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_results_gematria ON results (gematria_sum) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS processed_books ( book TEXT PRIMARY KEY, max_phrase_length INTEGER ) ''') conn.commit() def initialize_translator() -> None: """Initializes the Google Translator.""" global translator translator = GoogleTranslator(source='iw', target='en') logging.info("Translator initialized.") def process_book(book_id: int, max_phrase_length: int, cursor): """Processes a single book and returns phrases to insert.""" global book_names, total_word_count book_data = process_json_files(book_id, book_id) phrases_to_insert = [] if book_id in book_data: book_data = book_data[book_id] if 'title' not in book_data or not isinstance(book_data['title'], str): logging.warning(f"Skipping book {book_id} due to missing 'title' field.") return phrases_to_insert title = book_data['title'] book_names[book_id] = title # Check if this book has already been processed for this phrase length cursor.execute('''SELECT max_phrase_length FROM processed_books WHERE book = ?''', (title,)) result = cursor.fetchone() if result and result[0] >= max_phrase_length: logging.info(f"Skipping book {title}: Already processed with max_phrase_length {result[0]}") return phrases_to_insert if 'text' not in book_data or not isinstance(book_data['text'], list): logging.warning(f"Skipping book {book_id} due to missing 'text' field.") return phrases_to_insert chapters = book_data['text'] for chapter_id, chapter in enumerate(chapters): for verse_id, verse in enumerate(chapter): verse_text = flatten_text(verse) verse_text = re.sub(r'\[.*?\]', '', verse_text) verse_text = re.sub(r"[^\u05D0-\u05EA ]+", "", verse_text) verse_text = re.sub(r" +", " ", verse_text) words = verse_text.split() for length in range(1, max_phrase_length + 1): for start in range(len(words) - length + 1): phrase_candidate = " ".join(words[start:start + length]) gematria_sum = calculate_gematria(phrase_candidate.replace(" ", "")) word_position_range = f"{total_word_count + start + 1}-{total_word_count + start + length}" phrases_to_insert.append( (gematria_sum, phrase_candidate, None, title, chapter_id + 1, verse_id + 1, length, word_position_range)) total_word_count += len(words) return phrases_to_insert def populate_database(start_book: int, end_book: int, max_phrase_length: int = 1) -> None: """Populates the database with phrases from the Tanach.""" global conn, book_names, total_word_count logging.info(f"Populating database with books from {start_book} to {end_book}...") with sqlite3.connect(DATABASE_FILE) as conn: cursor = conn.cursor() for book_id in tqdm(range(start_book, end_book + 1), desc="Processing Books"): phrases_to_insert = process_book(book_id, max_phrase_length, cursor) if phrases_to_insert: cursor.executemany(''' INSERT OR IGNORE INTO results (gematria_sum, words, translation, book, chapter, verse, phrase_length, word_position) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ''', phrases_to_insert) # Update processed_books after processing each book cursor.execute(''' INSERT OR REPLACE INTO processed_books (book, max_phrase_length) VALUES (?, ?) ''', (book_names[book_id], max_phrase_length)) conn.commit() total_word_count = 0 # Reset for the next set of phrase lengths def get_translation(phrase: str) -> str: """Retrieves or generates the English translation of a Hebrew phrase and caches it in the database. """ global conn, translator, translation_cache # Check if the translation exists in the database with sqlite3.connect(DATABASE_FILE) as conn: cursor = conn.cursor() cursor.execute("SELECT translation FROM results WHERE words = ? LIMIT 1", (phrase,)) result = cursor.fetchone() if result and result[0]: # If a translation exists, use it return result[0] # If no translation in the database, translate and store it translation = translate_and_store(phrase) translation_cache[phrase] = translation # Update the database with the new translation with sqlite3.connect(DATABASE_FILE) as conn: cursor = conn.cursor() cursor.execute("UPDATE results SET translation = ? WHERE words = ?", (translation, phrase)) conn.commit() return translation def translate_and_store(phrase: str) -> str: """Translates a Hebrew phrase to English using Google Translate.""" global translator max_retries = 3 retries = 0 while retries < max_retries: try: translation = translator.translate(phrase) return translation except (exceptions.TranslationNotFound, exceptions.NotValidPayload, exceptions.ServerException, exceptions.RequestError) as e: retries += 1 logging.warning(f"Error translating phrase '{phrase}': {e}. Retrying... ({retries}/{max_retries})") logging.error(f"Failed to translate phrase '{phrase}' after {max_retries} retries.") return "[Translation Error]" def search_gematria_in_db(gematria_sum: int, max_words: int) -> List[Tuple[str, str, int, int, int, str]]: """Searches the database for phrases with a given Gematria value.""" global conn with sqlite3.connect(DATABASE_FILE) as conn: cursor = conn.cursor() cursor.execute(''' SELECT words, book, chapter, verse, phrase_length, word_position FROM results WHERE gematria_sum = ? AND phrase_length <= ? ''', (gematria_sum, max_words)) results = cursor.fetchall() return results def gematria_search_interface(phrases: str, max_words: int, show_translation: bool, auto_adjust_length: bool) -> str: """The main function for the Gradio interface, handling multiple phrases.""" global conn, book_names, gematria_cache results = [] all_results = [] # Store results for each phrase middle_words_results = [] # Store middle word results for all books all_names_average_position = 0 # Initialize variable for average position across all names and books total_name_count = 0 # Initialize counter for the total number of names processed phrases = phrases.strip().splitlines() if not phrases: return "Please enter at least one phrase." for phrase in phrases: if not phrase.strip(): continue # Skip empty lines numbers = re.findall(r'\d+', phrase) text_without_numbers = re.sub(r'\d+', '', phrase) phrase_gematria = calculate_gematria(text_without_numbers.replace(" ", "")) phrase_gematria += sum(int(number) for number in numbers) # Dynamically adjust max_words if auto_adjust_length is enabled if auto_adjust_length: max_words = len(text_without_numbers.split()) max_words = min(max_words, MAX_PHRASE_LENGTH_LIMIT) if (phrase_gematria, max_words) in gematria_cache: matching_phrases = gematria_cache[(phrase_gematria, max_words)] else: matching_phrases = search_gematria_in_db(phrase_gematria, max_words) gematria_cache[(phrase_gematria, max_words)] = matching_phrases if not matching_phrases: results.append(f"No matching phrases found for: {phrase}") continue sorted_phrases = sorted(matching_phrases, key=lambda x: (int(list(book_names.keys())[list(book_names.values()).index(x[1])]), x[2], x[3])) results_by_book = defaultdict(list) for words, book, chapter, verse, phrase_length, word_position in sorted_phrases: results_by_book[book].append((words, chapter, verse, phrase_length, word_position)) results.append(f"

Results for: {phrase} (Gematria: {phrase_gematria})

") results.append("
") for book, phrases in results_by_book.items(): for words, chapter, verse, phrase_length, word_position in phrases: translation = get_translation(words) if show_translation else "" link = f"https://www.biblegateway.com/passage/?search={quote_plus(book)}+{chapter}%3A{verse}&version=CJB" results.append(f"""

Book: {book}

Chapter: {chapter}, Verse: {verse}

Hebrew Phrase: {words}

Translation: {translation}

Phrase Length: {phrase_length} words

Phrase Gematria: {phrase_gematria}

Word Position in the Tanach: {word_position}

[See on Bible Gateway]
""") # Calculate average position for the current name across all books name_average_position = calculate_average_position_for_name(results_by_book) if name_average_position is not None: results.append(f"

Average Word Position for '{phrase}' across all books: {name_average_position:.2f}

") all_names_average_position += name_average_position total_name_count += 1 results.append("
") all_results.append(results_by_book) # Store results by book without the phrase # Calculate the average word position across all names and all their books if total_name_count > 0: all_names_average_position /= total_name_count results.append(f"

Average Word Position Across All Names and Books: {all_names_average_position:.2f}

") # Calculate middle words for all input lines (common books) if len(all_results) >= 2: results.append("

Middle Words (Common Books):

") results.append("
") common_books = set.intersection(*[set(results.keys()) for results in all_results]) logging.debug(f"Common books: {common_books}") for book in common_books: logging.debug(f"Processing book: {book}") # Find nearest positions for all phrases in the current book nearest_positions = find_nearest_positions([results[book] for results in all_results]) logging.debug(f"Nearest positions in {book}: {nearest_positions}") if nearest_positions: middle_word_position = sum(nearest_positions) / len(nearest_positions) logging.debug(f"Calculated middle word position in {book}: {middle_word_position}") start_position = int(middle_word_position) end_position = start_position + 1 if middle_word_position % 1 != 0 else start_position logging.debug(f"Middle word position range in {book}: {start_position}-{end_position}") middle_words_data = get_words_from_db(book, start_position, end_position) logging.debug(f"Middle words data fetched from database: {middle_words_data}") if middle_words_data: # Store middle word data along with book name for sorting middle_words_results.extend([(book, data) for data in middle_words_data]) else: # Handle edge case: fetch words independently for start and end positions logging.debug(f"No middle words found for range {start_position}-{end_position}. " f"Fetching words independently.") middle_words_data_start = get_words_from_db(book, start_position, start_position) middle_words_data_end = get_words_from_db(book, end_position, end_position) if middle_words_data_start or middle_words_data_end: middle_words_results.extend([(book, data) for data in middle_words_data_start + middle_words_data_end]) # Sort middle words results by book order before displaying middle_words_results.sort(key=lambda x: int(list(book_names.keys())[list(book_names.values()).index(x[0])])) for book, (words, chapter, verse, phrase_length, word_position) in middle_words_results: translation = get_translation(words) if show_translation else "" link = f"https://www.biblegateway.com/passage/?search={quote_plus(book)}+{chapter}%3A{verse}&version=CJB" results.append(f"""

Book: {book}

Chapter: {chapter}, Verse: {verse}

Hebrew Phrase: {words}

Translation: {translation}

Phrase Length: {phrase_length} words

Word Position in the Tanach: {word_position}

[See on Bible Gateway]
""") results.append("
") # Style modified to position search on top and results below style = """ """ return style + "\n".join(results) def calculate_average_position_for_name(results_by_book: Dict[str, List[Tuple]]) -> float: """Calculates the average word position for a single name across all books.""" positions = [] for book, phrases in results_by_book.items(): for _, _, _, _, word_position in phrases: start, end = map(int, word_position.split('-')) positions.append((start + end) / 2) return sum(positions) / len(positions) if positions else None def find_nearest_positions(results_lists: List[List]) -> List[int]: """Finds the nearest word positions among multiple lists of results.""" nearest_positions = [] for i in range(len(results_lists)): positions_i = [(int(pos.split('-')[0]) + int(pos.split('-')[1])) / 2 for _, _, _, _, pos in results_lists[i]] # Get average of start and end positions logging.debug(f"Positions for phrase {i+1}: {positions_i}") # Calculate the average position for the current phrase average_position = sum(positions_i) / len(positions_i) if positions_i else None logging.debug(f"Average position for phrase {i+1}: {average_position}") if average_position is not None: nearest_positions.append(average_position) return nearest_positions def get_words_from_db(book: str, start_position: int, end_position: int) -> List[Tuple]: """Fetches words from the database based on the book and exact word position range.""" global conn logging.debug(f"Fetching words from database for {book} at positions {start_position}-{end_position}") with sqlite3.connect(DATABASE_FILE) as conn: cursor = conn.cursor() cursor.execute(""" SELECT words, chapter, verse, phrase_length, word_position FROM results WHERE book = ? AND word_position = ? """, (book, f"{start_position}-{end_position}")) # Directly compare word_position results = cursor.fetchall() logging.debug(f"Words fetched from database: {results}") return results def flatten_text(text: List) -> str: """Flattens nested lists into a single list.""" if isinstance(text, list): return " ".join(flatten_text(item) if isinstance(item, list) else item for item in text) return text def run_app() -> None: """Initializes and launches the Gradio app.""" global conn initialize_database() initialize_translator() logging.info("Starting database population...") for max_phrase_length in range(1, MAX_PHRASE_LENGTH_LIMIT + 1): populate_database(1, 39, max_phrase_length=max_phrase_length) logging.info("Database population complete.") with gr.Blocks() as iface: # Use gr.Blocks() for layout control with gr.Row(): # Place inputs in a row textbox = gr.Textbox(label="Enter word(s) or numbers (one phrase per line)", lines=5) slider = gr.Slider(label="Max Word Count in Result Phrases", minimum=1, maximum=MAX_PHRASE_LENGTH_LIMIT, step=1, value=1) checkbox = gr.Checkbox(label="Show Translation", value=True) auto_adjust_checkbox = gr.Checkbox(label="Auto-Adjust Phrase Length", value=False) with gr.Row(): # Place buttons in a row clear_button = gr.Button("Clear") submit_button = gr.Button("Submit", variant="primary") html_output = gr.HTML(label="Results") # Output for the results submit_button.click(fn=gematria_search_interface, inputs=[textbox, slider, checkbox, auto_adjust_checkbox], outputs=html_output) clear_button.click(fn=lambda: "", inputs=None, outputs=html_output) # Clear the output iface.launch() if __name__ == "__main__": run_app()