File size: 2,781 Bytes
79b7942
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import os

import spacy
import speech_recognition as sr
from loguru import logger
from pydub import AudioSegment


# Load spaCy's English language model for grammar and text analysis
def get_doc(text):
    try:
        nlp = spacy.load("en_core_web_sm")
        doc = nlp(text)
        return doc
    except OSError as ex:
        logger.exception("Error while getting Spacy doc: ", str(ex))
        logger.info("Please download the model with this command: python -m spacy download en_core_web_sm")


# Convert MP3 to WAV using pydub
def convert_mp3_to_wav(mp3_file, wav_file="converted_comprehension_audio.wav"):
    sound = AudioSegment.from_mp3(mp3_file)
    sound.export(wav_file, format="wav")
    return wav_file


# Convert WAV to text using SpeechRecognition
def transcribe_audio(wav_file):
    recognizer = sr.Recognizer()
    with sr.AudioFile(wav_file) as source:
        audio_data = recognizer.record(source)
        text = recognizer.recognize_google(audio_data)
    return text


def split_audio(audio_path, chunk_length_ms=30000):
    """Splits audio into chunks of specified length (in milliseconds)."""
    audio = AudioSegment.from_wav(audio_path)
    chunks = [audio[i : i + chunk_length_ms] for i in range(0, len(audio), chunk_length_ms)]
    chunk_paths = []

    for idx, chunk in enumerate(chunks):
        chunk_path = f"{audio_path}_chunk_{idx}.wav"
        chunk.export(chunk_path, format="wav")
        chunk_paths.append(chunk_path)

    return chunk_paths


def aggregate_scores(chunk_scores):
    """Aggregate scores across all chunks to produce a final score."""
    final_score = {
        "accuracy_score": 0,
        "fluency_score": 0,
        "completeness_score": 0,
        "pronunciation_score": 0,
        "mispronunced_words": [],
        "display_text": "",
    }
    num_chunks = len(chunk_scores)

    # Sum each score across chunks
    for score in chunk_scores:
        final_score["accuracy_score"] += score.get("accuracy_score", 0)
        final_score["fluency_score"] += score.get("fluency_score", 0)
        final_score["completeness_score"] += score.get("completeness_score", 0)
        final_score["pronunciation_score"] += score.get("pronunciation_score", 0)
        final_score["mispronunced_words"].extend(score.get("mispronunced_words"))
        final_score["display_text"] += score.get("display_text")

    # Average each score
    for key in final_score:
        if type(final_score[key]) in [float, int]:
            final_score[key] = round(final_score[key] / num_chunks, 2)

    return final_score


def remove_files(file_list):
    try:
        os.system("rm " + " ".join(file_list))
        logger.info("All listed files removed.")
    except Exception as ex:
        logger.exception(f"Error removing files: {ex}")