Professor / plugins /admin_control.py
azils3's picture
Update plugins/admin_control.py
0a690cc verified
raw
history blame
24.4 kB
from pyrogram import Client, filters, enums
from pyrogram.errors import ChatAdminRequired
from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup, CallbackQuery
from pyrogram.errors.exceptions.bad_request_400 import MessageTooLong, PeerIdInvalid, UserNotParticipant, MediaEmpty, PhotoInvalidDimensions, WebpageMediaEmpty
from info import ADMINS, LOG_CHANNEL, SUPPORT_CHAT, WELCOM_PIC, WELCOM_TEXT, IMDB_TEMPLATE
from utils import get_size, temp, extract_user, get_file_id, get_poster, humanbytes
from database.users_chats_db import db
from database.ia_filterdb import Media
from datetime import datetime
from Script import script
import logging, re, asyncio, time, shutil, psutil, os, sys
logger = logging.getLogger(__name__)
logger.setLevel(logging.ERROR)
@Client.on_message(filters.command('leave') & filters.user(ADMINS))
async def leave_a_chat(bot, message):
"""
Command to make the bot leave a chat.
Only accessible to admins.
"""
logger.info(f"User {message.from_user.id} issued 'leave' command.")
if len(message.command) == 1:
logger.info("Command incomplete, no chat ID provided.")
return await message.reply('Gɪᴠᴇ Mᴇ A Cʜᴀᴛ Iᴅ')
chat = message.command[1]
try:
chat = int(chat)
except:
logger.info("Invalid chat ID provided.")
chat = chat
try:
buttons = [[InlineKeyboardButton('Sᴜᴩᴩᴏʀᴛ', url=f'https://t.me/{SUPPORT_CHAT}')]]
await bot.send_message(chat_id=chat, text='<b>Hᴇʟʟᴏ Fʀɪᴇɴᴅs, \nMʏ Aᴅᴍɪɴ Hᴀs Rᴇsᴛʀɪᴄᴛᴇᴅ Mᴇ Fʀᴏᴍ Wᴏʀᴋɪɴɢ Hᴇʀᴇ ! Iғ Yᴏᴜ Wᴀɴɴᴀ Aᴅᴅ Mᴇ Aɢᴀɪɴ Cᴏɴᴛᴀᴄᴛ Sᴜᴘᴘᴏʀᴛ</b>', reply_markup=InlineKeyboardMarkup(buttons))
logger.info(f"Bot left chat {chat} at user {message.from_user.id}'s request.")
try:
await k.pin()
except:
logger.info("Failed to pin message in chat {chat}.")
pass
return await bot.leave_chat(message.chat.id)
except Exception as e:
logger.error(f"Error leaving chat {chat}: {e}")
return await message.reply(f'Eʀʀᴏʀ: {e}')
@Client.on_message(filters.command('disable') & filters.user(ADMINS))
async def disable_chat(bot, message):
"""
Command to disable a chat.
Only accessible to admins.
"""
logger.info(f"User {message.from_user.id} issued 'disable' command.")
if len(message.command) == 1:
logger.info("Command incomplete, no chat ID provided.")
return await message.reply('Gɪᴠᴇ Mᴇ A Cʜᴀᴛ Iᴅ')
r = message.text.split(None)
if len(r) > 2:
reason = message.text.split(None, 2)[2]
chat = message.text.split(None, 2)[1]
else:
chat = message.command[1]
reason = "No Reason Provided"
try:
chat_ = int(chat)
except:
logger.info("Invalid chat ID provided.")
return await message.reply('Gɪᴠᴇ Mᴇ A Vᴀʟɪᴅ Cʜᴀᴛ ID')
cha_t = await db.get_chat(int(chat_))
if not cha_t:
logger.info(f"Chat {chat_} not found in DB.")
return await message.reply("Cʜᴀᴛ Nᴏᴛ Fᴏᴜɴᴅ Iɴ DB")
if cha_t['is_disabled']:
logger.info(f"Chat {chat_} is already disabled.")
return await message.reply(f"Tʜɪꜱ Cʜᴀᴛ Is Aʟʀᴇᴅʏ Dɪꜱᴀʙʟᴇᴅ:\nRᴇᴀꜱᴏɴ: <code> {cha_t['reason']} </code>")
await db.disable_chat(int(chat_), reason)
temp.BANNED_CHATS.append(int(chat_))
await message.reply('Cʜᴀᴛ Sᴜᴄᴄᴇꜱꜰᴜʟʟʏ Dɪꜱᴀʙʟᴇᴅ')
logger.info(f"Chat {chat_} disabled successfully.")
try:
buttons = [[InlineKeyboardButton('Sᴜᴩᴩᴏʀᴛ', url=f'https://t.me/{SUPPORT_CHAT}')]]
await bot.send_message(chat_id=chat_, text=f'<b>Hᴇʟʟᴏ Fʀɪᴇɴᴅs, \nᴍʏ Aᴅᴍɪɴ Hᴀs Tᴏʟᴅ Mᴇ Tᴏ Lᴇᴀᴠᴇ Fʀᴏᴍ Gʀᴏᴜᴘ Sᴏ I Gᴏ! Iғ Yᴏᴜ Wᴀɴɴᴀ Aᴅᴅ Mᴇ Aɢᴀɪɴ Cᴏɴᴛᴀᴄᴛ Mʏ Sᴜᴘᴘᴏʀᴛ Gʀᴏᴜᴘ.</b> \nRᴇᴀꜱᴏɴ : <code>{reason}</code>', reply_markup=InlineKeyboardMarkup(buttons))
await bot.leave_chat(chat_)
logger.info(f"Bot left chat {chat_} after disabling.")
except Exception as e:
logger.error(f"Error leaving chat {chat_}: {e}")
await message.reply(f"Eʀʀᴏʀ: {e}")
@Client.on_message(filters.command('enable') & filters.user(ADMINS))
async def re_enable_chat(bot, message):
"""
Command to re-enable a chat.
Only accessible to admins.
"""
logger.info(f"User {message.from_user.id} issued 'enable' command.")
if len(message.command) == 1:
logger.info("Command incomplete, no chat ID provided.")
return await message.reply('Gɪᴠᴇ Mᴇ A Cʜᴀᴛ Iᴅ')
chat = message.command[1]
try:
chat_ = int(chat)
except:
logger.info("Invalid chat ID provided.")
return await message.reply('Gɪᴠᴇ Mᴇ A Vᴀʟɪᴅ Cʜᴀᴛ ID')
sts = await db.get_chat(int(chat))
if not sts:
logger.info(f"Chat {chat_} not found in DB.")
return await message.reply("Cʜᴀᴛ Nᴏᴛ Fᴏᴜɴᴅ Iɴ DB")
if not sts.get('is_disabled'):
logger.info(f"Chat {chat_} is not disabled.")
return await message.reply('Tʜɪꜱ Cʜᴀᴛ Iꜱ Nᴏᴛ Yᴇᴛ Dɪꜱᴀʙʟᴇᴅ')
await db.re_enable_chat(int(chat_))
temp.BANNED_CHATS.remove(int(chat_))
await message.reply("Cʜᴀᴛ Sᴜᴄᴄᴇꜱꜰᴜʟʟʏ Rᴇ-Eɴᴀʙʟᴇᴅ")
logger.info(f"Chat {chat_} re-enabled successfully.")
@Client.on_message(filters.command('stats') & filters.incoming)
async def get_ststs(bot, message):
"""
Command to get bot statistics.
Accessible to both admins and users.
"""
logger.info(f"User {message.from_user.id} issued 'stats' command.")
rju = await message.reply('<b>Pʟᴇᴀꜱᴇ Wᴀɪᴛ...</b>')
total_users = await db.total_users_count()
totl_chats = await db.total_chat_count()
files = await Media.count_documents()
size = await db.get_db_size()
free = 536870912 - size
size = get_size(size)
free = get_size(free)
await rju.edit(script.STATUS_TXT.format(files, total_users, totl_chats, size, free))
logger.info(f"Bot statistics sent to user {message.from_user.id}.")
@Client.on_message(filters.command('invite') & filters.user(ADMINS))
async def gen_invite(bot, message):
"""
Command to generate an invite link for a chat.
Only accessible to admins.
"""
logger.info(f"User {message.from_user.id} issued 'invite' command.")
if len(message.command) == 1:
logger.info("Command incomplete, no chat ID provided.")
return await message.reply('Gɪᴠᴇ Mᴇ A Cʜᴀᴛ Iᴅ')
chat = message.command[1]
try:
chat_ = int(chat)
except:
logger.info("Invalid chat ID provided.")
return await message.reply('Gɪᴠᴇ Mᴇ A Vᴀʟɪᴅ Cʜᴀᴛ ID')
try:
link = await bot.create_chat_invite_link(chat_)
logger.info(f"Invite link generated for chat {chat_}.")
except ChatAdminRequired:
logger.error(f"Bot is not an admin in chat {chat_}.")
return await message.reply("Iɴᴠɪᴛᴇ Lɪɴᴋ Gᴇɴᴇʀᴀᴛɪᴏɴ Fᴀɪʟᴇᴅ, Iᴀᴍ Nᴏᴛ Hᴀᴠɪɴɢ Sᴜғғɪᴄɪᴇɴᴛ Rɪɢʜᴛꜱ")
except Exception as e:
logger.error(f"Error generating invite link for chat {chat_}: {e}")
return await message.reply(f'Eʀʀᴏʀ: {e}')
await message.reply(f'Hᴇʀᴇ Iꜱ Yᴏᴜʀ Iɴᴠɪᴛᴇ Lɪɴᴋ: {link.invite_link}')
logger.info(f"Invite link sent to user {message.from_user.id}.")
@Client.on_message(filters.command('ban_user') & filters.user(ADMINS))
async def ban_a_user(bot, message):
"""
Command to ban a user.
Only accessible to admins.
"""
logger.info(f"User {message.from_user.id} issued 'ban_user' command.")
if len(message.command) == 1:
logger.info("Command incomplete, no user ID provided.")
return await message.reply('Gɪᴠᴇ Mᴇ A Uꜱᴇʀ Iᴅ / Uꜱᴇʀɴᴀᴍᴇ')
r = message.text.split(None)
if len(r) > 2:
reason = message.text.split(None, 2)[2]
chat = message.text.split(None, 2)[1]
else:
chat = message.command[1]
reason = "No reason Provided"
try:
chat = int(chat)
except:
pass
try:
k = await bot.get_users(chat)
logger.info(f"User {chat} found: {k.mention}.")
except PeerIdInvalid:
logger.error(f"Peer ID invalid for user {chat}.")
return await message.reply("Tʜɪs Is Aɴ Iɴᴠᴀʟɪᴅ Usᴇʀ, Mᴀᴋᴇ Sᴜʀᴇ Iᴀ Hᴀᴠᴇ Mᴇᴛ Hɪᴍ Bᴇғᴏʀᴇ")
except IndexError:
logger.error(f"Index error for user {chat}.")
return await message.reply("Tʜɪs Mɪɢʜᴛ Bᴇ A Cʜᴀɴɴᴇʟ, Mᴀᴋᴇ Sᴜʀᴇ Iᴛs A Usᴇʀ.")
except Exception as e:
logger.error(f"Error getting user {chat}: {e}")
return await message.reply(f'Eʀʀᴏʀ: {e}')
else:
jar = await db.get_ban_status(k.id)
if jar['is_banned']:
logger.info(f"User {k.id} is already banned.")
return await message.reply(f"{k.mention} Iꜱ Aʟʀᴇᴅʏ Bᴀɴɴᴇᴅ\nRᴇᴀꜱᴏɴ: {jar['ban_reason']}")
await db.ban_user(k.id, reason)
temp.BANNED_USERS.append(k.id)
await message.reply(f"Sᴜᴄᴄᴇꜱꜰᴜʟʟʏ Bᴀɴɴᴇᴅ {k.mention}")
logger.info(f"User {k.id} banned successfully by user {message.from_user.id}.")
@Client.on_message(filters.command('unban_user') & filters.user(ADMINS))
async def unban_a_user(bot, message):
"""
Command to unban a user.
Only accessible to admins.
"""
logger.info(f"User {message.from_user.id} issued 'unban_user' command.")
if len(message.command) == 1:
logger.info("Command incomplete, no user ID provided.")
return await message.reply('Gɪᴠᴇ Mᴇ A Uꜱᴇʀ Iᴅ / Uꜱᴇʀɴᴀᴍᴇ')
r = message.text.split(None)
if len(r) > 2:
reason = message.text.split(None, 2)[2]
chat = message.text.split(None, 2)[1]
else:
chat = message.command[1]
reason = "No reason Provided"
try:
chat = int(chat)
except:
pass
try:
k = await bot.get_users(chat)
logger.info(f"User {chat} found: {k.mention}.")
except PeerIdInvalid:
logger.error(f"Peer ID invalid for user {chat}.")
return await message.reply("Tʜɪs Is Aɴ Iɴᴠᴀʟɪᴅ Usᴇʀ, Mᴀᴋᴇ Sᴜʀᴇ Iᴀ Hᴀᴠᴇ Mᴇᴛ Hɪᴍ Bᴇғᴏʀᴇ")
except IndexError:
logger.error(f"Index error for user {chat}.")
return await message.reply("Tʜɪs Mɪɢʜᴛ Bᴇ A Cʜᴀɴɴᴇʟ, Mᴀᴋᴇ Sᴜʀᴇ Iᴛs A Usᴇʀ.")
except Exception as e:
logger.error(f"Error getting user {chat}: {e}")
return await message.reply(f'Eʀʀᴏʀ: {e}')
else:
jar = await db.get_ban_status(k.id)
if not jar['is_banned']:
logger.info(f"User {k.id} is not banned.")
return await message.reply(f"{k.mention} Iꜱ Nᴏᴛ Yᴇᴛ Bᴀɴɴᴇᴅ")
await db.remove_ban(k.id)
temp.BANNED_USERS.remove(k.id)
await message.reply(f"Sᴜᴄᴄᴇꜱꜰᴜʟʟʏ Uɴʙᴀɴɴᴇᴅ {k.mention}")
logger.info(f"User {k.id} unbanned successfully by user {message.from_user.id}.")
@Client.on_message(filters.command('users') & filters.user(ADMINS))
async def list_users(bot, message):
"""
Command to list all users.
Only accessible to admins.
"""
logger.info(f"User {message.from_user.id} issued 'users' command.")
sps = await message.reply('Gᴇᴛᴛɪɴɢ Lɪꜱᴛ Oꜰ Uꜱᴇʀꜱ')
users = await db.get_all_users()
out = "Uꜱᴇʀꜱ Sᴀᴠᴇᴅ Iɴ DB Aʀᴇ:\n\n"
async for user in users:
out += f"<a href=tg://user?id={user['id']}>{user['name']}</a>\n"
try:
await sps.edit_text(out)
logger.info(f"User list sent to user {message.from_user.id}.")
except MessageTooLong:
with open('users.txt', 'w+') as outfile:
outfile.write(out)
await message.reply_document('users.txt', caption="Lɪꜱᴛ Oꜰ Uꜱᴇʀꜱ")
logger.info(f"User list too long, sent as document to user {message.from_user.id}.")
os.remove('users.txt')
@Client.on_message(filters.command('chats') & filters.user(ADMINS))
async def list_chats(bot, message):
"""
Command to list all chats.
Only accessible to admins.
"""
logger.info(f"User {message.from_user.id} issued 'chats' command.")
sps = await message.reply('Gᴇᴛᴛɪɴɢ Lɪꜱᴛ Oꜰ Cʜᴀᴛꜱ')
chats = await db.get_all_chats()
out = "Cʜᴀᴛꜱ Sᴀᴠᴇᴅ Iɴ DB Aʀᴇ:\n\n"
async for chat in chats:
username = chat['username']
username = "private" if not username else "@" + username
out += f"**- Tɪᴛʟᴇ:** `{chat['title']}`\n**- ID:** `{chat['id']}`\n**Uꜱᴇʀɴᴀᴍᴇ:** {username}\n"
try:
await sps.edit_text(out)
logger.info(f"Chat list sent to user {message.from_user.id}.")
except MessageTooLong:
with open('chats.txt', 'w+') as outfile:
outfile.write(out)
await message.reply_document('chats.txt', caption="Lɪꜱᴛ Oꜰ Cʜᴀᴛꜱ")
logger.info(f"Chat list too long, sent as document to user {message.from_user.id}.")
os.remove('chats.txt')
@Client.on_message(filters.command(["id"]))
async def show_id(client, message):
"""
Command to show user/group ID.
Accessible to both admins and users.
"""
logger.info(f"User {message.from_user.id} issued 'id' command.")
chat_type = message.chat.type
if chat_type == enums.ChatType.PRIVATE:
user_id = message.chat.id
first = message.from_user.first_name
last = message.from_user.last_name or ""
username = message.from_user.username
dc_id = message.from_user.dc_id or ""
await message.reply_text(f"<b>➲ ꜰɪʀꜱᴛ ɴᴀᴍᴇ:</b> {first}\n<b>➲ ʟᴀꜱᴛ ɴᴀᴍᴇ:</b> {last}\n<b>➲ ᴜꜱᴇʀɴᴀᴍᴇ:</b> {username}\n<b>➲ ᴛᴇʟᴇɢʀᴀᴍ ɪᴅ:</b> <code>{user_id}</code>\n<b>➲ ᴅᴄ ɪᴅ:</b> <code>{dc_id}</code>", quote=True)
logger.info(f"ID information sent to user {message.from_user.id}.")
elif chat_type in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]:
_id = ""
_id += f"<b>➲ ᴄʜᴀᴛ ɪᴅ</b>: <code>{message.chat.id}</code>\n"
if message.reply_to_message:
_id += (
"<b>➲ ᴜꜱᴇʀ ɪᴅ</b>: "
f"<code>{message.from_user.id if message.from_user else 'Anonymous'}</code>\n"
"<b>➲ ʀᴇᴩʟɪᴇᴅ ᴜꜱᴇʀ ɪᴅ</b>: "
f"<code>{message.reply_to_message.from_user.id if message.reply_to_message.from_user else 'Anonymous'}</code>\n"
)
file_info = get_file_id(message.reply_to_message)
else:
_id += (
"<b>➲ ᴜꜱᴇʀ ɪᴅ</b>: "
f"<code>{message.from_user.id if message.from_user else 'Anonymous'}</code>\n"
)
file_info = get_file_id(message)
if file_info:
_id += (
f"<b>{file_info.message_type}</b>: "
f"<code>{file_info.file_id}</code>\n"
)
await message.reply_text(_id, quote=True)
logger.info(f"ID information sent to user {message.from_user.id} in chat {message.chat.id}.")
@Client.on_message(filters.command(["info"]))
async def user_info(client, message):
"""
Command to get detailed user information.
Accessible to both admins and users.
"""
logger.info(f"User {message.from_user.id} issued 'info' command.")
status_message = await message.reply_text("`ᴩʟᴇᴀꜱᴇ ᴡᴀɪᴛ....`")
from_user = None
from_user_id, _ = extract_user(message)
try:
from_user = await client.get_users(from_user_id)
logger.info(f"User {from_user_id} found: {from_user.first_name}.")
except Exception as error:
logger.error(f"Error getting user {from_user_id}: {error}")
return await status_message.edit(str(error))
if from_user is None:
logger.info(f"User {from_user_id} not found.")
return await status_message.edit("ɴᴏ ᴠᴀʟɪᴅ ᴜꜱᴇʀ_ɪᴅ / ᴍᴇꜱꜱᴀɢᴇ sᴘᴇᴄɪꜰɪᴇᴅ")
message_out_str = ""
message_out_str += f"<b>➲ꜰɪʀꜱᴛ ɴᴀᴍᴇ:</b> {from_user.first_name}\n"
last_name = from_user.last_name or "<b>ɴᴏɴᴇ</b>"
message_out_str += f"<b>➲ʟᴀꜱᴛ ɴᴀᴍᴇ:</b> {last_name}\n"
message_out_str += f"<b>➲ᴛɢ-ɪᴅ:</b> <code>{from_user.id}</code>\n"
username = from_user.username or "<b>ɴᴏɴᴇ</b>"
dc_id = from_user.dc_id or "[ᴜꜱᴇʀ ᴅᴏꜱᴇ'ᴛ ʜᴀᴠᴇ ᴀ ᴠᴀʟɪᴅ ᴅᴩ]"
message_out_str += f"<b>➲ᴅᴄ-ɪᴅ:</b> <code>{dc_id}</code>\n"
message_out_str += f"<b>➲ᴜꜱᴇʀɴᴀᴍᴇ:</b> @{username}\n"
message_out_str += f"<b>➲ᴜꜱᴇʀ ʟɪɴᴋ:</b> <a href='tg://user?id={from_user.id}'><b>ᴄʟɪᴄᴋ ʜᴇʀᴇ</b></a>\n"
if message.chat.type in ((enums.ChatType.SUPERGROUP, enums.ChatType.CHANNEL)):
try:
chat_member_p = await message.chat.get_member(from_user_id)
joined_date = (chat_member_p.joined_date or datetime.now()).strftime("%Y.%m.%d %H:%M:%S")
message_out_str += f"<b>➲ᴊᴏɪɴᴇᴅ ᴛʜɪꜱ ᴄʜᴀᴛ ᴏɴ:</b> <code>{joined_date}</code>\n"
logger.info(f"User {from_user_id} joined chat {message.chat.id} on {joined_date}.")
except UserNotParticipant:
logger.info(f"User {from_user_id} is not a participant in chat {message.chat.id}.")
pass
chat_photo = from_user.photo
if chat_photo:
local_user_photo = await client.download_media(message=chat_photo.big_file_id)
buttons = [[InlineKeyboardButton('ᴄʟᴏꜱᴇ ✘', callback_data='close_data')]]
await message.reply_photo(
photo=local_user_photo,
quote=True,
reply_markup=InlineKeyboardMarkup(buttons),
caption=message_out_str,
parse_mode=enums.ParseMode.HTML,
disable_notification=True
)
os.remove(local_user_photo)
logger.info(f"User photo sent to user {message.from_user.id}.")
else:
buttons = [[InlineKeyboardButton('ᴄʟᴏꜱᴇ ✘', callback_data='close_data')]]
await message.reply_text(
text=message_out_str,
reply_markup=InlineKeyboardMarkup(buttons),
quote=True,
parse_mode=enums.ParseMode.HTML,
disable_notification=True
)
logger.info(f"User information sent to user {message.from_user.id}.")
@Client.on_message(filters.command(["imdb", 'search']))
async def imdb_search(client, message):
"""
Command to search for a movie on IMDb.
Accessible to both admins and users.
"""
logger.info(f"User {message.from_user.id} issued 'imdb' or 'search' command.")
if ' ' in message.text:
k = await message.reply('ꜱᴇᴀʀᴄʜɪɴɢ ɪᴍᴅʙ..')
r, title = message.text.split(None, 1)
movies = await get_poster(title, bulk=True)
if not movies:
logger.info(f"No IMDb results found for title: {title}.")
return await message.reply("ɴᴏ ʀᴇꜱᴜʟᴛ ꜰᴏᴜɴᴅ")
btn = [[InlineKeyboardButton(f"{movie.get('title')} - {movie.get('year')}", callback_data=f"imdb#{movie.movieID}")] for movie in movies ]
await k.edit('Hᴇʀᴇ Is Wʜᴀᴛ I Fᴏᴜɴᴅ Oɴ Iᴍᴅʙ', reply_markup=InlineKeyboardMarkup(btn))
logger.info(f"IMDb results found and sent to user {message.from_user.id}.")
else:
logger.info(f"Command 'imdb' or 'search' issued without a title.")
await message.reply('Gɪᴠᴇ Mᴇ A Mᴏᴠɪᴇ / Sᴇʀɪᴇꜱ Nᴀᴍᴇ')
@Client.on_callback_query(filters.regex('^imdb'))
async def imdb_callback(bot: Client, quer_y: CallbackQuery):
"""
Callback to handle IMDb search results.
"""
logger.info(f"Callback query received for IMDb: {quer_y.data}")
i, movie = quer_y.data.split('#')
imdb = await get_poster(query=movie, id=True)
btn = [[InlineKeyboardButton(f"{imdb.get('title')}", url=imdb['url'])]]
message = quer_y.message.reply_to_message or quer_y.message
if imdb:
caption = IMDB_TEMPLATE.format(
query = imdb['title'],
title = imdb['title'],
votes = imdb['votes'],
aka = imdb["aka"],
seasons = imdb["seasons"],
box_office = imdb['box_office'],
localized_title = imdb['localized_title'],
kind = imdb['kind'],
imdb_id = imdb["imdb_id"],
cast = imdb["cast"],
runtime = imdb["runtime"],
countries = imdb["countries"],
certificates = imdb["certificates"],
languages = imdb["languages"],
director = imdb["director"],
writer = imdb["writer"],
producer = imdb["producer"],
composer = imdb["composer"],
cinematographer = imdb["cinematographer"],
music_team = imdb["music_team"],
distributors = imdb["distributors"],
release_date = imdb['release_date'],
year = imdb['year'],
genres = imdb['genres'],
poster = imdb['poster'],
plot = imdb['plot'],
rating = imdb['rating'],
url = imdb['url'],
**locals()
)
logger.info(f"IMDb caption generated for movie: {imdb['title']}.")
else:
caption = "ɴᴏ ʀᴇꜱᴜʟᴛꜱ"
logger.info(f"No IMDb results found for movie ID: {movie}.")
if imdb.get('poster'):
try:
await quer_y.message.reply_photo(photo=imdb['poster'], caption=caption, reply_markup=InlineKeyboardMarkup(btn))
logger.info(f"IMDb poster sent to user {quer_y.from_user.id}.")
except (MediaEmpty, PhotoInvalidDimensions, WebpageMediaEmpty):
pic = imdb.get('poster')
poster = pic.replace('.jpg', "._V1_UX360.jpg")
await quer_y.message.reply_photo(photo=poster, caption=caption, reply_markup=InlineKeyboardMarkup(btn))
logger.info(f"IMDb poster (fallback) sent to user {quer_y.from_user.id}.")
except Exception as e:
logger.error(f"Error sending IMDb poster: {e}")
await quer_y.message.reply(caption, reply_markup=InlineKeyboardMarkup(btn), disable_web_page_preview=False)
logger.info(f"IMDb caption sent to user {quer_y.from_user.id}.")
await quer_y.message.delete()
logger.info(f"Original message deleted after sending IMDb results.")
else:
await quer_y.message.edit(caption, reply_markup=InlineKeyboardMarkup(btn), disable_web_page_preview=False)
logger.info(f"IMDb caption edited in message for user {quer_y.from_user.id}.")
@Client.on_message(filters.command('logs') & filters.user(ADMINS))
async def log_file(bot, msg):
"""
Command to send bot logs.
Only accessible to admins.
"""
logger.info(f"User {msg.from_user.id} issued 'logs' command.")
try:
await msg.reply_document('BotLog.txt')
logger.info(f"Bot logs sent to user {msg.from_user.id}.")
except Exception as e:
logger.error(f"Error sending bot logs: {e}")
await msg.reply(str(e))
@Client.on_message(filters.command("restart") & filters.user(ADMINS))
async def restart_bot(bot, msg):
"""
Command to restart the bot.
Only accessible to admins.
"""
logger.info(f"User {msg.from_user.id} issued 'restart' command.")
await msg.reply("Rᴇꜱᴛᴀᴛɪɴɢ........")
await asyncio.sleep(2)
await sts.delete()
os.execl(sys.executable, sys.executable, *sys.argv)
logger.info(f"Bot restarted by user {msg.from_user.id}.")