|
import streamlit as st
|
|
import pandas as pd
|
|
import re
|
|
import requests
|
|
import urllib.request
|
|
from PIL import Image
|
|
from transformers import pipeline
|
|
import tempfile
|
|
import cv2
|
|
import io
|
|
import yt_dlp
|
|
import os
|
|
|
|
|
|
st.markdown(
|
|
"""
|
|
<div style="background-color: #f8d7da; color: #721c24; padding: 10px; border-radius: 5px; border: 1px solid #f5c6cb;">
|
|
**Disclaimer:** You are recommended to give any images and videos from your local device. In case of URLs, give the url of the website's image from chrome by copying image address. And give the URL of twitter videos for video captioning by URL.
|
|
</div>
|
|
""",
|
|
unsafe_allow_html=True
|
|
)
|
|
|
|
|
|
captioning_model = pipeline("image-to-text", model="Salesforce/blip-image-captioning-base")
|
|
|
|
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
|
|
|
|
|
|
def extract_urls(text):
|
|
url_pattern = re.compile(r'https?://\S+')
|
|
return url_pattern.findall(text)
|
|
|
|
|
|
def fetch_image_from_url(url):
|
|
try:
|
|
response = urllib.request.urlopen(url)
|
|
image_data = response.read()
|
|
image = Image.open(io.BytesIO(image_data))
|
|
return image
|
|
except Exception as e:
|
|
return None
|
|
|
|
|
|
def convert_video_to_30fps(video_path):
|
|
cap = cv2.VideoCapture(video_path)
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
|
fps = 30
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
|
|
converted_video_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
|
|
out = cv2.VideoWriter(converted_video_path, fourcc, fps, (width, height))
|
|
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
out.write(frame)
|
|
|
|
cap.release()
|
|
out.release()
|
|
|
|
return converted_video_path
|
|
|
|
|
|
def extract_frames(video_stream):
|
|
frames = []
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video_file:
|
|
temp_video_file.write(video_stream.read())
|
|
temp_video_file_path = temp_video_file.name
|
|
|
|
|
|
converted_video_path = convert_video_to_30fps(temp_video_file_path)
|
|
|
|
cap = cv2.VideoCapture(converted_video_path)
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
frame_interval = int(fps)
|
|
|
|
while True:
|
|
success, frame = cap.read()
|
|
if not success:
|
|
break
|
|
current_frame_number = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
|
|
if current_frame_number % frame_interval == 0:
|
|
frames.append(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))
|
|
|
|
cap.release()
|
|
return frames
|
|
|
|
|
|
def generate_captions(frames):
|
|
captions = []
|
|
for frame in frames:
|
|
caption = captioning_model(frame)
|
|
if caption and 'generated_text' in caption[0]:
|
|
captions.append(caption[0]['generated_text'])
|
|
|
|
return captions
|
|
|
|
|
|
def generate_caption_for_image(image):
|
|
caption = captioning_model(image)
|
|
if caption and 'generated_text' in caption[0]:
|
|
return caption[0]['generated_text']
|
|
return "No caption generated."
|
|
|
|
|
|
def summarize_captions(captions):
|
|
combined_captions = " ".join(captions)
|
|
summary = summarizer(combined_captions, max_length=150, min_length=30, do_sample=False)
|
|
return summary[0]['summary_text']
|
|
|
|
|
|
def download_twitter_video(url):
|
|
url = url.replace("x.com", "twitter.com")
|
|
ydl_opts = {
|
|
'format': 'best',
|
|
'outtmpl': 'downloaded_video.%(ext)s',
|
|
'quiet': True,
|
|
'noplaylist': True,
|
|
}
|
|
|
|
try:
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info_dict = ydl.extract_info(url, download=False)
|
|
video_url = info_dict.get("url", None)
|
|
response = requests.get(video_url)
|
|
if response.status_code == 200:
|
|
return io.BytesIO(response.content)
|
|
else:
|
|
return None
|
|
except Exception as e:
|
|
st.error(f"An error occurred: {e}")
|
|
return None
|
|
|
|
|
|
def process_urls_in_dataframe(df):
|
|
results = []
|
|
for index, row in df.iterrows():
|
|
for cell in row:
|
|
if pd.notna(cell):
|
|
urls = extract_urls(str(cell))
|
|
for url in urls:
|
|
if url.startswith("https://x.com"):
|
|
st.write(f"Processing video URL: {url}")
|
|
video_stream = download_twitter_video(url)
|
|
if video_stream:
|
|
frames = extract_frames(video_stream)
|
|
if frames:
|
|
captions = generate_captions(frames)
|
|
summary = summarize_captions(captions)
|
|
results.append({"URL": url, "Caption": summary})
|
|
save_results_to_csv(results)
|
|
else:
|
|
st.error(f"Failed to extract frames from video: {url}")
|
|
else:
|
|
st.error(f"Failed to fetch video: {url}")
|
|
else:
|
|
st.write(f"Processing image URL: {url}")
|
|
image = fetch_image_from_url(url)
|
|
if image:
|
|
caption = generate_caption_for_image(image)
|
|
results.append({"URL": url, "Caption": caption})
|
|
save_results_to_csv(results)
|
|
return results
|
|
|
|
|
|
def save_results_to_csv(results):
|
|
file_path = "captions_results.csv"
|
|
df = pd.DataFrame(results)
|
|
if not os.path.isfile(file_path):
|
|
df.to_csv(file_path, index=False, mode='w', header=True)
|
|
else:
|
|
df.to_csv(file_path, index=False, mode='a', header=False)
|
|
|
|
|
|
st.title("Captioning Application")
|
|
|
|
|
|
st.subheader("Process URLs from File")
|
|
uploaded_file = st.file_uploader("Upload a CSV or Excel file", type=["csv", "xlsx"])
|
|
|
|
if uploaded_file is not None:
|
|
st.write("Processing file...")
|
|
if uploaded_file.name.endswith("csv"):
|
|
df = pd.read_csv(uploaded_file)
|
|
else:
|
|
df = pd.read_excel(uploaded_file)
|
|
|
|
results = process_urls_in_dataframe(df)
|
|
|
|
if results:
|
|
st.write(f"Processed {len(results)} URLs from the file.")
|
|
st.write("Results saved to captions_results.csv")
|
|
else:
|
|
st.write("No URLs found or processed.")
|
|
|
|
|
|
st.subheader("Process URLs Directly")
|
|
|
|
|
|
image_url = st.text_input("Enter Image URL:")
|
|
if image_url:
|
|
st.write(f"Processing Image URL: {image_url}")
|
|
image = fetch_image_from_url(image_url)
|
|
if image:
|
|
caption = generate_caption_for_image(image)
|
|
st.image(image, caption="Uploaded Image", use_column_width=True)
|
|
st.write(f"Caption: {caption}")
|
|
|
|
results = [{"URL": image_url, "Caption": caption}]
|
|
|
|
|
|
save_results_to_csv(results)
|
|
st.success("Results saved to captions_results.csv")
|
|
|
|
|
|
|
|
|
|
video_url = st.text_input("Enter Video URL:")
|
|
if video_url:
|
|
st.write(f"Processing Video URL: {video_url}")
|
|
if video_url.startswith("https://x.com"):
|
|
video_stream = download_twitter_video(video_url)
|
|
if video_stream:
|
|
frames = extract_frames(video_stream)
|
|
if frames:
|
|
captions = generate_captions(frames)
|
|
summary = summarize_captions(captions)
|
|
st.write(f"Caption: {summary}")
|
|
|
|
results = [{"URL": video_url, "Caption": summary}]
|
|
|
|
|
|
save_results_to_csv(results)
|
|
st.success("Results saved to captions_results.csv")
|
|
|
|
else:
|
|
st.error("Failed to extract frames from video.")
|
|
else:
|
|
st.error("Failed to fetch video.")
|
|
else:
|
|
st.error("Only Twitter video URLs are supported.")
|
|
|
|
|
|
st.subheader("Process Local Files")
|
|
|
|
uploaded_local_file = st.file_uploader("Upload a local image or video file", type=["jpg", "jpeg", "png", "mp4"])
|
|
|
|
if uploaded_local_file is not None:
|
|
if uploaded_local_file.type.startswith("image"):
|
|
image = Image.open(uploaded_local_file)
|
|
caption = generate_caption_for_image(image)
|
|
st.image(image, caption="Uploaded Image", use_column_width=True)
|
|
st.write(f"Caption: {caption}")
|
|
elif uploaded_local_file.type.startswith("video"):
|
|
video_stream = io.BytesIO(uploaded_local_file.read())
|
|
frames = extract_frames(video_stream)
|
|
if frames:
|
|
captions = generate_captions(frames)
|
|
summary = summarize_captions(captions)
|
|
st.video(uploaded_local_file)
|
|
st.write(f"Summary of Captions: {summary}")
|
|
else:
|
|
st.error("Failed to extract frames from video.")
|
|
|
|
st.write("Upload a file or enter a URL to start processing.")
|
|
|