Spaces:
Sleeping
Sleeping
from pyrogram import Client, filters | |
from config import API_ID, API_HASH, BOT_TOKEN | |
import requests | |
import os | |
import asyncio | |
import json | |
from datetime import datetime | |
import edge_tts | |
import io | |
from telegraph.aio import Telegraph | |
import aiohttp | |
# Add these constants at the top | |
API_URL = "https://api-inference.huggingface.co/models/openai/whisper-large-v3-turbo" | |
AI_URL = "https://charan5775-fastest.hf.space/t2t" | |
AI_HEADERS = { | |
"Content-Type": "application/json", | |
"Accept": "application/json" | |
} | |
# Initialize Telegraph and aiohttp session | |
telegraph = None | |
session = None | |
async def init_telegraph(): | |
global telegraph, session | |
try: | |
if telegraph is None: | |
session = aiohttp.ClientSession() | |
telegraph = Telegraph(session=session) | |
await telegraph.create_account(short_name='TelegramAIBot') | |
return True | |
except Exception as e: | |
print(f"Telegraph initialization error: {e}") | |
return False | |
async def cleanup(): | |
global session | |
if session: | |
await session.close() | |
# TTS settings | |
DEFAULT_VOICE = "en-IN-NeerjaNeural" | |
DEFAULT_RATE = "+25%" | |
# Initialize message history in memory only | |
message_history = [] | |
async def text_to_speech(text, voice=DEFAULT_VOICE, rate=DEFAULT_RATE): | |
"""Convert text to speech and return the audio data""" | |
try: | |
communicate = edge_tts.Communicate(text, voice, rate=rate) | |
audio_data = bytes() | |
async for chunk in communicate.stream(): | |
if chunk["type"] == "audio": | |
audio_data += chunk["data"] | |
return audio_data | |
except Exception as e: | |
print(f"Error in text-to-speech conversion: {str(e)}") | |
return None | |
# Create a new Client instance with a custom session name and no local storage | |
myaibot = Client( | |
"my_bot", | |
api_id=API_ID, | |
api_hash=API_HASH, | |
bot_token=BOT_TOKEN, | |
in_memory=True # This prevents SQLite database locks | |
) | |
def save_message_to_history(user_id, username, message_type, content, bot_response=None): | |
message_data = { | |
"content": content, | |
"response": bot_response | |
} | |
message_history.append(message_data) | |
print("Message History Update:", json.dumps(message_data, indent=2)) | |
def condense_text(text, max_length=100): | |
"""Condense text to a maximum length while keeping it readable""" | |
if len(text) <= max_length: | |
return text | |
return text[:max_length-3] + "..." | |
# Add command to print history | |
async def history_command(client, message): | |
if not message_history: | |
await message.reply_text("No message history available.") | |
return | |
try: | |
total_messages = len(message_history) | |
history_text = f"π Chat History (Total: {total_messages} messages)\n\n" | |
# If we have many messages, summarize older ones | |
if total_messages > 10: | |
history_text += "Earlier messages:\n" | |
for msg in message_history[:-10]: | |
history_text += f"β’ {condense_text(msg['content'], 50)}\n" | |
history_text += "\nRecent messages:\n" | |
recent_messages = message_history[-10:] | |
else: | |
recent_messages = message_history | |
# Add recent messages with more detail | |
for idx, msg in enumerate(recent_messages, 1): | |
history_text += f"{idx}. Q: {condense_text(msg['content'], 150)}\n" | |
if msg['response']: | |
history_text += f" A: {condense_text(msg['response'], 150)}\n" | |
history_text += "\n" | |
await message.reply_text(history_text) | |
except Exception as e: | |
print(f"Error in history command: {str(e)}") | |
# Fallback to super condensed version | |
short_history = "π Last 5 Messages:\n\n" | |
for msg in message_history[-5:]: | |
short_history += f"β’ {condense_text(msg['content'], 30)}\n" | |
await message.reply_text(short_history) | |
# Command handler for /start | |
async def start_command(client, message): | |
await message.reply_text("Hello! I'm your Telegram bot. Nice to meet you!") | |
# Command handler for /help | |
async def help_command(client, message): | |
help_text = """ | |
Available commands: | |
/start - Start the bot | |
/help - Show this help message | |
/history - Show chat history | |
/clear - Clear chat history | |
/info - Show user information | |
""" | |
await message.reply_text(help_text) | |
# Add clear command | |
async def clear_command(client, message): | |
try: | |
message_history.clear() | |
await message.reply_text("β¨ Chat history has been cleared!") | |
except Exception as e: | |
await message.reply_text(f"Error clearing history: {str(e)}") | |
# Message handler for regular text messages | |
async def echo(client, message): | |
try: | |
thinking_msg = await message.reply_text("π€ Thinking about your message...") | |
ai_response = await get_ai_response(message.text) | |
await thinking_msg.delete() | |
await message.reply_text(ai_response) | |
# Save message to history | |
save_message_to_history( | |
message.from_user.id, | |
message.from_user.username, | |
"text", | |
message.text, | |
ai_response | |
) | |
except Exception as e: | |
await message.reply_text(f"Sorry, I couldn't process your message: {str(e)}") | |
# Handle photo messages | |
async def handle_photo(client, message): | |
response = "Nice photo!" | |
await message.reply_text(response) | |
save_message_to_history( | |
message.from_user.id, | |
message.from_user.username, | |
"photo", | |
"Photo message", | |
response | |
) | |
# Handle sticker messages | |
async def handle_sticker(client, message): | |
response = "Cool sticker!" | |
await message.reply_text(response) | |
save_message_to_history( | |
message.from_user.id, | |
message.from_user.username, | |
"sticker", | |
"Sticker message", | |
response | |
) | |
# Custom command example | |
async def info_command(client, message): | |
user = message.from_user | |
info_text = f""" | |
User Information: | |
ID: {user.id} | |
Name: {user.first_name} | |
Username: @{user.username if user.username else 'None'} | |
""" | |
await message.reply_text(info_text) | |
# Add this function after your existing imports | |
async def transcribe_audio(file_path): | |
try: | |
with open(file_path, "rb") as f: | |
data = f.read() | |
response = requests.post(API_URL,data=data) | |
return response.json().get('text', 'Could not transcribe audio') | |
except Exception as e: | |
print(f"Error in transcription: {e}") | |
return "Error transcribing audio" | |
# Add this new function after transcribe_audio function | |
async def get_ai_response(text): | |
try: | |
# Create context from history | |
context = "" | |
if message_history: | |
# Get last 5 relevant messages for context | |
recent_history = message_history[-5:] | |
context = ( | |
"You are a helpful AI assistant. Below is the conversation history. " | |
"Use this context to provide a relevant response to the user's latest message. " | |
"If the current message is related to previous ones, make sure to reference and build upon that information.\n\n" | |
"Previous conversation:\n" | |
) | |
# Add conversation history with clear markers | |
for i, msg in enumerate(recent_history, 1): | |
context += f"Message {i}:\n" | |
context += f"User: {msg['content']}\n" | |
if msg['response']: | |
context += f"Assistant: {msg['response']}\n" | |
context += "\n" | |
# Add specific instructions for the response | |
context += ( | |
"Instructions:\n" | |
"1. Consider the conversation history above\n" | |
"2. If the new message relates to previous ones, reference that information\n" | |
"3. Maintain consistency with previous responses\n" | |
"4. Provide a direct and relevant answer\n\n" | |
"New message to respond to:\n" | |
) | |
# Combine context with current query | |
full_prompt = f"{context}User: {text}\nAssistant: Let me provide a relevant response based on our conversation..." | |
payload = { | |
"query": full_prompt, | |
"stream": False | |
} | |
response = requests.post(AI_URL, json=payload) | |
print(f"Raw API Response: {response.text}") # Debug print | |
if response.status_code != 200: | |
print(f"API Error: Status {response.status_code}") | |
return f"Sorry, the AI service returned an error (Status {response.status_code})" | |
response_data = response.json() | |
print(f"Parsed Response Data: {response_data}") # Debug print | |
# The API returns the response directly | |
if isinstance(response_data, dict) and 'response' in response_data: | |
return response_data['response'].replace("Assistant: Let me provide a relevant response based on our conversation...", "").strip() | |
else: | |
return str(response_data) | |
except requests.exceptions.RequestException as e: | |
print(f"Network error: {e}") | |
return "Sorry, I'm having trouble connecting to the AI service." | |
except json.JSONDecodeError as e: | |
print(f"JSON parsing error: {e}\nResponse text: {response.text}") | |
return "Sorry, I received an invalid response from the AI service." | |
except Exception as e: | |
print(f"Error getting AI response: {str(e)}\nFull error: {repr(e)}") | |
return "Sorry, I couldn't process your message." | |
# Update the voice message handler with TTS response | |
async def handle_voice(client, message): | |
try: | |
processing_msg = await message.reply_text("π΅ Processing your voice message...") | |
# Download and process voice message | |
voice_file = await message.download() | |
transcription = await transcribe_audio(voice_file) | |
await message.reply_text(f"π£οΈ Transcription:\n\n{transcription}") | |
# Get AI response | |
thinking_msg = await message.reply_text("π€ Thinking about your message...") | |
ai_response = await get_ai_response(transcription) | |
await thinking_msg.delete() | |
await message.reply_text(ai_response) | |
# Convert AI response to speech | |
processing_tts = await message.reply_text("π Converting response to speech...") | |
audio_data = await text_to_speech(ai_response) | |
if audio_data: | |
# Create an in-memory file-like object | |
audio_file = io.BytesIO(audio_data) | |
audio_file.name = "response.mp3" | |
# Send audio directly from memory | |
await message.reply_voice( | |
audio_file, | |
caption="π΅ Voice response" | |
) | |
# Save to history | |
save_message_to_history( | |
message.from_user.id, | |
message.from_user.username, | |
"voice", | |
transcription, | |
ai_response | |
) | |
# Clean up | |
try: | |
os.remove(voice_file) # Still need to remove the downloaded voice file | |
await processing_msg.delete() | |
await processing_tts.delete() | |
except: | |
pass | |
except Exception as e: | |
error_message = f"Sorry, there was an error processing your message: {str(e)}" | |
print(error_message) | |
await message.reply_text(error_message) | |
# Run the bot | |
if __name__ == "__main__": | |
print("Bot is running...") | |
try: | |
myaibot.run() | |
finally: | |
# Cleanup | |
if session: | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(cleanup()) |