azils3 commited on
Commit
10cf6e6
·
verified ·
1 Parent(s): 7174173

Update utils.py

Browse files
Files changed (1) hide show
  1. utils.py +219 -275
utils.py CHANGED
@@ -1,5 +1,10 @@
1
- import logging, os, re, asyncio, requests, aiohttp
2
- from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid
 
 
 
 
 
3
  from pyrogram.types import Message, InlineKeyboardButton
4
  from pyrogram import filters, enums
5
  from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORT_URL, SHORT_API
@@ -9,22 +14,19 @@ from datetime import datetime, timedelta
9
  from database.users_chats_db import db
10
  from bs4 import BeautifulSoup
11
 
12
- # Initialize logger with enhanced configuration
13
  logger = logging.getLogger(__name__)
14
- logger.setLevel(logging.INFO)
15
 
16
- # Log regex pattern compilation
17
- BTN_URL_REGEX = re.compile(r"(\[([^\[]+?)\]\((buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?\))")
18
- logger.debug("Button URL regex pattern compiled successfully")
19
 
20
- BANNED = {}
21
  SMART_OPEN = '“'
22
  SMART_CLOSE = '”'
23
  START_CHAR = ('\'', '"', SMART_OPEN)
 
24
 
25
- # Temporary database class with logging
26
  class temp(object):
27
- logger.info("Initializing temporary storage for banned users/chats and settings")
28
  BANNED_USERS = []
29
  BANNED_CHATS = []
30
  CURRENT = 0
@@ -37,355 +39,316 @@ class temp(object):
37
  PM_BUTTONS = {}
38
  PM_SPELL = {}
39
  GP_SPELL = {}
 
40
 
41
  async def is_subscribed(bot, query):
42
- """Check if user is subscribed to auth channel with detailed logging"""
43
- logger.debug(f"Checking subscription status for user {query.from_user.id}")
44
  try:
45
  user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id)
46
- logger.info(f"User {query.from_user.id} membership status: {user.status}")
47
-
48
- if user.status != enums.ChatMemberStatus.BANNED:
49
- logger.info(f"User {query.from_user.id} is valid subscriber")
50
- return True
51
- logger.warning(f"User {query.from_user.id} is banned in auth channel")
52
  except UserNotParticipant:
53
- logger.warning(f"User {query.from_user.id} not in auth channel")
 
54
  except Exception as e:
55
- logger.error(f"Subscription check error for {query.from_user.id}: {str(e)}")
 
 
 
 
 
56
  return False
57
 
58
  async def get_poster(query, bulk=False, id=False, file=None):
59
- """Fetch movie details from IMDb with comprehensive logging"""
60
- logger.info(f"Starting IMDb lookup for: {query}")
61
  imdb = Cinemagoer()
62
-
63
- try:
64
- if not id:
65
- logger.debug("Processing title-based search")
66
- query = (query.strip()).lower()
67
- title = query
68
- year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE)
69
-
 
70
  if year:
71
- logger.debug(f"Year found in query: {year}")
72
  year = list_to_str(year[:1])
73
- title = (query.replace(year, "")).strip()
74
- elif file is not None:
75
- logger.debug("Extracting year from filename")
76
- year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE)
77
- if year:
78
- year = list_to_str(year[:1])
79
- else:
80
- year = None
81
- logger.debug("No year information found")
82
-
83
- logger.info(f"Searching IMDb for: {title} ({year if year else 'no year'})")
84
  movieid = imdb.search_movie(title.lower(), results=10)
85
-
86
- if not movieid:
87
- logger.warning("No IMDb results found")
88
- return None
89
-
90
- logger.debug(f"Found {len(movieid)} potential matches")
91
-
92
- if year:
93
- logger.debug("Filtering results by year")
94
- filtered = list(filter(lambda k: str(k.get('year')) == str(year), movieid))
95
- if not filtered:
96
- logger.warning("No year matches, using original results")
97
- filtered = movieid
98
- else:
99
  filtered = movieid
100
-
101
- logger.debug("Filtering by media type")
102
- movieid = list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered))
103
-
104
- if not movieid:
105
- logger.warning("No valid media types found, using raw results")
106
- movieid = filtered
107
-
108
- if bulk:
109
- logger.info("Returning bulk results")
110
- return movieid
111
-
112
- movieid = movieid[0].movieID
113
- logger.info(f"Selected IMDb ID: {movieid}")
114
  else:
