import logging import os import re import asyncio import requests import aiohttp from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid from pyrogram.types import Message, InlineKeyboardButton from pyrogram import filters, enums from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORT_URL, SHORT_API from imdb import Cinemagoer from typing import Union, List from datetime import datetime, timedelta from database.users_chats_db import db from bs4 import BeautifulSoup logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logger.info("Imported necessary modules.") BTN_URL_REGEX = re.compile(r"($$([^\[]+?)$$$$(buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?$$)") logger.info("Defined regex for button URLs.") SMART_OPEN = '“' SMART_CLOSE = '”' START_CHAR = ('\'', '"', SMART_OPEN) logger.info("Defined smart quote constants.") class temp(object): BANNED_USERS = [] BANNED_CHATS = [] CURRENT = 0 CANCEL = False MELCOW = {} U_NAME = None B_NAME = None SETTINGS = {} GP_BUTTONS = {} PM_BUTTONS = {} PM_SPELL = {} GP_SPELL = {} logger.info("Initialized temporary object for caching data.") async def is_subscribed(bot, query): logger.debug(f"Checking subscription for user {query.from_user.id} in channel {AUTH_CHANNEL}.") try: user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id) except UserNotParticipant: logger.debug(f"User {query.from_user.id} is not a participant in {AUTH_CHANNEL}.") pass except Exception as e: logger.error(f"Error in is_subscribed: {e}") else: if user.status != enums.ChatMemberStatus.BANNED: logger.debug(f"User {query.from_user.id} is subscribed and not banned in {AUTH_CHANNEL}.") return True logger.debug(f"User {query.from_user.id} is either not subscribed or banned in {AUTH_CHANNEL}.") return False async def get_poster(query, bulk=False, id=False, file=None): logger.debug(f"Fetching poster for query: {query}, bulk: {bulk}, id: {id}, file: {file}") imdb = Cinemagoer() if not id: query = (query.strip()).lower() title = query year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE) if year: year = list_to_str(year[:1]) title = (query.replace(year, "")).strip() elif file is not None: year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE) if year: year = list_to_str(year[:1]) else: year = None logger.debug(f"Searching IMDb for title: {title}, year: {year}") try: movieid = imdb.search_movie(title.lower(), results=10) except Exception as e: logger.error(f"Error searching IMDb: {e}") return None if not movieid: logger.debug("No movies found.") return None if year: logger.debug(f"Filtering movies by year: {year}") filtered = list(filter(lambda k: str(k.get('year')) == str(year), movieid)) if not filtered: logger.debug("No movies found for specified year. Using all results.") filtered = movieid else: filtered = movieid movieid = list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered)) if not movieid: logger.debug("No movies or TV series found. Using all results.") movieid = filtered if bulk: logger.debug("Returning bulk movie IDs.") return movieid movieid = movieid[0].movieID else: movieid = query logger.debug(f"Fetching movie data for ID: {movieid}") movie = imdb.get_movie(movieid) if movie.get("original air date"): date = movie["original air date"] elif movie.get("year"): date = movie.get("year") else: date = "N/A" plot = "" if not LONG_IMDB_DESCRIPTION: plot = movie.get('plot') if plot and len(plot) > 0: plot = plot[0] else: plot = movie.get('plot outline') if plot and len(plot) > 800: plot = plot[0:800] + "..." logger.debug("Constructing poster dictionary.") return { 'title': movie.get('title'), 'votes': movie.get('votes'), "aka": list_to_str(movie.get("akas")), "seasons": movie.get("number of seasons"), "box_office": movie.get('box office'), 'localized_title': movie.get('localized title'), 'kind': movie.get("kind"), "imdb_id": f"tt{movie.get('imdbID')}", "cast": list_to_str(movie.get("cast")), "runtime": list_to_str(movie.get("runtimes")), "countries": list_to_str(movie.get("countries")), "certificates": list_to_str(movie.get("certificates")), "languages": list_to_str(movie.get("languages")), "director": list_to_str(movie.get("director")), "writer": list_to_str(movie.get("writer")), "producer": list_to_str(movie.get("producer")), "composer": list_to_str(movie.get("composer")), "cinematographer": list_to_str(movie.get("cinematographer")), "music_team": list_to_str(movie.get("music department")), "distributors": list_to_str(movie.get("distributors")), 'release_date': date, 'year': movie.get('year'), 'genres': list_to_str(movie.get("genres")), 'poster': movie.get('full-size cover url'), 'plot': plot, 'rating': str(movie.get("rating")), 'url': f'https://www.imdb.com/title/tt{movieid}' } def list_to_str(k): logger.debug(f"Converting list to string: {k}") if not k: return "N/A" elif len(k) == 1: return str(k[0]) elif MAX_LIST_ELM: k = k[:int(MAX_LIST_ELM)] result = ' '.join(f'{elem}, ' for elem in k) logger.debug(f"Trimmed and joined list: {result}") return result else: result = ' '.join(f'{elem}, ' for elem in k) logger.debug(f"Joined list: {result}") return result __repo__ = "https://github.com/MrMKN/PROFESSOR-BOT" __version__ = "PROFESSOR-BOT ᴠ4.5.0" __license__ = "GNU GENERAL PUBLIC LICENSE V2" __copyright__ = "Copyright (C) 2023-present MrMKN " logger.info(f"Repository: {__repo__}") logger.info(f"Version: {__version__}") logger.info(f"License: {__license__}") logger.info(f"Copyright: {__copyright__}") async def search_gagala(text): logger.debug(f"Performing Google search for: {text}") usr_agent = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36' } text = text.replace(" ", '+') url = f'https://www.google.com/search?q={text}' logger.debug(f"Sending GET request to: {url}") try: response = requests.get(url, headers=usr_agent) response.raise_for_status() except requests.RequestException as e: logger.error(f"Error in search_gagala: {e}") return [] soup = BeautifulSoup(response.text, 'html.parser') titles = soup.find_all('h3') results = [title.getText() for title in titles] logger.debug(f"Found {len(results)} search results.") return results async def get_settings(group_id): logger.debug(f"Getting settings for group ID: {group_id}") settings = temp.SETTINGS.get(group_id) if not settings: logger.debug("Settings not found in cache. Retrieving from database.") settings = await db.get_settings(group_id) temp.SETTINGS[group_id] = settings else: logger.debug("Settings found in cache.") return settings async def save_group_settings(group_id, key, value): logger.debug(f"Saving group settings for group ID: {group_id}, key: {key}, value: {value}") current = await get_settings(group_id) current[key] = value temp.SETTINGS[group_id] = current await db.update_settings(group_id, current) logger.debug("Settings updated successfully.") def get_size(size): logger.debug(f"Converting size: {size}") units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"] size = float(size) i = 0 while size >= 1024.0 and i < len(units): i += 1 size /= 1024.0 result = "%.2f %s" % (size, units[i]) logger.debug(f"Converted size: {result}") return result def get_file_id(msg: Message): if not msg.media: logger.debug("Message has no media.") return None for message_type in ("photo", "animation", "audio", "document", "video", "video_note", "voice", "sticker"): obj = getattr(msg, message_type) if obj: logger.debug(f"Found media type: {message_type}") setattr(obj, "message_type", message_type) return obj logger.debug("No supported media type found.") return None def extract_user(message: Message) -> Union[int, str]: logger.debug("Extracting user information.") user_id = None user_first_name = None if message.reply_to_message: logger.debug("Extracting user from replied message.") user_id = message.reply_to_message.from_user.id user_first_name = message.reply_to_message.from_user.first_name elif len(message.command) > 1: logger.debug("Extracting user from command arguments.") if (len(message.entities) > 1 and message.entities[1].type == enums.MessageEntityType.TEXT_MENTION): required_entity = message.entities[1] user_id = required_entity.user.id user_first_name = required_entity.user.first_name else: user_id = message.command[1] user_first_name = user_id try: user_id = int(user_id) except ValueError: logger.warning("User ID is not an integer.") pass else: logger.debug("Extracting user from message sender.") user_id = message.from_user.id user_first_name = message.from_user.first_name logger.debug(f"Extracted user ID: {user_id}, first name: {user_first_name}") return (user_id, user_first_name) def split_quotes(text: str) -> List: logger.debug(f"Splitting quotes in text: {text}") if not any(text.startswith(char) for char in START_CHAR): logger.debug("No starting quote found. Splitting by first space.") parts = text.split(None, 1) logger.debug(f"Split parts: {parts}") return parts counter = 1 # ignore first char -> is some kind of quote while counter < len(text): if text[counter] == "\\": logger.debug("Escaped character found.") counter += 1 elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE): logger.debug("Ending quote found.") break counter += 1 else: logger.debug("No ending quote found. Splitting by first space.") parts = text.split(None, 1) logger.debug(f"Split parts: {parts}") return parts # 1 to avoid starting quote, and counter is exclusive so avoids ending key = remove_escapes(text[1:counter].strip()) # index will be in range, or `else` would have been executed and returned rest = text[counter + 1:].strip() if not key: key = text[0] + text[0] result = list(filter(None, [key, rest])) logger.debug(f"Split parts: {result}") return result def parser(text, keyword, cb_data): logger.debug(f"Parsing text: {text}, keyword: {keyword}, cb_data: {cb_data}") if "buttonalert" in text: logger.debug("Found 'buttonalert' in text. Replacing newlines and tabs.") text = (text.replace("\n", "\\n").replace("\t", "\\t")) buttons = [] note_data = "" prev = 0 i = 0 alerts = [] for match in BTN_URL_REGEX.finditer(text): n_escapes = 0 to_check = match.start(1) - 1 while to_check > 0 and text[to_check] == "\\": n_escapes += 1 to_check -= 1 # if even, not escaped -> create button if n_escapes % 2 == 0: note_data += text[prev:match.start(1)] prev = match.end(1) if match.group(3) == "buttonalert": logger.debug(f"Found buttonalert: {match.group(2)}, {match.group(4)}, same_line: {bool(match.group(5))}") if bool(match.group(5)) and buttons: buttons[-1].append(InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")) else: buttons.append([InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")]) i += 1 alerts.append(match.group(4)) elif bool(match.group(5)) and buttons: logger.debug(f"Found URL button: {match.group(2)}, {match.group(4)}, same_line: {bool(match.group(5))}") buttons[-1].append(InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))) else: logger.debug(f"Found URL button: {match.group(2)}, {match.group(4)}, new line.") buttons.append([InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))]) else: note_data += text[prev:to_check] prev = match.start(1) - 1 else: note_data += text[prev:] try: logger.debug(f"Parsed note_data: {note_data}, buttons: {buttons}, alerts: {alerts}") return note_data, buttons, alerts except Exception as e: logger.error(f"Error in parser: {e}") return note_data, buttons, None def remove_escapes(text: str) -> str: logger.debug(f"Removing escapes from text: {text}") res = "" is_escaped = False for counter in range(len(text)): if is_escaped: res += text[counter] is_escaped = False elif text[counter] == "\\": is_escaped = True else: res += text[counter] logger.debug(f"Text after removing escapes: {res}") return res def humanbytes(size): logger.debug(f"Converting size to human-readable: {size}") if not size: logger.debug("Size is zero or empty.") return "" power = 2**10 n = 0 Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'} while size > power: size /= power n += 1 result = str(round(size, 2)) + " " + Dic_powerN[n] + 'B' logger.debug(f"Human-readable size: {result}") return result def get_time(seconds): logger.debug(f"Formatting time: {seconds} seconds") periods = [('ᴅ', 86400), ('ʜ', 3600), ('ᴍ', 60), ('ꜱ', 1)] result = '' for period_name, period_seconds in periods: if seconds >= period_seconds: period_value, seconds = divmod(seconds, period_seconds) result += f'{int(period_value)}{period_name}' logger.debug(f"Formatted time: {result}") return result async def get_shortlink(link): logger.debug(f"Generating shortlink for: {link}") url = f'{SHORT_URL}/api' params = {'api': SHORT_API, 'url': link} try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params, raise_for_status=True, ssl=False) as response: data = await response.json() if data["status"] == "success": shortened_url = data['shortenedUrl'] logger.debug(f"Shortlink generated: {shortened_url}") return shortened_url else: logger.error(f"Error generating shortlink: {data['message']}") return link except Exception as e: logger.error(f"Exception in get_shortlink: {e}") return link def extract_time(time_val): logger.debug(f"Extracting time from: {time_val}") if any(time_val.endswith(unit) for unit in ("s", "m", "h", "d")): unit = time_val[-1] time_num = time_val[:-1] # type: str if not time_num.isdigit(): logger.debug("Time number is not a digit.") return None if unit == "s": bantime = datetime.now() + timedelta(seconds=int(time_num)) elif unit == "m": bantime = datetime.now() + timedelta(minutes=int(time_num)) elif unit == "h": bantime = datetime.now() + timedelta(hours=int(time_num)) elif unit == "d": bantime = datetime.now() + timedelta(days=int(time_num)) else: logger.debug("Invalid unit.") return None logger.debug(f"Calculated bantime: {bantime}") return bantime else: logger.debug("No valid unit found in time_val.") return None async def admin_check(message: Message) -> bool: logger.debug(f"Checking if user {message.from_user.id} is admin in chat {message.chat.id}") if not message.from_user: logger.debug("Message does not have a from_user.") return False if message.chat.type not in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]: logger.debug("Chat is not a group or supergroup.") return False if message.from_user.id in [777000, 1087968824]: logger.debug("User is a special bot ID.") return True client = message._client chat_id = message.chat.id user_id = message.from_user.id try: check_status = await client.get_chat_member(chat_id=chat_id, user_id=user_id) except Exception as e: logger.error(f"Error checking chat member: {e}") return False admin_strings = [enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR] if check_status.status not in admin_strings: logger.debug("User is not an admin or owner.") return False else: logger.debug("User is an admin or owner.") return True async def admin_filter(filt, client, message): logger.debug(f"Admin filter checking for message: {message.id}") result = await admin_check(message) logger.debug(f"Admin filter result: {result}") return result