Spaces:
Running
Running
from pytubefix import YouTube | |
from pytubefix.cli import on_progress | |
import time | |
import math | |
import gradio as gr | |
import ffmpeg | |
from faster_whisper import WhisperModel | |
import requests | |
import json | |
import arabic_reshaper # pip install arabic-reshaper | |
from bidi.algorithm import get_display # pip install python-bidi | |
from moviepy import VideoFileClip, TextClip, CompositeVideoClip, AudioFileClip, ColorClip | |
import pysrt | |
import instaloader | |
import time | |
import re | |
import concurrent.futures | |
import os | |
api_key = "268976:66f4f58a2a905" | |
def fetch_data(url): | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
print(f"An error occurred: {e}") | |
return None | |
def download_file(url): | |
try: | |
response = requests.get(url.split("#")[0], stream=True) | |
response.raise_for_status() | |
print(url.split("#")[1]) | |
with open(url.split("#")[1], 'wb') as file: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
file.write(chunk) | |
print(f"Downloaded successfully: {url.split('#')[1]}") | |
except requests.exceptions.RequestException as e: | |
print(f"An error occurred: {e}") | |
def download_chunk(url, start, end, filename, index): | |
headers = {'Range': f'bytes={start}-{end}'} | |
response = requests.get(url, headers=headers, stream=True) | |
response.raise_for_status() | |
chunk_filename = f'{filename}.part{index}' | |
with open(chunk_filename, 'wb') as file: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
file.write(chunk) | |
return chunk_filename | |
def merge_files(filename, num_parts): | |
with open(filename, 'wb') as output_file: | |
for i in range(num_parts): | |
part_filename = f'{filename}.part{i}' | |
with open(part_filename, 'rb') as part_file: | |
output_file.write(part_file.read()) | |
# Optionally, delete the part file after merging | |
# os.remove(part_filename) | |
def download_file_in_parallel(link, size, num_threads=4): | |
url = link.split("#")[0] | |
filename = link.split("#")[1] | |
print(url+" filename: "+filename) | |
response = requests.head(url) | |
#file_size = int(response.headers['Content-Length']) | |
chunk_size = size // num_threads | |
ranges = [(i * chunk_size, (i + 1) * chunk_size - 1) for i in range(num_threads)] | |
ranges[-1] = (ranges[-1][0], size - 1) # Adjust the last range to the end of the file | |
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: | |
futures = [ | |
executor.submit(download_chunk, url, start, end, filename, i) | |
for i, (start, end) in enumerate(ranges) | |
] | |
for future in concurrent.futures.as_completed(futures): | |
future.result() # Ensure all threads complete | |
merge_files(filename, num_threads) | |
print(f'Downloaded successfully: {filename}') | |
def one_youtube(link, api_key): | |
# Fetch video ID | |
video_id_url = f"https://one-api.ir/youtube/?token={api_key}&action=getvideoid&link={link}" | |
video_data = fetch_data(video_id_url) | |
if not video_data: | |
return None, None | |
video_id = video_data["result"] | |
# Fetch video data | |
filter_option = "" # Replace with your filter option | |
video_data_url = f"https://youtube.one-api.ir/?token={api_key}&action=fullvideo&id={video_id}&filter={filter_option}" | |
video_data_2 = fetch_data(video_data_url) | |
if not video_data_2: | |
return None, None | |
formats_list = video_data_2["result"]["formats"] | |
file_name = video_data_2["result"]["title"] | |
video_name = f'{file_name}.mp4' | |
audio_name = f'{file_name}.mp3' | |
for f in formats_list: | |
if f["format_note"] == "360p": | |
download_id = f["id"] | |
video_size = f["filesize"] | |
for f in formats_list: | |
if f["format_note"] == "medium": | |
audio_id = f["id"] | |
audio_size = f["filesize"] | |
if not download_id or not audio_id: | |
return None, None | |
# Fetch video and audio links | |
video_link_url = f"https://youtube.one-api.ir/?token={api_key}&action=download&id={download_id}" | |
audio_link_url = f"https://youtube.one-api.ir/?token={api_key}&action=download&id={audio_id}" | |
video_link_data = fetch_data(video_link_url) | |
audio_link_data = fetch_data(audio_link_url) | |
if not video_link_data or not audio_link_data: | |
return None, None | |
video_link = video_link_data["result"]["link"] | |
audio_link = audio_link_data["result"]["link"] | |
vid_str=video_link+"#"+video_name | |
audio_str=audio_link+"#"+audio_name | |
# Download video and audio files | |
print(video_size , audio_size) | |
download_file_in_parallel(vid_str, video_size) | |
download_file_in_parallel(audio_str, audio_size) | |
return video_name, audio_name | |
# Define your functions here | |
def yt_download(url): | |
yt = YouTube(url) | |
print(yt.title) | |
video_path = f"{yt.title}.mp4" | |
ys = yt.streams.get_highest_resolution() | |
print(ys) | |
ys.download() | |
return video_path, yt.title | |
def insta_oneapi(url, api_key): | |
shortcode = url.split("/")[-1] | |
print(shortcode) | |
url_one="https://api.one-api.ir/instagram/v1/post/?shortcode="+shortcode | |
request_body = [{"shortcode": shortcode},] | |
headers = {"one-api-token": api_key, "Content-Type": "application/json"} | |
response = requests.get(url_one, headers=headers) | |
print(response) | |
if response.status_code == 200: | |
result = response.json() | |
try: | |
time.sleep(10) | |
response = requests.get(result["result"]['media'][0]["url"], stream=True) | |
response.raise_for_status() | |
with open("video.mp4", 'wb') as file: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
file.write(chunk) | |
print(f"Downloaded successfully") | |
return "video.mp4" | |
except requests.exceptions.RequestException as e: | |
print(f"An error occurred: {e}") | |
else: | |
print(f"Error: {response.status_code}, {response.text}") | |
return None | |
def insta_download(permalink): | |
# Create an instance of Instaloader | |
L = instaloader.Instaloader() | |
try: | |
# Extract the shortcode from the permalink | |
if "instagram.com/reel/" in permalink: | |
shortcode = permalink.split("instagram.com/reel/")[-1].split("/")[0] | |
elif "instagram.com/p/" in permalink: | |
shortcode = permalink.split("instagram.com/p/")[-1].split("/")[0] | |
else: | |
raise ValueError("Invalid permalink format") | |
# Load the post using the shortcode | |
post = instaloader.Post.from_shortcode(L.context, shortcode) | |
# Check if the post is a video | |
if not post.is_video: | |
raise ValueError("The provided permalink is not a video.") | |
# Get the video URL | |
video_url = post.video_url | |
# Extract the filename from the URL | |
filename = video_url.split("/")[-1] | |
# Remove query parameters | |
filename = filename.split("?")[0] | |
# Download the video using requests | |
response = requests.get(video_url, stream=True) | |
response.raise_for_status() # Raise an error for bad responses | |
# Save the content to a file | |
with open(filename, 'wb') as file: | |
for chunk in response.iter_content(chunk_size=8192): | |
file.write(chunk) | |
print(f"Downloaded video {filename} successfully.") | |
return filename | |
except Exception as e: | |
print(f"Failed to download video from {permalink}: {e}") | |
def extract_audio(input_video_name): | |
# Define the input video file and output audio file | |
mp3_file = "audio.mp3" | |
# Load the video clip | |
video_clip = VideoFileClip(input_video_name) | |
# Extract the audio from the video clip | |
audio_clip = video_clip.audio | |
# Write the audio to a separate file | |
audio_clip.write_audiofile(mp3_file) | |
# Close the video and audio clips | |
audio_clip.close() | |
video_clip.close() | |
print("Audio extraction successful!") | |
return mp3_file | |
def transcribe(audio): | |
model = WhisperModel("tiny") | |
segments, info = model.transcribe(audio) | |
segments = list(segments) | |
for segment in segments: | |
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text)) | |
return segments | |
def format_time(seconds): | |
hours = math.floor(seconds / 3600) | |
seconds %= 3600 | |
minutes = math.floor(seconds / 60) | |
seconds %= 60 | |
milliseconds = round((seconds - math.floor(seconds)) * 1000) | |
seconds = math.floor(seconds) | |
formatted_time = f"{hours:02d}:{minutes:02d}:{seconds:01d},{milliseconds:03d}" | |
return formatted_time | |
def generate_subtitle_file(language, segments, input_video_name): | |
subtitle_file = f"sub-{input_video_name}.{language}.srt" | |
text = "" | |
for index, segment in enumerate(segments): | |
segment_start = format_time(segment.start) | |
segment_end = format_time(segment.end) | |
text += f"{str(index+1)} \n" | |
text += f"{segment_start} --> {segment_end} \n" | |
text += f"{segment.text} \n" | |
text += "\n" | |
f = open(subtitle_file, "w", encoding='utf8') | |
f.write(text) | |
f.close() | |
return subtitle_file | |
def read_srt_file(file_path): | |
try: | |
with open(file_path, 'r', encoding='utf-8') as file: | |
srt_content = file.read() | |
return srt_content | |
except FileNotFoundError: | |
print(f"The file {file_path} was not found.") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
def write_srt(subtitle_text, output_file="edited_srt.srt"): | |
with open(output_file+".srt", 'w', encoding="utf-8") as file: | |
file.write(subtitle_text) | |
return output_file+".srt" | |
def generate_translated_subtitle(language, segments, input_video_name): | |
input_video_name=input_video_name.split('/')[-1] | |
subtitle_file = f"{input_video_name}.srt" | |
text = "" | |
lines = segments.split('\n') | |
new_list = [item for item in lines if item != ''] | |
segment_number = 1 | |
for index, segment in enumerate(new_list): | |
if (index+1) % 3 == 1 or (index+1)==1: | |
text += f"{segment}\n" | |
segment_number += 1 | |
if (index+1) % 3 == 2 or (index+1)==2: | |
text += segment + "\n" | |
if (index+1) % 3 == 0: | |
text += f"\u200F{segment}\n\n" | |
with open(subtitle_file, "w", encoding='utf8') as f: | |
f.write(text) | |
return subtitle_file | |
def clean_text(text): | |
# Remove 'srt ' from the start of each line | |
# Remove ''' from the start and end | |
text = re.sub(r"^```|```$", '', text) | |
text = re.sub(r'^srt', '', text, flags=re.MULTILINE) | |
return text | |
def split_srt_file(input_file, max_chars=3000): | |
# Read the contents of the SRT file | |
with open(input_file, 'r', encoding='utf-8') as file: | |
content = file.read() | |
# Split the content into individual subtitles | |
subtitles = content.strip().split('\n\n') | |
# Prepare to write the split files | |
output_files = [] | |
current_file_content = '' | |
current_file_index = 1 | |
for subtitle in subtitles: | |
# Check if adding this subtitle would exceed the character limit | |
if len(current_file_content) + len(subtitle) + 2 > max_chars: # +2 for \n\n | |
# Write the current file | |
output_file_name = f'split_{current_file_index}.srt' | |
with open(output_file_name, 'w', encoding='utf-8') as output_file: | |
output_file.write(current_file_content.strip()) | |
output_files.append(output_file_name) | |
# Prepare for the next file | |
current_file_index += 1 | |
current_file_content = subtitle + '\n\n' | |
else: | |
# If it fits, add the subtitle | |
current_file_content += subtitle + '\n\n' | |
# Write any remaining content to a new SRT file | |
if current_file_content: | |
output_file_name = f'split_{current_file_index}.srt' | |
with open(output_file_name, 'w', encoding='utf-8') as output_file: | |
output_file.write(current_file_content.strip()) | |
output_files.append(output_file_name) | |
return output_files | |
def translate_text(api_key, source_lang, target_lang, text): | |
url = "https://api.one-api.ir/translate/v1/google/" | |
request_body = {"source": source_lang, "target": target_lang, "text": text} | |
headers = {"one-api-token": api_key, "Content-Type": "application/json"} | |
response = requests.post(url, headers=headers, json=request_body) | |
if response.status_code == 200: | |
result = response.json() | |
enhanced_text = enhance_text(api_key, text, result['result']) | |
return enhanced_text | |
else: | |
print(f"Error: {response.status_code}, {response.text}") | |
return None | |
def enhance_text(api_key, text): | |
url = "https://api.one-api.ir/chatbot/v1/gpt4o/" | |
# Prepare the request body | |
request_body = [{ | |
"role": "user", | |
"content": "Translate the following English text into Persian, specifically for a voice-over. Prioritize a natural and engaging tone that resonates with a Persian-speaking audience. Make sure the translation is:Concise and Time-Conscious: It needs to be spoken fluently within approximately the same time it takes to say the original English. Avoid overly verbose phrasing.Clear and Understandable: Use vocabulary and sentence structures that are easily understood by a general Persian-speaking audience. Avoid jargon or overly technical terms unless absolutely necessary and always provide a culturally appropriate equivalent.Culturally Relevant: Adapt the translation to ensure it is culturally appropriate and resonates with Iranian sensibilities. Consider nuances of politeness, humor, and common expressions. Adapt phrases as necessary to better reflect the culture.Natural Sounding: Read the Persian translation aloud to ensure it sounds natural and flows well when spoken.Engaging: Use phrasing that keeps the listener interested and attentive. Consider using rhetorical devices or active voice where appropriate to make the delivery more impactful.[Optional: Include the intended audience demographics here, e.g., "Targeting a younger audience" or "Intended for a professional context". This helps tailor the language.][Optional: Mention the overall tone you're aiming for, e.g., "maintain a formal tone" or "use an informal and friendly tone".]Essentially, I need a translation that doesn't sound like a translation, but a message written originally in Persian that is easy to grasp and will keep the audience captivated.the text will send in next message . | |
},] | |
{ | |
"role": "assistant", | |
"content": "okay" | |
}, | |
{ | |
"role": "user", | |
"content": text | |
} | |
] | |
# Add the API key to the request | |
headers = { | |
"one-api-token": api_key, | |
"Content-Type": "application/json" | |
} | |
# Make the POST request | |
attempts = 0 | |
max_attempts = 3 | |
while attempts < max_attempts: | |
response = requests.post(url, headers=headers, json=request_body) | |
if response.status_code == 200: | |
result = response.json() | |
if result["status"] == 200: | |
print("status: ", result["status"]) | |
te = clean_text(result["result"][0]) | |
print("result: ", te) | |
return te | |
else: | |
print(f"Error: status {result['status']}, retrying in 30 seconds...") | |
else: | |
print(f"Error: {response.status_code}, {response.text}, retrying in 30 seconds...") | |
attempts += 1 | |
time.sleep(30) | |
print("Error Max attempts reached. Could not retrieve a successful response.") | |
return 0 | |
def write_google(google_translate): | |
google = "google_translate.srt" | |
with open(google, 'a', encoding="utf-8") as f: | |
f.write(google_translate) | |
def time_to_seconds(time_obj): | |
return time_obj.hours * 3600 + time_obj.minutes * 60 + time_obj.seconds + time_obj.milliseconds / 1000 | |
def create_subtitle_clips(subtitles, videosize, fontsize, font, color, debug): | |
subtitle_clips = [] | |
color_clips=[] | |
for subtitle in subtitles: | |
start_time = time_to_seconds(subtitle.start) # Add 2 seconds offset | |
end_time = time_to_seconds(subtitle.end) | |
duration = end_time - start_time | |
video_width, video_height = videosize | |
max_width = video_width * 0.8 | |
max_height = video_height * 0.2 | |
text_clip = TextClip(font, subtitle.text, font_size=fontsize, size=(int(video_width * 0.8), int(video_height * 0.2)) ,text_align="center" ,color=color, method='caption').with_start(start_time).with_duration(duration) | |
myclip = ColorClip(size=(int(video_width * 0.8), int(video_height * 0.2)) , color=(0, 0, 0)).with_opacity(0.4).with_start(start_time).with_duration(duration) | |
subtitle_x_position = 'center' | |
subtitle_y_position = video_height * 0.68 | |
text_position = (subtitle_x_position, subtitle_y_position) | |
subtitle_clips.append(text_clip.with_position(text_position)) | |
color_clips.append(myclip.with_position(text_position)) | |
return subtitle_clips, color_clips | |
def process_video(video, url, type): | |
if isinstance(video, str): | |
input_video = video.split("/")[-1] | |
print(input_video) | |
input_audio = extract_audio(video) | |
input_video_name = input_video.replace(".mp4", "") | |
else: | |
if type=="insta": | |
input_video=insta_oneapi(url, api_key) | |
input_video_name = input_video.replace(".mp4", "") | |
input_audio = extract_audio(input_video) | |
elif type=="youtube": | |
#input_video, input_audio = one_youtube(url, api_key) | |
input_video, title = yt_download(url) | |
input_video_name = input_video.replace(".mp4", "") | |
input_audio = extract_audio(input_video) | |
segments = transcribe(audio=input_audio) | |
language = "fa" | |
subtitle_file = generate_subtitle_file(language=language, segments=segments, input_video_name=input_video_name) | |
source_language = "en" | |
target_language = "fa" | |
#srt_string = read_srt_file(subtitle_file) | |
srt_files=split_srt_file(subtitle_file) | |
for i in srt_files: | |
srt_string = read_srt_file(f"{i}") | |
#google_translate = translate_text(api_key, source_language, target_language, srt_string) | |
google_translate = enhance_text(api_key, srt_string) | |
if google_translate == 0 : | |
google_translate = translate_text(api_key, source_language, target_language, srt_string) | |
write_google(google_translate) | |
time.sleep(15) | |
srt = read_srt_file("google_translate.srt") | |
os.remove("google_translate.srt") | |
return srt, video, input_audio | |
def video_edit(srt, input_video, input_audio= 'audio.mp3'): | |
input_video_name = input_video.replace(".mp4", "") | |
srt_name=generate_translated_subtitle("fa", srt, input_video_name) | |
return input_video, srt_name | |
""" input_video_name = input_video.replace(''.mp4', '') | |
video = VideoFileClip(input_video) | |
audio = AudioFileClip(input_audio) | |
video = video.with_audio(audio) | |
print(video) | |
output_video_file = input_video_name + '_subtitled' + '.mp4' | |
write_srt(srt) | |
subtitles = pysrt.open('edited_srt.srt', encoding='utf-8') | |
subtitle_clips = create_subtitle_clips(subtitles, video.size, 32, 'arial.ttf', 'white', False) | |
final_video = CompositeVideoClip([video] + subtitle_clips) | |
final_video.write_videofile(output_video_file, codec='libx264', audio_codec='aac', logger=None) | |
os.remove('google_translate.srt') | |
print('final')""" | |
with gr.Blocks() as demo: | |
gr.Markdown("Start typing below and then click **Run** to see the output.") | |
with gr.Row(): | |
inp = gr.Textbox(placeholder="Enter URL or upload") | |
drp = gr.Dropdown(["insta", "youtube"]) | |
btn = gr.Button("transcribe") | |
out = gr.Textbox(interactive=True) | |
video_file_input = gr.Video(label="Upload Video File") | |
video_path_output = gr.Textbox(label="Video Path", visible=False) | |
audio_path_output = gr.Textbox(label="Video Path", visible=False) | |
btn.click(fn=process_video, inputs=[video_file_input, inp, drp], outputs=[out, video_path_output, audio_path_output]) | |
with gr.Row(): | |
vid_out = gr.Video() | |
srt_file = gr.File() | |
btn2 = gr.Button("transcribe") | |
gr.on( | |
triggers=[btn2.click], | |
fn=write_google, | |
inputs=out, | |
).then(video_edit, [out, video_path_output, audio_path_output], outputs=[vid_out, srt_file]) | |
demo.launch(debug=True) |