|
import os
|
|
import json
|
|
import datetime
|
|
import subprocess
|
|
from queue import Queue
|
|
from threading import Thread
|
|
|
|
import torch
|
|
import yt_dlp
|
|
from faster_whisper import WhisperModel
|
|
from flask import Flask, render_template, request, Response, jsonify
|
|
from openai import OpenAI
|
|
import spacy
|
|
from collections import Counter
|
|
|
|
import time
|
|
import uuid
|
|
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
from werkzeug.utils import secure_filename
|
|
from collections import deque
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
log_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'app.log')
|
|
file_handler = RotatingFileHandler(log_file_path, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8')
|
|
file_handler.setLevel(logging.DEBUG)
|
|
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
|
|
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
file_handler.setFormatter(formatter)
|
|
console_handler.setFormatter(formatter)
|
|
|
|
|
|
logger.addHandler(file_handler)
|
|
logger.addHandler(console_handler)
|
|
|
|
|
|
logging.getLogger("faster_whisper").setLevel(logging.INFO)
|
|
|
|
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
|
|
|
|
app = Flask(__name__, static_folder='static', static_url_path='/static')
|
|
|
|
|
|
current_directory = os.path.dirname(os.path.realpath(__file__))
|
|
config_file_path = os.path.join(current_directory, 'config.json')
|
|
try:
|
|
with open(config_file_path, 'r', encoding='utf-8') as f:
|
|
config = json.load(f)
|
|
logger.info("成功加載配置文件")
|
|
except Exception as e:
|
|
logger.exception("加載配置文件時發生錯誤")
|
|
raise
|
|
|
|
|
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
|
|
|
|
|
|
nlp = spacy.load(config['spacy_model'])
|
|
|
|
|
|
model = WhisperModel(config['whisper_model'], device="auto", compute_type=config['whisper_compute_type'])
|
|
|
|
|
|
ffmpeg_path = config['ffmpeg_path']
|
|
if ffmpeg_path not in os.environ["PATH"]:
|
|
os.environ["PATH"] += os.pathsep + ffmpeg_path
|
|
|
|
def send_sse_message(q, data):
|
|
q.put_nowait(data)
|
|
|
|
def clean_filename(filename):
|
|
return ''.join(c for c in filename if c.isalnum() or c in (' ', '.', '_')).rstrip()
|
|
|
|
def download_audio(youtube_url, save_directory, q):
|
|
send_sse_message(q, {"status": "開始下載 YouTube 音頻..."})
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
output_filename = f"audio_{unique_id}"
|
|
output_path = os.path.join(save_directory, output_filename)
|
|
|
|
ydl_opts = {
|
|
'format': 'bestaudio/best',
|
|
'outtmpl': output_path + ".%(ext)s",
|
|
'postprocessors': [{
|
|
'key': 'FFmpegExtractAudio',
|
|
'preferredcodec': 'mp3',
|
|
'preferredquality': '192',
|
|
}],
|
|
'ffmpeg_location': ffmpeg_path,
|
|
'quiet': True
|
|
}
|
|
|
|
try:
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(youtube_url, download=True)
|
|
video_title = clean_filename(info.get('title', 'Untitled'))
|
|
|
|
|
|
time.sleep(2)
|
|
|
|
|
|
for file in os.listdir(save_directory):
|
|
if file.startswith(output_filename) and file.endswith('.mp3'):
|
|
converted_output_path = os.path.join(save_directory, file)
|
|
break
|
|
else:
|
|
raise FileNotFoundError("無法找到下載的音頻文件")
|
|
|
|
send_sse_message(q, {"status": f"音頻下載完成: {video_title}"})
|
|
return converted_output_path, video_title
|
|
except Exception as e:
|
|
send_sse_message(q, {"status": f"下載音頻時發生錯誤: {str(e)}"})
|
|
raise
|
|
|
|
def process_local_video(video_path, save_directory, q):
|
|
send_sse_message(q, {"status": "正在處理本地視頻..."})
|
|
video_title = os.path.splitext(os.path.basename(video_path))[0]
|
|
output_path = os.path.join(save_directory, f"{video_title}_audio.mp3")
|
|
|
|
ffmpeg_command = [
|
|
os.path.join(ffmpeg_path, 'ffmpeg'),
|
|
'-i', video_path,
|
|
'-vn',
|
|
'-acodec', 'libmp3lame',
|
|
'-q:a', '2',
|
|
output_path
|
|
]
|
|
|
|
logger.info(f"FFmpeg 命令: {' '.join(ffmpeg_command)}")
|
|
logger.info(f"輸入視頻路徑: {video_path}")
|
|
logger.info(f"輸出音頻路徑: {output_path}")
|
|
|
|
try:
|
|
|
|
if not os.path.exists(video_path):
|
|
raise FileNotFoundError(f"輸入視頻文件不存在: {video_path}")
|
|
|
|
|
|
if not os.access(os.path.dirname(output_path), os.W_OK):
|
|
raise PermissionError(f"沒有寫入權限: {os.path.dirname(output_path)}")
|
|
|
|
result = subprocess.run(ffmpeg_command, check=True, capture_output=True, text=True)
|
|
logger.info(f"FFmpeg 輸出: {result.stdout}")
|
|
send_sse_message(q, {"status": f"本地視頻處理完成: {video_title}"})
|
|
return output_path, video_title
|
|
except subprocess.CalledProcessError as e:
|
|
error_message = f"處理本地視頻時出錯: {e}\n\nFFmpeg 輸出:\n{e.stdout}\n\nFFmpeg 錯誤:\n{e.stderr}"
|
|
logger.error(error_message)
|
|
send_sse_message(q, {"status": "錯誤", "error": error_message})
|
|
raise
|
|
except Exception as e:
|
|
error_message = f"處理本地視頻時出現意外錯誤: {str(e)}"
|
|
logger.error(error_message)
|
|
send_sse_message(q, {"status": "錯誤", "error": error_message})
|
|
raise
|
|
|
|
def generate_transcript(audio_path, video_title, q):
|
|
send_sse_message(q, {"status": "開始音頻轉錄..."})
|
|
segments, info = model.transcribe(
|
|
audio_path,
|
|
beam_size=config['whisper_beam_size'],
|
|
language=config['whisper_language'],
|
|
temperature=config['whisper_temperature'],
|
|
initial_prompt=video_title,
|
|
repetition_penalty=2,
|
|
condition_on_previous_text=False
|
|
)
|
|
transcript = "\n".join([segment.text for segment in segments])
|
|
send_sse_message(q, {"status": f"音頻轉錄完成,檢測到的語言: {info.language}", "transcript": transcript})
|
|
return transcript
|
|
|
|
def smart_split_transcript(transcript, q):
|
|
send_sse_message(q, {"status": "開始智能分割轉錄文本..."})
|
|
doc = nlp(transcript)
|
|
segments = []
|
|
current_segment = ""
|
|
max_length = 1024
|
|
|
|
for sent in doc.sents:
|
|
if len(current_segment) + len(sent.text) <= max_length:
|
|
current_segment += " " + sent.text
|
|
else:
|
|
if current_segment:
|
|
segments.append(current_segment.strip())
|
|
current_segment = sent.text
|
|
|
|
if current_segment:
|
|
segments.append(current_segment.strip())
|
|
|
|
send_sse_message(q, {"status": f"轉錄文本分割完成,共 {len(segments)} 個段落"})
|
|
return segments
|
|
|
|
def extract_keywords_and_entities(text):
|
|
doc = nlp(text)
|
|
keywords = [token.lemma_ for token in doc if not token.is_stop and not token.is_punct]
|
|
keyword_freq = Counter(keywords).most_common(5)
|
|
entities = [(ent.text, ent.label_) for ent in doc.ents]
|
|
return [keyword for keyword, _ in keyword_freq], entities
|
|
|
|
def process_youtube_description(description):
|
|
prompt = f"""請處理以下 YouTube 影片描述,移除所有渠道宣傳內容後,保留原文。
|
|
|
|
描述內容:
|
|
{description}"""
|
|
|
|
response = client.chat.completions.create(
|
|
model=config['openai_model'],
|
|
messages=[{"role": "system", "content": prompt}],
|
|
temperature=0.1,
|
|
max_tokens=500
|
|
)
|
|
|
|
processed_description = response.choices[0].message.content.strip()
|
|
|
|
|
|
print("處理後的 YouTube 描述:")
|
|
print(processed_description)
|
|
print("------------------------")
|
|
|
|
return processed_description
|
|
|
|
def get_openai_summary(segment, video_title, is_final_summary, keywords, entities, processed_description, q):
|
|
if is_final_summary:
|
|
prompt = f"""以下是YouTube視頻'{video_title}'的多個段落摘要。請生成一個深入且全面的最終摘要,盡力保留主要內容、資訊細節、關鍵點和結論。摘要應該是連貫的、有條理的、詳細的,並且避免重複信息。在內容結尾,加入能夠方便搜尋器和 SEO 找到的 3 個 Hash Tag。請用繁體中文(香港)回應。
|
|
|
|
影片描述提供的可靠資訊 (請特別使用來補充和糾正摘要中的信息,尤其是戈人名或專有名詞):
|
|
{processed_description}
|
|
|
|
|
|
|
|
以下是待處理的摘要內容:
|
|
{segment}"""
|
|
else:
|
|
keywords_str = ", ".join(keywords)
|
|
entities_str = ", ".join([f"{text}({label})" for text, label in entities])
|
|
prompt = f"""以下內容是YouTube視頻的部份字幕文本,每行以短句顯示,閱讀時需要將多行組合一起才是一句完整的句子,偶爾會出現音譯的錯別字,請修正。內容主題是關於:'{video_title}',其中包含的關鍵詞有:{keywords_str},和以下的NER實體:{entities_str}。
|
|
|
|
影片描述提供的可靠資訊 (請特別使用來補充和糾正摘要中的信息,尤其是戈人名或專有名詞):
|
|
{processed_description}
|
|
|
|
請根據每個NER實體的意思,以及上述描述資訊,以不少於 200 字的繁體中文(香港) 重組文章段落。目標是盡量抽取與主題有關的所有觀點、事件、案例、學問、步驟、方法、時間、人物、數據、名詞的基礎資料,建構成一篇連貫的、全面的、詳細的紀錄。請特別注意使用描述資訊來糾正可能的錯誤,尤其是人名和地名。忽略重複的、單純抒發個人情緒的訊息、與 Youtuber 個人宣傳的訊息。
|
|
|
|
你要處理的內容如下:
|
|
{segment}"""
|
|
|
|
response = client.chat.completions.create(
|
|
model=config['openai_model'],
|
|
messages=[{"role": "system", "content": prompt}],
|
|
temperature=0.6,
|
|
max_tokens=1000
|
|
)
|
|
|
|
summary = response.choices[0].message.content.strip()
|
|
return summary
|
|
|
|
def save_summary(text, video_title, url_or_path, save_directory):
|
|
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
|
cleaned_title = clean_filename(video_title)[:20]
|
|
summary_file_name = f"GPT_Summary_{cleaned_title}_{current_time}.txt"
|
|
summary_file_path = os.path.join(save_directory, summary_file_name)
|
|
|
|
|
|
lines = text.split('\n')
|
|
if lines[0].startswith("影片名稱:") and lines[1].startswith("網址或路徑:"):
|
|
text = '\n'.join(lines[2:])
|
|
|
|
summary_text = f"影片名稱:\"{video_title}\"\n網址或路徑:\"{url_or_path}\"\n\n{text}"
|
|
|
|
with open(summary_file_path, "w", encoding="utf-8") as file:
|
|
file.write(summary_text)
|
|
|
|
def save_transcript(transcript, video_title, url_or_path, save_directory):
|
|
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
|
cleaned_title = clean_filename(video_title)[:20]
|
|
transcript_file_name = f"Transcript_{cleaned_title}_{current_time}.txt"
|
|
transcript_file_path = os.path.join(save_directory, transcript_file_name)
|
|
|
|
with open(transcript_file_path, "w", encoding="utf-8") as file:
|
|
file.write(f"影片名稱:\"{video_title}\"\n網址或路徑:\"{url_or_path}\"\n\n{transcript}")
|
|
|
|
logger.info(f"轉錄文本已保存至 {transcript_file_path}")
|
|
|
|
def save_segment_summary(summary_text, segment_index, video_title, save_directory):
|
|
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
|
cleaned_title = clean_filename(video_title)[:20]
|
|
summary_file_name = f"Segment_Summary_{cleaned_title}_{segment_index}_{current_time}.txt"
|
|
summary_file_path = os.path.join(save_directory, summary_file_name)
|
|
|
|
with open(summary_file_path, "w", encoding="utf-8") as file:
|
|
file.write(summary_text)
|
|
|
|
logger.info(f"段落摘要已保存至 {summary_file_path}")
|
|
|
|
def process_video(url_or_path, q, local_video_description=''):
|
|
try:
|
|
logger.info(f"開始處理視頻: {url_or_path}")
|
|
save_directory = config['save_directory']
|
|
|
|
processed_description = ""
|
|
if url_or_path.startswith('http'):
|
|
|
|
logger.info("檢測到 YouTube URL,開始獲取視頻信息")
|
|
ydl_opts = {'quiet': True}
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
video_info = ydl.extract_info(url_or_path, download=False)
|
|
|
|
video_data = {
|
|
'title': video_info['title'],
|
|
'duration': str(datetime.timedelta(seconds=video_info['duration'])),
|
|
'view_count': video_info['view_count'],
|
|
'like_count': video_info.get('like_count', 'N/A'),
|
|
'description': video_info['description']
|
|
}
|
|
send_sse_message(q, {"status": "獲取到視頻信息", "video_info": video_data})
|
|
|
|
|
|
raw_description = video_info['description']
|
|
processed_description = process_youtube_description(raw_description)
|
|
|
|
logger.info("開始下載 YouTube 音頻")
|
|
audio_path, video_title = download_audio(url_or_path, save_directory, q)
|
|
else:
|
|
logger.info("檢測到本地文件路徑,開始處理本地視頻")
|
|
audio_path, video_title = process_local_video(url_or_path, save_directory, q)
|
|
processed_description = local_video_description if local_video_description else "這是一個本地視頻文件,用戶沒有提供視頻描述。"
|
|
|
|
if not audio_path or not os.path.exists(audio_path):
|
|
raise FileNotFoundError(f"音頻文件不存在: {audio_path}")
|
|
|
|
logger.info("開始生成轉錄文本")
|
|
transcript = generate_transcript(audio_path, video_title, q)
|
|
|
|
|
|
save_transcript(transcript, video_title, url_or_path, save_directory)
|
|
|
|
logger.info("開始分割轉錄文本")
|
|
segments = smart_split_transcript(transcript, q)
|
|
|
|
all_summaries = []
|
|
for i, segment in enumerate(segments, start=1):
|
|
logger.info(f"開始為文本段 {i}/{len(segments)} 生成摘要")
|
|
send_sse_message(q, {"status": f"正在為文本段 {i}/{len(segments)} 生成摘要..."})
|
|
keywords, entities = extract_keywords_and_entities(segment)
|
|
segment_summary = get_openai_summary(segment, video_title, False, keywords, entities, processed_description, q)
|
|
if segment_summary:
|
|
all_summaries.append(segment_summary)
|
|
save_segment_summary(segment_summary, i, video_title, save_directory)
|
|
send_sse_message(q, {"status": f"段落 {i} 摘要完成", "summary": segment_summary})
|
|
|
|
logger.info("開始生成最終摘要")
|
|
send_sse_message(q, {"status": "正在生成最終摘要..."})
|
|
all_summaries_text = "\n\n".join(all_summaries)
|
|
final_summary = get_openai_summary(all_summaries_text, video_title, True, [], [], processed_description, q)
|
|
|
|
summary_versions.append(final_summary)
|
|
|
|
send_sse_message(q, {
|
|
"status": "處理完成",
|
|
"final_summary": final_summary,
|
|
"version": 0,
|
|
"total_versions": len(summary_versions)
|
|
})
|
|
|
|
|
|
final_summary_with_info = f'影片名稱:"{video_title}"\n網址或路徑:"{url_or_path}"\n\n{final_summary}'
|
|
|
|
send_sse_message(q, {"status": "處理完成", "final_summary": final_summary_with_info})
|
|
|
|
|
|
logger.info("保存最終摘要")
|
|
save_summary(final_summary_with_info, video_title, url_or_path, save_directory)
|
|
|
|
|
|
if os.path.exists(audio_path):
|
|
try:
|
|
os.remove(audio_path)
|
|
logger.info("臨時音頻文件已刪除")
|
|
send_sse_message(q, {"status": "臨時音頻文件已刪除"})
|
|
except Exception as e:
|
|
logger.error(f"無法刪除臨時音頻文件: {str(e)}")
|
|
send_sse_message(q, {"status": f"無法刪除臨時音頻文件: {str(e)}"})
|
|
|
|
|
|
if not url_or_path.startswith('http') and url_or_path.lower().endswith('.mp4'):
|
|
try:
|
|
os.remove(url_or_path)
|
|
logger.info("臨時上傳的 .mp4 文件已刪除")
|
|
send_sse_message(q, {"status": "臨時上傳的 .mp4 文件已刪除"})
|
|
except Exception as e:
|
|
logger.error(f"無法刪除臨時上傳的 .mp4 文件: {str(e)}")
|
|
send_sse_message(q, {"status": f"無法刪除臨時上傳的 .mp4 文件: {str(e)}"})
|
|
|
|
logger.info("視頻處理完成")
|
|
|
|
except Exception as e:
|
|
logger.exception("處理視頻時發生錯誤")
|
|
send_sse_message(q, {"status": f"錯誤: {str(e)}"})
|
|
|
|
|
|
|
|
refinement_count = 0
|
|
max_refinement_count = config.get('max_refinement_count', 5)
|
|
summary_versions = deque(maxlen=max_refinement_count + 1)
|
|
|
|
|
|
def refine_final_summary(original_summary, user_feedback, video_title, processed_description):
|
|
prompt = f"""你是一個專業的廣東話視頻內容摘要編輯。請根據用戶的反饋,改進以下內容摘要。標題是"{video_title}"。
|
|
|
|
原始摘要:
|
|
{original_summary}
|
|
|
|
用戶反饋:
|
|
{user_feedback}
|
|
|
|
|
|
請遵循以下指引:
|
|
1. 仔細閱讀原始摘要和用戶反饋,以用戶反饋的指示作為優先原則。
|
|
2. 根據用戶反饋,補充、修正在原始摘要內,任何錯誤或不準確的資訊,確保摘要全面涵蓋主題內容。
|
|
3. 保留原始摘要中準確和重要的部分。
|
|
4. 確保摘要邏輯清晰,結構完整,易於閱讀理解。
|
|
5. 如有必要,重新組織摘要結構以提高清晰度和連貫性。
|
|
6. 保留原有的 Hash Tag(如果有的話),或根據更新後的內容調整 Hash Tag。
|
|
|
|
請生成最終摘要,確保其準確、全面、連貫,並符合用戶的反饋意見。"""
|
|
|
|
response = client.chat.completions.create(
|
|
model=config['openai_model'],
|
|
messages=[{"role": "system", "content": prompt}],
|
|
temperature=0.8,
|
|
max_tokens=1000
|
|
)
|
|
|
|
refined_summary = response.choices[0].message.content.strip()
|
|
return refined_summary
|
|
|
|
|
|
@app.route('/refine_summary', methods=['POST'])
|
|
def refine_summary():
|
|
global refinement_count
|
|
data = request.json
|
|
|
|
original_summary = data['original_summary']
|
|
user_feedback = data['user_feedback']
|
|
video_title = data['video_title']
|
|
video_url = data['video_url']
|
|
processed_description = data['processed_description']
|
|
|
|
if refinement_count >= config['max_refinement_count']:
|
|
return jsonify({"error": "已達到最大重新生成次數"}), 400
|
|
|
|
refined_summary = refine_final_summary(original_summary, user_feedback, video_title, processed_description)
|
|
refinement_count += 1
|
|
|
|
|
|
refined_summary_with_info = f"影片名稱:{video_title}\n網址或路徑:{video_url}\n\n{refined_summary}"
|
|
|
|
logger.info(f"Sending refined summary: {refined_summary_with_info}")
|
|
return jsonify({
|
|
"refined_summary": refined_summary_with_info,
|
|
"version": refinement_count,
|
|
"total_versions": refinement_count + 1
|
|
})
|
|
|
|
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
@app.route('/process', methods=['POST'])
|
|
def process():
|
|
try:
|
|
url_or_path = request.form.get('url_or_path')
|
|
|
|
if not url_or_path:
|
|
return jsonify({"error": "No URL or path provided"}), 400
|
|
|
|
if url_or_path.startswith('http'):
|
|
|
|
pass
|
|
else:
|
|
|
|
if 'file' not in request.files:
|
|
return jsonify({"error": "No file uploaded"}), 400
|
|
file = request.files['file']
|
|
if file.filename == '':
|
|
return jsonify({"error": "No file selected"}), 400
|
|
if file:
|
|
filename = secure_filename(file.filename)
|
|
file_path = os.path.join(config['save_directory'], filename)
|
|
file.save(file_path)
|
|
url_or_path = file_path
|
|
|
|
|
|
local_video_description = request.form.get('localVideoDescription', '')
|
|
|
|
logger.info(f"處理文件: {url_or_path}")
|
|
|
|
q = Queue()
|
|
thread = Thread(target=process_video, args=(url_or_path, q, local_video_description))
|
|
thread.start()
|
|
return Response(event_stream(q), content_type='text/event-stream')
|
|
except Exception as e:
|
|
error_message = f"處理請求時出現錯誤: {str(e)}"
|
|
logger.error(error_message)
|
|
return jsonify({"error": error_message}), 500
|
|
|
|
def event_stream(q):
|
|
while True:
|
|
message = q.get()
|
|
yield f"data: {json.dumps(message)}\n\n"
|
|
if message.get('status') == '處理完成' or message.get('status').startswith('錯誤'):
|
|
break
|
|
|
|
if __name__ == '__main__':
|
|
port = int(os.environ.get('PORT', 5000))
|
|
app.run(host='0.0.0.0', port=port) |