Playground-Deepgram / deepgram_transcribe.py
jessica07's picture
Update deepgram_transcribe.py
73b5748 verified
# this is a script that transcibe downloaded youtube video using deepgram
# the audio should be cleaned with UVR5 first, so the file is flac
# it will upload the full length of interview or podcast to deepgram
# and will return the speaker id. User must manually listen audio clip to find out which speaker is wanted
# discard remaining speakers and short length audio
#
import os
from dotenv import load_dotenv
from pydub import AudioSegment
import math
from os.path import join
import shutil
from deepgram import (
DeepgramClient,
PrerecordedOptions,
FileSource,
)
def write_csv_file(csv_file, csv_data):
with open(csv_file, 'w') as file:
# Iterate over each row in the data
for row in csv_data:
# Create a string where each field is separated by a '|'
row_string = '|'.join(str(item) for item in row)
# Write the string to the file, followed by a newline character
file.write(row_string + '\n')
print(f"Data written to {csv_file}")
def process(audio_file, tag, progress):
load_dotenv("myenv-variable.env")
# Path to the audio file
AUDIO_FILE = audio_file #audio name
TAGS = tag # youtube source, for categorization
API_KEY = os.getenv('API_DEEPGRAM')
original_parent_folder = os.getcwd()
output_folder = join(original_parent_folder, "output")
if os.path.isdir(output_folder):
shutil.rmtree(output_folder)
if os.path.exists("output.zip"):
os.remove("output.zip")
os.mkdir(output_folder)
deepgram = DeepgramClient(API_KEY)
with open(AUDIO_FILE, "rb") as file:
buffer_data = file.read()
payload: FileSource = {
"buffer": buffer_data,
}
#STEP 2: Configure Deepgram options for audio analysis
options = PrerecordedOptions(
model="nova-2",
smart_format=True,
filler_words=True,
diarize=True
)
progress(0.20)
try:
response = deepgram.listen.prerecorded.v("1").transcribe_file(payload, options)
except Exception as e:
print(e)
progress(0.30)
audio = AudioSegment.from_file(AUDIO_FILE)
data = response
paragraphs = data['results']['channels'][0]['alternatives'][0]['paragraphs']['paragraphs']
csv_data_dict = dict()
i=1
progress(0.40)
for paragraph in progress.tqdm(paragraphs, desc="Generating..."):
sentences = paragraph['sentences']
for text in sentences:
# convert the start and end time of the sentence to ms, add +- 5ms buffer to it
start_time_ms = math.floor(text['start']*1000)-5
end_time_ms = math.ceil(text['end']*1000)+5
duration_s = round(text['end']-text['start'],3)
duration_ms = str(end_time_ms-start_time_ms).zfill(6)
if duration_s < 2:
continue
speaker_id = paragraph['speaker']
folder_path = join(output_folder, "Speaker_"+str(speaker_id))
if not os.path.isdir(folder_path):
os.mkdir(folder_path)
csv_data_dict.update({str(speaker_id): [["filename", "speaker", "text", "start_time", "end_time", "duration"]]})
print(csv_data_dict)
if speaker_id == 10:
speaker_id = "Tayr"
# Slice the audio segment
segment = audio[start_time_ms:end_time_ms]
# Generate file name
file_name = join("wavs",f"{TAGS}_Speaker_{speaker_id}_i{str(i).zfill(3)}_d{duration_ms}.wav")
# Export the segment to temp folder
temp_folder = join(folder_path,f"{TAGS}_Speaker_{speaker_id}_i{str(i).zfill(3)}_d{duration_ms}.wav")
segment.export(temp_folder, format="wav")
# Add data to CSV list
csv_data_dict[str(speaker_id)].append([file_name, speaker_id, text['text'], start_time_ms, end_time_ms, duration_s])
i += 1
progress(0.80)
# write output.txt file
for key, value in csv_data_dict.items():
# Specify the filename
speaker_folder = join(output_folder, f"Speaker_{key}")
csv_filename = join(speaker_folder,f"Speaker_{key}_{TAGS}_output.txt")
write_csv_file(csv_filename, value)
progress(0.90)
shutil.make_archive("output", 'zip', output_folder)
progress(1.00)
return "output.zip"