Spaces:
Paused
Paused
import asyncio | |
import os | |
import threading | |
from threading import Event | |
from typing import Optional | |
import datetime | |
import requests | |
import discord | |
import gradio as gr | |
from gradio_client import Client | |
from discord import Permissions | |
from discord.ext import commands | |
from discord.utils import oauth_url | |
from gradio_client.utils import QueueError | |
event = Event() | |
DISCORD_TOKEN = os.getenv("DISCORD_TOKEN") | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
async def wait(job): | |
while not job.done(): | |
await asyncio.sleep(0.2) | |
intents = discord.Intents.all() | |
bot = commands.Bot(command_prefix="$", intents=intents, help_command=None) | |
async def uptime(ctx): | |
"""Displays the uptime of the bot.""" | |
delta = datetime.datetime.utcnow() - bot.start_time | |
hours, remainder = divmod(int(delta.total_seconds()), 3600) | |
minutes, seconds = divmod(remainder, 60) | |
days, hours = divmod(hours, 24) | |
# Create a fancy embed with emojis | |
embed = discord.Embed(title="Bot Uptime", color=discord.Color.green()) | |
embed.add_field(name="Uptime", value=f"{days} days, {hours} hours, {minutes} minutes, {seconds} seconds", inline=False) | |
embed.set_footer(text="Created by Cosmos") | |
await ctx.send(embed=embed) | |
async def ai(ctx, *, input_text: str): | |
"""Ask our AI model a question. (Session resets every 1 message!)""" | |
client = Client("https://wop-xxx-opengpt.hf.space/") | |
result = client.predict(input_text, 0.9, 2000, 0.9, 1.2, api_name="/chat") | |
result = result[:-4] | |
# Create an embed with the AI's response | |
embed = discord.Embed(title="AI Response", description=result, color=discord.Color.green()) | |
embed.add_field(name="Info", value="Session resets every `1` message!\nModel Name: `Mistral 7B`", inline=False) | |
embed.set_footer(text="Created by Cosmos") | |
# Reply with the embed | |
await ctx.send(embed=embed) | |
async def verse(ctx): | |
"""Returns a random Bible verse.""" | |
# Fetch a random Bible verse | |
bible_api_url = "https://labs.bible.org/api/?passage=random&type=json" | |
response = requests.get(bible_api_url) | |
if response.status_code == 200: | |
verse = response.json()[0] | |
passage = f"**{verse['bookname']} {verse['chapter']}:{verse['verse']}** \n{verse['text']}" | |
else: | |
passage = "Unable to fetch Bible verse" | |
# Create an embed | |
embed = discord.Embed(title="Random Bible Verse", description=passage, color=discord.Color.blue()) | |
embed.set_footer(text="Created by Cosmos") | |
await ctx.send(embed=embed) | |
async def kick(ctx, member: discord.Member, *, reason: Optional[str] = "No reason provided"): | |
"""Kicks a member.""" | |
try: | |
await member.kick(reason=reason) | |
await ctx.send(f"{member.mention} has been kicked. Reason: {reason}") | |
except discord.Forbidden: | |
await ctx.send("I don't have permission to kick members.") | |
except discord.HTTPException: | |
await ctx.send("An error occurred while trying to kick the member. Please try again later.") | |
async def ban(ctx, member: discord.Member, *, reason: Optional[str] = "No reason provided"): | |
"""Bans a member.""" | |
try: | |
await member.ban(reason=reason) | |
await ctx.send(f"{member.mention} has been banned. Reason: {reason}") | |
except discord.Forbidden: | |
await ctx.send("I don't have permission to ban members.") | |
except discord.HTTPException: | |
await ctx.send("An error occurred while trying to ban the member. Please try again later.") | |
async def cmds(ctx): | |
"""Returns a list of commands and bot information.""" | |
# Get list of commands | |
command_list = [f"{command.name}: {command.help}" for command in bot.commands] | |
# Get bot information | |
bot_info = f"Bot Name: {bot.user.name}\nBot ID: {bot.user.id}" | |
# Create an embed | |
embed = discord.Embed(title="Bot prefix: $", color=discord.Color.blue()) | |
embed.add_field(name="Commands", value="\n".join(command_list), inline=False) | |
embed.add_field(name="Bot Information", value=bot_info, inline=False) | |
embed.set_footer(text="Created by Cosmos") | |
await ctx.send(embed=embed) | |
async def update_status(): | |
await bot.wait_until_ready() # Wait until the bot is fully ready | |
while True: | |
# Fetch the number of members in all guilds the bot is connected to | |
member_count = sum(guild.member_count for guild in bot.guilds) | |
# Update the bot's status | |
await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.watching, name=f"{member_count} users")) | |
# Wait for 60 seconds before updating again | |
await asyncio.sleep(60) | |
async def on_ready(): | |
bot.start_time = datetime.datetime.utcnow() | |
print(f"Logged in as {bot.user} (ID: {bot.user.id})") | |
event.set() | |
print("------") | |
bot.loop.create_task(update_status()) # Create the task within the on_ready event | |
async def on_member_join(member): | |
channel = discord.utils.get(member.guild.channels, name="👋wellcome-goodbye") | |
# Fetch a random Bible verse | |
bible_api_url = "https://labs.bible.org/api/?passage=random&type=json" | |
response = requests.get(bible_api_url) | |
if response.status_code == 200: | |
verse = response.json()[0] | |
passage = f"{verse['bookname']} {verse['chapter']}:{verse['verse']} - {verse['text']}" | |
else: | |
passage = "Unable to fetch Bible verse" | |
# Create an embed | |
embed = discord.Embed(title=f"Welcome to the server, {member.name}!", description=f"Hope you have a great stay here, <@{member.id}>!", color=discord.Color.blue()) | |
embed.add_field(name="Random Bible Verse", value=passage, inline=False) | |
embed.set_footer(text="Created by Cosmos") | |
await channel.send(embed=embed) | |
async def search(ctx, *, query: str): | |
"""Search for a bible verse.""" | |
bible_api_url = f"https://labs.bible.org/api/?passage={query}&type=json" | |
response = requests.get(bible_api_url) | |
if response.status_code == 200: | |
verses = response.json() | |
if verses: | |
# If verses are found, concatenate them into a single message | |
passage = "\n".join(f"**{verse['bookname']} {verse['chapter']}:{verse['verse']}** \n{verse['text']}" for verse in verses) | |
passage = truncate_response(passage) | |
embed = discord.Embed(title=f"Search Results for '{query}'", description=passage, color=discord.Color.blue()) | |
else: | |
embed = discord.Embed(title="Search Results", description="No results found.", color=discord.Color.red()) | |
else: | |
embed = discord.Embed(title="Search Results", description="Unable to fetch search results.", color=discord.Color.red()) | |
embed.set_footer(text="Created by Cosmos") | |
await ctx.send(embed=embed) | |
async def purge(ctx, limit: int): | |
"""Removes N amount of messages.""" | |
try: | |
await ctx.channel.purge(limit=limit + 1) # Add 1 to include the command message itself | |
await ctx.send(f"{limit} messages have been purged.") | |
except discord.Forbidden: | |
await ctx.send("I don't have permission to manage messages.") | |
except discord.HTTPException: | |
await ctx.send("An error occurred while trying to purge messages. Please try again later.") | |
async def on_command_error(ctx, error): | |
if isinstance(error, commands.CommandNotFound): | |
await ctx.send("Sorry, I couldn't find that command. Use `$cmds` to see the list of available commands.") | |
elif isinstance(error, commands.MissingRequiredArgument): | |
await ctx.send("Oops! Looks like you're missing some required arguments.") | |
elif isinstance(error, commands.CheckFailure): | |
await ctx.send("You do not have the permissions to execute this command.") | |
else: | |
# Log the error to console or your logging system | |
print(f"An error occurred: {error}") | |
# Additional error handling for unexpected errors | |
async def on_error(event_method, *args, **kwargs): | |
# Log the error to console or your logging system | |
print(f"An error occurred in {event_method}: {sys.exc_info()}") | |
# Error handling for unhandled exceptions | |
async def on_command_error(ctx, error): | |
if isinstance(error, commands.CommandInvokeError): | |
original_error = error.original | |
if isinstance(original_error, discord.Forbidden): | |
await ctx.send("I don't have permissions to do that.") | |
elif isinstance(original_error, discord.HTTPException): | |
await ctx.send("An error occurred while processing the command. Please try again later.") | |
else: | |
# Log the error to console or your logging system | |
print(f"Error: {original_error}") | |
await ctx.send("An unexpected error occurred.") | |
# running in thread | |
def run_bot(): | |
if not DISCORD_TOKEN: | |
print("DISCORD_TOKEN NOT SET") | |
event.set() | |
else: | |
bot.run(DISCORD_TOKEN) | |
threading.Thread(target=run_bot).start() | |
event.wait() | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
f""" | |
# Discord bot is online! | |
""" | |
) | |
demo.launch() |