115
- logger.info(f"Using provided IMDb ID: {query}")
116
- movieid = query
117
-
118
- logger.debug("Fetching detailed movie information")
119
- movie = imdb.get_movie(movieid)
120
-
121
- logger.debug("Processing movie metadata")
122
- date = movie.get("original air date") or movie.get("year") or "N/A"
123
- plot = ""
124
-
125
- if not LONG_IMDB_DESCRIPTION:
126
- plot = movie.get('plot')
127
- if plot and len(plot) > 0:
128
- plot = plot[0]
129
- logger.debug("Using short plot description")
130
- else:
131
- plot = movie.get('plot outline')
132
- logger.debug("Using long plot outline")
133
-
134
- if plot and len(plot) > 800:
135
- logger.debug("Trimming plot description")
136
- plot = plot[0:800] + "..."
137
-
138
- logger.info("Successfully processed IMDb data")
139
- return {
140
- 'title': movie.get('title'),
141
- 'votes': movie.get('votes'),
142
- "aka": list_to_str(movie.get("akas")),
143
- "seasons": movie.get("number of seasons"),
144
- "box_office": movie.get('box office'),
145
- 'localized_title': movie.get('localized title'),
146
- 'kind': movie.get("kind"),
147
- "imdb_id": f"tt{movie.get('imdbID')}",
148
- "cast": list_to_str(movie.get("cast")),
149
- "runtime": list_to_str(movie.get("runtimes")),
150
- "countries": list_to_str(movie.get("countries")),
151
- "certificates": list_to_str(movie.get("certificates")),
152
- "languages": list_to_str(movie.get("languages")),
153
- "director": list_to_str(movie.get("director")),
154
- "writer":list_to_str(movie.get("writer")),
155
- "producer":list_to_str(movie.get("producer")),
156
- "composer":list_to_str(movie.get("composer")) ,
157
- "cinematographer":list_to_str(movie.get("cinematographer")),
158
- "music_team": list_to_str(movie.get("music department")),
159
- "distributors": list_to_str(movie.get("distributors")),
160
- 'release_date': date,
161
- 'year': movie.get('year'),
162
- 'genres': list_to_str(movie.get("genres")),
163
- 'poster': movie.get('full-size cover url'),
164
- 'plot': plot,
165
- 'rating': str(movie.get("rating")),
166
- 'url':f'https://www.imdb.com/title/tt{movieid}'
167
- }
168
- except Exception as e:
169
- logger.error(f"IMDb processing failed: {str(e)}")
170
- return None
 
 
171
 
172
  def list_to_str(k):
173
- """Convert list to formatted string with logging"""
174
- logger.debug("Converting list to string")
175
- if not k:
176
- logger.debug("Empty list received")
177
  return "N/A"
178
- elif len(k) == 1:
179
- logger.debug("Single-item list converted")
180
  return str(k[0])
181
  elif MAX_LIST_ELM:
182
- logger.debug(f"Truncating list to {MAX_LIST_ELM} elements")
183
  k = k[:int(MAX_LIST_ELM)]
184
- return ' '.join(f'{elem}, ' for elem in k)
 
 
185
  else:
186
- logger.debug("Converting full list")
187
- return ' '.join(f'{elem}, ' for elem in k)
 
188
 
189
  __repo__ = "https://github.com/MrMKN/PROFESSOR-BOT"
190
  __version__ = "PROFESSOR-BOT ᴠ4.5.0"
191
  __license__ = "GNU GENERAL PUBLIC LICENSE V2"
192
  __copyright__ = "Copyright (C) 2023-present MrMKN <https://github.com/MrMKN>"
193
 
 
 
 
 
 
194
  async def search_gagala(text):
195
- """Perform Google search with logging"""
196
- logger.info(f"Initializing search for: {text}")
197
  usr_agent = {
198
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
199
- 'Chrome/61.0.3163.100 Safari/537.36'
200
- }
 
 
201
  try:
202
- text = text.replace(" ", '+')
203
- url = f'https://www.google.com/search?q={text}'
204
- logger.debug(f"Requesting URL: {url}")
205
-
206
  response = requests.get(url, headers=usr_agent)
207
  response.raise_for_status()
208
- logger.info("Google search successful")
209
-
210
- soup = BeautifulSoup(response.text, 'html.parser')
211
- titles = soup.find_all('h3')
212
- logger.debug(f"Found {len(titles)} search results")
213
-
214
- return [title.getText() for title in titles]
215
- except Exception as e:
216
- logger.error(f"Search failed: {str(e)}")
217
  return []
 
 
 
 
 
218
 
219
  async def get_settings(group_id):
220
- """Retrieve group settings with caching and logging"""
221
- logger.debug(f"Fetching settings for group {group_id}")
222
  settings = temp.SETTINGS.get(group_id)
223
-
224
  if not settings:
225
- logger.info("Cache miss - loading from database")
226
  settings = await db.get_settings(group_id)
227
  temp.SETTINGS[group_id] = settings
228
  else:
229
- logger.debug("Using cached settings")
230
-
231
  return settings
232
-
233
  async def save_group_settings(group_id, key, value):
234
- """Update group settings with transaction logging"""
235
- logger.info(f"Updating setting {key} for group {group_id}")
236
  current = await get_settings(group_id)
237
  current[key] = value
238
  temp.SETTINGS[group_id] = current
239
-
240
- try:
241
- await db.update_settings(group_id, current)
242
- logger.debug("Database update successful")
243
- except Exception as e:
244
- logger.error(f"Settings update failed: {str(e)}")
245
- raise
246
 
247
  def get_size(size):
248
- """Convert bytes to human-readable format with logging"""
249
- logger.debug(f"Converting size: {size} bytes")
250
  units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"]
251
  size = float(size)
252
  i = 0
253
-
254
  while size >= 1024.0 and i < len(units):
