import logging, os, re, asyncio, requests, aiohttp from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid from pyrogram.types import Message, InlineKeyboardButton from pyrogram import filters, enums from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORT_URL, SHORT_API from imdb import Cinemagoer from typing import Union, List from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) BTN_URL_REGEX = re.compile(r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))") BANNED = {} SMART_OPEN = '“' SMART_CLOSE = '”' START_CHAR = ('\'', '"', SMART_OPEN) # temp db for banned class temp(object): BANNED_USERS = [] BANNED_CHATS = [] CURRENT = 0 CANCEL = False MELCOW = {} U_NAME = None B_NAME = None SETTINGS = {} GP_BUTTONS = {} PM_BUTTONS = {} PM_SPELL = {} GP_SPELL = {} async def is_subscribed(bot, query): """ Check if a user is subscribed to a specific channel. """ logger.info(f"Checking if user {query.from_user.id} is subscribed to channel {AUTH_CHANNEL}.") try: user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id) logger.info(f"User {query.from_user.id} is subscribed to channel {AUTH_CHANNEL}.") except UserNotParticipant: logger.info(f"User {query.from_user.id} is not subscribed to channel {AUTH_CHANNEL}.") pass except Exception as e: logger.error(f"Error checking subscription for user {query.from_user.id}: {e}") print(e) else: if user.status != enums.ChatMemberStatus.BANNED: logger.info(f"User {query.from_user.id} is not banned in channel {AUTH_CHANNEL}.") return True logger.info(f"User {query.from_user.id} is not subscribed or is banned in channel {AUTH_CHANNEL}.") return False async def get_poster(query, bulk=False, id=False, file=None): """ Fetch movie details from IMDb. """ logger.info(f"Fetching poster for query: {query}, bulk: {bulk}, id: {id}, file: {file}.") imdb = Cinemagoer() if not id: query = (query.strip()).lower() title = query year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE) if year: year = list_to_str(year[:1]) title = (query.replace(year, "")).strip() elif file is not None: year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE) if year: year = list_to_str(year[:1]) else: year = None try: movieid = imdb.search_movie(title.lower(), results=10) logger.info(f"Found {len(movieid)} movie results for title: {title}.") except Exception as e: logger.error(f"Error searching movie: {e}") return None if not movieid: logger.info(f"No movie results found for title: {title}.") return None if year: filtered = list(filter(lambda k: str(k.get('year')) == str(year), movieid)) if not filtered: filtered = movieid else: filtered = movieid movieid = list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered)) if not movieid: movieid = filtered if bulk: logger.info(f"Returning bulk movie results: {filtered}.") return filtered movieid = movieid[0].movieID logger.info(f"Selected movie ID: {movieid}.") else: movieid = query logger.info(f"Using provided movie ID: {movieid}.") movie = imdb.get_movie(movieid) if movie.get("original air date"): date = movie["original air date"] elif movie.get("year"): date = movie.get("year") else: date = "N/A" plot = "" if not LONG_IMDB_DESCRIPTION: plot = movie.get('plot') if plot and len(plot) > 0: plot = plot[0] else: plot = movie.get('plot outline') if plot and len(plot) > 800: plot = plot[0:800] + "..." logger.info(f"Movie details fetched: {movie.get('title')}, Year: {date}, Plot: {plot}.") return { 'title': movie.get('title'), 'votes': movie.get('votes'), "aka": list_to_str(movie.get("akas")), "seasons": movie.get("number of seasons"), "box_office": movie.get('box office'), 'localized_title': movie.get('localized title'), 'kind': movie.get("kind"), "imdb_id": f"tt{movie.get('imdbID')}", "cast": list_to_str(movie.get("cast")), "runtime": list_to_str(movie.get("runtimes")), "countries": list_to_str(movie.get("countries")), "certificates": list_to_str(movie.get("certificates")), "languages": list_to_str(movie.get("languages")), "director": list_to_str(movie.get("director")), "writer": list_to_str(movie.get("writer")), "producer": list_to_str(movie.get("producer")), "composer": list_to_str(movie.get("composer")) , "cinematographer": list_to_str(movie.get("cinematographer")), "music_team": list_to_str(movie.get("music department")), "distributors": list_to_str(movie.get("distributors")), 'release_date': date, 'year': movie.get('year'), 'genres': list_to_str(movie.get("genres")), 'poster': movie.get('full-size cover url'), 'plot': plot, 'rating': str(movie.get("rating")), 'url': f'https://www.imdb.com/title/tt{movieid}' } def list_to_str(k): """ Convert a list to a comma-separated string. """ logger.debug(f"Converting list to string: {k}") if not k: logger.debug("List is empty, returning 'N/A'.") return "N/A" elif len(k) == 1: logger.debug(f"List has one element, returning: {k[0]}") return str(k[0]) elif MAX_LIST_ELM: k = k[:int(MAX_LIST_ELM)] result = ' '.join(f'{elem}, ' for elem in k) logger.debug(f"List truncated to {MAX_LIST_ELM} elements, returning: {result}") return result else: result = ' '.join(f'{elem}, ' for elem in k) logger.debug(f"Returning full list as string: {result}") return result __repo__ = "https://github.com/MrMKN/PROFESSOR-BOT" logger.info(f"Repository URL set to: {__repo__}") __version__ = "PROFESSOR-BOT ᴠ4.5.0" logger.info(f"Version set to: {__version__}") __license__ = "GNU GENERAL PUBLIC LICENSE V2" logger.info(f"License set to: {__license__}") __copyright__ = "Copyright (C) 2023-present MrMKN " logger.info(f"Copyright set to: {__copyright__}") async def search_gagala(text): """ Search Google for a given text. """ logger.info(f"Searching Google for text: {text}") usr_agent = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/61.0.3163.100 Safari/537.36' } text = text.replace(" ", '+') url = f'https://www.google.com/search?q={text}' try: response = requests.get(url, headers=usr_agent) response.raise_for_status() logger.info(f"Google search successful for text: {text}") except Exception as e: logger.error(f"Error searching Google: {e}") return [] soup = BeautifulSoup(response.text, 'html.parser') titles = soup.find_all('h3') result = [title.getText() for title in titles] logger.info(f"Found {len(result)} Google search results.") return result async def get_settings(group_id): """ Get settings for a specific group. """ logger.info(f"Getting settings for group ID: {group_id}") settings = temp.SETTINGS.get(group_id) if not settings: settings = await db.get_settings(group_id) temp.SETTINGS[group_id] = settings logger.info(f"Settings retrieved from database for group ID: {group_id}") else: logger.info(f"Settings retrieved from cache for group ID: {group_id}") return settings async def save_group_settings(group_id, key, value): """ Save settings for a specific group. """ logger.info(f"Saving setting '{key}' with value '{value}' for group ID: {group_id}") current = await get_settings(group_id) current[key] = value temp.SETTINGS[group_id] = current await db.update_settings(group_id, current) logger.info(f"Setting '{key}' saved for group ID: {group_id}") def get_size(size): """ Convert file size in bytes to a human-readable format. """ logger.debug(f"Converting size {size} bytes to human-readable format.") units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"] size = float(size) i = 0 while size >= 1024.0 and i < len(units): i += 1 size /= 1024.0 result = "%.2f %s" % (size, units[i]) logger.debug(f"Converted size to: {result}") return result def get_file_id(msg: Message): """ Extract file ID from a message. """ logger.debug(f"Extracting file ID from message: {msg.id}") if not msg.media: logger.debug("Message does not contain media.") return None for message_type in ("photo", "animation", "audio", "document", "video", "video_note", "voice", "sticker"): obj = getattr(msg, message_type) if obj: setattr(obj, "message_type", message_type) logger.debug(f"File ID extracted: {obj.file_id}, Type: {message_type}") return obj logger.debug("No file ID found in message.") return None def extract_user(message: Message) -> Union[int, str]: """ Extract user ID and first name from a message. """ logger.debug(f"Extracting user from message: {message.id}") user_id = None user_first_name = None if message.reply_to_message: user_id = message.reply_to_message.from_user.id user_first_name = message.reply_to_message.from_user.first_name logger.debug(f"User extracted from reply: ID: {user_id}, First Name: {user_first_name}") elif len(message.command) > 1: if (len(message.entities) > 1 and message.entities[1].type == enums.MessageEntityType.TEXT_MENTION): required_entity = message.entities[1] user_id = required_entity.user.id user_first_name = required_entity.user.first_name logger.debug(f"User extracted from text mention: ID: {user_id}, First Name: {user_first_name}") else: user_id = message.command[1] user_first_name = user_id try: user_id = int(user_id) logger.debug(f"User ID converted to integer: {user_id}") except ValueError: logger.debug("User ID conversion failed, keeping as string.") pass logger.debug(f"User extracted from command: ID: {user_id}, First Name: {user_first_name}") else: user_id = message.from_user.id user_first_name = message.from_user.first_name logger.debug(f"User extracted from sender: ID: {user_id}, First Name: {user_first_name}") return (user_id, user_first_name) def split_quotes(text: str) -> List: """ Split a quoted text into key and rest. """ logger.debug(f"Splitting quotes from text: {text}") if not any(text.startswith(char) for char in START_CHAR): key, rest = text.split(None, 1) logger.debug(f"Text does not start with quote, split into key: {key}, rest: {rest}") return [key, rest] counter = 1 # ignore first char -> is some kind of quote while counter < len(text): if text[counter] == "\\": counter += 1 elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE): break counter += 1 else: key, rest = text.split(None, 1) logger.debug(f"Text does not end with quote, split into key: {key}, rest: {rest}") return [key, rest] # 1 to avoid starting quote, and counter is exclusive so avoids ending key = remove_escapes(text[1:counter].strip()) # index will be in range, or `else` would have been executed and returned rest = text[counter + 1:].strip() if not key: key = text[0] + text[0] logger.debug(f"Text split into key: {key}, rest: {rest}") return list(filter(None, [key, rest])) def parser(text, keyword, cb_data): """ Parse button URLs and alerts from text. """ logger.debug(f"Parsing text for buttons and alerts: {text}") if "buttonalert" in text: text = (text.replace("\n", "\\n").replace("\t", "\\t")) buttons = [] note_data = "" prev = 0 i = 0 alerts = [] for match in BTN_URL_REGEX.finditer(text): n_escapes = 0 to_check = match.start(1) - 1 while to_check > 0 and text[to_check] == "\\": n_escapes += 1 to_check -= 1 # if even, not escaped -> create button if n_escapes % 2 == 0: note_data += text[prev:match.start(1)] prev = match.end(1) if match.group(3) == "buttonalert": # create a triple with button label, url, and newline status if bool(match.group(5)) and buttons: buttons[-1].append(InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")) else: buttons.append([InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")]) i += 1 alerts.append(match.group(4)) logger.debug(f"Button alert added: Label: {match.group(2)}, Callback Data: {cb_data}:{i}:{keyword}") elif bool(match.group(5)) and buttons: buttons[-1].append(InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))) logger.debug(f"URL button added: Label: {match.group(2)}, URL: {match.group(4)}") else: buttons.append([InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))]) logger.debug(f"URL button added: Label: {match.group(2)}, URL: {match.group(4)}") else: note_data += text[prev:to_check] prev = match.start(1) - 1 else: note_data += text[prev:] try: logger.debug(f"Parsed note data: {note_data}, buttons: {buttons}, alerts: {alerts}") return note_data, buttons, alerts except Exception as e: logger.error(f"Error parsing text: {e}") return note_data, buttons, None def remove_escapes(text: str) -> str: """ Remove escape characters from text. """ logger.debug(f"Removing escapes from text: {text}") res = "" is_escaped = False for counter in range(len(text)): if is_escaped: res += text[counter] is_escaped = False elif text[counter] == "\\": is_escaped = True else: res += text[counter] logger.debug(f"Escapes removed, resulting text: {res}") return res def humanbytes(size): """ Convert size in bytes to a human-readable format. """ logger.debug(f"Converting size {size} bytes to human-readable format.") if not size: logger.debug("Size is zero, returning empty string.") return "" power = 2**10 n = 0 Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'} while size > power: size /= power n += 1 result = str(round(size, 2)) + " " + Dic_powerN[n] + 'B' logger.debug(f"Converted size to: {result}") return result def get_time(seconds): """ Convert seconds to a human-readable time format. """ logger.debug(f"Converting {seconds} seconds to human-readable time format.") periods = [('ᴅ', 86400), ('ʜ', 3600), ('ᴍ', 60), ('ꜱ', 1)] result = '' for period_name, period_seconds in periods: if seconds >= period_seconds: period_value, seconds = divmod(seconds, period_seconds) result += f'{int(period_value)}{period_name}' logger.debug(f"Converted time to: {result}") return result async def get_shortlink(link): """ Generate a short link using a URL shortener service. """ logger.info(f"Generating short link for: {link}") url = f'{SHORT_URL}/api' params = {'api': SHORT_API, 'url': link} try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: data = await response.json() if data["status"] == "success": logger.info(f"Short link generated: {data['shortenedUrl']}") return data['shortenedUrl'] else: logger.error(f"Error generating short link: {data['message']}") return link except Exception as e: logger.error(f"Exception occurred while generating short link: {e}") return link # from Midukki-RoBoT def extract_time(time_val): """ Extract time from a time value string. """ logger.debug(f"Extracting time from value: {time_val}") if any(time_val.endswith(unit) for unit in ("s", "m", "h", "d")): unit = time_val[-1] time_num = time_val[:-1] # type: str if not time_num.isdigit(): logger.debug("Time value is not a digit, returning None.") return None if unit == "s": bantime = datetime.now() + timedelta(seconds=int(time_num)) elif unit == "m": bantime = datetime.now() + timedelta(minutes=int(time_num)) elif unit == "h": bantime = datetime.now() + timedelta(hours=int(time_num)) elif unit == "d": bantime = datetime.now() + timedelta(days=int(time_num)) else: logger.debug("Unknown time unit, returning None.") return None logger.debug(f"Extracted time: {bantime}") return bantime else: logger.debug("Time value does not end with valid unit, returning None.") return None async def admin_check(message: Message) -> bool: """ Check if a user is an admin in the chat. """ logger.debug(f"Checking if user {message.from_user.id} is an admin in chat {message.chat.id}.") if not message.from_user: logger.debug("Message does not contain a user, returning False.") return False if message.chat.type not in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: logger.debug("Chat type is not GROUP or SUPERGROUP, returning False.") return False if message.from_user.id in [777000, 1087968824]: logger.debug("User is a Telegram service account, returning True.") return True client = message._client chat_id = message.chat.id user_id = message.from_user.id try: check_status = await client.get_chat_member(chat_id=chat_id, user_id=user_id) logger.debug(f"User {user_id} status in chat {chat_id}: {check_status.status}") except Exception as e: logger.error(f"Error checking user status: {e}") return False admin_strings = [enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR] if check_status.status not in admin_strings: logger.debug(f"User {user_id} is not an admin, returning False.") return False else: logger.debug(f"User {user_id} is an admin, returning True.") return True async def admin_filter(filt, client, message): """ Filter for admin checks. """ logger.debug(f"Applying admin filter for message {message.id}.") return await admin_check(message)