File size: 10,189 Bytes
668bc28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
import streamlit as st
import pandas as pd
import re
import requests
import urllib.request
from PIL import Image
from transformers import pipeline
import tempfile
import cv2
import io
import yt_dlp
import os

# Add a styled disclaimer at the top
st.markdown(
    """

    <div style="background-color: #f8d7da; color: #721c24; padding: 10px; border-radius: 5px; border: 1px solid #f5c6cb;">

        **Disclaimer:** You are recommended to give any images and videos from your local device. In case of URLs, give the url of the website's image from chrome by copying image address. And give the URL of twitter videos for video captioning by URL.

    </div>

    """,
    unsafe_allow_html=True
)

# Load the Salesforce BLIP model for image captioning
captioning_model = pipeline("image-to-text", model="Salesforce/blip-image-captioning-base")
# Load the summarization model for summarizing captions
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")

# Function to extract URLs from a text
def extract_urls(text):
    url_pattern = re.compile(r'https?://\S+')
    return url_pattern.findall(text)

# Function to fetch image from URL
def fetch_image_from_url(url):
    try:
        response = urllib.request.urlopen(url)
        image_data = response.read()
        image = Image.open(io.BytesIO(image_data))
        return image
    except Exception as e:
        return None

# Function to convert video to 30 FPS
def convert_video_to_30fps(video_path):
    cap = cv2.VideoCapture(video_path)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Output format
    fps = 30  # Desired FPS
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Temporary file to save the 30 FPS video
    converted_video_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
    out = cv2.VideoWriter(converted_video_path, fourcc, fps, (width, height))

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        out.write(frame)  # Write the frame into the new video

    cap.release()
    out.release()
    
    return converted_video_path

# Function to extract frames from a 30 FPS video at 1-second intervals
def extract_frames(video_stream):
    frames = []
    with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_video_file:
        temp_video_file.write(video_stream.read())
        temp_video_file_path = temp_video_file.name

    # Convert video to 30 FPS
    converted_video_path = convert_video_to_30fps(temp_video_file_path)
    
    cap = cv2.VideoCapture(converted_video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)  # This should now be 30 FPS
    frame_interval = int(fps)  # Frame interval for 1 second

    while True:
        success, frame = cap.read()
        if not success:
            break
        current_frame_number = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
        if current_frame_number % frame_interval == 0:  # Extract one frame per second
            frames.append(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)))

    cap.release()
    return frames

# Function to generate captions for a list of frames
def generate_captions(frames):
    captions = []
    for frame in frames:
        caption = captioning_model(frame)
        if caption and 'generated_text' in caption[0]:
            captions.append(caption[0]['generated_text'])

    return captions

# Function to generate caption for a single image
def generate_caption_for_image(image):
    caption = captioning_model(image)
    if caption and 'generated_text' in caption[0]:
        return caption[0]['generated_text']
    return "No caption generated."

# Function to summarize the captions
def summarize_captions(captions):
    combined_captions = " ".join(captions)
    summary = summarizer(combined_captions, max_length=150, min_length=30, do_sample=False)
    return summary[0]['summary_text']

# Function to download Twitter video using yt-dlp
def download_twitter_video(url):
    url = url.replace("x.com", "twitter.com")  # Convert the URL if needed
    ydl_opts = {
        'format': 'best',
        'outtmpl': 'downloaded_video.%(ext)s',
        'quiet': True,
        'noplaylist': True,
    }

    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info_dict = ydl.extract_info(url, download=False)
            video_url = info_dict.get("url", None)
            response = requests.get(video_url)
            if response.status_code == 200:
                return io.BytesIO(response.content)
            else:
                return None
    except Exception as e:
        st.error(f"An error occurred: {e}")
        return None