255
  i += 1
256
  size /= 1024.0
257
-
258
  result = "%.2f %s" % (size, units[i])
259
  logger.debug(f"Converted size: {result}")
260
  return result
261
 
262
  def get_file_id(msg: Message):
263
- """Extract file ID from message with media type detection"""
264
- logger.debug("Extracting file ID from message")
265
  if not msg.media:
266
- logger.warning("No media found in message")
267
  return None
268
-
269
  for message_type in ("photo", "animation", "audio", "document", "video", "video_note", "voice", "sticker"):
270
  obj = getattr(msg, message_type)
271
  if obj:
272
- logger.info(f"Extracted media type: {message_type}")
273
  setattr(obj, "message_type", message_type)
274
  return obj
 
 
275
 
276
  def extract_user(message: Message) -> Union[int, str]:
277
- """Extract user ID and name from message with logging"""
278
- logger.debug("Extracting user information from message")
279
  user_id = None
280
  user_first_name = None
281
-
282
  if message.reply_to_message:
 
283
  user_id = message.reply_to_message.from_user.id
284
  user_first_name = message.reply_to_message.from_user.first_name
285
- logger.info(f"User extracted from reply: {user_id} - {user_first_name}")
286
  elif len(message.command) > 1:
 
287
  if (len(message.entities) > 1 and message.entities[1].type == enums.MessageEntityType.TEXT_MENTION):
288
  required_entity = message.entities[1]
289
  user_id = required_entity.user.id
290
  user_first_name = required_entity.user.first_name
291
- logger.info(f"User extracted from mention: {user_id} - {user_first_name}")
292
  else:
293
  user_id = message.command[1]
294
  user_first_name = user_id
295
- try:
296
- user_id = int(user_id)
297
- logger.info(f"User ID converted to int: {user_id}")
298
- except ValueError:
299
- logger.warning("Failed to convert user ID to int")
300
  else:
 
301
  user_id = message.from_user.id
302
  user_first_name = message.from_user.first_name
303
- logger.info(f"User extracted from message sender: {user_id} - {user_first_name}")
304
-
305
  return (user_id, user_first_name)
306
 
307
  def split_quotes(text: str) -> List:
308
- """Split quoted text into key and value with logging"""
309
- logger.debug("Splitting quotes from text")
310
  if not any(text.startswith(char) for char in START_CHAR):
311
- logger.debug("No starting quote found, splitting by space")
312
- return text.split(None, 1)
313
-
 
314
  counter = 1 # ignore first char -> is some kind of quote
315
  while counter < len(text):
316
  if text[counter] == "\\":
 
317
  counter += 1
318
  elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE):
 
319
  break
320
  counter += 1
321
  else:
322
- logger.debug("No ending quote found, returning split")
323
- return text.split(None, 1)
 
 
324
 
 
325
  key = remove_escapes(text[1:counter].strip())
 
326
  rest = text[counter + 1:].strip()
327
  if not key:
328
  key = text[0] + text[0]
329
-
330
- logger.debug(f"Split result: key='{key}', rest='{rest}'")
331
- return list(filter(None, [key, rest]))
332
 
333
  def parser(text, keyword, cb_data):
334
- """Parse buttons and alerts from text with logging"""
335
- logger.debug("Parsing buttons and alerts from text")
336
- if "buttonalert" in text:
337
  text = (text.replace("\n", "\\n").replace("\t", "\\t"))
338
-
339
  buttons = []
340
  note_data = ""
341
  prev = 0
342
  i = 0
343
  alerts = []
344
-
345
  for match in BTN_URL_REGEX.finditer(text):
346
  n_escapes = 0
347
  to_check = match.start(1) - 1
348
  while to_check > 0 and text[to_check] == "\\":
349
  n_escapes += 1
350
  to_check -= 1
351
-
352
  if n_escapes % 2 == 0:
353
  note_data += text[prev:match.start(1)]
354
  prev = match.end(1)
355
  if match.group(3) == "buttonalert":
 
356
  if bool(match.group(5)) and buttons:
357
  buttons[-1].append(InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}"))
358
  else:
