AforativeBot / app.py
measmonysuon's picture
Update app.py
11b6424 verified
raw
history blame
7.49 kB
import os
import telebot
import google.generativeai as genai
import logging
import requests
import gradio as gr
import httpx
import asyncio
from flask import Flask, request, jsonify
# Suppress experimental warnings
os.environ['HF_HUB_DISABLE_EXPERIMENTAL_WARNING'] = '1'
# Configuration
GOOGLE_API_KEY = 'AIzaSyAYXUMnwmR4nUGDCs97FJJsafcQAPAAuzE'
BOT_TOKEN = '7484321656:AAFaswxTqaSHu_s4jd_pk2Q2OJJWYcWHwAM'
WEBHOOK_SECRET = 'A3%26c8%21jP%23xZ1v*Qw5kL%5E0tR%40u9%25yS6' # URL-encoded secret
POWER_USER_ID = 75516649
PROXY_URL = 'http://eR3LhYeoZXNWhIp:[email protected]:58874'
# Initialize the Telegram bot
bot = telebot.TeleBot(BOT_TOKEN)
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('bot_debug.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Set up Google Generative AI
genai.configure(api_key=GOOGLE_API_KEY)
generation_config = {
"temperature": 1,
"top_p": 0.95,
"top_k": 64,
"max_output_tokens": 8192,
"response_mime_type": "text/plain",
}
model = genai.GenerativeModel(
model_name="gemini-1.5-pro",
generation_config=generation_config,
system_instruction="Please respond to user input"
)
chat_session = model.start_chat(
history=[
{"role": "user", "parts": ["hi\n"]},
{"role": "model", "parts": ["Hello! πŸ‘‹ How can I help you today? 😊 \n"]},
{"role": "user", "parts": ["I am looking for photo booth service?"]},
{"role": "model", "parts": ["That's great! πŸŽ‰ I can definitely help you with information about Aforative Media's photo booth services. \n\nTo give you the most relevant information, could you tell me a little more about what you're looking for? ..."]},
{"role": "user", "parts": ["How much for photo booth services?"]},
{"role": "model", "parts": ["You're smart to ask about pricing upfront! πŸ˜‰ \n\nAforative Media's Mr. & Ms. Booth photo booth services start at **USD 390 for a minimum of 2 hours**. ..."]},
{"role": "user", "parts": ["How about videography service?"]},
{"role": "model", "parts": ["You're thinking about capturing the memories on film too? Excellent choice! Videography adds a whole other dimension to remembering special events. \n\nAforative Media offers excellent videography services, and just like their photo booths, their videography packages are competitively priced and flexible. ..."]},
]
)
def generate_response(user_input):
"""Generate a response using Google Generative AI."""
try:
prompt = f"Respond to the user: {user_input}"
response = chat_session.send_message(prompt) # Generate response using text and prompt
response_text = response.text
return response_text
except Exception as e:
logger.error(f"Error during GenAI processing: {e}")
return "Sorry, I can't answer this query right now but I will improve from time to time."
# Define Gradio interface
def gradio_handle_update(user_input):
return generate_response(user_input)
iface = gr.Interface(
fn=gradio_handle_update,
inputs=gr.Textbox(label="User Input"),
outputs=gr.Textbox(label="Bot Response"),
live=True
)
# Flask app setup
app = Flask(__name__)
@app.route(f'/webhooks/{WEBHOOK_SECRET}', methods=['POST'])
def handle_update():
"""Handles incoming updates from Telegram."""
payload = request.json
logger.debug(f"Received payload: {payload}") # Log the incoming payload
if payload.get('message'):
message_text = payload['message'].get('text')
chat_id = payload['message']['chat']['id']
if message_text:
try:
# Forward message to Gradio and get the response
response = requests.post('http://localhost:7860/gradio', json={'message': message_text})
if response.status_code == 200:
response_text = response.json().get('response', 'Sorry, I cannot process this request.')
else:
response_text = 'Error occurred while processing your request.'
# Log and send the response
logger.debug(f"Generated response: {response_text}")
asyncio.run(bot_send_message(chat_id, response_text))
logger.info(f"Response sent to chat_id {chat_id}")
except Exception as e:
logger.error(f"Error during processing: {e}")
error_message = "Sorry, I can't answer this query right now but I will improve from time to time."
asyncio.run(bot_send_message(chat_id, error_message))
logger.error(f"Error message sent to chat_id {chat_id}")
return jsonify({'status': 'ok'}), 200
async def set_telegram_webhook(gradio_url):
"""Sets the webhook for the Telegram bot with proxy configuration."""
webhook_url = f"{gradio_url}/webhooks/{WEBHOOK_SECRET}"
retry_attempts = 5
retry_delay = 1 # seconds
for attempt in range(retry_attempts):
try:
async with httpx.AsyncClient(proxies=PROXY_URL) as client:
response = await client.post(
f"https://api.telegram.org/bot{BOT_TOKEN}/setWebhook",
data={"url": webhook_url},
)
result = response.json()
if result.get('ok'):
logger.info("Webhook set successfully.")
# Notify power user
await bot_send_message(POWER_USER_ID, "πŸš€ The webhook has been successfully connected! πŸŽ‰")
return
elif result.get('error_code') == 429:
retry_after = result['parameters'].get('retry_after', retry_delay)
logger.warning(f"Rate limit exceeded. Retrying after {retry_after} seconds.")
await asyncio.sleep(retry_after)
else:
logger.error(f"Failed to set webhook: {result}")
return
except httpx.RequestError as e:
logger.error(f"Request exception: {e}")
return
async def bot_send_message(chat_id, text):
"""Send a message to the specified chat ID using the proxy configuration."""
async with httpx.AsyncClient(proxies=PROXY_URL) as client:
response = await client.post(
f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage",
data={"chat_id": chat_id, "text": text, "parse_mode": "Markdown"}
)
if response.status_code == 200:
logger.info(f"Message sent successfully to chat_id {chat_id}")
else:
logger.error(f"Failed to send message: {response.text}")
def run_gradio_app():
"""Launches the Gradio app and sets the Telegram webhook."""
iface.launch(server_name="0.0.0.0", server_port=7860, debug=True)
# Directly set the webhook URL for local testing
gradio_url = "http://localhost:7860" # Use local URL for testing
asyncio.run(set_telegram_webhook(gradio_url)) # Set the webhook for Telegram
def run_flask_app():
"""Launches the Flask app and sets the Telegram webhook."""
app.run(host="0.0.0.0", port=5000, debug=True)
if __name__ == "__main__":
# Run Gradio app and Flask app concurrently
loop = asyncio.get_event_loop()
loop.create_task(run_flask_app())
run_gradio_app()