from langchain.tools import tool
from crewai_tools import ScrapeWebsiteTool
from gtts import gTTS
from pydub import AudioSegment
from groq import Groq
from PIL import Image, ImageDraw, ImageFont
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, ImageClip
import requests
import os
import tempfile
import re
import base64
import pypandoc
import cv2
import numpy as np
import warnings
warnings.filterwarnings('ignore')
from pathlib import Path
from openai import OpenAI
# !sudo apt-get install pandoc
@tool
def scrape_website(website_url):
"""Scrapes all the information from the given website.
Args:
website_url: A url of a company website.
Returns:
Scraped information from the given website.
"""
scrapper = ScrapeWebsiteTool()
data = scrapper.run(website_url=website_url)
return data
def convert_md_to_docx(md_file_path, docx_file_path):
output = pypandoc.convert_file(md_file_path, 'docx', outputfile=docx_file_path)
assert output == "", "Conversion failed"
print(f"Converted {md_file_path} to {docx_file_path}")
# def generate_image(text, num):
# engine_id = "stable-diffusion-v1-6"
# api_host = os.getenv('API_HOST', 'https://api.stability.ai')
# api_key = 'sk-5VTo97D19Ruf2zLinj3pQbVXmLmh2Ps354PGkufTHtqmB2BN'
# if api_key is None:
# raise Exception("Missing Stability API key.")
# response = requests.post(
# f"{api_host}/v1/generation/{engine_id}/text-to-image",
# headers={
# "Content-Type": "application/json",
# "Accept": "application/json",
# "Authorization": f"Bearer {api_key}"
# },
# json={
# "text_prompts": [
# {
# "text": text
# }
# ],
# "cfg_scale": 7,
# "height": 512,
# "width": 512,
# "samples": 1,
# "steps": 10,
# },
# )
# print(response.status_code)
# if response.status_code != 200:
# raise Exception("Non-200 response: " + str(response.text))
# data = response.json()
# # base64_image = None
# for image in data["artifacts"]:
# with open(f"image_{num}.png", "wb") as f:
# f.write(base64.b64decode(image["base64"]))
# # if base64_image is None:
# # raise Exception("No image was generated.")
# return f'image_{num}.png'
# def generate_image_core(text, num):
# response = requests.post(
# f"https://api.stability.ai/v2beta/stable-image/generate/core",
# headers={
# "authorization": f"sk-6iUj0Jg2eeKDOpRJuDmCDSvPJdUJ6oP6qrQY3sujqR8h4ycF",
# "accept": "image/*"
# },
# files={"none": ''},
# data={
# "prompt": text,
# "output_format": "png",
# 'aspect_ratio': "3:2"
# },
# )
# print(response.status_code)
# if response.status_code == 200:
# with open(f"image_{num}.png", 'wb') as file:
# file.write(response.content)
# else:
# raise Exception(str(response.json()))
# return f'image_{num}.png'
# def generate_image_openai(text, num):
# client = OpenAI(api_key='sk-proj-TVCjX5VGWF5s18k0Z1G1T3BlbkFJZYp0HIC4NnxzqC0ne4YG')
# try:
# print(2)
# response = client.images.generate(
# model="dall-e-2",
# prompt=text,
# size="512x512",
# quality="standard",
# n=1
# )
# print(3)
# image_url = response.data[0].url
# print(4)
# print(f'image {num} generated')
# image_response = requests.get(image_url)
# print(5)
# if image_response.status_code == 200:
# with open(os.path.join(f'image_{num}.png'), 'wb') as file:
# print(6)
# file.write(image_response.content)
# print(7)
# else:
# raise Exception(f"Failed to download image with status code {image_response.status_code} and message: {image_response.text}")
# except Exception as e:
# raise Exception(f"Image generation failed: {e}")
# return f'image_{num}.png'
# @tool
# def generate_images_and_add_to_blog(blog_content):
# """This tool is used to generate images and add them to blog
# Args:
# blog_content: A complete blog with prompts enclosed in prompt tag.
# Returns:
# A complete blog"""
# print('hi')
# image_descriptions = re.findall(r'(.*?)', blog_content)
# for i, text in enumerate(image_descriptions):
# try:
# print(1)
# img_path = generate_image_openai(text, i)
# print(8)
# # image_tag = f'data:image/png;base64,{base64_img}'
# blog_content = blog_content.replace(f'{text}', f'')
# print(9)
# except Exception as e:
# print(e)
# raise Exception(f"Image generation failed: {e}")
# with open('blog_post.md', 'w') as f:
# f.write(blog_content)
# convert_md_to_docx('blog_post.md', 'blog_post.docx')
# return blog_content
def generate_image_openai(text, num):
temp_output_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png')
output_image = temp_output_file.name
client = OpenAI()
try:
response = client.images.generate(
model="dall-e-2",
prompt=text,
size="512x512",
quality="standard",
n=1
)
image_url = response.data[0].url
print(f'image {num} generated')
image_response = requests.get(image_url)
print('response')
if image_response.status_code == 200:
with open(output_image, 'wb') as file:
file.write(image_response.content)
print('write')
else:
raise Exception(f"Failed to download image with status code {image_response.status_code} and message: {image_response.text}")
except Exception as e:
raise Exception(f"Image generation failed: {e}")
return output_image
@tool
def generate_images_and_add_to_blog(blog_content):
"""This tool is used to generate images and add them to blog
Args:
blog_content: A complete blog with prompts enclosed in prompt tag.
Returns:
A complete blog"""
print(blog_content)
print('*****************************************************')
print(type(blog_content))
blog_content = str(blog_content)
image_descriptions = re.findall(r'(.*?)', blog_content)
for i, text in enumerate(image_descriptions):
try:
temp_folder = tempfile.mkdtemp()
img_path = generate_image_openai(text, i)
# image_tag = f'data:image/png;base64,{base64_img}'
print(img_path)
blog_content = blog_content.replace(f'{text}', f'')
print('blog content')
except Exception as e:
print(e)
raise Exception(f"Image generation failed: {e}")
try:
print('blog')
with open('blog_post.md', 'w') as f:
f.write(blog_content)
print('convert')
convert_md_to_docx('blog_post.md', 'blog_post.docx')
print('converted')
except error:
print(error)
return blog_content
def process_script(script):
"""Used to process the script into dictionary format"""
dict = {}
text_for_image_generation = re.findall(r'(.*?)?image>', script, re.DOTALL)
text_for_speech_generation = re.findall(r'(.*?)?narration>', script, re.DOTALL)
dict['text_for_image_generation'] = text_for_image_generation
dict['text_for_speech_generation'] = text_for_speech_generation
return dict
def generate_speech(text, lang='en', speed=1.0, num=0):
"""
Generates speech for the given script using gTTS and adjusts the speed.
"""
temp_speech_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
temp_speech_path = temp_speech_file.name
client = OpenAI()
speech_file_path = temp_speech_path
response = client.audio.speech.create(
model="tts-1",
voice="echo",
input= text
)
response.stream_to_file(speech_file_path)
# tts = gTTS(text=text, lang=lang)
# tts.save(temp_speech_path)
sound = AudioSegment.from_file(temp_speech_path)
if speed != 1.0:
sound_with_altered_speed = sound._spawn(sound.raw_data, overrides={
"frame_rate": int(sound.frame_rate * speed)
}).set_frame_rate(sound.frame_rate)
sound_with_altered_speed.export(temp_speech_path, format="mp3")
else:
sound.export(temp_speech_path, format="mp3")
temp_speech_file.close()
return temp_speech_path
# def image_generator(script):
# """Generates images for the given script.
# Saves it to a temporary directory and returns the path.
# Args:
# script: a complete script containing narrations and image descriptions."""
# # remove_temp_files('/tmp')
# images_dir = tempfile.mkdtemp()
# dict = process_script(script)
# for i, text in enumerate(dict['text_for_image_generation']):
# try:
# # core
# # response = requests.post(
# # f"https://api.stability.ai/v2beta/stable-image/generate/core",
# # headers={
# # "authorization": f"sk-5VTo97D19Ruf2zLinj3pQbVXmLmh2Ps354PGkufTHtqmB2BN",
# # "accept": "image/*"
# # },
# # files={"none": ''},
# # data={
# # "prompt": text,
# # "output_format": "png",
# # 'aspect_ratio': "9:16"
# # },
# # )
# # print(response.status_code)
# # if response.status_code == 200:
# # with open(os.path.join(images_dir, f'image_{i}.png'), 'wb') as file:
# # file.write(response.content)
# # else:
# # raise Exception(str(response.json()))
# # v1
# # engine_id = "stable-diffusion-v1-6"
# # api_host = os.getenv('API_HOST', 'https://api.stability.ai')
# # api_key = 'sk-Z3EF1ebJ9oJUht6Q9fsh861wOsNhRFkxYXMYHNl7gt7xpBMD'
# # if api_key is None:
# # raise Exception("Missing Stability API key.")
# # response = requests.post(
# # f"{api_host}/v1/generation/{engine_id}/text-to-image",
# # headers={
# # "Content-Type": "application/json",
# # "Accept": "application/json",
# # "Authorization": f"Bearer {api_key}"
# # },
# # json={
# # "text_prompts": [
# # {
# # "text": text
# # }
# # ],
# # "cfg_scale": 7,
# # "height": 512,
# # "width": 512,
# # "samples": 1,
# # "steps": 10,
# # },
# # )
# # print(response.status_code)
# # if response.status_code != 200:
# # raise Exception("Non-200 response: " + str(response.text))
# # data = response.json()
# # # base64_image = None
# # for image in data["artifacts"]:
# # with open(os.path.join(images_dir, f'image_{i}.png'), "wb") as f:
# # f.write(base64.b64decode(image["base64"]))
# pass
# except Exception as e:
# print(e)
# raise Exception(f"Image generation failed: {e}")
# return images_dir
def image_generator(script):
"""Generates images for the given script.
Saves it to a temporary directory and returns the path.
Args:
script: a complete script containing narrations and image descriptions."""
# remove_temp_files('/tmp')
images_dir = tempfile.mkdtemp()
client = OpenAI()
dict = process_script(script)
for i, text in enumerate(dict['text_for_image_generation']):
try:
response = client.images.generate(
model="dall-e-2",
prompt=text,
size="512x512",
quality="standard",
n=1
)
image_url = response.data[0].url
print(f'image {i} generated')
# Download the image
image_response = requests.get(image_url)
if image_response.status_code == 200:
with open(os.path.join(images_dir, f'image_{i}.png'), 'wb') as file:
file.write(image_response.content)
else:
raise Exception(f"Failed to download image with status code {image_response.status_code} and message: {image_response.text}")
except Exception as e:
raise Exception(f"Image generation failed: {e}")
return images_dir
def speech_generator(script):
"""
Generates speech files for the given script using gTTS.
Saves them to a temporary directory and returns the path.
Args:
script: a complete script containing narrations and image descriptions.
"""
speeches_dir = tempfile.mkdtemp()
dict = process_script(script)
for i, text in enumerate(dict['text_for_speech_generation']):
speech_path = generate_speech(text, num=i)
print(f'speech {i} generated')
os.rename(speech_path, os.path.join(speeches_dir, f'speech_{i}.mp3'))
return speeches_dir, dict['text_for_speech_generation']
def split_text_into_chunks(text, chunk_size):
words = text.split()
return [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
def add_text_to_video(input_video, text, duration=1, fontsize=40, fontcolor=(255, 255, 255),
outline_thickness=2, outline_color=(0, 0, 0), delay_between_chunks=0.3,
font_path='Montserrat-Bold.ttf'):
temp_output_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
output_video = temp_output_file.name
chunks = split_text_into_chunks(text, 3) # Adjust chunk size as needed
cap = cv2.VideoCapture(input_video)
if not cap.isOpened():
raise ValueError("Error opening video file.")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
chunk_duration_frames = duration * fps
delay_frames = int(delay_between_chunks * fps)
if not os.path.exists(font_path):
raise FileNotFoundError(f"Font file not found: {font_path}")
try:
font = ImageFont.truetype(font_path, fontsize)
except Exception as e:
raise RuntimeError(f"Error loading font: {e}")
current_frame = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(frame_pil)
chunk_index = current_frame // (chunk_duration_frames + delay_frames)
if current_frame % (chunk_duration_frames + delay_frames) < chunk_duration_frames and chunk_index < len(chunks):
chunk = chunks[chunk_index]
text_bbox = draw.textbbox((0, 0), chunk, font=font)
text_width, text_height = text_bbox[2] - text_bbox[0], text_bbox[3] - text_bbox[1]
text_x = (width - text_width) // 2
text_y = height - 100 # Position text at the bottom
if text_width > width:
words = chunk.split()
half = len(words) // 2
line1 = ' '.join(words[:half])
line2 = ' '.join(words[half:])
text_size_line1 = draw.textsize(line1, font=font)
text_size_line2 = draw.textsize(line2, font=font)
text_x_line1 = (width - text_size_line1[0]) // 2
text_x_line2 = (width - text_size_line2[0]) // 2
text_y = height - 250 - text_size_line1[1] # Adjust vertical position for two lines
for dx in range(-outline_thickness, outline_thickness + 1):
for dy in range(-outline_thickness, outline_thickness + 1):
if dx != 0 or dy != 0:
draw.text((text_x_line1 + dx, text_y + dy), line1, font=font, fill=outline_color)
draw.text((text_x_line2 + dx, text_y + text_size_line1[1] + dy), line2, font=font, fill=outline_color)
draw.text((text_x_line1, text_y), line1, font=font, fill=fontcolor)
draw.text((text_x_line2, text_y + text_size_line1[1]), line2, font=font, fill=fontcolor)
else:
for dx in range(-outline_thickness, outline_thickness + 1):
for dy in range(-outline_thickness, outline_thickness + 1):
if dx != 0 or dy != 0:
draw.text((text_x + dx, text_y + dy), chunk, font=font, fill=outline_color)
draw.text((text_x, text_y), chunk, font=font, fill=fontcolor)
frame = cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR)
out.write(frame)
current_frame += 1
# Ensure loop breaks after processing all frames
if current_frame >= frame_count:
break
cap.release()
out.release()
cv2.destroyAllWindows()
return output_video
def apply_zoom_in_effect(clip, zoom_factor=1.2):
width, height = clip.size
duration = clip.duration
def zoom_in_effect(get_frame, t):
frame = get_frame(t)
zoom = 1 + (zoom_factor - 1) * (t / duration)
new_width, new_height = int(width * zoom), int(height * zoom)
resized_frame = cv2.resize(frame, (new_width, new_height))
x_start = (new_width - width) // 2
y_start = (new_height - height) // 2
cropped_frame = resized_frame[y_start:y_start + height, x_start:x_start + width]
return cropped_frame
return clip.fl(zoom_in_effect, apply_to=['mask'])
def create_video_from_images_and_audio(images_dir, speeches_dir, final_video_filename, all_captions):
"""Creates video using images and audios.
Args:
images_dir: path to images folder
speeches_dir: path to speeches folder
final_video_filename: the topic name which will be used as final video file name"""
print('hi')
client = Groq(api_key='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')
# images_paths = sorted(os.listdir(images_dir))
# audio_paths = sorted(os.listdir(speeches_dir))
images_paths = sorted([os.path.join(images_dir, img) for img in os.listdir(images_dir) if img.endswith('.png') or img.endswith('.jpg')])
audio_paths = sorted([os.path.join(speeches_dir, speech) for speech in os.listdir(speeches_dir) if speech.endswith('.mp3')])
clips = []
temp_files = []
video_dir = tempfile.mkdtemp()
for i in range(min(len(images_paths), len(audio_paths))):
img_clip = ImageClip(os.path.join(images_dir, images_paths[i]))
audioclip = AudioFileClip(os.path.join(speeches_dir, audio_paths[i]))
videoclip = img_clip.set_duration(audioclip.duration)
zoomed_clip = apply_zoom_in_effect(videoclip, 1.3)
# with open(os.path.join(speeches_dir, audio_paths[i]), "rb") as file:
# transcription = client.audio.transcriptions.create(
# file=(audio_paths[i], file.read()),
# model="whisper-large-v3",
# response_format="verbose_json",
# )
# caption = transcription.text
temp_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name
zoomed_clip.write_videofile(temp_video_path, codec='libx264', fps=24)
temp_files.append(temp_video_path)
caption = all_captions[i]
final_video_path = add_text_to_video(temp_video_path, caption, duration=1, fontsize=20)
temp_files.append(final_video_path)
final_clip = VideoFileClip(final_video_path)
final_clip = final_clip.set_audio(audioclip)
print(f'create small video {i}')
clips.append(final_clip)
final_clip = concatenate_videoclips(clips)
if not final_video_filename.endswith('.mp4'):
final_video_filename = final_video_filename + '.mp4'
final_clip.write_videofile(os.path.join(video_dir, final_video_filename), codec='libx264', fps=24)
# Close all video files properly
for clip in clips:
clip.close()
# Remove all temporary files
for temp_file in temp_files:
try:
os.remove(temp_file)
except Exception as e:
print(f"Error removing file {temp_file}: {e}")
return os.path.join(video_dir, final_video_filename)
@tool
def generate_video(pairs, final_video_filename):
""" Generates video using narration and image prompt pairs.
Args:
pairs:A string of arration and image prompt pairs enclosed in and tags.
final_video_filename: the topic name which will be used as final video file name
Returns:
Generated video path"""
images_dir = image_generator(pairs)
print(images_dir)
speeches_dir, all_captions = speech_generator(pairs)
print(speeches_dir)
video_path = create_video_from_images_and_audio(images_dir, speeches_dir, final_video_filename, all_captions)
print('video', video_path)
with open(video_path, 'rb') as f:
video = f.read()
with open('video.mp4', 'wb') as f:
f.write(video)
return video_path