359
  buttons.append([InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")])
360
  i += 1
361
  alerts.append(match.group(4))
362
- logger.debug(f"Button alert added: {match.group(2)}")
363
  elif bool(match.group(5)) and buttons:
 
364
  buttons[-1].append(InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", "")))
365
- logger.debug(f"Button URL added: {match.group(2)}")
366
  else:
 
367
  buttons.append([InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))])
368
- logger.debug(f"Button added: {match.group(2)}")
369
  else:
370
  note_data += text[prev:to_check]
371
  prev = match.start(1) - 1
372
-
373
- else:
374
  note_data += text[prev:]
375
-
376
- try:
377
- logger.info("Parsing completed successfully")
378
  return note_data, buttons, alerts
379
  except Exception as e:
380
- logger.error(f"Parsing error: {str(e)}")
381
  return note_data, buttons, None
382
 
383
  def remove_escapes(text: str) -> str:
384
- """Remove escape characters from text with logging"""
385
- logger.debug("Removing escape characters from text")
386
  res = ""
387
  is_escaped = False
388
-
389
  for counter in range(len(text)):
390
  if is_escaped:
391
  res += text[counter]
@@ -394,76 +357,65 @@ def remove_escapes(text: str) -> str:
394
  is_escaped = True
395
  else:
396
  res += text[counter]
397
-
398
- logger.debug("Escape characters removed successfully")
399
  return res
400
 
401
  def humanbytes(size):
402
- """Convert bytes to human-readable format with logging"""
403
- logger.debug(f"Converting size to human-readable format: {size}")
404
  if not size:
405
- logger.warning("No size provided for conversion")
406
  return ""
407
-
408
  power = 2**10
409
  n = 0
410
  Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'}
411
-
412
  while size > power:
413
  size /= power
414
  n += 1
415
-
416
  result = str(round(size, 2)) + " " + Dic_powerN[n] + 'B'
417
- logger.debug(f"Converted size: {result}")
418
  return result
419
 
420
  def get_time(seconds):
421
- """Convert seconds to human-readable time format with logging"""
422
- logger.debug(f"Converting seconds to time format: {seconds}")
423
  periods = [('ᴅ', 86400), ('ʜ', 3600), ('ᴍ', 60), ('ꜱ', 1)]
424
  result = ''
425
-
426
  for period_name, period_seconds in periods:
427
  if seconds >= period_seconds:
428
  period_value, seconds = divmod(seconds, period_seconds)
429
  result += f'{int(period_value)}{period_name}'
430
-
431
- logger.debug(f"Converted time: {result}")
432
  return result
433
 
434
  async def get_shortlink(link):
435
- """Get shortened link using API with logging"""
436
- logger.info(f"Requesting short link for: {link}")
437
  url = f'{SHORT_URL}/api'
438
  params = {'api': SHORT_API, 'url': link}
439
-
440
  try:
441
  async with aiohttp.ClientSession() as session:
442
  async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
443
  data = await response.json()
444
  if data["status"] == "success":
445
- logger.info("Short link retrieved successfully")
446
- return data['shortenedUrl']
 
447
  else:
448
- logger.error(f"Error from short link API: {data['message']}")
449
  return link
450
  except Exception as e:
451
- logger.error(f"Short link request failed: {str(e)}")
452
  return link
453
 
454
  def extract_time(time_val):
455
- """Extract time from string with logging"""
456
- logger.debug(f"Extracting time from value: {time_val}")
457
  if any(time_val.endswith(unit) for unit in ("s", "m", "h", "d")):
458
  unit = time_val[-1]
459
  time_num = time_val[:-1] # type: str
460
-
461
  if not time_num.isdigit():
462
- logger.warning("Invalid time number provided")
463
  return None
464
 
465
  if unit == "s":
466
- bantime = datetime.now() + timedelta(seconds=int(time_num))
467
  elif unit == "m":
468
  bantime = datetime.now() + timedelta(minutes=int(time_num))
469
  elif unit == "h":
@@ -471,51 +423,43 @@ def extract_time(time_val):
471
  elif unit == "d":
472
  bantime = datetime.now() + timedelta(days=int(time_num))
473
  else:
474
- logger.error("Unknown time unit provided")
475
  return None
476
-
477
- logger.info(f"Extracted ban time: {bantime}")
478
  return bantime
479
  else:
480
- logger.warning("No valid time unit found")
481
  return None
482
 
483
  async def admin_check(message: Message) -> bool:
484
- """Check if user is an admin with logging"""
485
- logger.debug("Checking admin status for user")
486
- if not message.from_user:
487
- logger.warning("Message has no user information")
488
  return False
489
-
490
- if message.chat.type not in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]:
491
- logger.warning("Message is not from a group or supergroup")
492
  return False
493
-
494
- if message.from_user.id in [777000, 1087968824]:
495
- logger.info("User is a bot admin")
496
  return True
497
-
498
  client = message._client
499
  chat_id = message.chat.id
500
  user_id = message.from_user.id
501
-
502
  try:
503
  check_status = await client.get_chat_member(chat_id=chat_id, user_id=user_id)
504
- admin_strings = [enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR]
505
-
506
- if check_status.status not in admin_strings:
507
- logger.info(f"User {user_id} is not an admin")
508
- return False
509
- else:
510
- logger.info(f"User {user_id} is an admin")
511
- return True
512
  except Exception as e:
513
- logger.error(f"Admin check failed for user {user_id}: {str(e)}")
 
 
 
 
514
  return False
 
 
 
515
 
516
  async def admin_filter(filt, client, message):
517
- """Filter for admin messages with logging"""
518
- logger.debug("Applying admin filter")
519
- is_admin = await admin_check(message)
520
- logger.info(f"Admin filter result: {is_admin}")
521
- return is_admin
 
1
+ import logging
2
+ import os
3
+ import re
4
+ import asyncio
5
+ import requests
6
+ import aiohttp
7
+ from pyrogram.errors import InputUserDeactivated, UserNotParticipant, FloodWait, UserIsBlocked, PeerIdInvalid
8
  from pyrogram.types import Message, InlineKeyboardButton
9
  from pyrogram import filters, enums
