tube / app-backup.py
ginipick's picture
Rename app (2).py to app-backup.py
6170788 verified
raw
history blame
No virus
8.21 kB
import gradio as gr
import requests
import re
import os
import json
import time
import threading
from googleapiclient.discovery import build
from huggingface_hub import InferenceClient
from pytube import YouTube
import whisper
import logging
# 둜그 μ„€μ •
logging.basicConfig(level=logging.INFO)
# Whisper λͺ¨λΈ λ‘œλ“œ
model = whisper.load_model("base")
# YouTube API ν‚€
API_KEY = 'AIzaSyDUz3wkGal0ewRtPlzeMit88bV4hS4ZIVY'
# YouTube API μ„œλΉ„μŠ€ λΉŒλ“œ
youtube = build('youtube', 'v3', developerKey=API_KEY)
# Hugging Face API μ„€μ •
client = InferenceClient(model="meta-llama/Meta-Llama-3-70B-Instruct", token=os.getenv("HF_TOKEN"))
WEBHOOK_URL = "https://connect.pabbly.com/workflow/sendwebhookdata/IjU3NjUwNTZhMDYzMDA0MzA1MjZhNTUzMzUxM2Ii_pc"
COMMENTS_FILE = 'comments.json'
DEFAULT_SYSTEM_PROMPT = "λŒ€ν™”μ‹œ λ°˜λ“œμ‹œ λ‚˜μ˜ 이름 'GPTube'λ₯Ό 밝히며 ν•œκΈ€λ‘œ 인사λ₯Όν•˜λΌ. λ°˜λ“œμ‹œ 'ν•œκΈ€'(ν•œκ΅­μ–΄)둜 250 토큰 μ΄λ‚΄λ‘œ 닡변을 μƒμ„±ν•˜κ³  좜λ ₯ν•˜λΌ. Respond to the following YouTube comment in a friendly and helpful manner:"
stop_event = threading.Event() # μŠ€λ ˆλ“œ 쀑지λ₯Ό μœ„ν•œ 이벀트
def load_existing_comments():
if os.path.exists(COMMENTS_FILE):
with open(COMMENTS_FILE, 'r') as file:
return json.load(file)
return []
def save_comments(comments):
with open(COMMENTS_FILE, 'w') as file:
json.dump(comments, file)
def download_audio(video_url):
yt = YouTube(video_url)
audio = yt.streams.filter(only_audio=True).first()
audio_path = audio.download(output_path=".")
file_stats = os.stat(audio_path)
logging.info(f'Size of audio file in Bytes: {file_stats.st_size}')
if file_stats.st_size <= 30000000: # Check the file size limit
base, ext = os.path.splitext(audio_path)
new_file = base + '.mp3'
os.rename(audio_path, new_file)
return new_file
else:
logging.error('Videos for transcription on this space are limited to about 1.5 hours. Please contact support for more information.')
return None
def generate_transcript(audio_path):
try:
if not audio_path or not os.path.exists(audio_path):
raise ValueError("μœ νš¨ν•œ μ˜€λ””μ˜€ 파일 κ²½λ‘œκ°€ μ•„λ‹™λ‹ˆλ‹€.")
result = model.transcribe(audio_path)
return result['text'].strip()
except Exception as e:
logging.error(f"Exception during transcription: {str(e)}")
return f"전사 쀑 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€: {str(e)}"
def generate_reply(comment_text, system_prompt):
prompt = f"{system_prompt}\n\nComment: {comment_text}\n\nReply:"
response = client.text_generation(
prompt=prompt,
max_new_tokens=250,
temperature=0.7,
top_p=0.9
)
if isinstance(response, dict) and 'generated_text' in response:
return response['generated_text']
return response
def send_webhook(data):
response = requests.post(WEBHOOK_URL, json=data)
return response.status_code, response.text
def get_video_comments(video_id):
try:
comments = []
request = youtube.commentThreads().list(
part='snippet',
videoId=video_id,
maxResults=100, #λŒ“κΈ€ μ½μ–΄λ“€μ΄λŠ” 수 μ •μ˜
textFormat='plainText'
)
response = request.execute()
while request is not None:
for item in response['items']:
snippet = item['snippet']['topLevelComment']['snippet']
comment = {
'comment_id': item['snippet']['topLevelComment']['id'],
'author': snippet['authorDisplayName'],
'published_at': snippet['publishedAt'],
'text': snippet['textDisplay'],
'reply_count': item['snippet']['totalReplyCount']
}
comments.append(comment)
if 'nextPageToken' in response:
request = youtube.commentThreads().list(
part='snippet',
videoId=video_id,
pageToken=response['nextPageToken'],
maxResults=100, #λŒ“κΈ€ μ½μ–΄λ“€μ΄λŠ” 수 μ •μ˜
textFormat='plainText'
)
response = request.execute()
else:
break
return comments
except Exception as e:
return [{'error': str(e)}]
def fetch_comments(video_url, system_prompt):
log_entries = []
video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', video_url)
if video_id_match:
video_id = video_id_match.group(1)
audio_path = download_audio(video_url)
if not audio_path:
return "μ˜€λ””μ˜€λ₯Ό λ‹€μš΄λ‘œλ“œν•  수 μ—†μŠ΅λ‹ˆλ‹€."
transcript = generate_transcript(audio_path)
existing_comments = load_existing_comments()
new_comments = get_video_comments(video_id)
if not new_comments or 'error' in new_comments[0]:
return "λŒ“κΈ€μ„ 찾을 수 μ—†κ±°λ‚˜ 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€."
recent_new_comments = [c for c in new_comments if c['comment_id'] not in {c['comment_id'] for c in existing_comments} and c['reply_count'] == 0]
if recent_new_comments:
for most_recent_comment in recent_new_comments:
combined_prompt = f"{transcript}\n\n{system_prompt}"
reply_text = generate_reply(most_recent_comment['text'], combined_prompt)
webhook_data = {
"comment_id": most_recent_comment['comment_id'],
"author": most_recent_comment['author'],
"published_at": most_recent_comment['published_at'],
"text": most_recent_comment['text'],
"reply_text": reply_text
}
webhook_status, webhook_response = send_webhook(webhook_data)
log_entries.append(f"졜근 λŒ“κΈ€: {most_recent_comment['text']}\n\nλ‹΅λ³€ 생성: {reply_text}\n\nμ›Ήν›… 응닡: {webhook_status} - {webhook_response}")
existing_comments.append(most_recent_comment)
save_comments(existing_comments)
else:
log_entries.append("μƒˆλ‘œμš΄ λŒ“κΈ€μ΄ μ—†μŠ΅λ‹ˆλ‹€.")
else:
log_entries.append("μœ νš¨ν•˜μ§€ μ•Šμ€ YouTube URLμž…λ‹ˆλ‹€.")
return "\n\n".join(log_entries)
def background_fetch_comments():
while not stop_event.is_set():
result = fetch_comments("https://www.youtube.com/watch?v=dQw4w9WgXcQ", DEFAULT_SYSTEM_PROMPT) # URLκ³Ό ν”„λ‘¬ν”„νŠΈ μ‹€μ œ μ‚¬μš© μ˜ˆμ‹œ
print(result)
time.sleep(10)
def start_background_fetch():
threading.Thread(target=background_fetch_comments).start()
def stop_background_fetch():
stop_event.set()
def get_text(video_url):
audio_path = download_audio(video_url)
if not audio_path:
return "μ˜€λ””μ˜€λ₯Ό λ‹€μš΄λ‘œλ“œν•  수 μ—†μŠ΅λ‹ˆλ‹€."
transcript = generate_transcript(audio_path)
return transcript
# Gradio μΈν„°νŽ˜μ΄μŠ€ μ •μ˜
demo = gr.Blocks()
with demo:
gr.Markdown("<h1><center>GPTube</center></h1>")
with gr.Row():
input_text_url = gr.Textbox(placeholder='YouTube video URL', label='YouTube URL')
input_text_prompt = gr.Textbox(placeholder='μ‹œμŠ€ν…œ ν”„λ‘¬ν”„νŠΈ', label='μ‹œμŠ€ν…œ ν”„λ‘¬ν”„νŠΈ', value=DEFAULT_SYSTEM_PROMPT, lines=5)
with gr.Row():
result_button_transcribe = gr.Button('Transcribe')
result_button_comments = gr.Button('Fetch Comments and Generate Reply')
with gr.Row():
output_text_transcribe = gr.Textbox(placeholder='Transcript of the YouTube video.', label='Transcript', lines=20)
output_text_prompt = gr.Textbox(placeholder='응닡 ν…μŠ€νŠΈ', label='응닡 ν…μŠ€νŠΈ', lines=20)
result_button_transcribe.click(get_text, inputs=input_text_url, outputs=output_text_transcribe, api_name="transcribe_api")
result_button_comments.click(fetch_comments, inputs=[input_text_url, input_text_prompt], outputs=output_text_prompt, api_name="fetch_comments_api")
# μΈν„°νŽ˜μ΄μŠ€ μ‹€ν–‰
demo.launch()