testingtodeploy / main.py
Charan5775's picture
Update main.py
e547122 verified
from pyrogram import Client, filters
from config import API_ID, API_HASH, BOT_TOKEN
import requests
import os
import asyncio
import json
from datetime import datetime
import edge_tts
import io
from telegraph.aio import Telegraph
import aiohttp
# Add these constants at the top
API_URL = "https://api-inference.huggingface.co/models/openai/whisper-large-v3-turbo"
AI_URL = "https://charan5775-fastest.hf.space/t2t"
AI_HEADERS = {
"Content-Type": "application/json",
"Accept": "application/json"
}
# Initialize Telegraph and aiohttp session
telegraph = None
session = None
async def init_telegraph():
global telegraph, session
try:
if telegraph is None:
session = aiohttp.ClientSession()
telegraph = Telegraph(session=session)
await telegraph.create_account(short_name='TelegramAIBot')
return True
except Exception as e:
print(f"Telegraph initialization error: {e}")
return False
async def cleanup():
global session
if session:
await session.close()
# TTS settings
DEFAULT_VOICE = "en-IN-NeerjaNeural"
DEFAULT_RATE = "+25%"
# Initialize message history in memory only
message_history = []
async def text_to_speech(text, voice=DEFAULT_VOICE, rate=DEFAULT_RATE):
"""Convert text to speech and return the audio data"""
try:
communicate = edge_tts.Communicate(text, voice, rate=rate)
audio_data = bytes()
async for chunk in communicate.stream():
if chunk["type"] == "audio":
audio_data += chunk["data"]
return audio_data
except Exception as e:
print(f"Error in text-to-speech conversion: {str(e)}")
return None
# Create a new Client instance with a custom session name and no local storage
myaibot = Client(
"my_bot",
api_id=API_ID,
api_hash=API_HASH,
bot_token=BOT_TOKEN,
in_memory=True # This prevents SQLite database locks
)
def save_message_to_history(user_id, username, message_type, content, bot_response=None):
message_data = {
"content": content,
"response": bot_response
}
message_history.append(message_data)
print("Message History Update:", json.dumps(message_data, indent=2))
def condense_text(text, max_length=100):
"""Condense text to a maximum length while keeping it readable"""
if len(text) <= max_length:
return text
return text[:max_length-3] + "..."
# Add command to print history
@myaibot.on_message(filters.command("history"))
async def history_command(client, message):
if not message_history:
await message.reply_text("No message history available.")
return
try:
total_messages = len(message_history)
history_text = f"πŸ“œ Chat History (Total: {total_messages} messages)\n\n"
# If we have many messages, summarize older ones
if total_messages > 10:
history_text += "Earlier messages:\n"
for msg in message_history[:-10]:
history_text += f"β€’ {condense_text(msg['content'], 50)}\n"
history_text += "\nRecent messages:\n"
recent_messages = message_history[-10:]
else:
recent_messages = message_history
# Add recent messages with more detail
for idx, msg in enumerate(recent_messages, 1):
history_text += f"{idx}. Q: {condense_text(msg['content'], 150)}\n"
if msg['response']:
history_text += f" A: {condense_text(msg['response'], 150)}\n"
history_text += "\n"
await message.reply_text(history_text)
except Exception as e:
print(f"Error in history command: {str(e)}")
# Fallback to super condensed version
short_history = "πŸ“œ Last 5 Messages:\n\n"
for msg in message_history[-5:]:
short_history += f"β€’ {condense_text(msg['content'], 30)}\n"
await message.reply_text(short_history)
# Command handler for /start
@myaibot.on_message(filters.command("start"))
async def start_command(client, message):
await message.reply_text("Hello! I'm your Telegram bot. Nice to meet you!")
# Command handler for /help
@myaibot.on_message(filters.command("help"))
async def help_command(client, message):
help_text = """
Available commands:
/start - Start the bot
/help - Show this help message
/history - Show chat history
/clear - Clear chat history
/info - Show user information
"""
await message.reply_text(help_text)
# Add clear command
@myaibot.on_message(filters.command("clear"))
async def clear_command(client, message):
try:
message_history.clear()
await message.reply_text("✨ Chat history has been cleared!")
except Exception as e:
await message.reply_text(f"Error clearing history: {str(e)}")
# Message handler for regular text messages
@myaibot.on_message(filters.text & filters.private & ~filters.command(["start", "help", "info"]))
async def echo(client, message):
try:
thinking_msg = await message.reply_text("πŸ€” Thinking about your message...")
ai_response = await get_ai_response(message.text)
await thinking_msg.delete()
await message.reply_text(ai_response)
# Save message to history
save_message_to_history(
message.from_user.id,
message.from_user.username,
"text",
message.text,
ai_response
)
except Exception as e:
await message.reply_text(f"Sorry, I couldn't process your message: {str(e)}")
# Handle photo messages
@myaibot.on_message(filters.photo)
async def handle_photo(client, message):
response = "Nice photo!"
await message.reply_text(response)
save_message_to_history(
message.from_user.id,
message.from_user.username,
"photo",
"Photo message",
response
)
# Handle sticker messages
@myaibot.on_message(filters.sticker)
async def handle_sticker(client, message):
response = "Cool sticker!"
await message.reply_text(response)
save_message_to_history(
message.from_user.id,
message.from_user.username,
"sticker",
"Sticker message",
response
)
# Custom command example
@myaibot.on_message(filters.command("info"))
async def info_command(client, message):
user = message.from_user
info_text = f"""
User Information:
ID: {user.id}
Name: {user.first_name}
Username: @{user.username if user.username else 'None'}
"""
await message.reply_text(info_text)
# Add this function after your existing imports
async def transcribe_audio(file_path):
try:
with open(file_path, "rb") as f:
data = f.read()
response = requests.post(API_URL,data=data)
return response.json().get('text', 'Could not transcribe audio')
except Exception as e:
print(f"Error in transcription: {e}")
return "Error transcribing audio"
# Add this new function after transcribe_audio function
async def get_ai_response(text):
try:
# Create context from history
context = ""
if message_history:
# Get last 5 relevant messages for context
recent_history = message_history[-5:]
context = (
"You are a helpful AI assistant. Below is the conversation history. "
"Use this context to provide a relevant response to the user's latest message. "
"If the current message is related to previous ones, make sure to reference and build upon that information.\n\n"
"Previous conversation:\n"
)
# Add conversation history with clear markers
for i, msg in enumerate(recent_history, 1):
context += f"Message {i}:\n"
context += f"User: {msg['content']}\n"
if msg['response']:
context += f"Assistant: {msg['response']}\n"
context += "\n"
# Add specific instructions for the response
context += (
"Instructions:\n"
"1. Consider the conversation history above\n"
"2. If the new message relates to previous ones, reference that information\n"
"3. Maintain consistency with previous responses\n"
"4. Provide a direct and relevant answer\n\n"
"New message to respond to:\n"
)
# Combine context with current query
full_prompt = f"{context}User: {text}\nAssistant: Let me provide a relevant response based on our conversation..."
payload = {
"query": full_prompt,
"stream": False
}
response = requests.post(AI_URL, json=payload)
print(f"Raw API Response: {response.text}") # Debug print
if response.status_code != 200:
print(f"API Error: Status {response.status_code}")
return f"Sorry, the AI service returned an error (Status {response.status_code})"
response_data = response.json()
print(f"Parsed Response Data: {response_data}") # Debug print
# The API returns the response directly
if isinstance(response_data, dict) and 'response' in response_data:
return response_data['response'].replace("Assistant: Let me provide a relevant response based on our conversation...", "").strip()
else:
return str(response_data)
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
return "Sorry, I'm having trouble connecting to the AI service."
except json.JSONDecodeError as e:
print(f"JSON parsing error: {e}\nResponse text: {response.text}")
return "Sorry, I received an invalid response from the AI service."
except Exception as e:
print(f"Error getting AI response: {str(e)}\nFull error: {repr(e)}")
return "Sorry, I couldn't process your message."
# Update the voice message handler with TTS response
@myaibot.on_message(filters.voice | filters.audio)
async def handle_voice(client, message):
try:
processing_msg = await message.reply_text("🎡 Processing your voice message...")
# Download and process voice message
voice_file = await message.download()
transcription = await transcribe_audio(voice_file)
await message.reply_text(f"πŸ—£οΈ Transcription:\n\n{transcription}")
# Get AI response
thinking_msg = await message.reply_text("πŸ€” Thinking about your message...")
ai_response = await get_ai_response(transcription)
await thinking_msg.delete()
await message.reply_text(ai_response)
# Convert AI response to speech
processing_tts = await message.reply_text("πŸ”Š Converting response to speech...")
audio_data = await text_to_speech(ai_response)
if audio_data:
# Create an in-memory file-like object
audio_file = io.BytesIO(audio_data)
audio_file.name = "response.mp3"
# Send audio directly from memory
await message.reply_voice(
audio_file,
caption="🎡 Voice response"
)
# Save to history
save_message_to_history(
message.from_user.id,
message.from_user.username,
"voice",
transcription,
ai_response
)
# Clean up
try:
os.remove(voice_file) # Still need to remove the downloaded voice file
await processing_msg.delete()
await processing_tts.delete()
except:
pass
except Exception as e:
error_message = f"Sorry, there was an error processing your message: {str(e)}"
print(error_message)
await message.reply_text(error_message)
# Run the bot
if __name__ == "__main__":
print("Bot is running...")
try:
myaibot.run()
finally:
# Cleanup
if session:
loop = asyncio.get_event_loop()
loop.run_until_complete(cleanup())