diff --git "a/fastapi_app.py" "b/fastapi_app.py" --- "a/fastapi_app.py" +++ "b/fastapi_app.py" @@ -30,6 +30,7 @@ import shutil import psutil import wget from datetime import datetime, timedelta +from urllib.parse import quote from telegraph import upload_file from PIL import Image, ImageDraw, ImageFont import textwrap @@ -100,6 +101,7 @@ class GlobalFilter(BaseModel): alert: str = Field(default=None) class BroadcastMessage(BaseModel): + chat_id: int message_id: int class ShortenURL(BaseModel): @@ -146,6 +148,7 @@ class GroupManagerAction(BaseModel): action: str user_id: int = Field(default=None) time: str = Field(default=None) + message_id: int = Field(default=None) # Endpoints @app.get("/uptime") @@ -224,8 +227,8 @@ async def del_all_gfilters_endpoint(): @app.post("/broadcast") async def broadcast_endpoint(broadcast_data: BroadcastMessage): users = await db.get_all_users() - b_msg = await bot_instance.get_messages(broadcast_data.message_id) - sts = await send_message(bot_instance, broadcast_data.message_id, "Broadcasting your messages...") + b_msg = await bot_instance.get_messages(broadcast_data.chat_id, broadcast_data.message_id) + sts = await send_message(bot_instance, broadcast_data.chat_id, "Broadcasting your messages...") start_time = time.time() total_users = await db.total_users_count() done = 0 @@ -249,20 +252,19 @@ async def broadcast_endpoint(broadcast_data: BroadcastMessage): await edit_message(bot_instance, sts, f"Broadcast in progress:\n\nTotal Users {total_users}\nCompleted: {done} / {total_users}\nSuccess: {success}\nBlocked: {blocked}\nDeleted: {deleted}") time_taken = datetime.timedelta(seconds=int(time.time() - start_time)) await delete_message(bot_instance, sts) - await send_message(bot_instance, broadcast_data.message_id, f"Broadcast completed:\nTime taken: {time_taken}\n\nTotal Users: {total_users}\nCompleted: {done} / {total_users}\nSuccess: {success}\nBlocked: {blocked}\nDeleted: {deleted}") + await send_message(bot_instance, broadcast_data.chat_id, f"Broadcast completed:\nTime taken: {time_taken}\n\nTotal Users: {total_users}\nCompleted: {done} / {total_users}\nSuccess: {success}\nBlocked: {blocked}\nDeleted: {deleted}") return {"status": "Broadcast completed"} @app.post("/clear_junk") -async def clear_junk_endpoint(): - users = await db.get_all_users() - sts = await send_message(bot_instance, broadcast_data.message_id, "Clearing junk users...") +async def clear_junk_endpoint(user_id: int): + sts = await send_message(bot_instance, user_id, "Clearing junk users...") start_time = time.time() total_users = await db.total_users_count() blocked = 0 deleted = 0 failed = 0 done = 0 - async for user in users: + async for user in db.get_all_users(): pti, sh = await clear_junk(int(user['id']), None) if pti == False: if sh == "Blocked": @@ -276,14 +278,14 @@ async def clear_junk_endpoint(): await edit_message(bot_instance, sts, f"Clearing junk in progress:\n\nTotal Users {total_users}\nCompleted: {done} / {total_users}\nBlocked: {blocked}\nDeleted: {deleted}") time_taken = datetime.timedelta(seconds=int(time.time() - start_time)) await delete_message(bot_instance, sts) - await send_message(bot_instance, broadcast_data.message_id, f"Clearing junk completed:\nTime taken: {time_taken}\n\nTotal Users: {total_users}\nCompleted: {done} / {total_users}\nBlocked: {blocked}\nDeleted: {deleted}") + await send_message(bot_instance, user_id, f"Clearing junk completed:\nTime taken: {time_taken}\n\nTotal Users: {total_users}\nCompleted: {done} / {total_users}\nBlocked: {blocked}\nDeleted: {deleted}") return {"status": "Junk clearing completed"} @app.post("/group_broadcast") async def group_broadcast_endpoint(broadcast_data: BroadcastMessage): groups = await db.get_all_chats() - b_msg = await bot_instance.get_messages(broadcast_data.message_id) - sts = await send_message(bot_instance, broadcast_data.message_id, "Broadcasting to groups...") + b_msg = await bot_instance.get_messages(broadcast_data.chat_id, broadcast_data.message_id) + sts = await send_message(bot_instance, broadcast_data.chat_id, "Broadcasting to groups...") start_time = time.time() total_groups = await db.total_chat_count() done = 0 @@ -308,13 +310,13 @@ async def group_broadcast_endpoint(broadcast_data: BroadcastMessage): await edit_message(bot_instance, sts, f"Broadcast in progress:\n\nTotal Groups {total_groups}\nCompleted: {done} / {total_groups}\nSuccess: {success}\nDeleted: {deleted}") time_taken = datetime.timedelta(seconds=int(time.time() - start_time)) await delete_message(bot_instance, sts) - await send_message(bot_instance, broadcast_data.message_id, f"Broadcast completed:\nTime taken: {time_taken}\n\nTotal Groups: {total_groups}\nCompleted: {done} / {total_groups}\nSuccess: {success}\nDeleted: {deleted}\n\nFailed reasons: {failed}") + await send_message(bot_instance, broadcast_data.chat_id, f"Broadcast completed:\nTime taken: {time_taken}\n\nTotal Groups: {total_groups}\nCompleted: {done} / {total_groups}\nSuccess: {success}\nDeleted: {deleted}\n\nFailed reasons: {failed}") return {"status": "Group broadcast completed"} @app.post("/junk_group") -async def junk_group_endpoint(): +async def junk_group_endpoint(user_id: int): groups = await db.get_all_chats() - sts = await send_message(bot_instance, broadcast_data.message_id, "Clearing junk groups...") + sts = await send_message(bot_instance, user_id, "Clearing junk groups...") start_time = time.time() total_groups = await db.total_chat_count() done = 0 @@ -335,7 +337,7 @@ async def junk_group_endpoint(): await edit_message(bot_instance, sts, f"Clearing junk in progress:\n\nTotal Groups {total_groups}\nCompleted: {done} / {total_groups}\nDeleted: {deleted}") time_taken = datetime.timedelta(seconds=int(time.time() - start_time)) await delete_message(bot_instance, sts) - await send_message(bot_instance, broadcast_data.message_id, f"Clearing junk completed:\nTime taken: {time_taken}\n\nTotal Groups: {total_groups}\nCompleted: {done} / {total_groups}\nDeleted: {deleted}\n\nFailed reasons: {failed}") + await send_message(bot_instance, user_id, f"Clearing junk completed:\nTime taken: {time_taken}\n\nTotal Groups: {total_groups}\nCompleted: {done} / {total_groups}\nDeleted: {deleted}\n\nFailed reasons: {failed}") return {"status": "Junk group clearing completed"} @app.post("/add_connection") @@ -376,12 +378,14 @@ async def delete_connection_endpoint(connection_request: ConnectionRequest): @app.post("/search") async def search_endpoint(search_query: SearchQuery): files, next_offset, total_results = await get_search_results(search_query.query, file_type=search_query.file_type, offset=search_query.offset, max_results=search_query.max_results) - return {"files": files, "next_offset": next_offset, "total_results": total_results} + return {"files": [file.dict() for file in files], "next_offset": next_offset, "total_results": total_results} @app.post("/get_file_details") async def get_file_details_endpoint(file_id: str): file_details = await get_file_details(file_id) - return {"file_details": file_details} + if file_details: + return {"file_details": [file.dict() for file in file_details]} + return {"file_details": None} @app.post("/get_settings") async def get_settings_endpoint(group_id: int): @@ -394,8 +398,8 @@ async def save_group_settings_endpoint(settings_update: SettingsUpdate): return {"status": "Settings updated successfully"} @app.post("/send_cached_media") -async def send_cached_media_endpoint(user_id: int, file_id: str, caption: str = None, protect_content: bool = False): - await bot_instance.send_cached_media(chat_id=user_id, file_id=file_id, caption=caption, protect_content=protect_content) +async def send_cached_media_endpoint(chat_id: int, file_id: str, caption: str = None, protect_content: bool = False): + await bot_instance.send_cached_media(chat_id=chat_id, file_id=file_id, caption=caption, protect_content=protect_content) return {"status": "Media sent successfully"} @app.post("/get_poster") @@ -436,7 +440,7 @@ async def auto_filter_endpoint(query: str, chat_id: int, user_id: int, offset: i ]) btn.append([InlineKeyboardButton(text="πŸ”— How to download πŸ”—", callback_data="howdl")]) reply_markup = InlineKeyboardMarkup(btn) - return {"files": files, "next_offset": next_offset, "total_results": total_results, "reply_markup": reply_markup} + return {"files": [file.dict() for file in files], "next_offset": next_offset, "total_results": total_results, "reply_markup": reply_markup.dict()} @app.post("/send_telegram_message") async def send_telegram_message_endpoint(chat_id: int, text: str): @@ -444,13 +448,13 @@ async def send_telegram_message_endpoint(chat_id: int, text: str): return {"message_id": message.id} @app.post("/edit_telegram_message") -async def edit_telegram_message_endpoint(message_id: int, chat_id: int, text: str): +async def edit_telegram_message_endpoint(chat_id: int, message_id: int, text: str): message = await bot_instance.get_messages(chat_id, message_id) edited_message = await edit_message(bot_instance, message, text) return {"edited_message_id": edited_message.id} @app.post("/delete_telegram_message") -async def delete_telegram_message_endpoint(message_id: int, chat_id: int): +async def delete_telegram_message_endpoint(chat_id: int, message_id: int): message = await bot_instance.get_messages(chat_id, message_id) await delete_message(bot_instance, message) return {"status": "Message deleted successfully"} @@ -625,6 +629,7 @@ async def download_youtube_video_endpoint(yt_request: YouTubeDownloadRequest): mio[0]["channel"] kekme = f"https://img.youtube.com/vi/{fridayz}/hqdefault.jpg" await asyncio.sleep(0.6) + url = mo sedlyf = wget.download(kekme) opts = { "format": "best", @@ -655,6 +660,7 @@ async def group_manager_action_endpoint(action_request: GroupManagerAction): action = action_request.action user_id = action_request.user_id time_val = action_request.time + message_id = action_request.message_id if action == "ban": await bot_instance.ban_chat_member(chat_id, user_id) @@ -681,17 +687,20 @@ async def group_manager_action_endpoint(action_request: GroupManagerAction): await bot_instance.restrict_chat_member(chat_id, user_id, permissions=enums.ChatPermissions(), until_date=until_date_val) return {"status": "User temporarily muted successfully"} elif action == "pin": - await bot_instance.pin_chat_message(chat_id, action_request.message_id) + if not message_id: + return {"error": "Message ID is required for pin action"} + await bot_instance.pin_chat_message(chat_id, message_id) return {"status": "Message pinned successfully"} elif action == "unpin": - await bot_instance.unpin_chat_message(chat_id, action_request.message_id) + if not message_id: + return {"error": "Message ID is required for unpin action"} + await bot_instance.unpin_chat_message(chat_id, message_id) return {"status": "Message unpinned successfully"} elif action == "purge": - if action_request.message_id: - await purge_messages(bot_instance, chat_id, action_request.message_id) - return {"status": "Messages purged successfully"} - else: + if not message_id: return {"error": "Message ID is required for purge action"} + await purge_messages(bot_instance, chat_id, message_id) + return {"status": "Messages purged successfully"} else: return {"error": "Invalid action"} @@ -713,1969 +722,935 @@ async def purge_messages(client, chat_id, message_id): logger.error(f"Error purging messages: {e}") return {"error": str(e)} -@app.post("/group_leave") -async def group_leave_endpoint(group_id: int): - await bot_instance.leave_chat(group_id) - return {"status": "Left the group successfully"} - -@app.post("/group_disable") -async def group_disable_endpoint(group_id: int, reason: str = "No Reason"): - await db.disable_chat(group_id, reason) - temp.BANNED_CHATS.append(group_id) - return {"status": "Group disabled successfully"} - -@app.post("/group_enable") -async def group_enable_endpoint(group_id: int): - await db.re_enable_chat(group_id) - temp.BANNED_CHATS.remove(group_id) - return {"status": "Group enabled successfully"} - -@app.post("/get_user_info") -async def get_user_info_endpoint(user_id: int): - user_info = await bot_instance.get_users(user_id) - return {"user_info": user_info} - -@app.post("/get_chat_info") -async def get_chat_info_endpoint(chat_id: int): - chat_info = await bot_instance.get_chat(chat_id) - return {"chat_info": chat_info} - -@app.post("/send_photo") -async def send_photo_endpoint(chat_id: int, photo_url: str, caption: str = None): +@app.post("/get_chat") +async def get_chat_endpoint(chat_id: int): + chat = await bot_instance.get_chat(chat_id) + return {"chat": chat.dict()} + +@app.post("/get_chat_member") +async def get_chat_member_endpoint(chat_id: int, user_id: int): + chat_member = await bot_instance.get_chat_member(chat_id, user_id) + return {"chat_member": chat_member.dict()} + +@app.post("/get_chat_members_count") +async def get_chat_members_count_endpoint(chat_id: int): + count = await bot_instance.get_chat_members_count(chat_id) + return {"members_count": count} + +@app.post("/kick_chat_member") +async def kick_chat_member_endpoint(chat_id: int, user_id: int): + await bot_instance.kick_chat_member(chat_id, user_id) + return {"status": "Chat member kicked successfully"} + +@app.post("/promote_chat_member") +async def promote_chat_member_endpoint(chat_id: int, user_id: int, can_change_info: bool = False, can_post_messages: bool = False, can_edit_messages: bool = False, can_delete_messages: bool = False, can_manage_video_chats: bool = False, can_restrict_members: bool = False, can_promote_members: bool = False, can_invite_users: bool = False, can_pin_messages: bool = False, can_manage_chat: bool = False): + await bot_instance.promote_chat_member( + chat_id, + user_id, + can_change_info=can_change_info, + can_post_messages=can_post_messages, + can_edit_messages=can_edit_messages, + can_delete_messages=can_delete_messages, + can_manage_video_chats=can_manage_video_chats, + can_restrict_members=can_restrict_members, + can_promote_members=can_promote_members, + can_invite_users=can_invite_users, + can_pin_messages=can_pin_messages, + can_manage_chat=can_manage_chat + ) + return {"status": "Chat member promoted successfully"} + +@app.post("/set_chat_permissions") +async def set_chat_permissions_endpoint(chat_id: int, permissions: dict): + chat_permissions = enums.ChatPermissions(**permissions) + await bot_instance.set_chat_permissions(chat_id, chat_permissions) + return {"status": "Chat permissions set successfully"} + +@app.post("/set_chat_photo") +async def set_chat_photo_endpoint(chat_id: int, photo_url: str): response = requests.get(photo_url) photo = BytesIO(response.content) photo.name = "photo.jpg" - message = await bot_instance.send_photo(chat_id, photo, caption=caption) + await bot_instance.set_chat_photo(chat_id, photo) + return {"status": "Chat photo set successfully"} + +@app.post("/delete_chat_photo") +async def delete_chat_photo_endpoint(chat_id: int): + await bot_instance.delete_chat_photo(chat_id) + return {"status": "Chat photo deleted successfully"} + +@app.post("/set_chat_title") +async def set_chat_title_endpoint(chat_id: int, title: str): + await bot_instance.set_chat_title(chat_id, title) + return {"status": "Chat title set successfully"} + +@app.post("/set_chat_description") +async def set_chat_description_endpoint(chat_id: int, description: str): + await bot_instance.set_chat_description(chat_id, description) + return {"status": "Chat description set successfully"} + +@app.post("/pin_chat_message") +async def pin_chat_message_endpoint(chat_id: int, message_id: int, disable_notification: bool = False): + await bot_instance.pin_chat_message(chat_id, message_id, disable_notification=disable_notification) + return {"status": "Message pinned successfully"} + +@app.post("/unpin_chat_message") +async def unpin_chat_message_endpoint(chat_id: int, message_id: int): + await bot_instance.unpin_chat_message(chat_id, message_id) + return {"status": "Message unpinned successfully"} + +@app.post("/unpin_all_chat_messages") +async def unpin_all_chat_messages_endpoint(chat_id: int): + await bot_instance.unpin_all_chat_messages(chat_id) + return {"status": "All messages unpinned successfully"} + +@app.post("/answer_callback_query") +async def answer_callback_query_endpoint(callback_query_id: str, text: str = None, show_alert: bool = False, url: str = None, cache_time: int = 0): + await bot_instance.answer_callback_query(callback_query_id, text=text, show_alert=show_alert, url=url, cache_time=cache_time) + return {"status": "Callback query answered successfully"} + +@app.post("/copy_message") +async def copy_message_endpoint(chat_id: int, from_chat_id: int, message_id: int, caption: str = None, parse_mode: str = None, caption_entities: list = None, reply_to_message_id: int = None, allow_sending_without_reply: bool = False, reply_markup: dict = None): + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.copy_message( + chat_id, + from_chat_id, + message_id, + caption=caption, + parse_mode=parse_mode, + caption_entities=caption_entities, + reply_to_message_id=reply_to_message_id, + allow_sending_without_reply=allow_sending_without_reply, + reply_markup=reply_markup_obj + ) return {"message_id": message.id} -@app.post("/send_document") -async def send_document_endpoint(chat_id: int, document_url: str, caption: str = None): - response = requests.get(document_url) - document = BytesIO(response.content) - document.name = "document.pdf" - message = await bot_instance.send_document(chat_id, document, caption=caption) - return {"message_id": message.id} +@app.post("/forward_messages") +async def forward_messages_endpoint(chat_id: int, from_chat_id: int, message_ids: list, disable_notification: bool = False): + messages = await bot_instance.forward_messages( + chat_id, + from_chat_id, + message_ids, + disable_notification=disable_notification + ) + return {"messages": [msg.id for msg in messages]} -@app.post("/send_video") -async def send_video_endpoint(chat_id: int, video_url: str, caption: str = None): - response = requests.get(video_url) - video = BytesIO(response.content) - video.name = "video.mp4" - message = await bot_instance.send_video(chat_id, video, caption=caption) - return {"message_id": message.id} +@app.post("/send_poll") +async def send_poll_endpoint(chat_id: int, question: str, options: list, is_anonymous: bool = False, type: str = "regular", allows_multiple_answers: bool = False, correct_option_id: int = None, explanation: str = None, explanation_parse_mode: None, open_period: int = None, close_date: int = None, is_closed: bool = False): + try: + message = await bot_instance.send_poll( + chat_id, + question, + options, + is_anonymous=is_anonymous, + type=type, + allows_multiple_answers=allows_multiple_answers, + correct_option_id=correct_option_id, + explanation=explanation, + explanation_parse_mode=explanation_parse_mode, + open_period=open_period, + close_date=close_date, + is_closed=is_closed + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending poll: {e}") + return {"error": str(e)} -@app.post("/send_audio") -async def send_audio_endpoint(chat_id: int, audio_url: str, caption: str = None): - response = requests.get(audio_url) - audio = BytesIO(response.content) - audio.name = "audio.mp3" - message = await bot_instance.send_audio(chat_id, audio, caption=caption) - return {"message_id": message.id} +@app.post("/send_dice") +async def send_dice_endpoint(chat_id: int, reply_to_message_id: int = None, reply_markup: dict = None): + try: + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_dice( + chat_id, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending dice: {e}") + return {"error": str(e)} -@app.post("/send_voice") -async def send_voice_endpoint(chat_id: int, voice_url: str, caption: str = None): - response = requests.get(voice_url) - voice = BytesIO(response.content) - voice.name = "voice.ogg" - message = await bot_instance.send_voice(chat_id, voice, caption=caption) - return {"message_id": message.id} +@app.post("/send_game") +async def send_game_endpoint(chat_id: int, game_short_name: str, reply_to_message_id: int = None, reply_markup: dict = None): + try: + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_game( + chat_id, + game_short_name, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending game: {e}") + return {"error": str(e)} -@app.post("/send_sticker") -async def send_sticker_endpoint(chat_id: int, sticker_url: str): - response = requests.get(sticker_url) - sticker = BytesIO(response.content) - sticker.name = "sticker.webp" - message = await bot_instance.send_sticker(chat_id, sticker) - return {"message_id": message.id} +@app.post("/send_media_group") +async def send_media_group_endpoint(chat_id: int, media: list, reply_to_message_id: int = None, reply_markup: dict = None): + try: + media_group = [] + for media_item in media: + if media_item["type"] == "photo": + response = requests.get(media_item["media"]) + photo = BytesIO(response.content) + photo.name = "photo.jpg" + media_group.append(types.InputMediaPhoto(media=photo)) + elif media_item["type"] == "video": + response = requests.get(media_item["media"]) + video = BytesIO(response.content) + video.name = "video.mp4" + media_group.append(types.InputMediaVideo(media=video)) + else: + return {"error": "Unsupported media type"} + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + messages = await bot_instance.send_media_group( + chat_id, + media_group, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"messages": [msg.id for msg in messages]} + except Exception as e: + logger.error(f"Error sending media group: {e}") + return {"error": str(e)} + +@app.post("/send_invoice") +async def send_invoice_endpoint(chat_id: int, title: str, description: str, payload: str, provider_token: str, currency: str, prices: list, start_parameter: str = None, provider_data: str = None, photo_url: str = None, photo_size: int = None, photo_width: int = None, photo_height: int = None, need_name: bool = False, need_phone_number: bool = False, need_email: bool = False, need_shipping_address: bool = False, send_phone_number_to_provider: bool = False, send_email_to_provider: bool = False, is_flexible: bool = False, disable_notification: bool = False, protect_content: bool = False, reply_to_message_id: int = None, allow_sending_without_reply: bool = False, reply_markup: dict = None): + try: + prices_obj = [types.LabeledPrice(label=price["label"], amount=price["amount"]) for price in prices] + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_invoice( + chat_id, + title, + description, + payload, + provider_token, + currency, + prices_obj, + start_parameter=start_parameter, + provider_data=provider_data, + photo_url=photo_url, + photo_size=photo_size, + photo_width=photo_width, + photo_height=photo_height, + need_name=need_name, + need_phone_number=need_phone_number, + need_email=need_email, + need_shipping_address=need_shipping_address, + send_phone_number_to_provider=send_phone_number_to_provider, + send_email_to_provider=send_email_to_provider, + is_flexible=is_flexible, + disable_notification=disable_notification, + protect_content=protect_content, + reply_to_message_id=reply_to_message_id, + allow_sending_without_reply=allow_sending_without_reply, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending invoice: {e}") + return {"error": str(e)} + +@app.post("/send_location") +async def send_location_endpoint(chat_id: int, latitude: float, longitude: float, reply_to_message_id: int = None, reply_markup: dict = None): + try: + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_location( + chat_id, + latitude, + longitude, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending location: {e}") + return {"error": str(e)} + +@app.post("/send_contact") +async def send_contact_endpoint(chat_id: int, phone_number: str, first_name: str, last_name: str = None, reply_to_message_id: int = None, reply_markup: dict = None): + try: + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_contact( + chat_id, + phone_number, + first_name, + last_name=last_name, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending contact: {e}") + return {"error": str(e)} @app.post("/send_animation") -async def send_animation_endpoint(chat_id: int, animation_url: str, caption: str = None): - response = requests.get(animation_url) - animation = BytesIO(response.content) - animation.name = "animation.gif" - message = await bot_instance.send_animation(chat_id, animation, caption=caption) - return {"message_id": message.id} +async def send_animation_endpoint(chat_id: int, animation_url: str, reply_to_message_id: int = None, reply_markup: dict = None): + try: + response = requests.get(animation_url) + animation = BytesIO(response.content) + animation.name = "animation.gif" + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_animation( + chat_id, + animation, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending animation: {e}") + return {"error": str(e)} @app.post("/send_video_note") -async def send_video_note_endpoint(chat_id: int, video_note_url: str): - response = requests.get(video_note_url) - video_note = BytesIO(response.content) - video_note.name = "video_note.mp4" - message = await bot_instance.send_video_note(chat_id, video_note) - return {"message_id": message.id} +async def send_video_note_endpoint(chat_id: int, video_note_url: str, reply_to_message_id: int = None, reply_markup: dict = None): + try: + response = requests.get(video_note_url) + video_note = BytesIO(response.content) + video_note.name = "video_note.mp4" + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_video_note( + chat_id, + video_note, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending video note: {e}") + return {"error": str(e)} -@app.post("/send_location") -async def send_location_endpoint(chat_id: int, latitude: float, longitude: float): - message = await bot_instance.send_location(chat_id, latitude, longitude) - return {"message_id": message.id} +@app.post("/send_voice") +async def send_voice_endpoint(chat_id: int, voice_url: str, reply_to_message_id: int = None, reply_markup: dict = None): + try: + response = requests.get(voice_url) + voice = BytesIO(response.content) + voice.name = "voice.ogg" + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_voice( + chat_id, + voice, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending voice: {e}") + return {"error": str(e)} -@app.post("/send_contact") -async def send_contact_endpoint(chat_id: int, phone_number: str, first_name: str, last_name: str = None): - message = await bot_instance.send_contact(chat_id, phone_number, first_name, last_name=last_name) - return {"message_id": message.id} +@app.post("/send_video") +async def send_video_endpoint(chat_id: int, video_url: str, reply_to_message_id: int = None, reply_markup: dict = None): + try: + response = requests.get(video_url) + video = BytesIO(response.content) + video.name = "video.mp4" + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_video( + chat_id, + video, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending video: {e}") + return {"error": str(e)} -@app.post("/send_dice") -async def send_dice_endpoint(chat_id: int): - message = await bot_instance.send_dice(chat_id) - return {"message_id": message.id} +@app.post("/send_document") +async def send_document_endpoint(chat_id: int, document_url: str, reply_to_message_id: int = None, reply_markup: dict = None): + try: + response = requests.get(document_url) + document = BytesIO(response.content) + document.name = "document.pdf" + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_document( + chat_id, + document, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending document: {e}") + return {"error": str(e)} -@app.post("/send_poll") -async def send_poll_endpoint(chat_id: int, question: str, options: list, is_anonymous: bool = False, type: str = "regular", allows_multiple_answers: bool = False, correct_option_id: int = None, explanation: str = None, explanation_parse_mode: str = None, open_period: int = None, close_date: int = None, is_closed: bool = False): - message = await bot_instance.send_poll( - chat_id, - question, - options, - is_anonymous=is_anonymous, - type=type, - allows_multiple_answers=allows_multiple_answers, - correct_option_id=correct_option_id, - explanation=explanation, - explanation_parse_mode=explanation_parse_mode, - open_period=open_period, - close_date=close_date, - is_closed=is_closed - ) - return {"message_id": message.id} - - - - @app.post("/create_chat_invite_link") - async def create_chat_invite_link_endpoint(chat_id: int, name: str = None, expire_date: int = None, member_limit: int = None, creates_join_request: bool = False): - invite_link = await bot_instance.create_chat_invite_link( - chat_id, - name=name, - expire_date=expire_date, - member_limit=member_limit, - creates_join_request=creates_join_request - ) - return {"invite_link": invite_link.invite_link} - - @app.post("/set_chat_photo") - async def set_chat_photo_endpoint(chat_id: int, photo_url: str): - response = requests.get(photo_url) - photo = BytesIO(response.content) - photo.name = "photo.jpg" - await bot_instance.set_chat_photo(chat_id, photo) - return {"status": "Chat photo set successfully"} - - @app.post("/delete_chat_photo") - async def delete_chat_photo_endpoint(chat_id: int): - await bot_instance.delete_chat_photo(chat_id) - return {"status": "Chat photo deleted successfully"} - - @app.post("/set_chat_title") - async def set_chat_title_endpoint(chat_id: int, title: str): - await bot_instance.set_chat_title(chat_id, title) - return {"status": "Chat title set successfully"} - - @app.post("/set_chat_description") - async def set_chat_description_endpoint(chat_id: int, description: str): - await bot_instance.set_chat_description(chat_id, description) - return {"status": "Chat description set successfully"} - - @app.post("/pin_chat_message") - async def pin_chat_message_endpoint(chat_id: int, message_id: int, disable_notification: bool = False): - await bot_instance.pin_chat_message(chat_id, message_id, disable_notification=disable_notification) - return {"status": "Message pinned successfully"} - - @app.post("/unpin_chat_message") - async def unpin_chat_message_endpoint(chat_id: int, message_id: int): - await bot_instance.unpin_chat_message(chat_id, message_id) - return {"status": "Message unpinned successfully"} - - @app.post("/unpin_all_chat_messages") - async def unpin_all_chat_messages_endpoint(chat_id: int): - await bot_instance.unpin_all_chat_messages(chat_id) - return {"status": "All messages unpinned successfully"} - - @app.post("/restrict_chat_member") - async def restrict_chat_member_endpoint(chat_id: int, user_id: int, permissions: dict, until_date: int = None): - chat_permissions = enums.ChatPermissions(**permissions) - await bot_instance.restrict_chat_member(chat_id, user_id, chat_permissions, until_date=until_date) - return {"status": "Chat member restricted successfully"} - - @app.post("/promote_chat_member") - async def promote_chat_member_endpoint(chat_id: int, user_id: int, can_change_info: bool = False, can_post_messages: bool = False, can_edit_messages: bool = False, can_delete_messages: bool = False, can_manage_video_chats: bool = False, can_restrict_members: bool = False, can_promote_members: bool = False, can_invite_users: bool = False, can_pin_messages: bool = False, can_manage_chat: bool = False): - await bot_instance.promote_chat_member( - chat_id, - user_id, - can_change_info=can_change_info, - can_post_messages=can_post_messages, - can_edit_messages=can_edit_messages, - can_delete_messages=can_delete_messages, - can_manage_video_chats=can_manage_video_chats, - can_restrict_members=can_restrict_members, - can_promote_members=can_promote_members, - can_invite_users=can_invite_users, - can_pin_messages=can_pin_messages, - can_manage_chat=can_manage_chat - ) - return {"status": "Chat member promoted successfully"} - - @app.post("/kick_chat_member") - async def kick_chat_member_endpoint(chat_id: int, user_id: int): - await bot_instance.kick_chat_member(chat_id, user_id) - return {"status": "Chat member kicked successfully"} - - @app.post("/unban_chat_member") - async def unban_chat_member_endpoint(chat_id: int, user_id: int): - await bot_instance.unban_chat_member(chat_id, user_id) - return {"status": "Chat member unbanned successfully"} - - @app.post("/set_chat_administrator_custom_title") - async def set_chat_administrator_custom_title_endpoint(chat_id: int, user_id: int, custom_title: str): - await bot_instance.set_chat_administrator_custom_title(chat_id, user_id, custom_title) - return {"status": "Custom title set successfully"} - - @app.post("/set_chat_permissions") - async def set_chat_permissions_endpoint(chat_id: int, permissions: dict): - chat_permissions = enums.ChatPermissions(**permissions) - await bot_instance.set_chat_permissions(chat_id, chat_permissions) - return {"status": "Chat permissions set successfully"} - - @app.post("/set_chat_sticker_set") - async def set_chat_sticker_set_endpoint(chat_id: int, sticker_set_name: str): - await bot_instance.set_chat_sticker_set(chat_id, sticker_set_name) - return {"status": "Chat sticker set set successfully"} - - @app.post("/delete_chat_sticker_set") - async def delete_chat_sticker_set_endpoint(chat_id: int): - await bot_instance.delete_chat_sticker_set(chat_id) - return {"status": "Chat sticker set deleted successfully"} - - @app.post("/approve_chat_join_request") - async def approve_chat_join_request_endpoint(chat_id: int, user_id: int): - await bot_instance.approve_chat_join_request(chat_id, user_id) - return {"status": "Join request approved successfully"} - - @app.post("/decline_chat_join_request") - async def decline_chat_join_request_endpoint(chat_id: int, user_id: int): - await bot_instance.decline_chat_join_request(chat_id, user_id) - return {"status": "Join request declined successfully"} - - @app.post("/ban_chat_sender_chat") - async def ban_chat_sender_chat_endpoint(chat_id: int, sender_chat_id: int): - await bot_instance.ban_chat_sender_chat(chat_id, sender_chat_id) - return {"status": "Sender chat banned successfully"} - - @app.post("/unban_chat_sender_chat") - async def unban_chat_sender_chat_endpoint(chat_id: int, sender_chat_id: int): - await bot_instance.unban_chat_sender_chat(chat_id, sender_chat_id) - return {"status": "Sender chat unbanned successfully"} - - @app.post("/set_chat_menu_button") - async def set_chat_menu_button_endpoint(chat_id: int, menu_button: dict = None): - if menu_button: - menu_button_obj = types.MenuButton(**menu_button) - else: - menu_button_obj = None - await bot_instance.set_chat_menu_button(chat_id, menu_button_obj) - return {"status": "Chat menu button set successfully"} - - @app.post("/get_chat_menu_button") - async def get_chat_menu_button_endpoint(chat_id: int): - menu_button = await bot_instance.get_chat_menu_button(chat_id) - return {"menu_button": menu_button} - - @app.post("/set_my_commands") - async def set_my_commands_endpoint(commands: list): - bot_commands = [types.BotCommand(command=cmd["command"], description=cmd["description"]) for cmd in commands] - await bot_instance.set_my_commands(bot_commands) - return {"status": "Commands set successfully"} - - @app.post("/get_my_commands") - async def get_my_commands_endpoint(): - commands = await bot_instance.get_my_commands() - return {"commands": [cmd.dict() for cmd in commands]} - - @app.post("/answer_callback_query") - async def answer_callback_query_endpoint(callback_query_id: str, text: str = None, show_alert: bool = False, url: str = None, cache_time: int = 0): - await bot_instance.answer_callback_query(callback_query_id, text=text, show_alert=show_alert, url=url, cache_time=cache_time) - return {"status": "Callback query answered successfully"} - - @app.post("/send_copy") - async def send_copy_endpoint(chat_id: int, from_chat_id: int, message_id: int, caption: str = None, parse_mode: str = None, caption_entities: list = None, reply_to_message_id: int = None, allow_sending_without_reply: bool = False, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.copy_message( - chat_id, - from_chat_id, - message_id, - caption=caption, - parse_mode=parse_mode, - caption_entities=caption_entities, - reply_to_message_id=reply_to_message_id, - allow_sending_without_reply=allow_sending_without_reply, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/forward_messages") - async def forward_messages_endpoint(chat_id: int, from_chat_id: int, message_ids: list, disable_notification: bool = False): - messages = await bot_instance.forward_messages( - chat_id, - from_chat_id, - message_ids, - disable_notification=disable_notification - ) - return {"messages": [msg.id for msg in messages]} - - @app.post("/send_sticker") - async def send_sticker_endpoint(chat_id: int, sticker_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(sticker_url) - sticker = BytesIO(response.content) - sticker.name = "sticker.webp" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_sticker( - chat_id, - sticker, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_animation") - async def send_animation_endpoint(chat_id: int, animation_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(animation_url) - animation = BytesIO(response.content) - animation.name = "animation.gif" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_animation( - chat_id, - animation, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_video_note") - async def send_video_note_endpoint(chat_id: int, video_note_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(video_note_url) - video_note = BytesIO(response.content) - video_note.name = "video_note.mp4" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_video_note( - chat_id, - video_note, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_voice") - async def send_voice_endpoint(chat_id: int, voice_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(voice_url) - voice = BytesIO(response.content) - voice.name = "voice.ogg" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_voice( - chat_id, - voice, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_video") - async def send_video_endpoint(chat_id: int, video_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(video_url) - video = BytesIO(response.content) - video.name = "video.mp4" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_video( - chat_id, - video, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_document") - async def send_document_endpoint(chat_id: int, document_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(document_url) - document = BytesIO(response.content) - document.name = "document.pdf" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_document( - chat_id, - document, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_photo") - async def send_photo_endpoint(chat_id: int, photo_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(photo_url) - photo = BytesIO(response.content) - photo.name = "photo.jpg" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_photo( - chat_id, - photo, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_audio") - async def send_audio_endpoint(chat_id: int, audio_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(audio_url) - audio = BytesIO(response.content) - audio.name = "audio.mp3" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_audio( - chat_id, - audio, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_location") - async def send_location_endpoint(chat_id: int, latitude: float, longitude: float, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_location( - chat_id, - latitude, - longitude, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_contact") - async def send_contact_endpoint(chat_id: int, phone_number: str, first_name: str, last_name: str = None, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_contact( - chat_id, - phone_number, - first_name, - last_name=last_name, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_dice") - async def send_dice_endpoint(chat_id: int, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_dice( - chat_id, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_game") - async def send_game_endpoint(chat_id: int, game_short_name: str, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_game( - chat_id, - game_short_name, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_media_group") - async def send_media_group_endpoint(chat_id: int, media: list, reply_to_message_id: int = None, reply_markup: dict = None): - media_group = [types.InputMediaPhoto(media=media_item["media"]) if media_item["type"] == "photo" else types.InputMediaVideo(media=media_item["media"]) for media_item in media] - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - messages = await bot_instance.send_media_group( - chat_id, - media_group, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"messages": [msg.id for msg in messages]} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_invoice") - async def send_invoice_endpoint(chat_id: int, title: str, description: str, payload: str, provider_token: str, currency: str, prices: list, start_parameter: str = None, provider_data: str = None, photo_url: str = None, photo_size: int = None, photo_width: int = None, photo_height: int = None, need_name: bool = False, need_phone_number: bool = False, need_email: bool = False, need_shipping_address: bool = False, send_phone_number_to_provider: bool = False, send_email_to_provider: bool = False, is_flexible: bool = False, disable_notification: bool = False, protect_content: bool = False, reply_to_message_id: int = None, allow_sending_without_reply: bool = False, reply_markup: dict = None): - prices_obj = [types.LabeledPrice(label=price["label"], amount=price["amount"]) for price in prices] - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_invoice( - chat_id, - title, - description, - payload, - provider_token, - currency, - prices_obj, - start_parameter=start_parameter, - provider_data=provider_data, - photo_url=photo_url, - photo_size=photo_size, - photo_width=photo_width, - photo_height=photo_height, - need_name=need_name, - need_phone_number=need_phone_number, - need_email=need_email, - need_shipping_address=need_shipping_address, - send_phone_number_to_provider=send_phone_number_to_provider, - send_email_to_provider=send_email_to_provider, - is_flexible=is_flexible, - disable_notification=disable_notification, - protect_content=protect_content, - reply_to_message_id=reply_to_message_id, - allow_sending_without_reply=allow_sending_without_reply, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_game") - async def send_game_endpoint(chat_id: int, game_short_name: str, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_game( - chat_id, - game_short_name, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_poll") - async def send_poll_endpoint(chat_id: int, question: str, options: list, is_anonymous: bool = False, type: str = "regular", allows_multiple_answers: bool = False, correct_option_id: int = None, explanation: str = None, explanation_parse_mode: str = None, open_period: int = None, close_date: int = None, is_closed: bool = False): - message = await bot_instance.send_poll( - chat_id, - question, - options, - is_anonymous=is_anonymous, - type=type, - allows_multiple_answers=allows_multiple_answers, - correct_option_id=correct_option_id, - explanation=explanation, - explanation_parse_mode=explanation_parse_mode, - open_period=open_period, - close_date=close_date, - is_closed=is_closed - ) - return {"message_id": message.id} - - @app.post("/send_dice") - async def send_dice_endpoint(chat_id: int, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_dice( - chat_id, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_poll") - async def send_poll_endpoint(chat_id: int, question: str, options: list, is_anonymous: bool = False, type: str = "regular", allows_multiple_answers: bool = False, correct_option_id: int = None, explanation: str = None, explanation_parse_mode: str = None, open_period: int = None, close_date: int = None, is_closed: bool = False): - message = await bot_instance.send_poll( - chat_id, - question, - options, - is_anonymous=is_anonymous, - type=type, - allows_multiple_answers=allows_multiple_answers, - correct_option_id=correct_option_id, - explanation=explanation, - explanation_parse_mode=explanation_parse_mode, - open_period=open_period, - close_date=close_date, - is_closed=is_closed - ) - return {"message_id": message.id} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} +@app.post("/send_photo") +async def send_photo_endpoint(chat_id: int, photo_url: str, reply_to_message_id: int = None, reply_markup: dict = None): + try: + response = requests.get(photo_url) + photo = BytesIO(response.content) + photo.name = "photo.jpg" + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_photo( + chat_id, + photo, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending photo: {e}") + return {"error": str(e)} + +@app.post("/send_audio") +async def send_audio_endpoint(chat_id: int, audio_url: str, reply_to_message_id: int = None, reply_markup: dict = None): + try: + response = requests.get(audio_url) + audio = BytesIO(response.content) + audio.name = "audio.mp3" + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_audio( + chat_id, + audio, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending audio: {e}") + return {"error": str(e)} +@app.post("/send_sticker") +async def send_sticker_endpoint(chat_id: int, sticker_url: str, reply_to_message_id: int = None, reply_markup: dict = None): + try: + response = requests.get(sticker_url) + sticker = BytesIO(response.content) + sticker.name = "sticker.webp" + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.send_sticker( + chat_id, + sticker, + reply_to_message_id=reply_to_message_id, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending sticker: {e}") + return {"error": str(e)} + +@app.post("/send_chat_action") async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/get_chat") - async def get_chat_endpoint(chat_id: int): - chat = await bot_instance.get_chat(chat_id) - return {"chat": chat.dict()} - - @app.post("/get_chat_member") - async def get_chat_member_endpoint(chat_id: int, user_id: int): - chat_member = await bot_instance.get_chat_member(chat_id, user_id) - return {"chat_member": chat_member.dict()} - - @app.post("/get_chat_members_count") - async def get_chat_members_count_endpoint(chat_id: int): - count = await bot_instance.get_chat_members_count(chat_id) - return {"members_count": count} - - @app.post("/kick_chat_member") - async def kick_chat_member_endpoint(chat_id: int, user_id: int): - await bot_instance.kick_chat_member(chat_id, user_id) - return {"status": "Chat member kicked successfully"} - - @app.post("/unban_chat_member") - async def unban_chat_member_endpoint(chat_id: int, user_id: int): - await bot_instance.unban_chat_member(chat_id, user_id) - return {"status": "Chat member unbanned successfully"} - - @app.post("/promote_chat_member") - async def promote_chat_member_endpoint(chat_id: int, user_id: int, can_change_info: bool = False, can_post_messages: bool = False, can_edit_messages: bool = False, can_delete_messages: bool = False, can_manage_video_chats: bool = False, can_restrict_members: bool = False, can_promote_members: bool = False, can_invite_users: bool = False, can_pin_messages: bool = False, can_manage_chat: bool = False): - await bot_instance.promote_chat_member( - chat_id, - user_id, - can_change_info=can_change_info, - can_post_messages=can_post_messages, - can_edit_messages=can_edit_messages, - can_delete_messages=can_delete_messages, - can_manage_video_chats=can_manage_video_chats, - can_restrict_members=can_restrict_members, - can_promote_members=can_promote_members, - can_invite_users=can_invite_users, - can_pin_messages=can_pin_messages, - can_manage_chat=can_manage_chat - ) - return {"status": "Chat member promoted successfully"} - - @app.post("/set_chat_permissions") - async def set_chat_permissions_endpoint(chat_id: int, permissions: dict): - chat_permissions = enums.ChatPermissions(**permissions) - await bot_instance.set_chat_permissions(chat_id, chat_permissions) - return {"status": "Chat permissions set successfully"} - - @app.post("/set_chat_photo") - async def set_chat_photo_endpoint(chat_id: int, photo_url: str): - response = requests.get(photo_url) - photo = BytesIO(response.content) - photo.name = "photo.jpg" - await bot_instance.set_chat_photo(chat_id, photo) - return {"status": "Chat photo set successfully"} - - @app.post("/delete_chat_photo") - async def delete_chat_photo_endpoint(chat_id: int): - await bot_instance.delete_chat_photo(chat_id) - return {"status": "Chat photo deleted successfully"} - - @app.post("/set_chat_title") - async def set_chat_title_endpoint(chat_id: int, title: str): - await bot_instance.set_chat_title(chat_id, title) - return {"status": "Chat title set successfully"} - - @app.post("/set_chat_description") - async def set_chat_description_endpoint(chat_id: int, description: str): - await bot_instance.set_chat_description(chat_id, description) - return {"status": "Chat description set successfully"} - - @app.post("/pin_chat_message") - async def pin_chat_message_endpoint(chat_id: int, message_id: int, disable_notification: bool = False): - await bot_instance.pin_chat_message(chat_id, message_id, disable_notification=disable_notification) - return {"status": "Message pinned successfully"} - - @app.post("/unpin_chat_message") - async def unpin_chat_message_endpoint(chat_id: int, message_id: int): - await bot_instance.unpin_chat_message(chat_id, message_id) - return {"status": "Message unpinned successfully"} - - @app.post("/unpin_all_chat_messages") - async def unpin_all_chat_messages_endpoint(chat_id: int): - await bot_instance.unpin_all_chat_messages(chat_id) - return {"status": "All messages unpinned successfully"} - - @app.post("/answer_callback_query") - async def answer_callback_query_endpoint(callback_query_id: str, text: str = None, show_alert: bool = False, url: str = None, cache_time: int = 0): - await bot_instance.answer_callback_query(callback_query_id, text=text, show_alert=show_alert, url=url, cache_time=cache_time) - return {"status": "Callback query answered successfully"} - - @app.post("/copy_message") - async def copy_message_endpoint(chat_id: int, from_chat_id: int, message_id: int, caption: str = None, parse_mode: str = None, caption_entities: list = None, reply_to_message_id: int = None, allow_sending_without_reply: bool = False, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.copy_message( - chat_id, - from_chat_id, - message_id, - caption=caption, - parse_mode=parse_mode, - caption_entities=caption_entities, - reply_to_message_id=reply_to_message_id, - allow_sending_without_reply=allow_sending_without_reply, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/forward_messages") - async def forward_messages_endpoint(chat_id: int, from_chat_id: int, message_ids: list, disable_notification: bool = False): - messages = await bot_instance.forward_messages( - chat_id, - from_chat_id, - message_ids, - disable_notification=disable_notification - ) - return {"messages": [msg.id for msg in messages]} - - @app.post("/send_poll") - async def send_poll_endpoint(chat_id: int, question: str, options: list, is_anonymous: bool = False, type: str = "regular", allows_multiple_answers: bool = False, correct_option_id: int = None, explanation: str = None, explanation_parse_mode: str = None, open_period: int = None, close_date: int = None, is_closed: bool = False): - message = await bot_instance.send_poll( - chat_id, - question, - options, - is_anonymous=is_anonymous, - type=type, - allows_multiple_answers=allows_multiple_answers, - correct_option_id=correct_option_id, - explanation=explanation, - explanation_parse_mode=explanation_parse_mode, - open_period=open_period, - close_date=close_date, - is_closed=is_closed - ) - return {"message_id": message.id} - - @app.post("/send_dice") - async def send_dice_endpoint(chat_id: int, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_dice( - chat_id, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_game") - async def send_game_endpoint(chat_id: int, game_short_name: str, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_game( - chat_id, - game_short_name, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_media_group") - async def send_media_group_endpoint(chat_id: int, media: list, reply_to_message_id: int = None, reply_markup: dict = None): - media_group = [types.InputMediaPhoto(media=media_item["media"]) if media_item["type"] == "photo" else types.InputMediaVideo(media=media_item["media"]) for media_item in media] - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - messages = await bot_instance.send_media_group( - chat_id, - media_group, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"messages": [msg.id for msg in messages]} - - @app.post("/send_invoice") - async def send_invoice_endpoint(chat_id: int, title: str, description: str, payload: str, provider_token: str, currency: str, prices: list, start_parameter: str = None, provider_data: str = None, photo_url: str = None, photo_size: int = None, photo_width: int = None, photo_height: int = None, need_name: bool = False, need_phone_number: bool = False, need_email: bool = False, need_shipping_address: bool = False, send_phone_number_to_provider: bool = False, send_email_to_provider: bool = False, is_flexible: bool = False, disable_notification: bool = False, protect_content: bool = False, reply_to_message_id: int = None, allow_sending_without_reply: bool = False, reply_markup: dict = None): - prices_obj = [types.LabeledPrice(label=price["label"], amount=price["amount"]) for price in prices] - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_invoice( - chat_id, - title, - description, - payload, - provider_token, - currency, - prices_obj, - start_parameter=start_parameter, - provider_data=provider_data, - photo_url=photo_url, - photo_size=photo_size, - photo_width=photo_width, - photo_height=photo_height, - need_name=need_name, - need_phone_number=need_phone_number, - need_email=need_email, - need_shipping_address=need_shipping_address, - send_phone_number_to_provider=send_phone_number_to_provider, - send_email_to_provider=send_email_to_provider, - is_flexible=is_flexible, - disable_notification=disable_notification, - protect_content=protect_content, - reply_to_message_id=reply_to_message_id, - allow_sending_without_reply=allow_sending_without_reply, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_location") - async def send_location_endpoint(chat_id: int, latitude: float, longitude: float, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_location( - chat_id, - latitude, - longitude, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_contact") - async def send_contact_endpoint(chat_id: int, phone_number: str, first_name: str, last_name: str = None, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_contact( - chat_id, - phone_number, - first_name, - last_name=last_name, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_animation") - async def send_animation_endpoint(chat_id: int, animation_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(animation_url) - animation = BytesIO(response.content) - animation.name = "animation.gif" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_animation( - chat_id, - animation, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_video_note") - async def send_video_note_endpoint(chat_id: int, video_note_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(video_note_url) - video_note = BytesIO(response.content) - video_note.name = "video_note.mp4" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_video_note( - chat_id, - video_note, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_voice") - async def send_voice_endpoint(chat_id: int, voice_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(voice_url) - voice = BytesIO(response.content) - voice.name = "voice.ogg" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_voice( - chat_id, - voice, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_video") - async def send_video_endpoint(chat_id: int, video_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(video_url) - video = BytesIO(response.content) - video.name = "video.mp4" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_video( - chat_id, - video, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_document") - async def send_document_endpoint(chat_id: int, document_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(document_url) - document = BytesIO(response.content) - document.name = "document.pdf" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_document( - chat_id, - document, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_photo") - async def send_photo_endpoint(chat_id: int, photo_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(photo_url) - photo = BytesIO(response.content) - photo.name = "photo.jpg" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_photo( - chat_id, - photo, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_audio") - async def send_audio_endpoint(chat_id: int, audio_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(audio_url) - audio = BytesIO(response.content) - audio.name = "audio.mp3" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_audio( - chat_id, - audio, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_sticker") - async def send_sticker_endpoint(chat_id: int, sticker_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(sticker_url) - sticker = BytesIO(response.content) - sticker.name = "sticker.webp" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_sticker( - chat_id, - sticker, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_video_note") - async def send_video_note_endpoint(chat_id: int, video_note_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(video_note_url) - video_note = BytesIO(response.content) - video_note.name = "video_note.mp4" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_video_note( - chat_id, - video_note, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_voice") - async def send_voice_endpoint(chat_id: int, voice_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(voice_url) - voice = BytesIO(response.content) - voice.name = "voice.ogg" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_voice( - chat_id, - voice, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_video") - async def send_video_endpoint(chat_id: int, video_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(video_url) - video = BytesIO(response.content) - video.name = "video.mp4" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_video( - chat_id, - video, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_document") - async def send_document_endpoint(chat_id: int, document_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(document_url) - document = BytesIO(response.content) - document.name = "document.pdf" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_document( - chat_id, - document, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_photo") - async def send_photo_endpoint(chat_id: int, photo_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(photo_url) - photo = BytesIO(response.content) - photo.name = "photo.jpg" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_photo( - chat_id, - photo, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_audio") - async def send_audio_endpoint(chat_id: int, audio_url: str, reply_to_message_id: int = None, reply_markup: dict = None): - response = requests.get(audio_url) - audio = BytesIO(response.content) - audio.name = "audio.mp3" - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_audio( - chat_id, - audio, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_location") - async def send_location_endpoint(chat_id: int, latitude: float, longitude: float, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_location( - chat_id, - latitude, - longitude, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_contact") - async def send_contact_endpoint(chat_id: int, phone_number: str, first_name: str, last_name: str = None, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_contact( - chat_id, - phone_number, - first_name, - last_name=last_name, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_dice") - async def send_dice_endpoint(chat_id: int, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_dice( - chat_id, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_game") - async def send_game_endpoint(chat_id: int, game_short_name: str, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_game( - chat_id, - game_short_name, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_media_group") - async def send_media_group_endpoint(chat_id: int, media: list, reply_to_message_id: int = None, reply_markup: dict = None): - media_group = [types.InputMediaPhoto(media=media_item["media"]) if media_item["type"] == "photo" else types.InputMediaVideo(media=media_item["media"]) for media_item in media] - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - messages = await bot_instance.send_media_group( - chat_id, - media_group, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"messages": [msg.id for msg in messages]} - - @app.post("/send_invoice") - async def send_invoice_endpoint(chat_id: int, title: str, description: str, payload: str, provider_token: str, currency: str, prices: list, start_parameter: str = None, provider_data: str = None, photo_url: str = None, photo_size: int = None, photo_width: int = None, photo_height: int = None, need_name: bool = False, need_phone_number: bool = False, need_email: bool = False, need_shipping_address: bool = False, send_phone_number_to_provider: bool = False, send_email_to_provider: bool = False, is_flexible: bool = False, disable_notification: bool = False, protect_content: bool = False, reply_to_message_id: int = None, allow_sending_without_reply: bool = False, reply_markup: dict = None): - prices_obj = [types.LabeledPrice(label=price["label"], amount=price["amount"]) for price in prices] - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_invoice( - chat_id, - title, - description, - payload, - provider_token, - currency, - prices_obj, - start_parameter=start_parameter, - provider_data=provider_data, - photo_url=photo_url, - photo_size=photo_size, - photo_width=photo_width, - photo_height=photo_height, - need_name=need_name, - need_phone_number=need_phone_number, - need_email=need_email, - need_shipping_address=need_shipping_address, - send_phone_number_to_provider=send_phone_number_to_provider, - send_email_to_provider=send_email_to_provider, - is_flexible=is_flexible, - disable_notification=disable_notification, - protect_content=protect_content, - reply_to_message_id=reply_to_message_id, - allow_sending_without_reply=allow_sending_without_reply, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_poll") - async def send_poll_endpoint(chat_id: int, question: str, options: list, is_anonymous: bool = False, type: str = "regular", allows_multiple_answers: bool = False, correct_option_id: int = None, explanation: str = None, explanation_parse_mode: str = None, open_period: int = None, close_date: int = None, is_closed: bool = False): - message = await bot_instance.send_poll( - chat_id, - question, - options, - is_anonymous=is_anonymous, - type=type, - allows_multiple_answers=allows_multiple_answers, - correct_option_id=correct_option_id, - explanation=explanation, - explanation_parse_mode=explanation_parse_mode, - open_period=open_period, - close_date=close_date, - is_closed=is_closed - ) - return {"message_id": message.id} - - @app.post("/send_dice") - async def send_dice_endpoint(chat_id: int, reply_to_message_id: int = None, reply_markup: dict = None): - reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None - message = await bot_instance.send_dice( - chat_id, - reply_to_message_id=reply_to_message_id, - reply_markup=reply_markup_obj - ) - return {"message_id": message.id} - - @app.post("/send_chat_action") - async def send_chat_action_endpoint(chat_id: int, action: str): - actions = { - "typing": enums.ChatAction.TYPING, - "upload_photo": enums.ChatAction.UPLOAD_PHOTO, - "record_video": enums.ChatAction.RECORD_VIDEO, - "upload_video": enums.ChatAction.UPLOAD_VIDEO, - "record_voice": enums.ChatAction.RECORD_VOICE, - "upload_voice": enums.ChatAction.UPLOAD_VOICE, - "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, - "find_location": enums.ChatAction.FIND_LOCATION, - "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, - "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, - "playing": enums.ChatAction.PLAYING - } - action_enum = actions.get(action) - if not action_enum: - return {"error": "Invalid action"} - await bot_instance.send_chat_action(chat_id, action_enum) - return {"status": "Chat action sent successfully"} - - @app.post("/text_to_speech") - async def text_to_speech_endpoint(tts_request: TextToSpeechRequest): - audio = await convert(tts_request.text) - return {"audio_url": audio.name} - - async def convert(text): - audio = BytesIO() - i = Translator().translate(text, dest="en") - lang = i.src - tts = gTTS(text, lang=lang) - audio.name = lang + ".mp3" - tts.write_to_fp(audio) - return audio - - @app.post("/download_youtube_song") - async def download_youtube_song_endpoint(yt_request: YouTubeDownloadRequest): - try: - results = YoutubeSearch(yt_request.url, max_results=1).to_dict() - link = f"https://youtube.com{results[0]['url_suffix']}" - title = results[0]["title"][:40] - thumbnail = results[0]["thumbnails"][0] - thumb_name = f'thumb{title}.jpg' - thumb = requests.get(thumbnail, allow_redirects=True) - open(thumb_name, 'wb').write(thumb.content) - performer = f"[Mα΄‹Ι΄ Bᴏᴛᴒℒ]" - duration = results[0]["duration"] - url_suffix = results[0]["url_suffix"] - views = results[0]["views"] - except Exception as e: - logger.error(f"Error downloading YouTube song: {e}") - return {"error": str(e)} - - ydl_opts = {"format": "bestaudio[ext=m4a]"} - try: - with YoutubeDL(ydl_opts) as ydl: - info_dict = ydl.extract_info(link, download=False) - audio_file = ydl.prepare_filename(info_dict) - ydl.process_info(info_dict) - except Exception as e: - logger.error(f"Error processing YouTube song: {e}") - return {"error": str(e)} - - cap = "**BYβ€Ίβ€Ί [Mα΄‹Ι΄ Bᴏᴛᴒℒ](https://t.me/mkn_bots_updates)**" - secmul, dur, dur_arr = 1, 0, duration.split(':') - for i in range(len(dur_arr) - 1, -1, -1): - dur += (int(dur_arr[i]) * secmul) - secmul *= 60 - - return {"audio_file": audio_file, "caption": cap, "title": title, "duration": dur, "performer": performer, "thumb": thumb_name} - - @app.post("/download_youtube_video") - async def download_youtube_video_endpoint(yt_request: YouTubeDownloadRequest): - url = yt_request.url - try: - search = SearchVideos(f"{url}", offset=1, mode="dict", max_results=1) - mi = search.result() - mio = mi["search_result"] - mo = mio[0]["link"] - thum = mio[0]["title"] - fridayz = mio[0]["id"] - mio[0]["channel"] - kekme = f"https://img.youtube.com/vi/{fridayz}/hqdefault.jpg" - await asyncio.sleep(0.6) - url = mo - sedlyf = wget.download(kekme) - opts = { - "format": "best", - "addmetadata": True, - "key": "FFmpegMetadata", - "prefer_ffmpeg": True, - "geo_bypass": True, - "nocheckcertificate": True, - "postprocessors": [{"key": "FFmpegVideoConvertor", "preferedformat": "mp4"}], - "outtmpl": "%(id)s.mp4", - "logtostderr": False, - "quiet": True, - } - with YoutubeDL(opts) as ytdl: - ytdl_data = ytdl.extract_info(url, download=True) - except Exception as e: - logger.error(f"Error downloading YouTube video: {e}") - return {"error": str(e)} - - file_stark = f"{ytdl_data['id']}.mp4" - capy = f"""**πšƒπ™Έπšƒπ™»π™΄ :** [{thum}]({mo})\n**πšπ™΄πš€πš„π™΄πš‚πšƒπ™΄π™³ π™±πšˆ :** {await bot_instance.get_me().mention}""" - - return {"video_file": file_stark, "caption": capy, "title": ytdl_data["title"], "duration": int(ytdl_data["duration"]), "thumb": sedlyf} - - @app.post("/carbon") - async def carbon_endpoint(carbon_request: CarbonRequest): - carbon_image = await make_carbon(carbon_request.text, True) - return {"carbon_url": carbon_image} - - async def make_carbon(text, tele=False): - url = "https://carbonara.solopov.dev/api/cook" - async with aiohttp.ClientSession() as session: - async with session.post(url, json={"code": text}) as resp: - image = BytesIO(await resp.read()) - image.name = "carbon.png" - if tele: - uf = upload_file(image) - image.close() - return f"https://graph.org{uf[0]}" - return image - - @app.post("/font") - async def font_endpoint(font_request: FontRequest): - fonts = { - "typewriter": Fonts.typewriter, - "outline": Fonts.outline, - "serif": Fonts.serief, - "bold_cool": Fonts.bold_cool, - "cool": Fonts.cool, - "small_cap": Fonts.smallcap, - "script": Fonts.script, - "bold_script": Fonts.bold_script, - "tiny": Fonts.tiny, - "comic": Fonts.comic, - "san": Fonts.san, - "slant_san": Fonts.slant_san, - "slant": Fonts.slant, - "sim": Fonts.sim, - "circles": Fonts.circles, - "dark_circle": Fonts.dark_circle, - "gothic": Fonts.gothic, - "bold_gothic": Fonts.bold_gothic, - "cloud": Fonts.cloud, - "happy": Fonts.happy, - "sad": Fonts.sad, - "special": Fonts.special, - "square": Fonts.square, - "dark_square": Fonts.dark_square, - "andalucia": Fonts.andalucia, - "manga": Fonts.manga, - "stinky": Fonts.stinky, - "bubbles": Fonts.bubbles, - "underline": Fonts.underline, - "ladybug": Fonts.ladybug, - "rays": Fonts.rays, - "birds": Fonts.birds, - "slash": Fonts.slash, - "stop": Fonts.stop, - "skyline": Fonts.skyline, - "arrows": Fonts.arrows, - "rvnes": Fonts.rvnes, - "strike": Fonts.strike, - "frozen": Fonts.frozen - } - cls = fonts.get(font_request.style) - if not cls: - return {"error": "Invalid style"} - new_text = cls(font_request.text) - return {"new_text": new_text} - - @app.post("/paste") - async def paste_endpoint(paste_request: PasteRequest): - paste_response = await p_paste(paste_request.text) - return paste_response - - async def p_paste(code, extension=None): - siteurl = "https://pasty.lus.pm/api/v1/pastes" - data = {"content": code} - try: - response = requests.post(url=siteurl, data=json.dumps(data), headers={"User-Agent": "Mozilla/5.0", "content-type": "application/json"}) - except Exception as e: - return {"error": str(e)} - if response.ok: - response = response.json() - purl = ( - f"https://pasty.lus.pm/{response['id']}.{extension}" - if extension - else f"https://pasty.lus.pm/{response['id']}.txt" - ) - return { - "url": purl, - "raw": f"https://pasty.lus.pm/{response['id']}/raw", - "bin": "Pasty", - } - return {"error": "Unable to reach pasty.lus.pm"} - - @app.post("/share_text") - async def share_text_endpoint(share_request: ShareTextRequest): - share_url = f"https://t.me/share/url?url={quote(share_request.text)}" - return {"share_url": share_url} - - @app.post("/telegraph_upload") - async def telegraph_upload_endpoint(upload_request: TelegraphUploadRequest): - try: - response = upload_file(upload_request.file_path) - except Exception as e: - logger.error(f"Error uploading to Telegraph: {e}") - return {"error": str(e)} - telegraph_url = f"https://graph.org{response[0]}" - return {"telegraph_url": telegraph_url} - - @app.post("/group_manager_action") - async def group_manager_action_endpoint(action_request: GroupManagerAction): - chat_id = action_request.group_id - action = action_request.action - user_id = action_request.user_id - time_val = action_request.time - - if action == "ban": - await bot_instance.ban_chat_member(chat_id, user_id) - return {"status": "User banned successfully"} - elif action == "unban": - await bot_instance.unban_chat_member(chat_id, user_id) - return {"status": "User unbanned successfully"} - elif action == "tban": - until_date_val = extract_time(time_val) - if until_date_val is None: - return {"error": "Invalid time type specified. Expected m, h, or d, Got it: {time_val[-1]}"} - await bot_instance.ban_chat_member(chat_id, user_id, until_date=until_date_val) - return {"status": "User temporarily banned successfully"} - elif action == "mute": - - - - - - - - - - - - - -await bot_instance.restrict_chat_member(chat_id, user_id, permissions=enums.ChatPermissions()) - return {"status": "User muted successfully"} - elif action == "unmute": - await bot_instance.restrict_chat_member(chat_id, user_id, permissions=enums.ChatPermissions(can_send_messages=True)) - return {"status": "User unmuted successfully"} - elif action == "tmute": - until_date_val = extract_time(time_val) - if until_date_val is None: - return {"error": "Invalid time type specified. Expected m, h, or d, Got it: {time_val[-1]}"} - await bot_instance.restrict_chat_member(chat_id, user_id, permissions=enums.ChatPermissions(), until_date=until_date_val) - return {"status": "User temporarily muted successfully"} - elif action == "pin": - await bot_instance.pin_chat_message(chat_id, action_request.message_id) - return {"status": "Message pinned successfully"} - elif action == "unpin": - await bot_instance.unpin_chat_message(chat_id, action_request.message_id) - return {"status": "Message unpinned successfully"} - elif action == "purge": - if action_request.message_id: - await purge_messages(bot_instance, chat_id, action_request.message_id) - return {"status": "Messages purged successfully"} - else: - return {"error": "Message ID is required for purge action"} - else: - return {"error": "Invalid action"} - - async def purge_messages(client, chat_id, message_id): - try: - message_ids = [] - count_del_etion_s = 0 - if message_id: - for a_s_message_id in range(message_id, message_id + 100): - message_ids.append(a_s_message_id) - if len(message_ids) == 100: - await client.delete_messages(chat_id=chat_id, message_ids=message_ids, revoke=True) - count_del_etion_s += len(message_ids) - message_ids = [] - if len(message_ids) > 0: - await client.delete_messages(chat_id=chat_id, message_ids=message_ids, revoke=True) - count_del_etion_s += len(message_ids) - except Exception as e: - logger.error(f"Error purging messages: {e}") - return {"error": str(e)} - - @app.post("/broadcast_messages") - async def broadcast_messages_endpoint(user_id: int, message_id: int): - b_msg = await bot_instance.get_messages(user_id, message_id) - sts = await send_message(bot_instance, user_id, "Broadcasting your messages...") - start_time = time.time() - total_users = await db.total_users_count() - done = 0 - blocked = 0 - deleted = 0 - failed = 0 - success = 0 - async for user in db.get_all_users(): - pti, sh = await broadcast_messages(int(user['id']), b_msg) - if pti: - success += 1 - elif pti == False: - if sh == "Blocked": - blocked += 1 - elif sh == "Deleted": - deleted += 1 - elif sh == "Error": - failed += 1 - done += 1 - if not done % 20: - await edit_message(bot_instance, sts, f"Broadcast in progress:\n\nTotal Users {total_users}\nCompleted: {done} / {total_users}\nSuccess: {success}\nBlocked: {blocked}\nDeleted: {deleted}") - time_taken = datetime.timedelta(seconds=int(time.time() - start_time)) - await delete_message(bot_instance, sts) - await send_message(bot_instance, user_id, f"Broadcast completed:\nTime taken: {time_taken}\n\nTotal Users: {total_users}\nCompleted: {done} / {total_users}\nSuccess: {success}\nBlocked: {blocked}\nDeleted: {deleted}") - return {"status": "Broadcast completed"} - - async def broadcast_messages(user_id, message): - try: - await message.copy(chat_id=user_id) - return True, "Success" - except FloodWait as e: - await asyncio.sleep(e.value) - return await broadcast_messages(user_id, message) - except InputUserDeactivated: - await db.delete_user(int(user_id)) - logger.info(f"{user_id}-Removed from Database, since deleted account.") - return False, "Deleted" - except UserIsBlocked: - logger.info(f"{user_id} -Blocked the bot.") - return False, "Blocked" - except PeerIdInvalid: - await db.delete_user(int(user_id)) - logger.info(f"{user_id} - PeerIdInvalid") - return False, "Error" - except Exception as e: - return False, "Error" - - @app.post("/clear_junk") - async def clear_junk_endpoint(user_id: int): - sts = await send_message(bot_instance, user_id, "Clearing junk users...") - start_time = time.time() - total_users = await db.total_users_count() - blocked = 0 - deleted = 0 - failed = 0 - done = 0 - async for user in db.get_all_users(): - pti, sh = await clear_junk(int(user['id']), None) - if pti == False: - if sh == "Blocked": - blocked += 1 - elif sh == "Deleted": - deleted += 1 - elif sh == "Error": - failed += 1 - done += 1 - if not done % 20: - await edit_message(bot_instance, sts, f"Clearing junk in progress:\n\nTotal Users {total_users}\nCompleted: {done} / {total_users}\nBlocked: {blocked}\nDeleted: {deleted}") - time_taken = datetime.timedelta(seconds=int(time.time() - start_time)) - await delete_message(bot_instance, sts) - await send_message(bot_instance, user_id, f"Clearing junk completed:\nTime taken: {time_taken}\n\nTotal Users: {total_users}\nCompleted: {done} / {total_users}\nBlocked: {blocked}\nDeleted: {deleted}") - return {"status": "Junk clearing completed"} - - async def clear_junk(user_id, message): - try: - key = await message.copy(chat_id=user_id) - await key.delete(True) - return True, "Success" - except FloodWait as e: - await asyncio.sleep(e.value) - return await clear_junk(user_id, message) - except InputUserDeactivated: - await db.delete_user(int(user_id)) - logger.info(f"{user_id}-Removed from Database, since deleted account.") - return False, "Deleted" - except UserIsBlocked: - logger.info(f"{user_id} -Blocked the bot.") - return False, "Blocked" - except PeerIdInvalid: - await db.delete_user(int(user_id)) - logger.info(f"{user_id} - PeerIdInvalid") - return False, "Error" - except Exception as e: - return False, "Error" - - @app.post("/group_broadcast") - async def group_broadcast_endpoint(user_id: int, message_id: int): - groups = await db.get_all_chats() - b_msg = await bot_instance.get_messages(user_id, message_id) - sts = await send_message(bot_instance, user_id, "Broadcasting to groups...") - start_time = time.time() - total_groups = await db.total_chat_count() - done = 0 - failed = "" - success = 0 - deleted = 0 - async for group in groups: - pti, sh, ex = await broadcast_messages_group(int(group['id']), b_msg) - if pti == True: - if sh == "Success": - success += 1 - elif pti == False: - if sh == "deleted": - deleted += 1 - failed += ex - try: - await bot_instance.leave_chat(int(group['id'])) - except Exception as e: - logger.error(f"{e} > {group['id']}") - done += 1 - if not done % 20: - await edit_message(bot_instance, sts, f"Broadcast in progress:\n\nTotal Groups {total_groups}\nCompleted: {done} / {total_groups}\nSuccess: {success}\nDeleted: {deleted}") - time_taken = datetime.timedelta(seconds=int(time.time() - start_time)) - await delete_message(bot_instance, sts) - await send_message(bot_instance, user_id, f"Broadcast completed:\nTime taken: {time_taken}\n\nTotal Groups: {total_groups}\nCompleted: {done} / {total_groups}\nSuccess: {success}\nDeleted: {deleted}\n\nFailed reasons: {failed}") - return {"status": "Group broadcast completed"} - - async def broadcast_messages_group(chat_id, message): - try: - await message.copy(chat_id=chat_id) - return True, "Success", 'mm' - except FloodWait as e: - await asyncio.sleep(e.value) - return await broadcast_messages_group(chat_id, message) - except Exception as e: - await db.delete_chat(int(chat_id)) - logger.info(f"{chat_id} - PeerIdInvalid") - return False, "deleted", f'{e}\n\n' - - @app.post("/junk_group") - async def junk_group_endpoint(user_id: int): - groups = await db.get_all_chats() - sts = await send_message(bot_instance, user_id, "Clearing junk groups...") - start_time = time.time() - total_groups = await db.total_chat_count() - done = 0 - failed = "" - deleted = 0 - async for group in groups: - pti, sh, ex = await junk_group(int(group['id']), None) - if pti == False: - if sh == "deleted": - deleted += 1 - failed += ex - try: - await bot_instance.leave_chat(int(group['id'])) - except Exception as e: - logger.error(f"{e} > {group['id']}") - done += 1 - if not done % 20: - await edit_message(bot_instance, sts, f"Clearing junk in progress:\n\nTotal Groups {total_groups}\nCompleted: {done} / {total_groups}\nDeleted: {deleted}") - time_taken = datetime.timedelta(seconds=int(time.time() - start_time)) - await delete_message(bot_instance, sts) - await send_message(bot_instance, user_id, f"Clearing junk completed:\nTime taken: {time_taken}\n\nTotal Groups: {total_groups}\nCompleted: {done} / {total_groups}\nDeleted: {deleted}\n\nFailed reasons: {failed}") - return {"status": "Junk group clearing completed"} - - async def junk_group(chat_id, message): - try: - kk = await message.copy(chat_id=chat_id) - await kk.delete(True) - return True, "Success", 'mm' - except FloodWait as e: - await asyncio.sleep(e.value) - return await junk_group(chat_id, message) - except Exception as e: - await db.delete_chat(int(chat_id)) - logger.info(f"{chat_id} - PeerIdInvalid") - return False, "deleted", f'{e}\n\n' - - @app.post("/group_leave") - async def group_leave_endpoint(chat_id: int): - await bot_instance.leave_chat(chat_id) - return {"status": "Left the group successfully"} - - @app.post("/group_disable") - async def group_disable_endpoint(chat_id: int, reason: str = "No Reason"): - await db.disable_chat(chat_id, reason) - temp.BANNED_CHATS.append(chat_id) - return {"status": "Group disabled successfully"} - - @app.post("/group_enable") - async def group_enable_endpoint(chat_id: int): - await db.re_enable_chat(chat_id) - temp.BANNED_CHATS.remove(chat_id) - return {"status": "Group enabled successfully"} - - @app.post("/get_user_info") - async def get_user_info_endpoint(user_id: int): - user_info = await bot_instance.get_users(user_id) - return {"user_info": user_info.dict()} - - @app.post("/get_chat_info") - async def get_chat_info_endpoint(chat_id: int): - chat_info = await bot_instance.get_chat(chat_id) - return {"chat_info": chat_info.dict()} - - @app.post("/send_telegram_message") - async def send_telegram_message_endpoint(chat_id: int, text: str): - message = await send_message(bot_instance, chat_id, text) - return {"message_id": message.id} - - @app.post("/edit_telegram_message") - async def edit_telegram_message_endpoint(chat_id: int, message_id: int, text: str): - message = await bot_instance.get_messages(chat_id, message_id) - edited_message = await edit_message(bot_instance, message, text) - return {"edited_message_id": edited_message.id} - - @app.post("/delete_telegram_message") - async def delete_telegram_message_endpoint(chat_id: int, message_id: int): - message = await bot_instance.get_messages(chat_id, message_id) - await delete_message(bot_instance, message) - return {"status": "Message deleted successfully"} - - if __name__ == "__main__": - import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file + actions = { + "typing": enums.ChatAction.TYPING, + "upload_photo": enums.ChatAction.UPLOAD_PHOTO, + "record_video": enums.ChatAction.RECORD_VIDEO, + "upload_video": enums.ChatAction.UPLOAD_VIDEO, + "record_voice": enums.ChatAction.RECORD_VOICE, + "upload_voice": enums.ChatAction.UPLOAD_VOICE, + "upload_document": enums.ChatAction.UPLOAD_DOCUMENT, + "find_location": enums.ChatAction.FIND_LOCATION, + "record_video_note": enums.ChatAction.RECORD_VIDEO_NOTE, + "upload_video_note": enums.ChatAction.UPLOAD_VIDEO_NOTE, + "playing": enums.ChatAction.PLAYING + } + action_enum = actions.get(action) + if not action_enum: + return {"error": "Invalid action"} + try: + await bot_instance.send_chat_action(chat_id, action_enum) + return {"status": "Chat action sent successfully"} + except Exception as e: + logger.error(f"Error sending chat action: {e}") + return {"error": str(e)} + +@app.post("/create_chat_invite_link") +async def create_chat_invite_link_endpoint(chat_id: int, name: str = None, expire_date: int = None, member_limit: int = None, creates_join_request: bool = False): + try: + invite_link = await bot_instance.create_chat_invite_link( + chat_id, + name=name, + expire_date=expire_date, + member_limit=member_limit, + creates_join_request=creates_join_request + ) + return {"invite_link": invite_link.invite_link} + except Exception as e: + logger.error(f"Error creating chat invite link: {e}") + return {"error": str(e)} + +@app.post("/set_chat_photo") +async def set_chat_photo_endpoint(chat_id: int, photo_url: str): + try: + response = requests.get(photo_url) + photo = BytesIO(response.content) + photo.name = "photo.jpg" + await bot_instance.set_chat_photo(chat_id, photo) + return {"status": "Chat photo set successfully"} + except Exception as e: + logger.error(f"Error setting chat photo: {e}") + return {"error": str(e)} + +@app.post("/delete_chat_photo") +async def delete_chat_photo_endpoint(chat_id: int): + try: + await bot_instance.delete_chat_photo(chat_id) + return {"status": "Chat photo deleted successfully"} + except Exception as e: + logger.error(f"Error deleting chat photo: {e}") + return {"error": str(e)} + +@app.post("/set_chat_title") +async def set_chat_title_endpoint(chat_id: int, title: str): + try: + await bot_instance.set_chat_title(chat_id, title) + return {"status": "Chat title set successfully"} + except Exception as e: + logger.error(f"Error setting chat title: {e}") + return {"error": str(e)} + +@app.post("/set_chat_description") +async def set_chat_description_endpoint(chat_id: int, description: str): + try: + await bot_instance.set_chat_description(chat_id, description) + return {"status": "Chat description set successfully"} + except Exception as e: + logger.error(f"Error setting chat description: {e}") + return {"error": str(e)} + +@app.post("/pin_chat_message") +async def pin_chat_message_endpoint(chat_id: int, message_id: int, disable_notification: bool = False): + try: + await bot_instance.pin_chat_message(chat_id, message_id, disable_notification=disable_notification) + return {"status": "Message pinned successfully"} + except Exception as e: + logger.error(f"Error pinning chat message: {e}") + return {"error": str(e)} + +@app.post("/unpin_chat_message") +async def unpin_chat_message_endpoint(chat_id: int, message_id: int): + try: + await bot_instance.unpin_chat_message(chat_id, message_id) + return {"status": "Message unpinned successfully"} + except Exception as e: + logger.error(f"Error unpinning chat message: {e}") + return {"error": str(e)} + +@app.post("/unpin_all_chat_messages") +async def unpin_all_chat_messages_endpoint(chat_id: int): + try: + await bot_instance.unpin_all_chat_messages(chat_id) + return {"status": "All messages unpinned successfully"} + except Exception as e: + logger.error(f"Error unpinning all chat messages: {e}") + return {"error": str(e)} + +@app.post("/restrict_chat_member") +async def restrict_chat_member_endpoint(chat_id: int, user_id: int, permissions: dict, until_date: int = None): + try: + chat_permissions = enums.ChatPermissions(**permissions) + await bot_instance.restrict_chat_member(chat_id, user_id, chat_permissions, until_date=until_date) + return {"status": "Chat member restricted successfully"} + except Exception as e: + logger.error(f"Error restricting chat member: {e}") + return {"error": str(e)} + +@app.post("/promote_chat_member") +async def promote_chat_member_endpoint(chat_id: int, user_id: int, can_change_info: bool = False, can_post_messages: bool = False, can_edit_messages: bool = False, can_delete_messages: bool = False, can_manage_video_chats: bool = False, can_restrict_members: bool = False, can_promote_members: bool = False, can_invite_users: bool = False, can_pin_messages: bool = False, can_manage_chat: bool = False): + try: + await bot_instance.promote_chat_member( + chat_id, + user_id, + can_change_info=can_change_info, + can_post_messages=can_post_messages, + can_edit_messages=can_edit_messages, + can_delete_messages=can_delete_messages, + can_manage_video_chats=can_manage_video_chats, + can_restrict_members=can_restrict_members, + can_promote_members=can_promote_members, + can_invite_users=can_invite_users, + can_pin_messages=can_pin_messages, + can_manage_chat=can_manage_chat + ) + return {"status": "Chat member promoted successfully"} + except Exception as e: + logger.error(f"Error promoting chat member: {e}") + return {"error": str(e)} + +@app.post("/kick_chat_member") +async def kick_chat_member_endpoint(chat_id: int, user_id: int): + try: + await bot_instance.kick_chat_member(chat_id, user_id) + return {"status": "Chat member kicked successfully"} + except Exception as e: + logger.error(f"Error kicking chat member: {e}") + return {"error": str(e)} + +@app.post("/unban_chat_member") +async def unban_chat_member_endpoint(chat_id: int, user_id: int): + try: + await bot_instance.unban_chat_member(chat_id, user_id) + return {"status": "Chat member unbanned successfully"} + except Exception as e: + logger.error(f"Error unbanning chat member: {e}") + return {"error": str(e)} + +@app.post("/set_chat_administrator_custom_title") +async def set_chat_administrator_custom_title_endpoint(chat_id: int, user_id: int, custom_title: str): + try: + await bot_instance.set_chat_administrator_custom_title(chat_id, user_id, custom_title) + return {"status": "Custom title set successfully"} + except Exception as e: + logger.error(f"Error setting custom title: {e}") + return {"error": str(e)} + +@app.post("/set_chat_permissions") +async def set_chat_permissions_endpoint(chat_id: int, permissions: dict): + try: + chat_permissions = enums.ChatPermissions(**permissions) + await bot_instance.set_chat_permissions(chat_id, chat_permissions) + return {"status": "Chat permissions set successfully"} + except Exception as e: + logger.error(f"Error setting chat permissions: {e}") + return {"error": str(e)} + +@app.post("/set_chat_sticker_set") +async def set_chat_sticker_set_endpoint(chat_id: int, sticker_set_name: str): + try: + await bot_instance.set_chat_sticker_set(chat_id, sticker_set_name) + return {"status": "Chat sticker set set successfully"} + except Exception as e: + logger.error(f"Error setting chat sticker set: {e}") + return {"error": str(e)} + +@app.post("/delete_chat_sticker_set") +async def delete_chat_sticker_set_endpoint(chat_id: int): + try: + await bot_instance.delete_chat_sticker_set(chat_id) + return {"status": "Chat sticker set deleted successfully"} + except Exception as e: + logger.error(f"Error deleting chat sticker set: {e}") + return {"error": str(e)} + +@app.post("/approve_chat_join_request") +async def approve_chat_join_request_endpoint(chat_id: int, user_id: int): + try: + await bot_instance.approve_chat_join_request(chat_id, user_id) + return {"status": "Join request approved successfully"} + except Exception as e: + logger.error(f"Error approving chat join request: {e}") + return {"error": str(e)} + +@app.post("/decline_chat_join_request") +async def decline_chat_join_request_endpoint(chat_id: int, user_id: int): + try: + await bot_instance.decline_chat_join_request(chat_id, user_id) + return {"status": "Join request declined successfully"} + except Exception as e: + logger.error(f"Error declining chat join request: {e}") + return {"error": str(e)} + +@app.post("/ban_chat_sender_chat") +async def ban_chat_sender_chat_endpoint(chat_id: int, sender_chat_id: int): + try: + await bot_instance.ban_chat_sender_chat(chat_id, sender_chat_id) + return {"status": "Sender chat banned successfully"} + except Exception as e: + logger.error(f"Error banning chat sender chat: {e}") + return {"error": str(e)} + +@app.post("/unban_chat_sender_chat") +async def unban_chat_sender_chat_endpoint(chat_id: int, sender_chat_id: int): + try: + await bot_instance.unban_chat_sender_chat(chat_id, sender_chat_id) + return {"status": "Sender chat unbanned successfully"} + except Exception as e: + logger.error(f"Error unbanning chat sender chat: {e}") + return {"error": str(e)} + +@app.post("/set_chat_menu_button") +async def set_chat_menu_button_endpoint(chat_id: int, menu_button: dict = None): + try: + if menu_button: + menu_button_obj = types.MenuButton(**menu_button) + else: + menu_button_obj = None + await bot_instance.set_chat_menu_button(chat_id, menu_button_obj) + return {"status": "Chat menu button set successfully"} + except Exception as e: + logger.error(f"Error setting chat menu button: {e}") + return {"error": str(e)} + +@app.post("/get_chat_menu_button") +async def get_chat_menu_button_endpoint(chat_id: int): + try: + menu_button = await bot_instance.get_chat_menu_button(chat_id) + return {"menu_button": menu_button.dict() if menu_button else None} + except Exception as e: + logger.error(f"Error getting chat menu button: {e}") + return {"error": str(e)} + +@app.post("/set_my_commands") +async def set_my_commands_endpoint(commands: list): + try: + bot_commands = [types.BotCommand(command=cmd["command"], description=cmd["description"]) for cmd in commands] + await bot_instance.set_my_commands(bot_commands) + return {"status": "Commands set successfully"} + except Exception as e: + logger.error(f"Error setting my commands: {e}") + return {"error": str(e)} + +@app.post("/get_my_commands") +async def get_my_commands_endpoint(): + try: + commands = await bot_instance.get_my_commands() + return {"commands": [cmd.dict() for cmd in commands]} + except Exception as e: + logger.error(f"Error getting my commands: {e}") + return {"error": str(e)} + +@app.post("/send_copy") +async def send_copy_endpoint(chat_id: int, from_chat_id: int, message_id: int, caption: str = None, parse_mode: str = None, caption_entities: list = None, reply_to_message_id: int = None, allow_sending_without_reply: bool = False, reply_markup: dict = None): + try: + reply_markup_obj = InlineKeyboardMarkup(**reply_markup) if reply_markup else None + message = await bot_instance.copy_message( + chat_id, + from_chat_id, + message_id, + caption=caption, + parse_mode=parse_mode, + caption_entities=caption_entities, + reply_to_message_id=reply_to_message_id, + allow_sending_without_reply=allow_sending_without_reply, + reply_markup=reply_markup_obj + ) + return {"message_id": message.id} + except Exception as e: + logger.error(f"Error sending copy: {e}") + return {"error": str(e)} + +@app.post("/forward_messages") +async def forward_messages_endpoint(chat_id: int, from_chat_id: int, message_ids: list, disable_notification: bool = False): + try: + messages = await bot_instance.forward_messages( + chat_id, + from_chat_id, + message_ids, + disable_notification=disable_notification + ) + return {"messages": [msg.id for msg in messages]} + except Exception as e: + logger.error(f"Error forwarding messages: {e}") + return {"error": str(e)} + +@app.post("/text_to_speech") +async def text_to_speech_endpoint(tts_request: TextToSpeechRequest): + try: + audio = await convert(tts_request.text) + return {"audio_url": audio.name} + except Exception as e: + logger.error(f"Error converting text to speech: {e}") + return {"error": str(e)} + +async def convert(text): + audio = BytesIO() + i = Translator().translate(text, dest="en") + lang = i.src + tts = gTTS(text, lang=lang) + audio.name = lang + ".mp3" + tts.write_to_fp(audio) + return audio + +@app.post("/download_youtube_song") +async def download_youtube_song_endpoint(yt_request: YouTubeDownloadRequest): + try: + results = YoutubeSearch(yt_request.url, max_results=1).to_dict() + link = f"https://youtube.com{results[0]['url_suffix']}" + title = results[0]["title"][:40] + thumbnail = results[0]["thumbnails"][0] + thumb_name = f'thumb{title}.jpg' + thumb = requests.get(thumbnail, allow_redirects=True) + open(thumb_name, 'wb').write(thumb.content) + performer = f"[Mα΄‹Ι΄ Bᴏᴛᴒℒ]" + duration = results[0]["duration"] + url_suffix = results[0]["url_suffix"] + views = results[0]["views"] + except Exception as e: + logger.error(f"Error downloading YouTube song: {e}") + return {"error": str(e)} + + ydl_opts = {"format": "bestaudio[ext=m4a]"} + try: + with YoutubeDL(ydl_opts) as ydl: + info_dict = ydl.extract_info(link, download=False) + audio_file = ydl.prepare_filename(info_dict) + ydl.process_info(info_dict) + except Exception as e: + logger.error(f"Error processing YouTube song: {e}") + return {"error": str(e)} + + cap = "**BYβ€Ίβ€Ί [Mα΄‹Ι΄ Bᴏᴛᴒℒ](https://t.me/mkn_bots_updates)**" + secmul, dur, dur_arr = 1, 0, duration.split(':') + for i in range(len(dur_arr) - 1, -1, -1): + dur += (int(dur_arr[i]) * secmul) + secmul *= 60 + + return {"audio_file": audio_file, "caption": cap, "title": title, "duration": dur, "performer": performer, "thumb": thumb_name} + +@app.post("/download_youtube_video") +async def download_youtube_video_endpoint(yt_request: YouTubeDownloadRequest): + try: + url = yt_request.url + search = SearchVideos(f"{url}", offset=1, mode="dict", max_results=1) + mi = search.result() + mio = mi["search_result"] + mo = mio[0]["link"] + thum = mio[0]["title"] + fridayz = mio[0]["id"] + mio[0]["channel"] + kekme = f"https://img.youtube.com/vi/{fridayz}/hqdefault.jpg" + await asyncio.sleep(0.6) + url = mo + sedlyf = wget.download(kekme) + opts = { + "format": "best", + "addmetadata": True, + "key": "FFmpegMetadata", + "prefer_ffmpeg": True, + "geo_bypass": True, + "nocheckcertificate": True, + "postprocessors": [{"key": "FFmpegVideoConvertor", "preferedformat": "mp4"}], + "outtmpl": "%(id)s.mp4", + "logtostderr": False, + "quiet": True, + } + with YoutubeDL(opts) as ytdl: + ytdl_data = ytdl.extract_info(url, download=True) + except Exception as e: + logger.error(f"Error downloading YouTube video: {e}") + return {"error": str(e)} + + file_stark = f"{ytdl_data['id']}.mp4" + capy = f"""**πšƒπ™Έπšƒπ™»π™΄ :** [{thum}]({mo})\n**πšπ™΄πš€πš„π™΄πš‚πšƒπ™΄π™³ π™±πšˆ :** {await bot_instance.get_me().mention}""" + + return {"video_file": file_stark, "caption": capy, "title": ytdl_data["title"], "duration": int(ytdl_data["duration"]), "thumb": sedlyf} + +@app.post("/carbon") +async def carbon_endpoint(carbon_request: CarbonRequest): + try: + carbon_image = await make_carbon(carbon_request.text, True) + return {"carbon_url": carbon_image} + except Exception as e: + logger.error(f"Error generating carbon image: {e}") + return {"error": str(e)} + +async def make_carbon(text, tele=False): + url = "https://carbonara.solopov.dev/api/cook" + async with aiohttp.ClientSession() as session: + async with session.post(url, json={"code": text}) as resp: + image = BytesIO(await resp.read()) + image.name = "carbon.png" + if tele: + uf = upload_file(image) + image.close() + return f"https://graph.org{uf[0]}" + return image + +@app.post("/font") +async def font_endpoint(font_request: FontRequest): + fonts = { + "typewriter": Fonts.typewriter, + "outline": Fonts.outline, + "serif": Fonts.serief, + "bold_cool": Fonts.bold_cool, + "cool": Fonts.cool, + "small_cap": Fonts.smallcap, + "script": Fonts.script, + "bold_script": Fonts.bold_script, + "tiny": Fonts.tiny, + "comic": Fonts.comic, + "san": Fonts.san, + "slant_san": Fonts.slant_san, + "slant": Fonts.slant, + "sim": Fonts.sim, + "circles": Fonts.circles, + "dark_circle": Fonts.dark_circle, + "gothic": Fonts.gothic, + "bold_gothic": Fonts.bold_gothic, + "cloud": Fonts.cloud, + "happy": Fonts.happy, + "sad": Fonts.sad, + "special": Fonts.special, + "square": Fonts.square, + "dark_square": Fonts.dark_square, + "andalucia": Fonts.andalucia, + "manga": Fonts.manga, + "stinky": Fonts.stinky, + "bubbles": Fonts.bubbles, + "underline": Fonts.underline, + "ladybug": Fonts.ladybug, + "rays": Fonts.rays, + "birds": Fonts.birds, + "slash": Fonts.slash, + "stop": Fonts.stop, + "skyline": Fonts.skyline, + "arrows": Fonts.arrows, + "rvnes": Fonts.rvnes, + "strike": Fonts.strike, + "frozen": Fonts.frozen + } + cls = fonts.get(font_request.style) + if not cls: + return {"error": "Invalid style"} + try: + new_text = cls(font_request.text) + return {"new_text": new_text} + except Exception as e: + logger.error(f"Error applying font style: {e}") + return {"error": str(e)} + +@app.post("/paste") +async def paste_endpoint(paste_request: PasteRequest): + try: + paste_response = await p_paste(paste_request.text) + return paste_response + except Exception as e: + logger.error(f"Error pasting text: {e}") + return {"error": str(e)} + +async def p_paste(code, extension=None): + siteurl = "https://pasty.lus.pm/api/v1/pastes" + data = {"content": code} + try: + response = requests.post(url=siteurl, data=json.dumps(data), headers={"User-Agent": "Mozilla/5.0", "content-type": "application/json"}) + except Exception as e: + return {"error": str(e)} + if response.ok: + response = response.json() + purl = ( + f"https://pasty.lus.pm/{response['id']}.{extension}" + if extension + else f"https://pasty.lus.pm/{response['id']}.txt" + ) + return { + "url": purl, + "raw": f"https://pasty.lus.pm/{response['id']}/raw", + "bin": "Pasty", + } + return {"error": "Unable to reach pasty.lus.pm"} + +@app.post("/share_text") +async def share_text_endpoint(share_request: ShareTextRequest): + try: + share_url = f"https://t.me/share/url?url={quote(share_request.text)}" + return {"share_url": share_url} + except Exception as e: + logger.error(f"Error sharing text: {e}") + return {"error": str(e)} + +@app.post("/telegraph_upload") +async def telegraph_upload_endpoint(upload_request: TelegraphUploadRequest): + try: + response = upload_file(upload_request.file_path) + telegraph_url = f"https://graph.org{response[0]}" + return {"telegraph_url": telegraph_url} + except Exception as e: + logger.error(f"Error uploading to Telegraph: {e}") + return {"error": str(e)} + +# Ensure the Fonts class is imported from the appropriate module +# from image.font_string import Fonts + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file