from __future__ import unicode_literals import os, requests, asyncio, math, time, wget from pyrogram import filters, Client from pyrogram.types import Message from youtube_search import YoutubeSearch from youtubesearchpython import SearchVideos from yt_dlp import YoutubeDL @Client.on_message(filters.command(['song', 'mp3']) & filters.private) async def song(client, message): user_id = message.from_user.id user_name = message.from_user.first_name rpk = "["+user_name+"](tg://user?id="+str(user_id)+")" query = '' for i in message.command[1:]: query += ' ' + str(i) print(query) m = await message.reply(f"**ѕєαrchíng чσur ѕσng...!\n {query}**") ydl_opts = {"format": "bestaudio[ext=m4a]"} try: results = YoutubeSearch(query, max_results=1).to_dict() link = f"https://youtube.com{results[0]['url_suffix']}" title = results[0]["title"][:40] thumbnail = results[0]["thumbnails"][0] thumb_name = f'thumb{title}.jpg' thumb = requests.get(thumbnail, allow_redirects=True) open(thumb_name, 'wb').write(thumb.content) performer = f"[Mᴋɴ Bᴏᴛᴢ™]" duration = results[0]["duration"] url_suffix = results[0]["url_suffix"] views = results[0]["views"] except Exception as e: print(str(e)) return await m.edit("**𝙵𝙾𝚄𝙽𝙳 𝙽𝙾𝚃𝙷𝙸𝙽𝙶 𝙿𝙻𝙴𝙰𝚂𝙴 𝙲𝙾𝚁𝚁𝙴𝙲𝚃 𝚃𝙷𝙴 𝚂𝙿𝙴𝙻𝙻𝙸𝙽𝙶 𝙾𝚁 𝙲𝙷𝙴𝙲𝙺 𝚃𝙷𝙴 𝙻𝙸𝙽𝙺**") await m.edit("**dσwnlσαdíng чσur ѕσng...!**") try: with YoutubeDL(ydl_opts) as ydl: info_dict = ydl.extract_info(link, download=False) audio_file = ydl.prepare_filename(info_dict) ydl.process_info(info_dict) cap = "**BY›› [Mᴋɴ Bᴏᴛᴢ™](https://t.me/mkn_bots_updates)**" secmul, dur, dur_arr = 1, 0, duration.split(':') for i in range(len(dur_arr)-1, -1, -1): dur += (int(dur_arr[i]) * secmul) secmul *= 60 await message.reply_audio( audio_file, caption=cap, quote=False, title=title, duration=dur, performer=performer, thumb=thumb_name ) await m.delete() except Exception as e: await m.edit("**🚫 𝙴𝚁𝚁𝙾𝚁 🚫**") print(e) try: os.remove(audio_file) os.remove(thumb_name) except Exception as e: print(e) def get_text(message: Message) -> [None,str]: text_to_return = message.text if message.text is None: return None if " " not in text_to_return: return None try: return message.text.split(None, 1)[1] except IndexError: return None @Client.on_message(filters.command(["video", "mp4"])) async def vsong(client, message: Message): urlissed = get_text(message) pablo = await client.send_message(message.chat.id, f"**𝙵𝙸𝙽𝙳𝙸𝙽𝙶 𝚈𝙾𝚄𝚁 𝚅𝙸𝙳𝙴𝙾** `{urlissed}`") if not urlissed: return await pablo.edit("Invalid Command Syntax Please Check help Menu To Know More!") search = SearchVideos(f"{urlissed}", offset=1, mode="dict", max_results=1) mi = search.result() mio = mi["search_result"] mo = mio[0]["link"] thum = mio[0]["title"] fridayz = mio[0]["id"] mio[0]["channel"] kekme = f"https://img.youtube.com/vi/{fridayz}/hqdefault.jpg" await asyncio.sleep(0.6) url = mo sedlyf = wget.download(kekme) opts = { "format": "best", "addmetadata": True, "key": "FFmpegMetadata", "prefer_ffmpeg": True, "geo_bypass": True, "nocheckcertificate": True, "postprocessors": [{"key": "FFmpegVideoConvertor", "preferedformat": "mp4"}], "outtmpl": "%(id)s.mp4", "logtostderr": False, "quiet": True, } try: with YoutubeDL(opts) as ytdl: ytdl_data = ytdl.extract_info(url, download=True) except Exception as e: return await pablo.edit_text(f"**𝙳𝚘𝚠𝚗𝚕𝚘𝚊𝚍 𝙵𝚊𝚒𝚕𝚎𝚍 𝙿𝚕𝚎𝚊𝚜𝚎 𝚃𝚛𝚢 𝙰𝚐𝚊𝚒𝚗..♥️** \n**Error :** `{str(e)}`") file_stark = f"{ytdl_data['id']}.mp4" capy = f"""**𝚃𝙸𝚃𝙻𝙴 :** [{thum}]({mo})\n**𝚁𝙴𝚀𝚄𝙴𝚂𝚃𝙴𝙳 𝙱𝚈 :** {message.from_user.mention}""" await client.send_video( message.chat.id, video=open(file_stark, "rb"), duration=int(ytdl_data["duration"]), file_name=str(ytdl_data["title"]), thumb=sedlyf, caption=capy, supports_streaming=True, reply_to_message_id=message.id ) await pablo.delete() for files in (sedlyf, file_stark): if files and os.path.exists(files): os.remove(files)