# Function to process URLs in a DataFrame
def process_urls_in_dataframe(df):
    results = []
    for index, row in df.iterrows():
        for cell in row:
            if pd.notna(cell):
                urls = extract_urls(str(cell))
                for url in urls:
                    if url.startswith("https://x.com"):
                        st.write(f"Processing video URL: {url}")
                        video_stream = download_twitter_video(url)
                        if video_stream:
                            frames = extract_frames(video_stream)
                            if frames:
                                captions = generate_captions(frames)
                                summary = summarize_captions(captions)
                                results.append({"URL": url, "Caption": summary})
                                save_results_to_csv(results)
                            else:
                                st.error(f"Failed to extract frames from video: {url}")
                        else:
                            st.error(f"Failed to fetch video: {url}")
                    else:
                        st.write(f"Processing image URL: {url}")
                        image = fetch_image_from_url(url)
                        if image:
                            caption = generate_caption_for_image(image)
                            results.append({"URL": url, "Caption": caption})
                            save_results_to_csv(results)
    return results

# Function to save results to a CSV file
def save_results_to_csv(results):
    file_path = "captions_results.csv"
    df = pd.DataFrame(results)
    if not os.path.isfile(file_path):
        df.to_csv(file_path, index=False, mode='w', header=True)
    else:
        df.to_csv(file_path, index=False, mode='a', header=False)

# Streamlit app
st.title("Captioning Application")

# Section to process uploaded CSV or Excel files
st.subheader("Process URLs from File")
uploaded_file = st.file_uploader("Upload a CSV or Excel file", type=["csv", "xlsx"])

if uploaded_file is not None:
    st.write("Processing file...")
    if uploaded_file.name.endswith("csv"):
        df = pd.read_csv(uploaded_file)
    else:
        df = pd.read_excel(uploaded_file)
    
    results = process_urls_in_dataframe(df)
    
    if results:
        st.write(f"Processed {len(results)} URLs from the file.")
        st.write("Results saved to captions_results.csv")
    else:
        st.write("No URLs found or processed.")

# Section to process URLs for images and videos
st.subheader("Process URLs Directly")

# Upload image URL
image_url = st.text_input("Enter Image URL:")
if image_url:
    st.write(f"Processing Image URL: {image_url}")
    image = fetch_image_from_url(image_url)
    if image:
        caption = generate_caption_for_image(image)
        st.image(image, caption="Uploaded Image", use_column_width=True)
        st.write(f"Caption: {caption}")
        # Collect results in a list of dictionaries
        results = [{"URL": image_url, "Caption": caption}]
        
        # Save the results to the CSV file
        save_results_to_csv(results)
        st.success("Results saved to captions_results.csv")
        


# Upload video URL
video_url = st.text_input("Enter Video URL:")
if video_url:
    st.write(f"Processing Video URL: {video_url}")
    if video_url.startswith("https://x.com"):
        video_stream = download_twitter_video(video_url)
        if video_stream:
            frames = extract_frames(video_stream)
            if frames:
                captions = generate_captions(frames)
                summary = summarize_captions(captions)
                st.write(f"Caption: {summary}")
                # Collect results in a list of dictionaries
                results = [{"URL": video_url, "Caption": summary}]
                
                # Save the results to the CSV file
                save_results_to_csv(results)
                st.success("Results saved to captions_results.csv")

            else:
                st.error("Failed to extract frames from video.")
        else:
            st.error("Failed to fetch video.")
    else:
        st.error("Only Twitter video URLs are supported.")

# Section to process local files
st.subheader("Process Local Files")

uploaded_local_file = st.file_uploader("Upload a local image or video file", type=["jpg", "jpeg", "png", "mp4"])

if uploaded_local_file is not None:
    if uploaded_local_file.type.startswith("image"):
        image = Image.open(uploaded_local_file)
        caption = generate_caption_for_image(image)
        st.image(image, caption="Uploaded Image", use_column_width=True)
        st.write(f"Caption: {caption}")
    elif uploaded_local_file.type.startswith("video"):
        video_stream = io.BytesIO(uploaded_local_file.read())
        frames = extract_frames(video_stream)
        if frames:
            captions = generate_captions(frames)
            summary = summarize_captions(captions)
            st.video(uploaded_local_file)
            st.write(f"Summary of Captions: {summary}")
        else:
            st.error("Failed to extract frames from video.")

st.write("Upload a file or enter a URL to start processing.")