import os import telebot import google.generativeai as genai import logging import requests import gradio as gr import httpx import asyncio from flask import Flask, request, jsonify # Suppress experimental warnings os.environ['HF_HUB_DISABLE_EXPERIMENTAL_WARNING'] = '1' # Configuration GOOGLE_API_KEY = 'AIzaSyAYXUMnwmR4nUGDCs97FJJsafcQAPAAuzE' BOT_TOKEN = '7484321656:AAFaswxTqaSHu_s4jd_pk2Q2OJJWYcWHwAM' WEBHOOK_SECRET = 'A3%26c8%21jP%23xZ1v*Qw5kL%5E0tR%40u9%25yS6' # URL-encoded secret POWER_USER_ID = 75516649 PROXY_URL = 'http://eR3LhYeoZXNWhIp:clIvQ2hSkO5CtLl@107.180.131.170:58874' # Initialize the Telegram bot bot = telebot.TeleBot(BOT_TOKEN) # Configure logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('bot_debug.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Set up Google Generative AI genai.configure(api_key=GOOGLE_API_KEY) generation_config = { "temperature": 1, "top_p": 0.95, "top_k": 64, "max_output_tokens": 8192, "response_mime_type": "text/plain", } model = genai.GenerativeModel( model_name="gemini-1.5-pro", generation_config=generation_config, system_instruction="Please respond to user input" ) chat_session = model.start_chat( history=[ {"role": "user", "parts": ["hi\n"]}, {"role": "model", "parts": ["Hello! 👋 How can I help you today? 😊 \n"]}, {"role": "user", "parts": ["I am looking for photo booth service?"]}, {"role": "model", "parts": ["That's great! 🎉 I can definitely help you with information about Aforative Media's photo booth services. \n\nTo give you the most relevant information, could you tell me a little more about what you're looking for? ..."]}, {"role": "user", "parts": ["How much for photo booth services?"]}, {"role": "model", "parts": ["You're smart to ask about pricing upfront! 😉 \n\nAforative Media's Mr. & Ms. Booth photo booth services start at **USD 390 for a minimum of 2 hours**. ..."]}, {"role": "user", "parts": ["How about videography service?"]}, {"role": "model", "parts": ["You're thinking about capturing the memories on film too? Excellent choice! Videography adds a whole other dimension to remembering special events. \n\nAforative Media offers excellent videography services, and just like their photo booths, their videography packages are competitively priced and flexible. ..."]}, ] ) def generate_response(user_input): """Generate a response using Google Generative AI.""" try: prompt = f"Respond to the user: {user_input}" response = chat_session.send_message(prompt) # Generate response using text and prompt response_text = response.text return response_text except Exception as e: logger.error(f"Error during GenAI processing: {e}") return "Sorry, I can't answer this query right now but I will improve from time to time." # Define Gradio interface def gradio_handle_update(user_input): return generate_response(user_input) iface = gr.Interface( fn=gradio_handle_update, inputs=gr.Textbox(label="User Input"), outputs=gr.Textbox(label="Bot Response"), live=True ) # Flask app setup app = Flask(__name__) @app.route(f'/webhooks/{WEBHOOK_SECRET}', methods=['POST']) def handle_update(): """Handles incoming updates from Telegram.""" payload = request.json logger.debug(f"Received payload: {payload}") # Log the incoming payload if payload.get('message'): message_text = payload['message'].get('text') chat_id = payload['message']['chat']['id'] if message_text: try: # Forward message to Gradio and get the response response = requests.post('http://localhost:7860/gradio', json={'message': message_text}) if response.status_code == 200: response_text = response.json().get('response', 'Sorry, I cannot process this request.') else: response_text = 'Error occurred while processing your request.' # Log and send the response logger.debug(f"Generated response: {response_text}") asyncio.run(bot_send_message(chat_id, response_text)) logger.info(f"Response sent to chat_id {chat_id}") except Exception as e: logger.error(f"Error during processing: {e}") error_message = "Sorry, I can't answer this query right now but I will improve from time to time." asyncio.run(bot_send_message(chat_id, error_message)) logger.error(f"Error message sent to chat_id {chat_id}") return jsonify({'status': 'ok'}), 200 async def set_telegram_webhook(gradio_url): """Sets the webhook for the Telegram bot with proxy configuration.""" webhook_url = f"{gradio_url}/webhooks/{WEBHOOK_SECRET}" retry_attempts = 5 retry_delay = 1 # seconds for attempt in range(retry_attempts): try: async with httpx.AsyncClient(proxies=PROXY_URL) as client: response = await client.post( f"https://api.telegram.org/bot{BOT_TOKEN}/setWebhook", data={"url": webhook_url}, ) result = response.json() if result.get('ok'): logger.info("Webhook set successfully.") # Notify power user await bot_send_message(POWER_USER_ID, "🚀 The webhook has been successfully connected! 🎉") return elif result.get('error_code') == 429: retry_after = result['parameters'].get('retry_after', retry_delay) logger.warning(f"Rate limit exceeded. Retrying after {retry_after} seconds.") await asyncio.sleep(retry_after) else: logger.error(f"Failed to set webhook: {result}") return except httpx.RequestError as e: logger.error(f"Request exception: {e}") return async def bot_send_message(chat_id, text): """Send a message to the specified chat ID using the proxy configuration.""" async with httpx.AsyncClient(proxies=PROXY_URL) as client: response = await client.post( f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage", data={"chat_id": chat_id, "text": text, "parse_mode": "Markdown"} ) if response.status_code == 200: logger.info(f"Message sent successfully to chat_id {chat_id}") else: logger.error(f"Failed to send message: {response.text}") def run_gradio_app(): """Launches the Gradio app and sets the Telegram webhook.""" iface.launch(server_name="0.0.0.0", server_port=7860, debug=True) # Directly set the webhook URL for local testing gradio_url = "http://localhost:7860" # Use local URL for testing asyncio.run(set_telegram_webhook(gradio_url)) # Set the webhook for Telegram def run_flask_app(): """Launches the Flask app and sets the Telegram webhook.""" app.run(host="0.0.0.0", port=5000, debug=True) if __name__ == "__main__": # Run Gradio app and Flask app concurrently loop = asyncio.get_event_loop() loop.create_task(run_flask_app()) run_gradio_app()