# -*- coding: utf-8 -*- # 財政部財政資訊中心 江信宗 import gradio as gr import openai from pydub import AudioSegment from zhconv_rs import zhconv import uuid import edge_tts import json import os import re import time import aiofiles import pypdf import io class TextExtractor: @staticmethod async def extract_from_pdf(file_path: str) -> str: async with aiofiles.open(file_path, 'rb') as file: content = await file.read() pdf_reader = pypdf.PdfReader(io.BytesIO(content)) return "\n\n".join(page.extract_text() for page in pdf_reader.pages if page.extract_text()) @staticmethod async def extract_from_txt(file_path: str) -> str: async with aiofiles.open(file_path, 'r') as file: return await file.read() @classmethod async def extract_text(cls, file_path: str) -> str: _, file_extension = os.path.splitext(file_path) if file_extension.lower() == '.pdf': return await cls.extract_from_pdf(file_path) elif file_extension.lower() == '.txt': return await cls.extract_from_txt(file_path) else: raise gr.Error(f"Unsupported file type: {file_extension}") def create_client(api_key=None): if api_key: openai.api_key = api_key else: openai.api_key = os.getenv("YOUR_API_KEY") return openai.OpenAI(api_key=openai.api_key, base_url="https://api.sambanova.ai/v1") def generate_response(input_text, language, speaker1, speaker2, api_key): speaker1_name = speaker1.split(' - ')[0] speaker2_name = speaker2.split(' - ')[0] gr.Info("正在生成 Podcast 劇本中......") start_time = time.time() if language == "Auto Detect": language_instruction = "- The podcast MUST be in the same language as the user input." else: language_instruction = f"- The podcast Must reply to me in {language} language." example = """ { "topic": "AIF", "podcast": [ { "speaker": 1, "line": "Welcome to the 財資歐北共 Podcast. I am the host {speaker1_name}. Today we have invited an expert {speaker2_name} to join our program despite his busy schedule." }, { "speaker": 2, "line": "Hello everyone, I am {speaker2_name}, I am honored to come and chat with you." }, { "speaker": 1, "line": "Today we will discuss a very interesting topic..." }, { "speaker": 2, "line": "Yes, this topic is indeed fascinating. Let's start with..." }, …………, { "speaker": 1, "line": "Thank you {speaker2_name} for your professional sharing. Welcome to subscribe to the Wishing Podcast. Thank you and goodbye." } ] } """ system_prompt = f"""你的任務是將提供的輸入文字轉換為一個訊息豐富、吸引人且專業的播客對話。輸入文字可能會比較混亂或結構不完整,因為它可能來自不同來源,如PDF檔案或網頁文字等。不要擔心格式問題或任何不相關的訊息;你的目標是超越表面訊息提取可以在播客中討論的關鍵點和知識精華,並突顯有趣的事實。 以下是你將要處理的輸入文字: {{input_text}} 首先,仔細閱讀輸入文字,並Chain-of-Thought積極找出主要話題、關鍵點、令人印象深刻的細節,以及任何有趣的事實或軼事。思考如何將這些訊息以有趣且吸引人的方式呈現出來,以適合高品質的播客劇本。 頭腦風暴創造性的方法來深度探討你在輸入文字中識別出的主要話題、"key insights"及"golden nuggets of knowledge",儘可能思考使用真實生活的範例、講故事技巧或假設情境來讓內容更能吸引聽眾並讓他們感覺學習到新的知識。 請記住,你的播客應當結構清晰和引人入勝並易於普通聽眾理解,避免使用過多的專業術語或假設聽眾對該話題已有瞭解。發揮你的想像力填補輸入文字中的任何空白,或頭腦風暴提出一些值得深入探討與發人深省的問題,以供播客討論。目標是創造一個訊息豐富且娛樂性強的對話,因此可以在你的方法上大膽盡情自由發揮創意。 將你的頭腦風暴想法和播客對話的大綱寫在這裡,務必讓它有趣且吸引人。確保記錄下你希望在結尾重申的主要見解和觀點。 現在你已經進行頭腦風暴並建立大綱,該開始撰寫實際的播客對話了。目標是主持人與專家之間自然、對話式的交流,融入你在頭腦風暴中得出的最佳想法,並花費精力確保將任何複雜話題以易於理解的方式解釋清楚,現實生活中的例子和相關的軼事對於讓訊息深入人心至關重要。 {language_instruction} - The podcast should be most long. - The podcast should be interesting, lively, and engaging, and hook the listener from the start. - The script must be in JSON format. Follow this JSON example structure, MUST be in {language} language: {example} 根據你在頭腦風暴階段提出的關鍵點和創造性想法,撰寫一段引人入勝且訊息豐富的播客對話(至少1000個字)。定義Host({speaker1_name})和Expert({speaker2_name})的角色,Using signposts to guide listeners and avoiding a monotone, robotic tone,Host以熱情的方式突出有趣且吸引人的觀點,而Expert則提供深入分析、背景訊息和更宏觀的見解。內容必須以清晰的概述開始,並包括任何必要的上下文或解釋,使內容對一般聽眾容易理解。使用Host名字 {speaker1_name} 和Expert名字 {speaker2_name},為聽眾營造更吸引人和身臨其境的聆聽體驗。不要包括像[Host]或[Expert]這樣的括號預留位置。設計你的輸出內容必須生動活潑、促進聽眾參與,並避免單調語氣與機器人般的語調,因為它將直接朗讀為音訊。 確保對話儘可能詳細且完整,同時保持在主題之內並維持吸引人的流暢性,避免每句開頭使用"好的"、"是的"。目標是使用你的全部輸出容量,建立儘可能最長的播客節目,同時以娛樂性的方式傳達輸入文字中的關鍵訊息,並追求引人入勝的學習體驗。 在對話結束時,讓主持人和專家自然總結他們討論中的主要見解和要點,這應當是對話的隨機部分,以自然隨意而非明顯刻意的總結 - 目的是在結束前最後一次以自然流暢的方式強化核心思想。 """ client = create_client(api_key) response = client.chat.completions.create( model="Meta-Llama-3.1-405B-Instruct", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": input_text} ], temperature=1 ) try: podcast_match = re.search(r'{.*}', response.choices[0].message.content, re.DOTALL) if podcast_match: podcast_json = podcast_match.group(0) if language == "繁體中文": podcast_json = zhconv(podcast_json, "zh-tw") try: json.loads(podcast_json) except json.JSONDecodeError: podcast_json = re.sub(r',\s*}', '}', podcast_json) podcast_json = re.sub(r',\s*]', ']', podcast_json) end_time = time.time() gr.Info(f"已成功生成 Podcast 劇本,執行時間: {(end_time - start_time):.2f} 秒。") return podcast_json else: raise gr.Error("生成 Podcast 劇本失敗!請稍後重試或減少話題內容!") except Exception as e: if "API key not valid" in str(e): raise gr.Error("無效的 API 金鑰!!請提供有效的 API 金鑰。") elif "rate limit" in str(e).lower(): raise gr.Error("API 金鑰使用額度已超過限制!!請稍後再試或使用其他 API 金鑰。") else: raise gr.Error(f"生成 Podcast 劇本失敗!!請稍後再試。") async def tts_generate(input_text, speaker1, speaker2): voice_names = { "家豪 - 中文 (Male)": "zh-TW-YunJheNeural", "淑芬 - 中文 (Female)": "zh-TW-HsiaoChenNeural", "子晴 - 中文 (Female)": "zh-TW-HsiaoYuNeural", "景睿 - 中文 (Male)": "zh-CN-YunxiNeural", "品妍 - 中文 (Female)": "zh-CN-XiaoxiaoNeural", "志明 - 中文 (Male)": "zh-CN-YunyangNeural", "美玲 - 中文 (Female)": "zh-CN-XiaoyiNeural", "建宏 - 中文 (Male)": "zh-CN-YunjianNeural", "宥廷 - 中文 (Male)": "zh-CN-YunxiaNeural", "雨霏 - 中文 (Female)": "zh-CN-liaoning-XiaobeiNeural", "Andrew - English (Male)": "en-US-AndrewMultilingualNeural", "Ava - English (Female)": "en-US-AvaMultilingualNeural", "Brian - English (Male)": "en-US-BrianMultilingualNeural", "Emma - English (Female)": "en-US-EmmaMultilingualNeural", "Florian - German (Male)": "de-DE-FlorianMultilingualNeural", "Seraphina - German (Female)": "de-DE-SeraphinaMultilingualNeural", "Remy - French (Male)": "fr-FR-RemyMultilingualNeural", "Vivienne - French (Female)": "fr-FR-VivienneMultilingualNeural" } speaker1_voice = voice_names[speaker1] speaker2_voice = voice_names[speaker2] gr.Info("正在生成 Podcast 音檔中......") start_time = time.time() try: podcast_dict = json.loads(input_text) except json.JSONDecodeError: cleaned_input = re.sub(r',\s*}', '}', input_text) cleaned_input = re.sub(r',\s*]', ']', cleaned_input) podcast_dict = json.loads(cleaned_input) podcast_json = { "topic": podcast_dict.get("topic", "Unknown Topic"), "podcast": [] } speaker_map = { 1: "speaker1", 2: "speaker2" } combined = AudioSegment.empty() for line in podcast_dict.get("podcast", []): speaker = line.get("speaker") text = line.get("line", "") voice = speaker1_voice if speaker == 1 else speaker2_voice voice_name = speaker1.split(' - ')[0] if speaker == 1 else speaker2.split(' - ')[0] communicate = edge_tts.Communicate(text, voice) audio_file = f"{voice_name}_{uuid.uuid4()}.mp3" await communicate.save(audio_file) audio = AudioSegment.from_mp3(audio_file) combined += audio os.remove(audio_file) podcast_json["podcast"].append({ "speaker": speaker_map.get(speaker, speaker), "line": text }) output_file = f"Jiangxz_{uuid.uuid4()}.mp3" combined.export(output_file, format="mp3") end_time = time.time() gr.Info(f"已成功生成 Podcast 音檔,執行時間: {(end_time - start_time):.2f} 秒。") return output_file async def process_podcast(input_text, input_file, language, speaker1, speaker2, api_key): gr.Info("開始生成 Podcast 節目及音檔......") start_time = time.time() input_text = input_text.strip() if input_file: input_text = await TextExtractor.extract_text(input_file.name) if not input_text.strip(): gr.Warning("PDF檔案不得為掃描圖片檔,請您確認正確輸入文字或上傳PDF文字檔。") return None, None podcast_script = generate_response(input_text, language, speaker1, speaker2, api_key) speaker1_name = speaker1.split(' - ')[0] speaker2_name = speaker2.split(' - ')[0] try: podcast_data = json.loads(podcast_script) podcast_text = "" for line in podcast_data.get("podcast", []): if isinstance(line['speaker'], int): speaker = speaker1_name if line['speaker'] == 1 else speaker2_name else: speaker = line['speaker'] podcast_text += f"{speaker}:{line['line']}\n" except json.JSONDecodeError: podcast_text = "Error: Unable to parse the podcast script." audio_file = await tts_generate(podcast_script, speaker1, speaker2) end_time = time.time() gr.Info(f"已成功完成 Podcast 節目及音檔,總執行時間: {(end_time - start_time):.2f} 秒。") gr.Info("請等待本訊息自動消失後即可播放或下載 Podcast 音檔!!") return podcast_text, audio_file custom_css = """ .center-aligned { text-align: center !important; color: #ff4081; text-shadow: 2px 2px 4px rgba(0,0,0,0.1); margin-bottom: 0 !important; } .gen-button { border-radius: 10px !important; background-color: #ff4081 !important; color: white !important; font-weight: bold !important; transition: all 0.3s ease !important; margin: 0 !important; } .gen-button:hover { background-color: #f50057 !important; transform: scale(1.05); } .gr-input, .gr-box, .gr-dropdown { border-radius: 10px !important; border: 2px solid #ff4081 !important; margin: 0 !important; } .gr-input:focus, .gr-box:focus, .gr-dropdown:focus { border-color: #f50057 !important; box-shadow: 0 0 0 2px rgba(245,0,87,0.2) !important; } .input-background { background-color: #B7E0FF !important; padding: 15px !important; border-radius: 10px !important; margin: 0 !important; } .input-background textarea { font-size: 18px !important; background-color: #ffffff; border: 1px solid #f0f8ff; border-radius: 8px; } .file-background { background-color: #B7E0FF !important; padding: 15px !important; border-radius: 10px !important; margin: 0 !important; height: auto; } .lng-background { background-color: #FFF5CD !important; padding: 10px !important; border-radius: 10px !important; margin: 0 !important; } .lng-background select { background-color: #ffffff; border: 1px solid #f0f8ff; border-radius: 8px; } .sk1-background { background-color: #FFF5CD !important; padding: 10px !important; border-radius: 10px !important; margin: 0 !important; } .sk1-background select { background-color: #ffffff; border: 1px solid #f0f8ff; border-radius: 8px; } .sk2-background { background-color: #FFF5CD !important; padding: 10px !important; border-radius: 10px !important; margin: 0 !important; } .sk2-background select { background-color: #ffffff; border: 1px solid #f0f8ff; border-radius: 8px; } .clear-button { color: black !important; background-color: #FFCFB3 !important; padding: 10px !important; border-radius: 10px !important; margin: 0 !important; } .clear-button:hover { background-color: #FFA07A !important; transform: scale(1.05); } .api-background { background-color: #FFCFB3 !important; padding: 15px !important; border-radius: 10px !important; margin: 0 !important; } .audio-background { background-color: #FFF4B5 !important; padding: 5px !important; border-radius: 10px !important; margin: 0 !important; } .script-background { background-color: #FEF9D9 !important; padding: 15px !important; border-radius: 10px !important; margin: 0 !important; } .script-background textarea { font-size: 18px !important; background-color: #ffffff; border: 1px solid #f0f8ff; border-radius: 8px; } """ with gr.Blocks(theme=gr.themes.Monochrome(), css=custom_css) as iface: gr.Markdown(""" # 🎙️ 聲音經濟 - 財資歐北共 Podcast 🎙️ > ### **※ 玩轉聲音魅力,開拓更多可能性,自動生成 Podcast 節目及音檔,系統布署:江信宗,LLM:Llama-3.1-405B-Instruct。** """, elem_classes="center-aligned") with gr.Row(): input_text = gr.Textbox( label="請輸入 Podcast 話題(建議50至1000字)", placeholder="受限 LLM Context Length,建議2000字以內......", autofocus=True, max_lines=20, scale=4, elem_classes="input-background" ) fileName = gr.File( file_types=[".pdf", ".txt"], label="或上傳 PDF 檔", scale=1, elem_classes="file-background" ) def check_input_length(text): if 0 < len(text) < 4: return gr.Warning("輸入內容過短,請提供明確的話題內容。") elif len(text) > 4096: return gr.Warning("輸入內容已超過 max tokens,請縮短話題內容。") input_text.change(fn=check_input_length, inputs=[input_text]) with gr.Row(): Language = gr.Dropdown( choices=["繁體中文", "Auto Detect", "English", "日本語", "한국어", "Deutsch", "Français"], value="繁體中文", label="節目語言", interactive=True, scale=1, elem_classes="lng-background" ) speaker_choices = [ "家豪 - 中文 (Male)", "淑芬 - 中文 (Female)", "子晴 - 中文 (Female)", "景睿 - 中文 (Male)", "品妍 - 中文 (Female)", "志明 - 中文 (Male)", "美玲 - 中文 (Female)", "建宏 - 中文 (Male)", "宥廷 - 中文 (Male)", "雨霏 - 中文 (Female)", "Andrew - English (Male)", "Ava - English (Female)", "Brian - English (Male)", "Emma - English (Female)", "Florian - German (Male)", "Seraphina - German (Female)", "Remy - French (Male)", "Vivienne - French (Female)" ] Speaker_1 = gr.Dropdown( choices=speaker_choices, value="景睿 - 中文 (Male)", label="播客#1語音", interactive=True, scale=2, elem_classes="sk1-background" ) Speaker_2 = gr.Dropdown( choices=speaker_choices, value="品妍 - 中文 (Female)", label="播客#2語音", interactive=True, scale=2, elem_classes="sk2-background" ) clear_input_text_button = gr.Button("清除Podcast話題", scale=1, elem_classes="clear-button") clear_input_text_button.click(fn=lambda: (None, None), inputs=None, outputs=[input_text, fileName]) with gr.Row(): generate_button = gr.Button("生成 Podcast 節目及音檔", scale=2, elem_classes="gen-button") api_key = gr.Textbox(label="請輸入您的 API Key", type="password", placeholder="API authentication key for large language models", scale=1, elem_classes="api-background") audio_output = gr.Audio(label="Generated Podcast Audio", elem_classes="audio-background") podcast_script = gr.Textbox(label="Generated Podcast 劇本", elem_classes="script-background") generate_button.click(fn=process_podcast, inputs=[input_text, fileName, Language, Speaker_1, Speaker_2, api_key], outputs=[podcast_script, audio_output]) if __name__ == "__main__": if "SPACE_ID" in os.environ: iface.launch() else: iface.launch(share=True, show_api=False)