adamchanadam's picture
Update app.py
aecb255 verified
import os
import json
import datetime
import subprocess
from queue import Queue
from threading import Thread
import torch
import yt_dlp
from faster_whisper import WhisperModel
from flask import Flask, render_template, request, Response, jsonify
from openai import OpenAI
import spacy
from collections import Counter
import time
import uuid
import logging
from logging.handlers import RotatingFileHandler
from werkzeug.utils import secure_filename
from collections import deque
from dotenv import load_dotenv
# 嘗試加載 .env 文件
if os.path.exists('.env'):
load_dotenv()
# 設置基本日誌配置
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 創建一個文件處理器,使用 RotatingFileHandler 來限制日誌文件大小
log_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'app.log')
file_handler = RotatingFileHandler(log_file_path, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
# 創建一個控制台處理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 創建一個格式器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 將處理器添加到日誌器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 設置其他模塊的日誌級別
logging.getLogger("faster_whisper").setLevel(logging.INFO)
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
app = Flask(__name__, static_folder='static', static_url_path='/static')
# 讀取設定檔
current_directory = os.path.dirname(os.path.realpath(__file__))
config_file_path = os.path.join(current_directory, 'config.json')
try:
with open(config_file_path, 'r', encoding='utf-8') as f:
config = json.load(f)
logger.info("成功加載配置文件")
except Exception as e:
logger.exception("加載配置文件時發生錯誤")
raise
# 設置 OpenAI API 金鑰
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# 初始化 SpaCy
nlp = spacy.load(config['spacy_model'])
# 初始化 Whisper 模型
model = WhisperModel(config['whisper_model'], device="auto", compute_type=config['whisper_compute_type'])
# 設置 FFmpeg 路徑
ffmpeg_path = config['ffmpeg_path']
if ffmpeg_path not in os.environ["PATH"]:
os.environ["PATH"] += os.pathsep + ffmpeg_path
def send_sse_message(q, data):
q.put_nowait(data)
def clean_filename(filename):
return ''.join(c for c in filename if c.isalnum() or c in (' ', '.', '_')).rstrip()
def download_audio(youtube_url, save_directory, q, ydl_opts):
send_sse_message(q, {"status": "開始下載 YouTube 音頻..."})
unique_id = str(uuid.uuid4())[:8]
output_filename = f"audio_{unique_id}"
output_path = os.path.join(save_directory, output_filename)
ydl_opts.update({
'outtmpl': output_path + ".%(ext)s",
})
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(youtube_url, download=True)
video_title = clean_filename(info.get('title', 'Untitled'))
# 等待一小段時間,確保文件已經完全寫入
time.sleep(2)
# 檢查生成的文件
for file in os.listdir(save_directory):
if file.startswith(output_filename) and file.endswith('.mp3'):
converted_output_path = os.path.join(save_directory, file)
break
else:
raise FileNotFoundError("無法找到下載的音頻文件")
send_sse_message(q, {"status": f"音頻下載完成: {video_title}"})
return converted_output_path, video_title
except Exception as e:
send_sse_message(q, {"status": f"下載音頻時發生錯誤: {str(e)}"})
raise
def process_local_video(video_path, save_directory, q):
send_sse_message(q, {"status": "正在處理本地視頻..."})
video_title = os.path.splitext(os.path.basename(video_path))[0]
output_path = os.path.join(save_directory, f"{video_title}_audio.mp3")
ffmpeg_command = [
os.path.join(ffmpeg_path, 'ffmpeg'), # 使用完整路徑
'-i', video_path,
'-vn', # 禁用視頻
'-acodec', 'libmp3lame', # 使用 MP3 編碼器
'-q:a', '2', # 音頻質量,2 是很好的質量
output_path
]
logger.info(f"FFmpeg 命令: {' '.join(ffmpeg_command)}")
logger.info(f"輸入視頻路徑: {video_path}")
logger.info(f"輸出音頻路徑: {output_path}")
try:
# 檢查輸入文件是否存在
if not os.path.exists(video_path):
raise FileNotFoundError(f"輸入視頻文件不存在: {video_path}")
# 檢查輸出目錄是否可寫
if not os.access(os.path.dirname(output_path), os.W_OK):
raise PermissionError(f"沒有寫入權限: {os.path.dirname(output_path)}")
result = subprocess.run(ffmpeg_command, check=True, capture_output=True, text=True)
logger.info(f"FFmpeg 輸出: {result.stdout}")
send_sse_message(q, {"status": f"本地視頻處理完成: {video_title}"})
return output_path, video_title
except subprocess.CalledProcessError as e:
error_message = f"處理本地視頻時出錯: {e}\n\nFFmpeg 輸出:\n{e.stdout}\n\nFFmpeg 錯誤:\n{e.stderr}"
logger.error(error_message)
send_sse_message(q, {"status": "錯誤", "error": error_message})
raise
except Exception as e:
error_message = f"處理本地視頻時出現意外錯誤: {str(e)}"
logger.error(error_message)
send_sse_message(q, {"status": "錯誤", "error": error_message})
raise
def generate_transcript(audio_path, video_title, q):
send_sse_message(q, {"status": "開始音頻轉錄..."})
segments, info = model.transcribe(
audio_path,
beam_size=config['whisper_beam_size'],
language=config['whisper_language'],
temperature=config['whisper_temperature'],
initial_prompt=video_title,
repetition_penalty=2,
condition_on_previous_text=False
)
transcript = "\n".join([segment.text for segment in segments])
send_sse_message(q, {"status": f"音頻轉錄完成,檢測到的語言: {info.language}", "transcript": transcript})
return transcript
def smart_split_transcript(transcript, q):
send_sse_message(q, {"status": "開始智能分割轉錄文本..."})
doc = nlp(transcript)
segments = []
current_segment = ""
max_length = 1024
for sent in doc.sents:
if len(current_segment) + len(sent.text) <= max_length:
current_segment += " " + sent.text
else:
if current_segment:
segments.append(current_segment.strip())
current_segment = sent.text
if current_segment:
segments.append(current_segment.strip())
send_sse_message(q, {"status": f"轉錄文本分割完成,共 {len(segments)} 個段落"})
return segments
def extract_keywords_and_entities(text):
doc = nlp(text)
keywords = [token.lemma_ for token in doc if not token.is_stop and not token.is_punct]
keyword_freq = Counter(keywords).most_common(5)
entities = [(ent.text, ent.label_) for ent in doc.ents]
return [keyword for keyword, _ in keyword_freq], entities
def process_youtube_description(description):
prompt = f"""請處理以下 YouTube 影片描述,移除所有渠道宣傳內容後,保留原文。
描述內容:
{description}"""
response = client.chat.completions.create(
model=config['openai_model'],
messages=[{"role": "system", "content": prompt}],
temperature=0.1,
max_tokens=500
)
processed_description = response.choices[0].message.content.strip()
# 在終端機打印處理後的描述
print("處理後的 YouTube 描述:")
print(processed_description)
print("------------------------")
return processed_description
def get_openai_summary(segment, video_title, is_final_summary, keywords, entities, processed_description, q):
if is_final_summary:
prompt = f"""以下是YouTube視頻'{video_title}'的多個段落摘要。請生成一個深入且全面的最終摘要,盡力保留主要內容、資訊細節、關鍵點和結論。摘要應該是連貫的、有條理的、詳細的,並且避免重複信息。在內容結尾,加入能夠方便搜尋器和 SEO 找到的 3 個 Hash Tag。請用繁體中文(香港)回應。
影片描述提供的可靠資訊 (請特別使用來補充和糾正摘要中的信息,尤其是戈人名或專有名詞):
{processed_description}
以下是待處理的摘要內容:
{segment}"""
else:
keywords_str = ", ".join(keywords)
entities_str = ", ".join([f"{text}({label})" for text, label in entities])
prompt = f"""以下內容是YouTube視頻的部份字幕文本,每行以短句顯示,閱讀時需要將多行組合一起才是一句完整的句子,偶爾會出現音譯的錯別字,請修正。內容主題是關於:'{video_title}',其中包含的關鍵詞有:{keywords_str},和以下的NER實體:{entities_str}
影片描述提供的可靠資訊 (請特別使用來補充和糾正摘要中的信息,尤其是戈人名或專有名詞):
{processed_description}
請根據每個NER實體的意思,以及上述描述資訊,以不少於 200 字的繁體中文(香港) 重組文章段落。目標是盡量抽取與主題有關的所有觀點、事件、案例、學問、步驟、方法、時間、人物、數據、名詞的基礎資料,建構成一篇連貫的、全面的、詳細的紀錄。請特別注意使用描述資訊來糾正可能的錯誤,尤其是人名和地名。忽略重複的、單純抒發個人情緒的訊息、與 Youtuber 個人宣傳的訊息。
你要處理的內容如下:
{segment}"""
response = client.chat.completions.create(
model=config['openai_model'],
messages=[{"role": "system", "content": prompt}],
temperature=0.6,
max_tokens=1000
)
summary = response.choices[0].message.content.strip()
return summary
def save_summary(text, video_title, url_or_path, save_directory):
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
cleaned_title = clean_filename(video_title)[:20]
summary_file_name = f"GPT_Summary_{cleaned_title}_{current_time}.txt"
summary_file_path = os.path.join(save_directory, summary_file_name)
# 移除文本開頭可能存在的影片名稱和 URL/路徑信息
lines = text.split('\n')
if lines[0].startswith("影片名稱:") and lines[1].startswith("網址或路徑:"):
text = '\n'.join(lines[2:])
summary_text = f"影片名稱:\"{video_title}\"\n網址或路徑:\"{url_or_path}\"\n\n{text}"
with open(summary_file_path, "w", encoding="utf-8") as file:
file.write(summary_text)
def save_transcript(transcript, video_title, url_or_path, save_directory):
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
cleaned_title = clean_filename(video_title)[:20]
transcript_file_name = f"Transcript_{cleaned_title}_{current_time}.txt"
transcript_file_path = os.path.join(save_directory, transcript_file_name)
with open(transcript_file_path, "w", encoding="utf-8") as file:
file.write(f"影片名稱:\"{video_title}\"\n網址或路徑:\"{url_or_path}\"\n\n{transcript}")
logger.info(f"轉錄文本已保存至 {transcript_file_path}")
def save_segment_summary(summary_text, segment_index, video_title, save_directory):
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
cleaned_title = clean_filename(video_title)[:20]
summary_file_name = f"Segment_Summary_{cleaned_title}_{segment_index}_{current_time}.txt"
summary_file_path = os.path.join(save_directory, summary_file_name)
with open(summary_file_path, "w", encoding="utf-8") as file:
file.write(summary_text)
logger.info(f"段落摘要已保存至 {summary_file_path}")
def process_video(url_or_path, q, local_video_description=''):
try:
logger.info(f"開始處理視頻: {url_or_path}")
save_directory = config['save_directory']
# 處理 YouTube cookies
cookies_content = os.environ.get('YOUTUBE_COOKIES')
cookies_file = os.path.join(os.path.dirname(__file__), 'youtube.com_cookies.txt')
if cookies_content:
cookies_content = cookies_content.strip('"').replace('\\n', '\n').replace('\\t', '\t')
with open(cookies_file, 'w') as f:
f.write(cookies_content)
logger.info("已創建 YouTube cookies 文件")
else:
logger.warning("未找到 YouTube cookies 環境變量")
processed_description = ""
if url_or_path.startswith('http'):
logger.info("檢測到 YouTube URL,開始獲取視頻信息")
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'ffmpeg_location': config['ffmpeg_path'],
'outtmpl': os.path.join(save_directory, 'audio_%(id)s.%(ext)s'),
'quiet': True,
'no_warnings': True,
'ignoreerrors': True,
'logtostderr': False,
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36',
'socket_timeout': 30,
'retries': 5,
'verbose': True,
'extract_flat': 'in_playlist',
'youtube_include_dash_manifest': False,
'source_address': '0.0.0.0',
'cookiefile': cookies_file, # 添加 cookies 文件
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
video_info = ydl.extract_info(url_or_path, download=False)
if video_info is None:
raise ValueError("無法獲取視頻信息")
video_data = {
'title': video_info.get('title', 'Unknown Title'),
'duration': str(datetime.timedelta(seconds=video_info.get('duration', 0))),
'view_count': video_info.get('view_count', 'N/A'),
'like_count': video_info.get('like_count', 'N/A'),
'description': video_info.get('description', 'No description available')
}
send_sse_message(q, {"status": "獲取到視頻信息", "video_info": video_data})
# 處理 YouTube 描述
raw_description = video_info.get('description', '')
processed_description = process_youtube_description(raw_description)
logger.info("開始下載 YouTube 音頻")
audio_path, video_title = download_audio(url_or_path, save_directory, q, ydl_opts)
except yt_dlp.utils.DownloadError as e:
error_message = str(e)
if "Sign in to confirm you're not a bot" in error_message:
send_sse_message(q, {"status": "錯誤:YouTube 要求人機驗證。請稍後再試或使用其他視頻。"})
else:
send_sse_message(q, {"status": f"下載音頻時發生錯誤: {error_message}"})
logger.error(f"YouTube 下載錯誤: {error_message}")
raise
except ValueError as e:
send_sse_message(q, {"status": f"錯誤:{str(e)}"})
logger.error(f"值錯誤: {str(e)}")
raise
else:
# 本地文件處理邏輯保持不變
logger.info("檢測到本地文件路徑,開始處理本地視頻")
audio_path, video_title = process_local_video(url_or_path, save_directory, q)
processed_description = local_video_description if local_video_description else "這是一個本地視頻文件,用戶沒有提供視頻描述。"
# 剩餘的處理邏輯保持不變
if not audio_path or not os.path.exists(audio_path):
raise FileNotFoundError(f"音頻文件不存在: {audio_path}")
logger.info("開始生成轉錄文本")
transcript = generate_transcript(audio_path, video_title, q)
# 保存轉錄文本
save_transcript(transcript, video_title, url_or_path, save_directory)
logger.info("開始分割轉錄文本")
segments = smart_split_transcript(transcript, q)
all_summaries = []
for i, segment in enumerate(segments, start=1):
logger.info(f"開始為文本段 {i}/{len(segments)} 生成摘要")
send_sse_message(q, {"status": f"正在為文本段 {i}/{len(segments)} 生成摘要..."})
keywords, entities = extract_keywords_and_entities(segment)
segment_summary = get_openai_summary(segment, video_title, False, keywords, entities, processed_description, q)
if segment_summary:
all_summaries.append(segment_summary)
save_segment_summary(segment_summary, i, video_title, save_directory)
send_sse_message(q, {"status": f"段落 {i} 摘要完成", "summary": segment_summary})
logger.info("開始生成最終摘要")
send_sse_message(q, {"status": "正在生成最終摘要..."})
all_summaries_text = "\n\n".join(all_summaries)
final_summary = get_openai_summary(all_summaries_text, video_title, True, [], [], processed_description, q)
summary_versions.append(final_summary)
send_sse_message(q, {
"status": "處理完成",
"final_summary": final_summary,
"version": 0,
"total_versions": len(summary_versions)
})
# 添加影片名稱和 URL/路徑到最終摘要
final_summary_with_info = f'影片名稱:"{video_title}"\n網址或路徑:"{url_or_path}"\n\n{final_summary}'
send_sse_message(q, {"status": "處理完成", "final_summary": final_summary_with_info})
# 保存最終摘要
logger.info("保存最終摘要")
save_summary(final_summary_with_info, video_title, url_or_path, save_directory)
# 刪除臨時音頻文件
if os.path.exists(audio_path):
try:
os.remove(audio_path)
logger.info("臨時音頻文件已刪除")
send_sse_message(q, {"status": "臨時音頻文件已刪除"})
except Exception as e:
logger.error(f"無法刪除臨時音頻文件: {str(e)}")
send_sse_message(q, {"status": f"無法刪除臨時音頻文件: {str(e)}"})
# 如果是本地上傳的 .mp4 文件,刪除臨時文件
if not url_or_path.startswith('http') and url_or_path.lower().endswith('.mp4'):
try:
os.remove(url_or_path)
logger.info("臨時上傳的 .mp4 文件已刪除")
send_sse_message(q, {"status": "臨時上傳的 .mp4 文件已刪除"})
except Exception as e:
logger.error(f"無法刪除臨時上傳的 .mp4 文件: {str(e)}")
send_sse_message(q, {"status": f"無法刪除臨時上傳的 .mp4 文件: {str(e)}"})
logger.info("視頻處理完成")
except Exception as e:
logger.exception("處理視頻時發生錯誤")
send_sse_message(q, {"status": f"錯誤: {str(e)}"})
finally:
# 處理完成後刪除 cookies 文件
if os.path.exists(cookies_file):
os.remove(cookies_file)
logger.info("已刪除 YouTube cookies 文件")
# 在全局變量部分添加:
refinement_count = 0
max_refinement_count = config.get('max_refinement_count', 5) # 使用 get 方法,如果 config.json 配置中沒有,則使用默認值 5
summary_versions = deque(maxlen=max_refinement_count + 1)
# 添加新的函數:
def refine_final_summary(original_summary, user_feedback, video_title, processed_description):
prompt = f"""你是一個專業的廣東話視頻內容摘要編輯。請根據用戶的反饋,改進以下內容摘要。標題是"{video_title}"。
原始摘要:
{original_summary}
用戶反饋:
{user_feedback}
請遵循以下指引:
1. 仔細閱讀原始摘要和用戶反饋,以用戶反饋的指示作為優先原則。
2. 根據用戶反饋,補充、修正在原始摘要內,任何錯誤或不準確的資訊,確保摘要全面涵蓋主題內容。
3. 保留原始摘要中準確和重要的部分。
4. 確保摘要邏輯清晰,結構完整,易於閱讀理解。
5. 如有必要,重新組織摘要結構以提高清晰度和連貫性。
6. 保留原有的 Hash Tag(如果有的話),或根據更新後的內容調整 Hash Tag。
請生成最終摘要,確保其準確、全面、連貫,並符合用戶的反饋意見。"""
response = client.chat.completions.create(
model=config['openai_model'],
messages=[{"role": "system", "content": prompt}],
temperature=0.8,
max_tokens=1000
)
refined_summary = response.choices[0].message.content.strip()
return refined_summary
# 添加新的路由:
@app.route('/refine_summary', methods=['POST'])
def refine_summary():
global refinement_count
data = request.json
#logger.info(f"Received refinement request: {data}") #{'original_summary': .... 'user_feedback': .... 'video_title':...'video_url'...'processed_description'...
original_summary = data['original_summary']
user_feedback = data['user_feedback']
video_title = data['video_title']
video_url = data['video_url']
processed_description = data['processed_description']
if refinement_count >= config['max_refinement_count']:
return jsonify({"error": "已達到最大重新生成次數"}), 400
refined_summary = refine_final_summary(original_summary, user_feedback, video_title, processed_description)
refinement_count += 1
# 添加視頻信息到摘要
refined_summary_with_info = f"影片名稱:{video_title}\n網址或路徑:{video_url}\n\n{refined_summary}"
logger.info(f"Sending refined summary: {refined_summary_with_info}")
return jsonify({
"refined_summary": refined_summary_with_info,
"version": refinement_count,
"total_versions": refinement_count + 1
})
@app.route('/')
def index():
return render_template('index.html')
@app.route('/process', methods=['POST'])
def process():
try:
url_or_path = request.form.get('url_or_path')
if not url_or_path:
return jsonify({"error": "No URL or path provided"}), 400
if url_or_path.startswith('http'):
# YouTube URL 處理邏輯保持不變
pass
else:
# 本地文件處理
if 'file' not in request.files:
return jsonify({"error": "No file uploaded"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No file selected"}), 400
if file:
filename = secure_filename(file.filename)
file_path = os.path.join(config['save_directory'], filename)
file.save(file_path)
url_or_path = file_path
# 獲取本地視頻描述
local_video_description = request.form.get('localVideoDescription', '')
logger.info(f"處理文件: {url_or_path}")
q = Queue()
thread = Thread(target=process_video, args=(url_or_path, q, local_video_description))
thread.start()
return Response(event_stream(q), content_type='text/event-stream')
except Exception as e:
error_message = f"處理請求時出現錯誤: {str(e)}"
logger.error(error_message)
return jsonify({"error": error_message}), 500
def event_stream(q):
while True:
message = q.get()
yield f"data: {json.dumps(message)}\n\n"
if message.get('status') == '處理完成' or message.get('status').startswith('錯誤'):
break
if __name__ == '__main__':
port = int(os.environ.get('PORT', 5000))
app.run(host='0.0.0.0', port=port)