10
  from info import AUTH_CHANNEL, LONG_IMDB_DESCRIPTION, MAX_LIST_ELM, SHORT_URL, SHORT_API
 
14
  from database.users_chats_db import db
15
  from bs4 import BeautifulSoup
16
 
17
+ logging.basicConfig(level=logging.INFO)
18
  logger = logging.getLogger(__name__)
19
+ logger.info("Imported necessary modules.")
20
 
21
+ BTN_URL_REGEX = re.compile(r"($$([^\[]+?)$$$$(buttonurl|buttonalert):(?:/{0,2})(.+?)(:same)?$$)")
22
+ logger.info("Defined regex for button URLs.")
 
23
 
 
24
  SMART_OPEN = '“'
25
  SMART_CLOSE = '”'
26
  START_CHAR = ('\'', '"', SMART_OPEN)
27
+ logger.info("Defined smart quote constants.")
28
 
 
29
  class temp(object):
 
30
  BANNED_USERS = []
31
  BANNED_CHATS = []
32
  CURRENT = 0
 
39
  PM_BUTTONS = {}
40
  PM_SPELL = {}
41
  GP_SPELL = {}
42
+ logger.info("Initialized temporary object for caching data.")
43
 
44
  async def is_subscribed(bot, query):
45
+ logger.debug(f"Checking subscription for user {query.from_user.id} in channel {AUTH_CHANNEL}.")
 
46
  try:
47
  user = await bot.get_chat_member(AUTH_CHANNEL, query.from_user.id)
 
 
 
 
 
 
48
  except UserNotParticipant:
49
+ logger.debug(f"User {query.from_user.id} is not a participant in {AUTH_CHANNEL}.")
50
+ pass
51
  except Exception as e:
52
+ logger.error(f"Error in is_subscribed: {e}")
53
+ else:
54
+ if user.status != enums.ChatMemberStatus.BANNED:
55
+ logger.debug(f"User {query.from_user.id} is subscribed and not banned in {AUTH_CHANNEL}.")
56
+ return True
57
+ logger.debug(f"User {query.from_user.id} is either not subscribed or banned in {AUTH_CHANNEL}.")
58
  return False
59
 
60
  async def get_poster(query, bulk=False, id=False, file=None):
61
+ logger.debug(f"Fetching poster for query: {query}, bulk: {bulk}, id: {id}, file: {file}")
 
62
  imdb = Cinemagoer()
63
+ if not id:
64
+ query = (query.strip()).lower()
65
+ title = query
66
+ year = re.findall(r'[1-2]\d{3}$', query, re.IGNORECASE)
67
+ if year:
68
+ year = list_to_str(year[:1])
69
+ title = (query.replace(year, "")).strip()
70
+ elif file is not None:
71
+ year = re.findall(r'[1-2]\d{3}', file, re.IGNORECASE)
72
  if year:
 
73
  year = list_to_str(year[:1])
74
+ else:
75
+ year = None
76
+ logger.debug(f"Searching IMDb for title: {title}, year: {year}")
77
+ try:
 
 
 
 
 
 
 
78
  movieid = imdb.search_movie(title.lower(), results=10)
79
+ except Exception as e:
80
+ logger.error(f"Error searching IMDb: {e}")
81
+ return None
82
+ if not movieid:
83
+ logger.debug("No movies found.")
84
+ return None
85
+ if year:
86
+ logger.debug(f"Filtering movies by year: {year}")
87
+ filtered = list(filter(lambda k: str(k.get('year')) == str(year), movieid))
88
+ if not filtered:
89
+ logger.debug("No movies found for specified year. Using all results.")
 
 
 
90
  filtered = movieid
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  else:
92
+ filtered = movieid
93
+ movieid = list(filter(lambda k: k.get('kind') in ['movie', 'tv series'], filtered))
94
+ if not movieid:
95
+ logger.debug("No movies or TV series found. Using all results.")
96
+ movieid = filtered
97
+ if bulk:
98
+ logger.debug("Returning bulk movie IDs.")
99
+ return movieid
100
+ movieid = movieid[0].movieID
101
+ else:
102
+ movieid = query
103
+ logger.debug(f"Fetching movie data for ID: {movieid}")
104
+ movie = imdb.get_movie(movieid)
105
+ if movie.get("original air date"):
106
+ date = movie["original air date"]
107
+ elif movie.get("year"):
108
+ date = movie.get("year")
109
+ else:
110
+ date = "N/A"
111
+ plot = ""
112
+ if not LONG_IMDB_DESCRIPTION:
113
+ plot = movie.get('plot')
114
+ if plot and len(plot) > 0:
115
+ plot = plot[0]
116
+ else:
117
+ plot = movie.get('plot outline')
118
+ if plot and len(plot) > 800:
119
+ plot = plot[0:800] + "..."
120
+ logger.debug("Constructing poster dictionary.")
121
+ return {
122
+ 'title': movie.get('title'),
123
+ 'votes': movie.get('votes'),
124
+ "aka": list_to_str(movie.get("akas")),
125
+ "seasons": movie.get("number of seasons"),
126
+ "box_office": movie.get('box office'),
127
+ 'localized_title': movie.get('localized title'),
128
+ 'kind': movie.get("kind"),
129
+ "imdb_id": f"tt{movie.get('imdbID')}",
130
+ "cast": list_to_str(movie.get("cast")),
131
+ "runtime": list_to_str(movie.get("runtimes")),
132
+ "countries": list_to_str(movie.get("countries")),
133
+ "certificates": list_to_str(movie.get("certificates")),
134
+ "languages": list_to_str(movie.get("languages")),
135
+ "director": list_to_str(movie.get("director")),
136
+ "writer": list_to_str(movie.get("writer")),
137
+ "producer": list_to_str(movie.get("producer")),
138
+ "composer": list_to_str(movie.get("composer")),
139
+ "cinematographer": list_to_str(movie.get("cinematographer")),
140
+ "music_team": list_to_str(movie.get("music department")),
141
+ "distributors": list_to_str(movie.get("distributors")),
142
+ 'release_date': date,
143
+ 'year': movie.get('year'),
144
+ 'genres': list_to_str(movie.get("genres")),
145
+ 'poster': movie.get('full-size cover url'),
146
+ 'plot': plot,
147
+ 'rating': str(movie.get("rating")),
148
+ 'url': f'https://www.imdb.com/title/tt{movieid}'
149
+ }
150
 
