from pyrogram import Client, filters from config import API_ID, API_HASH, BOT_TOKEN import requests import os import asyncio import json from datetime import datetime import edge_tts import io from telegraph.aio import Telegraph import aiohttp # Add these constants at the top API_URL = "https://api-inference.huggingface.co/models/openai/whisper-large-v3-turbo" AI_URL = "https://charan5775-fastest.hf.space/t2t" AI_HEADERS = { "Content-Type": "application/json", "Accept": "application/json" } # Initialize Telegraph and aiohttp session telegraph = None session = None async def init_telegraph(): global telegraph, session try: if telegraph is None: session = aiohttp.ClientSession() telegraph = Telegraph(session=session) await telegraph.create_account(short_name='TelegramAIBot') return True except Exception as e: print(f"Telegraph initialization error: {e}") return False async def cleanup(): global session if session: await session.close() # TTS settings DEFAULT_VOICE = "en-IN-NeerjaNeural" DEFAULT_RATE = "+25%" # Initialize message history in memory only message_history = [] async def text_to_speech(text, voice=DEFAULT_VOICE, rate=DEFAULT_RATE): """Convert text to speech and return the audio data""" try: communicate = edge_tts.Communicate(text, voice, rate=rate) audio_data = bytes() async for chunk in communicate.stream(): if chunk["type"] == "audio": audio_data += chunk["data"] return audio_data except Exception as e: print(f"Error in text-to-speech conversion: {str(e)}") return None # Create a new Client instance with a custom session name and no local storage myaibot = Client( "my_bot", api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN, in_memory=True # This prevents SQLite database locks ) def save_message_to_history(user_id, username, message_type, content, bot_response=None): message_data = { "content": content, "response": bot_response } message_history.append(message_data) print("Message History Update:", json.dumps(message_data, indent=2)) def condense_text(text, max_length=100): """Condense text to a maximum length while keeping it readable""" if len(text) <= max_length: return text return text[:max_length-3] + "..." # Add command to print history @myaibot.on_message(filters.command("history")) async def history_command(client, message): if not message_history: await message.reply_text("No message history available.") return try: total_messages = len(message_history) history_text = f"📜 Chat History (Total: {total_messages} messages)\n\n" # If we have many messages, summarize older ones if total_messages > 10: history_text += "Earlier messages:\n" for msg in message_history[:-10]: history_text += f"• {condense_text(msg['content'], 50)}\n" history_text += "\nRecent messages:\n" recent_messages = message_history[-10:] else: recent_messages = message_history # Add recent messages with more detail for idx, msg in enumerate(recent_messages, 1): history_text += f"{idx}. Q: {condense_text(msg['content'], 150)}\n" if msg['response']: history_text += f" A: {condense_text(msg['response'], 150)}\n" history_text += "\n" await message.reply_text(history_text) except Exception as e: print(f"Error in history command: {str(e)}") # Fallback to super condensed version short_history = "📜 Last 5 Messages:\n\n" for msg in message_history[-5:]: short_history += f"• {condense_text(msg['content'], 30)}\n" await message.reply_text(short_history) # Command handler for /start @myaibot.on_message(filters.command("start")) async def start_command(client, message): await message.reply_text("Hello! I'm your Telegram bot. Nice to meet you!") # Command handler for /help @myaibot.on_message(filters.command("help")) async def help_command(client, message): help_text = """ Available commands: /start - Start the bot /help - Show this help message /history - Show chat history /clear - Clear chat history /info - Show user information """ await message.reply_text(help_text) # Add clear command @myaibot.on_message(filters.command("clear")) async def clear_command(client, message): try: message_history.clear() await message.reply_text("✨ Chat history has been cleared!") except Exception as e: await message.reply_text(f"Error clearing history: {str(e)}") # Message handler for regular text messages @myaibot.on_message(filters.text & filters.private & ~filters.command(["start", "help", "info"])) async def echo(client, message): try: thinking_msg = await message.reply_text("🤔 Thinking about your message...") ai_response = await get_ai_response(message.text) await thinking_msg.delete() await message.reply_text(ai_response) # Save message to history save_message_to_history( message.from_user.id, message.from_user.username, "text", message.text, ai_response ) except Exception as e: await message.reply_text(f"Sorry, I couldn't process your message: {str(e)}") # Handle photo messages @myaibot.on_message(filters.photo) async def handle_photo(client, message): response = "Nice photo!" await message.reply_text(response) save_message_to_history( message.from_user.id, message.from_user.username, "photo", "Photo message", response ) # Handle sticker messages @myaibot.on_message(filters.sticker) async def handle_sticker(client, message): response = "Cool sticker!" await message.reply_text(response) save_message_to_history( message.from_user.id, message.from_user.username, "sticker", "Sticker message", response ) # Custom command example @myaibot.on_message(filters.command("info")) async def info_command(client, message): user = message.from_user info_text = f""" User Information: ID: {user.id} Name: {user.first_name} Username: @{user.username if user.username else 'None'} """ await message.reply_text(info_text) # Add this function after your existing imports async def transcribe_audio(file_path): try: with open(file_path, "rb") as f: data = f.read() response = requests.post(API_URL,data=data) return response.json().get('text', 'Could not transcribe audio') except Exception as e: print(f"Error in transcription: {e}") return "Error transcribing audio" # Add this new function after transcribe_audio function async def get_ai_response(text): try: # Create context from history context = "" if message_history: # Get last 5 relevant messages for context recent_history = message_history[-5:] context = ( "You are a helpful AI assistant. Below is the conversation history. " "Use this context to provide a relevant response to the user's latest message. " "If the current message is related to previous ones, make sure to reference and build upon that information.\n\n" "Previous conversation:\n" ) # Add conversation history with clear markers for i, msg in enumerate(recent_history, 1): context += f"Message {i}:\n" context += f"User: {msg['content']}\n" if msg['response']: context += f"Assistant: {msg['response']}\n" context += "\n" # Add specific instructions for the response context += ( "Instructions:\n" "1. Consider the conversation history above\n" "2. If the new message relates to previous ones, reference that information\n" "3. Maintain consistency with previous responses\n" "4. Provide a direct and relevant answer\n\n" "New message to respond to:\n" ) # Combine context with current query full_prompt = f"{context}User: {text}\nAssistant: Let me provide a relevant response based on our conversation..." payload = { "query": full_prompt, "stream": False } response = requests.post(AI_URL, json=payload) print(f"Raw API Response: {response.text}") # Debug print if response.status_code != 200: print(f"API Error: Status {response.status_code}") return f"Sorry, the AI service returned an error (Status {response.status_code})" response_data = response.json() print(f"Parsed Response Data: {response_data}") # Debug print # The API returns the response directly if isinstance(response_data, dict) and 'response' in response_data: return response_data['response'].replace("Assistant: Let me provide a relevant response based on our conversation...", "").strip() else: return str(response_data) except requests.exceptions.RequestException as e: print(f"Network error: {e}") return "Sorry, I'm having trouble connecting to the AI service." except json.JSONDecodeError as e: print(f"JSON parsing error: {e}\nResponse text: {response.text}") return "Sorry, I received an invalid response from the AI service." except Exception as e: print(f"Error getting AI response: {str(e)}\nFull error: {repr(e)}") return "Sorry, I couldn't process your message." # Update the voice message handler with TTS response @myaibot.on_message(filters.voice | filters.audio) async def handle_voice(client, message): try: processing_msg = await message.reply_text("🎵 Processing your voice message...") # Download and process voice message voice_file = await message.download() transcription = await transcribe_audio(voice_file) await message.reply_text(f"🗣️ Transcription:\n\n{transcription}") # Get AI response thinking_msg = await message.reply_text("🤔 Thinking about your message...") ai_response = await get_ai_response(transcription) await thinking_msg.delete() await message.reply_text(ai_response) # Convert AI response to speech processing_tts = await message.reply_text("🔊 Converting response to speech...") audio_data = await text_to_speech(ai_response) if audio_data: # Create an in-memory file-like object audio_file = io.BytesIO(audio_data) audio_file.name = "response.mp3" # Send audio directly from memory await message.reply_voice( audio_file, caption="🎵 Voice response" ) # Save to history save_message_to_history( message.from_user.id, message.from_user.username, "voice", transcription, ai_response ) # Clean up try: os.remove(voice_file) # Still need to remove the downloaded voice file await processing_msg.delete() await processing_tts.delete() except: pass except Exception as e: error_message = f"Sorry, there was an error processing your message: {str(e)}" print(error_message) await message.reply_text(error_message) # Run the bot if __name__ == "__main__": print("Bot is running...") try: myaibot.run() finally: # Cleanup if session: loop = asyncio.get_event_loop() loop.run_until_complete(cleanup())