Spaces:
Paused
Paused
import asyncio | |
import os | |
import threading | |
import json | |
import random | |
from threading import Event | |
from typing import Optional | |
import datetime | |
import requests | |
import discord | |
import gradio as gr | |
from gradio_client import Client | |
from discord import Permissions | |
from discord.ext import commands | |
from discord.utils import oauth_url | |
from gradio_client.utils import QueueError | |
event = Event() | |
DISCORD_TOKEN = os.getenv("DISCORD_TOKEN") | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
def ask(input_text): | |
api_url = "https://somerandomapifor-disorc-cause-i-need-it.onrender.com/qa" | |
params = {'question': input_text} | |
response = requests.get(api_url, params=params) | |
if response.status_code == 200: | |
json_response = response.json() | |
answer = json_response.get('answer') # Assuming the JSON response has an 'answer' field | |
# Applying the operation to the answer | |
modified_answer = answer[:-4] if answer else "No answer found." | |
return modified_answer | |
else: | |
return "Error: Unable to fetch response" | |
async def wait(job): | |
while not job.done(): | |
await asyncio.sleep(0.2) | |
intents = discord.Intents.all() | |
bot = commands.Bot(command_prefix="$", intents=intents, help_command=None) | |
async def uptime(ctx): | |
"""Displays the uptime of the bot.""" | |
delta = datetime.datetime.utcnow() - bot.start_time | |
hours, remainder = divmod(int(delta.total_seconds()), 3600) | |
minutes, seconds = divmod(remainder, 60) | |
days, hours = divmod(hours, 24) | |
# Create a fancy embed with emojis | |
embed = discord.Embed(title="Bot Uptime", color=discord.Color.green()) | |
embed.add_field(name="Uptime", value=f"{days} days, {hours} hours, {minutes} minutes, {seconds} seconds", inline=False) | |
embed.set_footer(text="Created by Cosmos") | |
await ctx.reply(embed=embed) | |
#@bot.command() | |
#async def ai(ctx, *, input_text: str): | |
# """Ask our AI model a question.""" | |
# async with ctx.typing(): | |
#result = ask(input_text) | |
# result = "the ai is disabled currently" | |
# Reply with the embed | |
# await ctx.reply(result) | |
async def verse(ctx): | |
"""Get a random Bible verse.""" | |
# Fetch a random Bible verse | |
bible_api_url = "https://labs.bible.org/api/?passage=random&type=json" | |
response = requests.get(bible_api_url) | |
if response.status_code == 200: | |
verse = response.json()[0] | |
passage = f"**{verse['bookname']} {verse['chapter']}:{verse['verse']}** \n{verse['text']}" | |
else: | |
passage = "Unable to fetch Bible verse" | |
# Create an embed | |
embed = discord.Embed(title="Random Bible Verse", description=passage, color=discord.Color.blue()) | |
embed.set_footer(text="Created by Cosmos") | |
await ctx.reply(embed=embed) | |
async def kick(ctx, member: discord.Member, *, reason: Optional[str] = "No reason provided"): | |
"""Kicks a member.""" | |
try: | |
await member.kick(reason=reason) | |
await ctx.reply(f"{member.mention} has been kicked. Reason: {reason}") | |
except discord.Forbidden: | |
await ctx.reply("I don't have permission to kick members.") | |
except discord.HTTPException: | |
await ctx.reply("An error occurred while trying to kick the member. Please try again later.") | |
async def ban(ctx, member: discord.Member, *, reason: Optional[str] = "No reason provided"): | |
"""Bans a member.""" | |
try: | |
await member.ban(reason=reason) | |
await ctx.reply(f"{member.mention} has been banned. Reason: {reason}") | |
except discord.Forbidden: | |
await ctx.reply("I don't have permission to ban members.") | |
except discord.HTTPException: | |
await ctx.reply("An error occurred while trying to ban the member. Please try again later.") | |
async def cmds(ctx): | |
"""Returns a list of commands and bot information.""" | |
# Get list of commands | |
command_list = [f"{command.name}: {command.help}" for command in bot.commands] | |
# Get bot information | |
bot_info = f"Bot Name: {bot.user.name}\nBot ID: {bot.user.id}" | |
# Create an embed | |
embed = discord.Embed(title="Bot prefix: $", color=discord.Color.blue()) | |
embed.add_field(name="Commands", value="\n".join(command_list), inline=False) | |
embed.add_field(name="Bot Information", value=bot_info, inline=False) | |
embed.set_footer(text="Created by Cosmos") | |
await ctx.reply(embed=embed) | |
async def update_status(): | |
await bot.wait_until_ready() # Wait until the bot is fully ready | |
while True: | |
# Fetch the number of members in all guilds the bot is connected to | |
member_count = sum(guild.member_count for guild in bot.guilds) | |
# Update the bot's status | |
await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.watching, name=f"{member_count} users")) | |
# Wait for 60 seconds before updating again | |
await asyncio.sleep(60) | |
async def on_ready(): | |
bot.start_time = datetime.datetime.utcnow() | |
print(f"Logged in as {bot.user} (ID: {bot.user.id})") | |
event.set() | |
print("------") | |
bot.loop.create_task(update_status()) # Create the task within the on_ready event | |
async def on_member_join(member): | |
# Fetch the channels by their IDs | |
channel_ids = [1210566384267034644, 1210987054658494477, 1210578060164866128] | |
channels_info = [] | |
for channel_id in channel_ids: | |
channel = member.guild.get_channel(channel_id) | |
if channel: | |
channels_info.append(f"• {channel.name}: {channel.mention}") | |
# Prepare the message content | |
channels_message = "\n".join(channels_info) | |
message_content = ( | |
f"Welcome to the server, {member.name}!\n\n" | |
f"Hope you have a great stay here, {member.mention}!\n\n" | |
f"Here are some channels you might find interesting:\n{channels_message}\n\n" | |
) | |
# Load Bible verses JSON data | |
verses_json = ''' | |
{ | |
"verses": [ | |
{ | |
"reference": "**Romans 12:10**", | |
"text": "“Be devoted to one another in love. Honor one another above yourselves.”" | |
}, | |
{ | |
"reference": "**John 4:20**", | |
"text": "“If someone says, ‘I love God,’ and hates his brother, he is a liar; for he who does not love his brother whom he has seen, how can he love God whom he has not seen?”" | |
}, | |
{ | |
"reference": "**Ephesians 4:32**", | |
"text": "“And be kind to one another, tenderhearted, forgiving one another, even as God in Christ forgave you.”" | |
}, | |
{ | |
"reference": "**John 13:34-35**", | |
"text": "“A new commandment I give to you, that you love one another; as I have loved you, that you also love one another. By this, all will know that you are My disciples if you have love for one another.”" | |
}, | |
{ | |
"reference": "**Corinthians 12:26**", | |
"text": "“And if one member suffers, all the members suffer with it; or if one member is honored, all the members rejoice with it.”" | |
} | |
] | |
} | |
''' | |
verses_data = json.loads(verses_json) | |
random_verse = random.choice(verses_data["verses"]) # Selecting the first verse for demonstration | |
# Include the Bible verse in the message content | |
message_content += f"\n\nRandom Bible Verse:\n{random_verse['reference']} - {random_verse['text']}" | |
# Send a direct message to the user with the message content including the Bible verse | |
try: | |
await member.send(message_content) | |
except discord.HTTPException: | |
print(f"Failed to send a DM to {member.name}") | |
# Create an embed for the welcome channel | |
welcome_channel = discord.utils.get(member.guild.channels, name="👋wellcome-goodbye") | |
if welcome_channel: | |
embed = discord.Embed( | |
title=f"Welcome to the server, {member.name}!", | |
description=f"Hope you have a great stay here, {member.mention}!", | |
color=discord.Color.blue() | |
) | |
embed.add_field(name="Random Bible Verse", value=f"{random_verse['reference']} - {random_verse['text']}", inline=False) | |
embed.set_footer(text="Created by Cosmos") | |
await welcome_channel.send(embed=embed) | |
async def search(ctx, *, query: str): | |
"""Search for a bible verse.""" | |
bible_api_url = f"http://labs.bible.org/api/?passage={query}&formatting=plain&type=json" | |
try: | |
response = requests.get(bible_api_url) | |
response.raise_for_status() | |
verses = response.json() | |
if verses: | |
# Extract text values from dictionaries | |
verse_texts = [verse["text"] for verse in verses] | |
# If verses are found, concatenate them into a single message | |
passage = "\n".join(verse_texts) | |
#passage = truncate_response(passage) | |
embed = discord.Embed(title=f"Search Results for '{query}'", description=passage, color=discord.Color.blue()) | |
else: | |
embed = discord.Embed(title="Search Results", description="No results found.", color=discord.Color.red()) | |
except requests.RequestException as e: | |
embed = discord.Embed(title="Search Results", description=f"Error: {e}", color=discord.Color.red()) | |
embed.set_footer(text="Created by Cosmos") | |
await ctx.reply(embed=embed) | |
async def purge(ctx, limit: int): | |
"""Removes N amount of messages.""" | |
try: | |
await ctx.channel.purge(limit=limit + 1) # Add 1 to include the command message itself | |
await ctx.reply(f"{limit} messages have been purged.") | |
except discord.Forbidden: | |
await ctx.reply("I don't have permission to manage messages.") | |
except discord.HTTPException: | |
await ctx.reply("An error occurred while trying to purge messages. Please try again later.") | |
async def on_command_error(ctx, error): | |
if isinstance(error, commands.CommandNotFound): | |
await ctx.reply("Sorry, I couldn't find that command. Use `$cmds` to see the list of available commands.") | |
elif isinstance(error, commands.MissingRequiredArgument): | |
await ctx.reply("Oops! Looks like you're missing some required arguments.") | |
elif isinstance(error, commands.CheckFailure): | |
await ctx.reply("You do not have the permissions to execute this command.") | |
else: | |
# Log the error to console or your logging system | |
print(f"An error occurred: {error}") | |
# Additional error handling for unexpected errors | |
async def on_error(event_method, *args, **kwargs): | |
# Log the error to console or your logging system | |
print(f"An error occurred in {event_method}: {sys.exc_info()}") | |
# Error handling for unhandled exceptions | |
async def on_command_error(ctx, error): | |
if isinstance(error, commands.CommandInvokeError): | |
original_error = error.original | |
if isinstance(original_error, discord.Forbidden): | |
await ctx.send("I don't have permissions to do that.") | |
elif isinstance(original_error, discord.HTTPException): | |
await ctx.send("An error occurred while processing the command. Please try again later.") | |
else: | |
# Log the error to console or your logging system | |
print(f"Error: {original_error}") | |
await ctx.send("An unexpected error occurred.") | |
# running in thread | |
def run_bot(): | |
if not DISCORD_TOKEN: | |
print("DISCORD_TOKEN NOT SET") | |
event.set() | |
else: | |
bot.run(DISCORD_TOKEN) | |
threading.Thread(target=run_bot).start() | |
event.wait() | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
f""" | |
# Discord bot is online! | |
""" | |
) | |
demo.launch() |