import os import spacy import speech_recognition as sr from loguru import logger from pydub import AudioSegment # Load spaCy's English language model for grammar and text analysis def get_doc(text): try: nlp = spacy.load("en_core_web_sm") doc = nlp(text) return doc except OSError as ex: logger.exception("Error while getting Spacy doc: ", str(ex)) logger.info("Please download the model with this command: python -m spacy download en_core_web_sm") # Convert MP3 to WAV using pydub def convert_mp3_to_wav(mp3_file, wav_file="converted_comprehension_audio.wav"): sound = AudioSegment.from_mp3(mp3_file) sound.export(wav_file, format="wav") return wav_file # Convert WAV to text using SpeechRecognition def transcribe_audio(wav_file): recognizer = sr.Recognizer() with sr.AudioFile(wav_file) as source: audio_data = recognizer.record(source) text = recognizer.recognize_google(audio_data) return text def split_audio(audio_path, chunk_length_ms=30000): """Splits audio into chunks of specified length (in milliseconds).""" audio = AudioSegment.from_wav(audio_path) chunks = [audio[i : i + chunk_length_ms] for i in range(0, len(audio), chunk_length_ms)] chunk_paths = [] for idx, chunk in enumerate(chunks): chunk_path = f"{audio_path}_chunk_{idx}.wav" chunk.export(chunk_path, format="wav") chunk_paths.append(chunk_path) return chunk_paths def aggregate_scores(chunk_scores): """Aggregate scores across all chunks to produce a final score.""" final_score = { "accuracy_score": 0, "fluency_score": 0, "completeness_score": 0, "pronunciation_score": 0, "mispronunced_words": [], "display_text": "", } num_chunks = len(chunk_scores) # Sum each score across chunks for score in chunk_scores: final_score["accuracy_score"] += score.get("accuracy_score", 0) final_score["fluency_score"] += score.get("fluency_score", 0) final_score["completeness_score"] += score.get("completeness_score", 0) final_score["pronunciation_score"] += score.get("pronunciation_score", 0) final_score["mispronunced_words"].extend(score.get("mispronunced_words")) final_score["display_text"] += score.get("display_text") # Average each score for key in final_score: if type(final_score[key]) in [float, int]: final_score[key] = round(final_score[key] / num_chunks, 2) return final_score def remove_files(file_list): try: os.system("rm " + " ".join(file_list)) logger.info("All listed files removed.") except Exception as ex: logger.exception(f"Error removing files: {ex}")