import os import json import datetime import subprocess from queue import Queue from threading import Thread import torch import yt_dlp from faster_whisper import WhisperModel from flask import Flask, render_template, request, Response, jsonify from openai import OpenAI import spacy from collections import Counter import time import uuid import logging from logging.handlers import RotatingFileHandler from werkzeug.utils import secure_filename from collections import deque # 設置基本日誌配置 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 創建一個文件處理器,使用 RotatingFileHandler 來限制日誌文件大小 log_file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'app.log') file_handler = RotatingFileHandler(log_file_path, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8') file_handler.setLevel(logging.DEBUG) # 創建一個控制台處理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 創建一個格式器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) # 將處理器添加到日誌器 logger.addHandler(file_handler) logger.addHandler(console_handler) # 設置其他模塊的日誌級別 logging.getLogger("faster_whisper").setLevel(logging.INFO) os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE' app = Flask(__name__, static_folder='static', static_url_path='/static') # 讀取設定檔 current_directory = os.path.dirname(os.path.realpath(__file__)) config_file_path = os.path.join(current_directory, 'config.json') try: with open(config_file_path, 'r', encoding='utf-8') as f: config = json.load(f) logger.info("成功加載配置文件") except Exception as e: logger.exception("加載配置文件時發生錯誤") raise # 設置 OpenAI API 金鑰 client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # 初始化 SpaCy nlp = spacy.load(config['spacy_model']) # 初始化 Whisper 模型 model = WhisperModel(config['whisper_model'], device="auto", compute_type=config['whisper_compute_type']) # 設置 FFmpeg 路徑 ffmpeg_path = config['ffmpeg_path'] if ffmpeg_path not in os.environ["PATH"]: os.environ["PATH"] += os.pathsep + ffmpeg_path def send_sse_message(q, data): q.put_nowait(data) def clean_filename(filename): return ''.join(c for c in filename if c.isalnum() or c in (' ', '.', '_')).rstrip() def download_audio(youtube_url, save_directory, q): send_sse_message(q, {"status": "開始下載 YouTube 音頻..."}) unique_id = str(uuid.uuid4())[:8] # 生成一個唯一的識別碼 output_filename = f"audio_{unique_id}" output_path = os.path.join(save_directory, output_filename) ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': output_path + ".%(ext)s", 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], 'ffmpeg_location': ffmpeg_path, 'quiet': True } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(youtube_url, download=True) video_title = clean_filename(info.get('title', 'Untitled')) # 等待一小段時間,確保文件已經完全寫入 time.sleep(2) # 檢查生成的文件 for file in os.listdir(save_directory): if file.startswith(output_filename) and file.endswith('.mp3'): converted_output_path = os.path.join(save_directory, file) break else: raise FileNotFoundError("無法找到下載的音頻文件") send_sse_message(q, {"status": f"音頻下載完成: {video_title}"}) return converted_output_path, video_title except Exception as e: send_sse_message(q, {"status": f"下載音頻時發生錯誤: {str(e)}"}) raise def process_local_video(video_path, save_directory, q): send_sse_message(q, {"status": "正在處理本地視頻..."}) video_title = os.path.splitext(os.path.basename(video_path))[0] output_path = os.path.join(save_directory, f"{video_title}_audio.mp3") ffmpeg_command = [ os.path.join(ffmpeg_path, 'ffmpeg'), # 使用完整路徑 '-i', video_path, '-vn', # 禁用視頻 '-acodec', 'libmp3lame', # 使用 MP3 編碼器 '-q:a', '2', # 音頻質量,2 是很好的質量 output_path ] logger.info(f"FFmpeg 命令: {' '.join(ffmpeg_command)}") logger.info(f"輸入視頻路徑: {video_path}") logger.info(f"輸出音頻路徑: {output_path}") try: # 檢查輸入文件是否存在 if not os.path.exists(video_path): raise FileNotFoundError(f"輸入視頻文件不存在: {video_path}") # 檢查輸出目錄是否可寫 if not os.access(os.path.dirname(output_path), os.W_OK): raise PermissionError(f"沒有寫入權限: {os.path.dirname(output_path)}") result = subprocess.run(ffmpeg_command, check=True, capture_output=True, text=True) logger.info(f"FFmpeg 輸出: {result.stdout}") send_sse_message(q, {"status": f"本地視頻處理完成: {video_title}"}) return output_path, video_title except subprocess.CalledProcessError as e: error_message = f"處理本地視頻時出錯: {e}\n\nFFmpeg 輸出:\n{e.stdout}\n\nFFmpeg 錯誤:\n{e.stderr}" logger.error(error_message) send_sse_message(q, {"status": "錯誤", "error": error_message}) raise except Exception as e: error_message = f"處理本地視頻時出現意外錯誤: {str(e)}" logger.error(error_message) send_sse_message(q, {"status": "錯誤", "error": error_message}) raise def generate_transcript(audio_path, video_title, q): send_sse_message(q, {"status": "開始音頻轉錄..."}) segments, info = model.transcribe( audio_path, beam_size=config['whisper_beam_size'], language=config['whisper_language'], temperature=config['whisper_temperature'], initial_prompt=video_title, repetition_penalty=2, condition_on_previous_text=False ) transcript = "\n".join([segment.text for segment in segments]) send_sse_message(q, {"status": f"音頻轉錄完成,檢測到的語言: {info.language}", "transcript": transcript}) return transcript def smart_split_transcript(transcript, q): send_sse_message(q, {"status": "開始智能分割轉錄文本..."}) doc = nlp(transcript) segments = [] current_segment = "" max_length = 1024 for sent in doc.sents: if len(current_segment) + len(sent.text) <= max_length: current_segment += " " + sent.text else: if current_segment: segments.append(current_segment.strip()) current_segment = sent.text if current_segment: segments.append(current_segment.strip()) send_sse_message(q, {"status": f"轉錄文本分割完成,共 {len(segments)} 個段落"}) return segments def extract_keywords_and_entities(text): doc = nlp(text) keywords = [token.lemma_ for token in doc if not token.is_stop and not token.is_punct] keyword_freq = Counter(keywords).most_common(5) entities = [(ent.text, ent.label_) for ent in doc.ents] return [keyword for keyword, _ in keyword_freq], entities def process_youtube_description(description): prompt = f"""請處理以下 YouTube 影片描述,移除所有渠道宣傳內容後,保留原文。 描述內容: {description}""" response = client.chat.completions.create( model=config['openai_model'], messages=[{"role": "system", "content": prompt}], temperature=0.1, max_tokens=500 ) processed_description = response.choices[0].message.content.strip() # 在終端機打印處理後的描述 print("處理後的 YouTube 描述:") print(processed_description) print("------------------------") return processed_description def get_openai_summary(segment, video_title, is_final_summary, keywords, entities, processed_description, q): if is_final_summary: prompt = f"""以下是YouTube視頻'{video_title}'的多個段落摘要。請生成一個深入且全面的最終摘要,盡力保留主要內容、資訊細節、關鍵點和結論。摘要應該是連貫的、有條理的、詳細的,並且避免重複信息。在內容結尾,加入能夠方便搜尋器和 SEO 找到的 3 個 Hash Tag。請用繁體中文(香港)回應。 影片描述提供的可靠資訊 (請特別使用來補充和糾正摘要中的信息,尤其是戈人名或專有名詞): {processed_description} 以下是待處理的摘要內容: {segment}""" else: keywords_str = ", ".join(keywords) entities_str = ", ".join([f"{text}({label})" for text, label in entities]) prompt = f"""以下內容是YouTube視頻的部份字幕文本,每行以短句顯示,閱讀時需要將多行組合一起才是一句完整的句子,偶爾會出現音譯的錯別字,請修正。內容主題是關於:'{video_title}',其中包含的關鍵詞有:{keywords_str},和以下的NER實體:{entities_str}。 影片描述提供的可靠資訊 (請特別使用來補充和糾正摘要中的信息,尤其是戈人名或專有名詞): {processed_description} 請根據每個NER實體的意思,以及上述描述資訊,以不少於 200 字的繁體中文(香港) 重組文章段落。目標是盡量抽取與主題有關的所有觀點、事件、案例、學問、步驟、方法、時間、人物、數據、名詞的基礎資料,建構成一篇連貫的、全面的、詳細的紀錄。請特別注意使用描述資訊來糾正可能的錯誤,尤其是人名和地名。忽略重複的、單純抒發個人情緒的訊息、與 Youtuber 個人宣傳的訊息。 你要處理的內容如下: {segment}""" response = client.chat.completions.create( model=config['openai_model'], messages=[{"role": "system", "content": prompt}], temperature=0.6, max_tokens=1000 ) summary = response.choices[0].message.content.strip() return summary def save_summary(text, video_title, url_or_path, save_directory): current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S") cleaned_title = clean_filename(video_title)[:20] summary_file_name = f"GPT_Summary_{cleaned_title}_{current_time}.txt" summary_file_path = os.path.join(save_directory, summary_file_name) # 移除文本開頭可能存在的影片名稱和 URL/路徑信息 lines = text.split('\n') if lines[0].startswith("影片名稱:") and lines[1].startswith("網址或路徑:"): text = '\n'.join(lines[2:]) summary_text = f"影片名稱:\"{video_title}\"\n網址或路徑:\"{url_or_path}\"\n\n{text}" with open(summary_file_path, "w", encoding="utf-8") as file: file.write(summary_text) def save_transcript(transcript, video_title, url_or_path, save_directory): current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S") cleaned_title = clean_filename(video_title)[:20] transcript_file_name = f"Transcript_{cleaned_title}_{current_time}.txt" transcript_file_path = os.path.join(save_directory, transcript_file_name) with open(transcript_file_path, "w", encoding="utf-8") as file: file.write(f"影片名稱:\"{video_title}\"\n網址或路徑:\"{url_or_path}\"\n\n{transcript}") logger.info(f"轉錄文本已保存至 {transcript_file_path}") def save_segment_summary(summary_text, segment_index, video_title, save_directory): current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S") cleaned_title = clean_filename(video_title)[:20] summary_file_name = f"Segment_Summary_{cleaned_title}_{segment_index}_{current_time}.txt" summary_file_path = os.path.join(save_directory, summary_file_name) with open(summary_file_path, "w", encoding="utf-8") as file: file.write(summary_text) logger.info(f"段落摘要已保存至 {summary_file_path}") def process_video(url_or_path, q, local_video_description=''): try: logger.info(f"開始處理視頻: {url_or_path}") save_directory = config['save_directory'] processed_description = "" if url_or_path.startswith('http'): # YouTube URL 處理邏輯保持不變 logger.info("檢測到 YouTube URL,開始獲取視頻信息") ydl_opts = {'quiet': True} with yt_dlp.YoutubeDL(ydl_opts) as ydl: video_info = ydl.extract_info(url_or_path, download=False) video_data = { 'title': video_info['title'], 'duration': str(datetime.timedelta(seconds=video_info['duration'])), 'view_count': video_info['view_count'], 'like_count': video_info.get('like_count', 'N/A'), 'description': video_info['description'] } send_sse_message(q, {"status": "獲取到視頻信息", "video_info": video_data}) # 處理 YouTube 描述 raw_description = video_info['description'] processed_description = process_youtube_description(raw_description) logger.info("開始下載 YouTube 音頻") audio_path, video_title = download_audio(url_or_path, save_directory, q) else: logger.info("檢測到本地文件路徑,開始處理本地視頻") audio_path, video_title = process_local_video(url_or_path, save_directory, q) processed_description = local_video_description if local_video_description else "這是一個本地視頻文件,用戶沒有提供視頻描述。" if not audio_path or not os.path.exists(audio_path): raise FileNotFoundError(f"音頻文件不存在: {audio_path}") logger.info("開始生成轉錄文本") transcript = generate_transcript(audio_path, video_title, q) # 保存轉錄文本 save_transcript(transcript, video_title, url_or_path, save_directory) logger.info("開始分割轉錄文本") segments = smart_split_transcript(transcript, q) all_summaries = [] for i, segment in enumerate(segments, start=1): logger.info(f"開始為文本段 {i}/{len(segments)} 生成摘要") send_sse_message(q, {"status": f"正在為文本段 {i}/{len(segments)} 生成摘要..."}) keywords, entities = extract_keywords_and_entities(segment) segment_summary = get_openai_summary(segment, video_title, False, keywords, entities, processed_description, q) if segment_summary: all_summaries.append(segment_summary) save_segment_summary(segment_summary, i, video_title, save_directory) send_sse_message(q, {"status": f"段落 {i} 摘要完成", "summary": segment_summary}) logger.info("開始生成最終摘要") send_sse_message(q, {"status": "正在生成最終摘要..."}) all_summaries_text = "\n\n".join(all_summaries) final_summary = get_openai_summary(all_summaries_text, video_title, True, [], [], processed_description, q) # 將最終摘要添加到 summary_versions summary_versions.append(final_summary) # 修改這裡:發送包含版本信息的最終摘要 send_sse_message(q, { "status": "處理完成", "final_summary": final_summary, "version": 0, "total_versions": len(summary_versions) }) # 添加影片名稱和 URL/路徑到最終摘要 final_summary_with_info = f'影片名稱:"{video_title}"\n網址或路徑:"{url_or_path}"\n\n{final_summary}' send_sse_message(q, {"status": "處理完成", "final_summary": final_summary_with_info}) # 保存最終摘要 logger.info("保存最終摘要") save_summary(final_summary_with_info, video_title, url_or_path, save_directory) # 刪除臨時音頻文件 if os.path.exists(audio_path): try: os.remove(audio_path) logger.info("臨時音頻文件已刪除") send_sse_message(q, {"status": "臨時音頻文件已刪除"}) except Exception as e: logger.error(f"無法刪除臨時音頻文件: {str(e)}") send_sse_message(q, {"status": f"無法刪除臨時音頻文件: {str(e)}"}) # 如果是本地上傳的 .mp4 文件,刪除臨時文件 if not url_or_path.startswith('http') and url_or_path.lower().endswith('.mp4'): try: os.remove(url_or_path) logger.info("臨時上傳的 .mp4 文件已刪除") send_sse_message(q, {"status": "臨時上傳的 .mp4 文件已刪除"}) except Exception as e: logger.error(f"無法刪除臨時上傳的 .mp4 文件: {str(e)}") send_sse_message(q, {"status": f"無法刪除臨時上傳的 .mp4 文件: {str(e)}"}) logger.info("視頻處理完成") except Exception as e: logger.exception("處理視頻時發生錯誤") send_sse_message(q, {"status": f"錯誤: {str(e)}"}) # 在全局變量部分添加: refinement_count = 0 max_refinement_count = config.get('max_refinement_count', 5) # 使用 get 方法,如果 config.json 配置中沒有,則使用默認值 5 summary_versions = deque(maxlen=max_refinement_count + 1) # 添加新的函數: def refine_final_summary(original_summary, user_feedback, video_title, processed_description): prompt = f"""你是一個專業的廣東話視頻內容摘要編輯。請根據用戶的反饋,改進以下內容摘要。標題是"{video_title}"。 原始摘要: {original_summary} 用戶反饋: {user_feedback} 請遵循以下指引: 1. 仔細閱讀原始摘要和用戶反饋,以用戶反饋的指示作為優先原則。 2. 根據用戶反饋,補充、修正在原始摘要內,任何錯誤或不準確的資訊,確保摘要全面涵蓋主題內容。 3. 保留原始摘要中準確和重要的部分。 4. 確保摘要邏輯清晰,結構完整,易於閱讀理解。 5. 如有必要,重新組織摘要結構以提高清晰度和連貫性。 6. 保留原有的 Hash Tag(如果有的話),或根據更新後的內容調整 Hash Tag。 請生成最終摘要,確保其準確、全面、連貫,並符合用戶的反饋意見。""" response = client.chat.completions.create( model=config['openai_model'], messages=[{"role": "system", "content": prompt}], temperature=0.8, max_tokens=1000 ) refined_summary = response.choices[0].message.content.strip() return refined_summary # 添加新的路由: @app.route('/refine_summary', methods=['POST']) def refine_summary(): global refinement_count data = request.json #logger.info(f"Received refinement request: {data}") #{'original_summary': .... 'user_feedback': .... 'video_title':...'video_url'...'processed_description'... original_summary = data['original_summary'] user_feedback = data['user_feedback'] video_title = data['video_title'] video_url = data['video_url'] processed_description = data['processed_description'] if refinement_count >= config['max_refinement_count']: return jsonify({"error": "已達到最大重新生成次數"}), 400 refined_summary = refine_final_summary(original_summary, user_feedback, video_title, processed_description) refinement_count += 1 # 添加視頻信息到摘要 refined_summary_with_info = f"影片名稱:{video_title}\n網址或路徑:{video_url}\n\n{refined_summary}" logger.info(f"Sending refined summary: {refined_summary_with_info}") return jsonify({ "refined_summary": refined_summary_with_info, "version": refinement_count, "total_versions": refinement_count + 1 }) @app.route('/') def index(): return render_template('index.html') @app.route('/process', methods=['POST']) def process(): try: url_or_path = request.form.get('url_or_path') if not url_or_path: return jsonify({"error": "No URL or path provided"}), 400 if url_or_path.startswith('http'): # YouTube URL 處理邏輯保持不變 pass else: # 本地文件處理 if 'file' not in request.files: return jsonify({"error": "No file uploaded"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "No file selected"}), 400 if file: filename = secure_filename(file.filename) file_path = os.path.join(config['save_directory'], filename) file.save(file_path) url_or_path = file_path # 獲取本地視頻描述 local_video_description = request.form.get('localVideoDescription', '') logger.info(f"處理文件: {url_or_path}") q = Queue() thread = Thread(target=process_video, args=(url_or_path, q, local_video_description)) thread.start() return Response(event_stream(q), content_type='text/event-stream') except Exception as e: error_message = f"處理請求時出現錯誤: {str(e)}" logger.error(error_message) return jsonify({"error": error_message}), 500 def event_stream(q): while True: message = q.get() yield f"data: {json.dumps(message)}\n\n" if message.get('status') == '處理完成' or message.get('status').startswith('錯誤'): break if __name__ == '__main__': port = int(os.environ.get('PORT', 5000)) app.run(host='0.0.0.0', port=port)