151
  def list_to_str(k):
152
+ logger.debug(f"Converting list to string: {k}")
153
+ if not k:
 
 
154
  return "N/A"
155
+ elif len(k) == 1:
 
156
  return str(k[0])
157
  elif MAX_LIST_ELM:
 
158
  k = k[:int(MAX_LIST_ELM)]
159
+ result = ' '.join(f'{elem}, ' for elem in k)
160
+ logger.debug(f"Trimmed and joined list: {result}")
161
+ return result
162
  else:
163
+ result = ' '.join(f'{elem}, ' for elem in k)
164
+ logger.debug(f"Joined list: {result}")
165
+ return result
166
 
167
  __repo__ = "https://github.com/MrMKN/PROFESSOR-BOT"
168
  __version__ = "PROFESSOR-BOT ᴠ4.5.0"
169
  __license__ = "GNU GENERAL PUBLIC LICENSE V2"
170
  __copyright__ = "Copyright (C) 2023-present MrMKN <https://github.com/MrMKN>"
171
 
172
+ logger.info(f"Repository: {__repo__}")
173
+ logger.info(f"Version: {__version__}")
174
+ logger.info(f"License: {__license__}")
175
+ logger.info(f"Copyright: {__copyright__}")
176
+
177
  async def search_gagala(text):
178
+ logger.debug(f"Performing Google search for: {text}")
 
179
  usr_agent = {
180
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'
181
+ }
182
+ text = text.replace(" ", '+')
183
+ url = f'https://www.google.com/search?q={text}'
184
+ logger.debug(f"Sending GET request to: {url}")
185
  try:
 
 
 
 
186
  response = requests.get(url, headers=usr_agent)
187
  response.raise_for_status()
188
+ except requests.RequestException as e:
189
+ logger.error(f"Error in search_gagala: {e}")
 
 
 
 
 
 
 
190
  return []
191
+ soup = BeautifulSoup(response.text, 'html.parser')
192
+ titles = soup.find_all('h3')
193
+ results = [title.getText() for title in titles]
194
+ logger.debug(f"Found {len(results)} search results.")
195
+ return results
196
 
197
  async def get_settings(group_id):
198
+ logger.debug(f"Getting settings for group ID: {group_id}")
 
199
  settings = temp.SETTINGS.get(group_id)
 
200
  if not settings:
201
+ logger.debug("Settings not found in cache. Retrieving from database.")
202
  settings = await db.get_settings(group_id)
203
  temp.SETTINGS[group_id] = settings
204
  else:
205
+ logger.debug("Settings found in cache.")
 
206
  return settings
207
+
208
  async def save_group_settings(group_id, key, value):
209
+ logger.debug(f"Saving group settings for group ID: {group_id}, key: {key}, value: {value}")
 
210
  current = await get_settings(group_id)
211
  current[key] = value
212
  temp.SETTINGS[group_id] = current
213
+ await db.update_settings(group_id, current)
214
+ logger.debug("Settings updated successfully.")
 
 
 
 
 
215
 
216
  def get_size(size):
217
+ logger.debug(f"Converting size: {size}")
 
218
  units = ["Bytes", "KB", "MB", "GB", "TB", "PB", "EB"]
219
  size = float(size)
220
  i = 0
 
221
  while size >= 1024.0 and i < len(units):
222
  i += 1
223
  size /= 1024.0
 
224
  result = "%.2f %s" % (size, units[i])
225
  logger.debug(f"Converted size: {result}")
226
  return result
227
 
228
  def get_file_id(msg: Message):
 
 
229
  if not msg.media:
230
+ logger.debug("Message has no media.")
231
  return None
 
232
  for message_type in ("photo", "animation", "audio", "document", "video", "video_note", "voice", "sticker"):
233
  obj = getattr(msg, message_type)
234
  if obj:
