#Using codes from killerz3/PodGen & eswardivi/Podcastify import json import httpx import os import re import feedparser import asyncio import random import edge_tts import tempfile import cohere import gradio as gr from pydub import AudioSegment from moviepy.editor import AudioFileClip, concatenate_audioclips double_prompt = ''' [SYSTEM_INSTRUCT] You are an insightful podcast generator for The Daily Show. You have to create short conversations between Xiao and Yang that gives an overview of the Info given by the user. Please provide the script and output strictly in the following JSON format: { "title": "[string]", "content": { "Xiao_0: "[string]", "Yang_0": "[string]", ... } } #Be concise, keep Show style. No less than five rounds of conversation. #Please note that the [string] you generate now must be in native Chinese. ''' single_prompt = """ [SYSTEM_INSTRUCT] You are a podcast generator in The Daily Show. You have to create short scripts for Yang to comment on current events in the Info given by the user. Please provide the script and output strictly in the following JSON format: { "title": "[string]", "content": { "Yang_0: "[string]", "Yang_1": "[string]", ... } } #Be concise, keep Show style. No less than five rounds of conversation. #Please note that the [string] you generate now must be in native Chinese. """ DESCRIPTION = '''

📻听说demo

一个轻量的中文播客

🔎 输入完整的网页链接发送即可。

🦕 部分网址可能无法解析,请尝试更换。

🍀 点击随机将随机获取资讯。

''' css = """ h1 { text-align: center; display: block; } p { text-align: center; } footer { display:none !important } """ rss_feed = 'https://www.scmp.com/rss/4/feed' apikey = os.environ.get("API_KEY") co = cohere.Client(api_key=apikey) # RSS feeds def is_url(string): url_pattern = re.compile( r'^(?:http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) return re.match(url_pattern, string) is not None def validate_url(url): try: response = httpx.get(url, timeout=60.0) response.raise_for_status() return response.text except httpx.RequestError as e: return f"An error occurred while requesting {url}: {str(e)}" except httpx.HTTPStatusError as e: return f"Error response {e.response.status_code} while requesting {url}" except Exception as e: return f"An unexpected error occurred: {str(e)}" def fetch_text(url): print("Entered Webpage Extraction") prefix_url = "https://r.jina.ai/" full_url = prefix_url + url print(full_url) print("Exited Webpage Extraction") return validate_url(full_url) async def text_to_speech(text, voice, filename): communicate = edge_tts.Communicate(text, voice) await communicate.save(filename) async def gen_show(script): title = script['title'] content = script['content'] temp_files = [] tasks = [] for key, text in content.items(): speaker = key.split('_')[0] # Extract the speaker name index = key.split('_')[1] # Extract the dialogue index voice = "zh-CN-XiaoxiaoNeural" if speaker == "Xiao" else "zh-CN-YunyangNeural" # Create temporary file for each speaker's dialogue temp_file = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) temp_files.append(temp_file.name) filename = temp_file.name tasks.append(text_to_speech(text, voice, filename)) print(f"Generated audio for {speaker}_{index}: {filename}") await asyncio.gather(*tasks) # Combine the audio files using moviepy audio_clips = [AudioFileClip(temp_file) for temp_file in temp_files] combined = concatenate_audioclips(audio_clips) # Create temporary file for the combined output output_filename = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False).name # Save the combined file combined.write_audiofile(output_filename) print(f"Combined audio saved as: {output_filename}") # Clean up temporary files for temp_file in temp_files: os.remove(temp_file) print(f"Deleted temporary file: {temp_file}") return output_filename def extract_content(text): """Extracts the JSON content from the given text.""" match = re.search(r'\{(?:[^{}]|\{[^{}]*\})*\}', text, re.DOTALL) if match: return match.group(0) else: return None async def main(link, peoples): if not link.startswith("http://") and not link.startswith("https://"): return "URL must start with 'http://' or 'https://'",None text = fetch_text(link) if "Error" in text: return text, None prompt = f"Info: {text}" if peoples == "双人": system_prompt = double_prompt else: system_prompt = single_prompt messages = system_prompt + "\n\n\n" + prompt completion = co.chat( model="command-r", message=messages ) print(completion) generated_script = extract_content(completion.text) #print("Generated Script:"+generated_script) # Check if the generated_script is empty or not valid JSON if not generated_script or not generated_script.strip().startswith('{'): raise ValueError("Failed to generate a valid script.") script_json = json.loads(generated_script) # Use the generated script as input output_filename = await gen_show(script_json) print("Output File:"+output_filename) # Read the generated audio file return output_filename async def random_news(peoples): global rss_feed if not is_url(rss_feed): raise ValueError(f"{rss_feed} is not a valid RSS feed.") news = [] feed = feedparser.parse(rss_feed) for entry in feed.entries: news.append(entry.link) random_url = random.choice(news) print(random_url) output = await main(random_url, peoples) return output Examples = [ ["https://www.yahoo.com/news/shes-worlds-most-expensive-cow-040156493.html","单人"], ["https://www.yahoo.com/entertainment/kevin-spacey-says-owes-many-220432469.html","双人"], ["https://www.yahoo.com/tech/super-hornet-armed-sm-6-180853983.html","双人"], ["https://www.yahoo.com/news/harvard-scientists-may-unknown-technologically-150917239.html","单人"], ] with gr.Blocks(theme='soft', css=css, title="听说") as iface: with gr.Accordion(""): gr.Markdown(DESCRIPTION) with gr.Row(): output_box = gr.Audio(label="播客", type="filepath", interactive=False, autoplay=True, elem_classes="audio") # Create an output textbox with gr.Row(): input_box = gr.Textbox(label="网址", placeholder="请输入https开头的网址") with gr.Row(): peoples = gr.Radio(["单人","双人"],value="双人",label="播音员人数") with gr.Row(): submit_btn = gr.Button("🚀 发送") # Create a submit button random_btn = gr.Button("🤙 随机") clear_btn = gr.ClearButton(output_box, value="🗑️ 清除") # Create a clear button gr.Examples(examples=Examples, inputs=[input_box,peoples], outputs=output_box, fn=main, label="示例", cache_examples="lazy") # Set up the event listeners submit_btn.click(main, inputs=[input_box,peoples], outputs=output_box) random_btn.click(fn=random_news, inputs=peoples, outputs=output_box) #gr.close_all() iface.queue().launch(show_api=False) # Launch the Gradio interface