Spaces:
Sleeping
Sleeping
import os | |
import telebot | |
import google.generativeai as genai | |
import logging | |
import requests | |
import gradio as gr | |
import httpx | |
import asyncio | |
from flask import Flask, request, jsonify | |
# Suppress experimental warnings | |
os.environ['HF_HUB_DISABLE_EXPERIMENTAL_WARNING'] = '1' | |
# Configuration | |
GOOGLE_API_KEY = 'AIzaSyAYXUMnwmR4nUGDCs97FJJsafcQAPAAuzE' | |
BOT_TOKEN = '7484321656:AAFaswxTqaSHu_s4jd_pk2Q2OJJWYcWHwAM' | |
WEBHOOK_SECRET = 'A3%26c8%21jP%23xZ1v*Qw5kL%5E0tR%40u9%25yS6' # URL-encoded secret | |
POWER_USER_ID = 75516649 | |
PROXY_URL = 'http://eR3LhYeoZXNWhIp:[email protected]:58874' | |
# Initialize the Telegram bot | |
bot = telebot.TeleBot(BOT_TOKEN) | |
# Configure logging | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('bot_debug.log'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Set up Google Generative AI | |
genai.configure(api_key=GOOGLE_API_KEY) | |
generation_config = { | |
"temperature": 1, | |
"top_p": 0.95, | |
"top_k": 64, | |
"max_output_tokens": 8192, | |
"response_mime_type": "text/plain", | |
} | |
model = genai.GenerativeModel( | |
model_name="gemini-1.5-pro", | |
generation_config=generation_config, | |
system_instruction="Please respond to user input" | |
) | |
chat_session = model.start_chat( | |
history=[ | |
{"role": "user", "parts": ["hi\n"]}, | |
{"role": "model", "parts": ["Hello! π How can I help you today? π \n"]}, | |
{"role": "user", "parts": ["I am looking for photo booth service?"]}, | |
{"role": "model", "parts": ["That's great! π I can definitely help you with information about Aforative Media's photo booth services. \n\nTo give you the most relevant information, could you tell me a little more about what you're looking for? ..."]}, | |
{"role": "user", "parts": ["How much for photo booth services?"]}, | |
{"role": "model", "parts": ["You're smart to ask about pricing upfront! π \n\nAforative Media's Mr. & Ms. Booth photo booth services start at **USD 390 for a minimum of 2 hours**. ..."]}, | |
{"role": "user", "parts": ["How about videography service?"]}, | |
{"role": "model", "parts": ["You're thinking about capturing the memories on film too? Excellent choice! Videography adds a whole other dimension to remembering special events. \n\nAforative Media offers excellent videography services, and just like their photo booths, their videography packages are competitively priced and flexible. ..."]}, | |
] | |
) | |
def generate_response(user_input): | |
"""Generate a response using Google Generative AI.""" | |
try: | |
prompt = f"Respond to the user: {user_input}" | |
response = chat_session.send_message(prompt) # Generate response using text and prompt | |
response_text = response.text | |
return response_text | |
except Exception as e: | |
logger.error(f"Error during GenAI processing: {e}") | |
return "Sorry, I can't answer this query right now but I will improve from time to time." | |
# Define Gradio interface | |
def gradio_handle_update(user_input): | |
return generate_response(user_input) | |
iface = gr.Interface( | |
fn=gradio_handle_update, | |
inputs=gr.Textbox(label="User Input"), | |
outputs=gr.Textbox(label="Bot Response"), | |
live=True | |
) | |
# Flask app setup | |
app = Flask(__name__) | |
def handle_update(): | |
"""Handles incoming updates from Telegram.""" | |
payload = request.json | |
logger.debug(f"Received payload: {payload}") # Log the incoming payload | |
if payload.get('message'): | |
message_text = payload['message'].get('text') | |
chat_id = payload['message']['chat']['id'] | |
if message_text: | |
try: | |
# Forward message to Gradio and get the response | |
response = requests.post('http://localhost:7860/gradio', json={'message': message_text}) | |
if response.status_code == 200: | |
response_text = response.json().get('response', 'Sorry, I cannot process this request.') | |
else: | |
response_text = 'Error occurred while processing your request.' | |
# Log and send the response | |
logger.debug(f"Generated response: {response_text}") | |
asyncio.run(bot_send_message(chat_id, response_text)) | |
logger.info(f"Response sent to chat_id {chat_id}") | |
except Exception as e: | |
logger.error(f"Error during processing: {e}") | |
error_message = "Sorry, I can't answer this query right now but I will improve from time to time." | |
asyncio.run(bot_send_message(chat_id, error_message)) | |
logger.error(f"Error message sent to chat_id {chat_id}") | |
return jsonify({'status': 'ok'}), 200 | |
async def set_telegram_webhook(gradio_url): | |
"""Sets the webhook for the Telegram bot with proxy configuration.""" | |
webhook_url = f"{gradio_url}/webhooks/{WEBHOOK_SECRET}" | |
retry_attempts = 5 | |
retry_delay = 1 # seconds | |
for attempt in range(retry_attempts): | |
try: | |
async with httpx.AsyncClient(proxies=PROXY_URL) as client: | |
response = await client.post( | |
f"https://api.telegram.org/bot{BOT_TOKEN}/setWebhook", | |
data={"url": webhook_url}, | |
) | |
result = response.json() | |
if result.get('ok'): | |
logger.info("Webhook set successfully.") | |
# Notify power user | |
await bot_send_message(POWER_USER_ID, "π The webhook has been successfully connected! π") | |
return | |
elif result.get('error_code') == 429: | |
retry_after = result['parameters'].get('retry_after', retry_delay) | |
logger.warning(f"Rate limit exceeded. Retrying after {retry_after} seconds.") | |
await asyncio.sleep(retry_after) | |
else: | |
logger.error(f"Failed to set webhook: {result}") | |
return | |
except httpx.RequestError as e: | |
logger.error(f"Request exception: {e}") | |
return | |
async def bot_send_message(chat_id, text): | |
"""Send a message to the specified chat ID using the proxy configuration.""" | |
async with httpx.AsyncClient(proxies=PROXY_URL) as client: | |
response = await client.post( | |
f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage", | |
data={"chat_id": chat_id, "text": text, "parse_mode": "Markdown"} | |
) | |
if response.status_code == 200: | |
logger.info(f"Message sent successfully to chat_id {chat_id}") | |
else: | |
logger.error(f"Failed to send message: {response.text}") | |
def run_gradio_app(): | |
"""Launches the Gradio app and sets the Telegram webhook.""" | |
iface.launch(server_name="0.0.0.0", server_port=7860, debug=True) | |
# Directly set the webhook URL for local testing | |
gradio_url = "http://localhost:7860" # Use local URL for testing | |
asyncio.run(set_telegram_webhook(gradio_url)) # Set the webhook for Telegram | |
def run_flask_app(): | |
"""Launches the Flask app and sets the Telegram webhook.""" | |
app.run(host="0.0.0.0", port=5000, debug=True) | |
if __name__ == "__main__": | |
# Run Gradio app and Flask app concurrently | |
loop = asyncio.get_event_loop() | |
loop.create_task(run_flask_app()) | |
run_gradio_app() |