235
+ logger.debug(f"Found media type: {message_type}")
236
  setattr(obj, "message_type", message_type)
237
  return obj
238
+ logger.debug("No supported media type found.")
239
+ return None
240
 
241
  def extract_user(message: Message) -> Union[int, str]:
242
+ logger.debug("Extracting user information.")
 
243
  user_id = None
244
  user_first_name = None
 
245
  if message.reply_to_message:
246
+ logger.debug("Extracting user from replied message.")
247
  user_id = message.reply_to_message.from_user.id
248
  user_first_name = message.reply_to_message.from_user.first_name
 
249
  elif len(message.command) > 1:
250
+ logger.debug("Extracting user from command arguments.")
251
  if (len(message.entities) > 1 and message.entities[1].type == enums.MessageEntityType.TEXT_MENTION):
252
  required_entity = message.entities[1]
253
  user_id = required_entity.user.id
254
  user_first_name = required_entity.user.first_name
 
255
  else:
256
  user_id = message.command[1]
257
  user_first_name = user_id
258
+ try:
259
+ user_id = int(user_id)
260
+ except ValueError:
261
+ logger.warning("User ID is not an integer.")
262
+ pass
263
  else:
264
+ logger.debug("Extracting user from message sender.")
265
  user_id = message.from_user.id
266
  user_first_name = message.from_user.first_name
267
+ logger.debug(f"Extracted user ID: {user_id}, first name: {user_first_name}")
 
268
  return (user_id, user_first_name)
269
 
270
  def split_quotes(text: str) -> List:
271
+ logger.debug(f"Splitting quotes in text: {text}")
 
272
  if not any(text.startswith(char) for char in START_CHAR):
273
+ logger.debug("No starting quote found. Splitting by first space.")
274
+ parts = text.split(None, 1)
275
+ logger.debug(f"Split parts: {parts}")
276
+ return parts
277
  counter = 1 # ignore first char -> is some kind of quote
278
  while counter < len(text):
279
  if text[counter] == "\\":
280
+ logger.debug("Escaped character found.")
281
  counter += 1
282
  elif text[counter] == text[0] or (text[0] == SMART_OPEN and text[counter] == SMART_CLOSE):
283
+ logger.debug("Ending quote found.")
284
  break
285
  counter += 1
286
  else:
287
+ logger.debug("No ending quote found. Splitting by first space.")
288
+ parts = text.split(None, 1)
289
+ logger.debug(f"Split parts: {parts}")
290
+ return parts
291
 
292
+ # 1 to avoid starting quote, and counter is exclusive so avoids ending
293
  key = remove_escapes(text[1:counter].strip())
294
+ # index will be in range, or `else` would have been executed and returned
295
  rest = text[counter + 1:].strip()
296
  if not key:
297
  key = text[0] + text[0]
298
+ result = list(filter(None, [key, rest]))
299
+ logger.debug(f"Split parts: {result}")
300
+ return result
301
 
302
  def parser(text, keyword, cb_data):
303
+ logger.debug(f"Parsing text: {text}, keyword: {keyword}, cb_data: {cb_data}")
304
+ if "buttonalert" in text:
305
+ logger.debug("Found 'buttonalert' in text. Replacing newlines and tabs.")
306
  text = (text.replace("\n", "\\n").replace("\t", "\\t"))
 
307
  buttons = []
308
  note_data = ""
309
  prev = 0
310
  i = 0
311
  alerts = []
 
312
  for match in BTN_URL_REGEX.finditer(text):
313
  n_escapes = 0
314
  to_check = match.start(1) - 1
315
  while to_check > 0 and text[to_check] == "\\":
316
  n_escapes += 1
317
  to_check -= 1
318
+ # if even, not escaped -> create button
319
  if n_escapes % 2 == 0:
320
  note_data += text[prev:match.start(1)]
321
  prev = match.end(1)
322
  if match.group(3) == "buttonalert":
323
+ logger.debug(f"Found buttonalert: {match.group(2)}, {match.group(4)}, same_line: {bool(match.group(5))}")
324
  if bool(match.group(5)) and buttons:
325
  buttons[-1].append(InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}"))
326
  else:
327
  buttons.append([InlineKeyboardButton(match.group(2), callback_data=f"{cb_data}:{i}:{keyword}")])
328
  i += 1
329
  alerts.append(match.group(4))
 
330
  elif bool(match.group(5)) and buttons:
331
+ logger.debug(f"Found URL button: {match.group(2)}, {match.group(4)}, same_line: {bool(match.group(5))}")
332
  buttons[-1].append(InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", "")))
 
333
  else:
334
+ logger.debug(f"Found URL button: {match.group(2)}, {match.group(4)}, new line.")
335
  buttons.append([InlineKeyboardButton(match.group(2), url=match.group(4).replace(" ", ""))])
 
336
  else:
337
  note_data += text[prev:to_check]
338
  prev = match.start(1) - 1
339
+ else:
 
340
  note_data += text[prev:]
341
+ try:
342
+ logger.debug(f"Parsed note_data: {note_data}, buttons: {buttons}, alerts: {alerts}")
 
343
  return note_data, buttons, alerts
344
  except Exception as e:
345
+ logger.error(f"Error in parser: {e}")
346
  return note_data, buttons, None
347
 
348
  def remove_escapes(text: str) -> str:
349
+ logger.debug(f"Removing escapes from text: {text}")
 
350
  res = ""
351
  is_escaped = False
 
352
  for counter in range(len(text)):
353
  if is_escaped:
354
  res += text[counter]
 
357
  is_escaped = True
358
  else:
359
  res += text[counter]
360
+ logger.debug(f"Text after removing escapes: {res}")
 
361
  return res
362
 
363
  def humanbytes(size):
364
+ logger.debug(f"Converting size to human-readable: {size}")
 
365
  if not size:
366
+ logger.debug("Size is zero or empty.")
367
  return ""
 
368
  power = 2**10
369
  n = 0
370
  Dic_powerN = {0: ' ', 1: 'Ki', 2: 'Mi', 3: 'Gi', 4: 'Ti'}
 
371
  while size > power:
372
  size /= power
373
  n += 1
 
374
  result = str(round(size, 2)) + " " + Dic_powerN[n] + 'B'
375
+ logger.debug(f"Human-readable size: {result}")
376
  return result
377
 
378
  def get_time(seconds):
379
+ logger.debug(f"Formatting time: {seconds} seconds")
 
380
  periods = [('ᴅ', 86400), ('ʜ', 3600), ('ᴍ', 60), ('ꜱ', 1)]
381
  result = ''
 
382
  for period_name, period_seconds in periods:
383
  if seconds >= period_seconds:
384
  period_value, seconds = divmod(seconds, period_seconds)
385
  result += f'{int(period_value)}{period_name}'
386
+ logger.debug(f"Formatted time: {result}")
 
387
  return result
388
 
389
  async def get_shortlink(link):
390
+ logger.debug(f"Generating shortlink for: {link}")
 
391
  url = f'{SHORT_URL}/api'
392
  params = {'api': SHORT_API, 'url': link}
 
393
  try:
394
  async with aiohttp.ClientSession() as session:
395
  async with session.get(url, params=params, raise_for_status=True, ssl=False) as response:
396
  data = await response.json()
397
  if data["status"] == "success":
398
+ shortened_url = data['shortenedUrl']
399
+ logger.debug(f"Shortlink generated: {shortened_url}")
400
+ return shortened_url
401
  else:
402
+ logger.error(f"Error generating shortlink: {data['message']}")
403
  return link
404
  except Exception as e:
405
+ logger.error(f"Exception in get_shortlink: {e}")
406
  return link
407
 
408
  def extract_time(time_val):
409
+ logger.debug(f"Extracting time from: {time_val}")
 
410
  if any(time_val.endswith(unit) for unit in ("s", "m", "h", "d")):
411
  unit = time_val[-1]
412
  time_num = time_val[:-1] # type: str
 
413
  if not time_num.isdigit():
414
+ logger.debug("Time number is not a digit.")
415
  return None
416
 
417
  if unit == "s":
418
+ bantime = datetime.now() + timedelta(seconds=int(time_num))
419
  elif unit == "m":
420
  bantime = datetime.now() + timedelta(minutes=int(time_num))
421
  elif unit == "h":
 
423
  elif unit == "d":
424
  bantime = datetime.now() + timedelta(days=int(time_num))
425
  else:
426
+ logger.debug("Invalid unit.")
427
  return None
428
+ logger.debug(f"Calculated bantime: {bantime}")
 
429
  return bantime
430
  else:
431
+ logger.debug("No valid unit found in time_val.")
432
  return None
433
 
434
  async def admin_check(message: Message) -> bool:
435
+ logger.debug(f"Checking if user {message.from_user.id} is admin in chat {message.chat.id}")
436
+ if not message.from_user:
437
+ logger.debug("Message does not have a from_user.")
 
438
  return False
439
+ if message.chat.type not in [enums.ChatType.GROUP, enums.ChatType.SUPERGROUP]:
440
+ logger.debug("Chat is not a group or supergroup.")
 
441
  return False
442
+ if message.from_user.id in [777000, 1087968824]:
443
+ logger.debug("User is a special bot ID.")
 
444
  return True
 
445
  client = message._client
446
  chat_id = message.chat.id
447
  user_id = message.from_user.id
 
448
  try:
449
  check_status = await client.get_chat_member(chat_id=chat_id, user_id=user_id)
 
 
 
 
 
 
 
 
450
  except Exception as e:
451
+ logger.error(f"Error checking chat member: {e}")
452
+ return False
453
+ admin_strings = [enums.ChatMemberStatus.OWNER, enums.ChatMemberStatus.ADMINISTRATOR]
454
+ if check_status.status not in admin_strings:
455
+ logger.debug("User is not an admin or owner.")
456
  return False
457
+ else:
458
+ logger.debug("User is an admin or owner.")
459
+ return True
460
 
461
  async def admin_filter(filt, client, message):
462
+ logger.debug(f"Admin filter checking for message: {message.id}")
463
+ result = await admin_check(message)
464
+ logger.debug(f"Admin filter result: {result}")
465
+ return result