Vesperal / app.py
bhagatsuryainatom's picture
Update app.py
a96c89a verified
raw
history blame
12.1 kB
import gradio as gr
import discord
from discord.ext import commands, tasks
from collections import defaultdict
import json
import os
from dotenv import load_dotenv
import os
from huggingface_hub import HfApi, HfFolder
HfFolder.save_token(os.getenv("HUGGING_FACE_TOKEN")) # Save the token for authentication
# Calculate the sleep time until the next midnight in IST
ist_timezone = pytz.timezone('Asia/Kolkata')
current_time = datetime.now(ist_timezone)
next_midnight = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
sleep_time = (next_midnight - current_time).total_seconds()
api = HfApi()
api.set_space_sleep_time(repo_id="bhagatsuryainatom/Vesperal", sleep_time=sleep_time)
load_dotenv()
intents = discord.Intents.default()
intents.messages = True
intents.message_content = True
intents.voice_states = True
intents.reactions = True
bot = commands.Bot(command_prefix='!', intents=intents)
# Place your bot's token here
TOKEN = os.getenv('TOKEN')
YOUR_CHANNEL_ID= os.getenv('YOUR_CHANNEL_ID')
# Initialize the sessions and pending_messages dictionaries
sessions = {}
pending_messages = {}
class Session:
def __init__(self):
self.camera_required = False
self.time_without_video = defaultdict(int)
self.warnings_given = defaultdict(int)
self.exemptions = set()
self.instructions_sent = False # Flag to track if instructions have been sent
def to_dict(self):
# Convert sets of Member IDs to lists for JSON serialization
return {
"camera_required": self.camera_required,
"time_without_video": {str(member_id): time for member_id, time in self.time_without_video.items()},
"warnings_given": {str(member_id): warnings for member_id, warnings in self.warnings_given.items()},
"exemptions": list(self.exemptions) # Assuming this already stores member IDs
}
@classmethod
def from_dict(cls, data):
session = cls()
session.camera_required = data.get("camera_required", False)
session.time_without_video = defaultdict(int, data.get("time_without_video", {}))
session.warnings_given = defaultdict(int, data.get("warnings_given", {}))
session.exemptions = set(data.get("exemptions", []))
session.instructions_sent = data.get("instructions_sent", False)
return session
def save_sessions():
with open('sessions.json', 'w') as f:
json.dump({str(vc_id): session.to_dict() for vc_id, session in sessions.items()}, f, ensure_ascii=False, indent=4)
def correct_and_load_json(path):
try:
with open(path, 'r') as file:
return json.load(file)
except json.JSONDecodeError:
print("JSON format error detected. Attempting to correct.")
with open(path, 'r') as file:
file_content = file.read()
corrected_content = file_content.replace("'", '"') # Replace single quotes with double quotes
# Implement other corrections here as needed
with open(path, 'w') as file:
file.write(corrected_content)
with open(path, 'r') as file:
return json.load(file) # Attempt to load corrected content
def load_sessions():
if os.path.exists('sessions.json') and os.path.getsize('sessions.json') > 0:
try:
session_data = correct_and_load_json('sessions.json')
for vc_id, data in session_data.items():
sessions[int(vc_id)] = Session.from_dict(data)
except json.JSONDecodeError:
print("Failed to correct JSON file. Please check the file format manually.")
else:
print("Session file not found or is empty, starting fresh.")
@bot.event
async def on_ready():
load_sessions()
print(f'Logged in as {bot.user.name}')
check_camera_status.start()
# Ensure save_sessions_task starts properly within the on_ready event
if not save_sessions_task.is_running():
save_sessions_task.start()
@bot.command()
async def test(ctx):
await ctx.send("Test successful!")
@tasks.loop(seconds=30)
async def check_camera_status():
for vc_id, session in sessions.items():
if not session.camera_required:
continue
vc = bot.get_channel(vc_id)
if vc is None:
continue
for member in vc.members:
if member.id in session.exemptions:
continue
if not member.voice.self_video:
session.time_without_video[member] += 30
if session.time_without_video[member] == 30:
session.warnings_given[member] = 1
await member.send("Warning 1: Please turn on your camera within the next 30 seconds.")
elif session.time_without_video[member] >= 60:
session.warnings_given[member] = 2
await member.send("Final Warning: You are being removed for not turning on your camera.")
await member.move_to(None)
# Reset the member's session data
session.time_without_video.pop(member, None)
session.warnings_given.pop(member, None)
else:
# Reset the member's session data if their camera is on
session.time_without_video.pop(member, None)
session.warnings_given.pop(member, None)
@tasks.loop(minutes=5) # Save sessions every 5 minutes
async def save_sessions_task():
save_sessions()
print("Sessions saved.")
@save_sessions_task.before_loop
async def before_save_sessions():
await bot.wait_until_ready()
@bot.event
async def on_voice_state_update(member, before, after):
if member.bot:
return
if after.channel and (not before.channel or before.channel.id != after.channel.id):
session = sessions.get(after.channel.id)
if session is None:
session = Session()
sessions[after.channel.id] = session
if any(role.name == "Admin" for role in member.roles):
other_admins_in_channel = [m for m in after.channel.members if m != member and any(role.name == "Admin" for role in m.roles)]
if not other_admins_in_channel:
# Replace "YOUR_CHANNEL_ID" with your actual general channel ID
general_channel_id = YOUR_CHANNEL_ID
general_channel = member.guild.get_channel(int(general_channel_id))
if not general_channel:
print(f"Couldn't find the channel with ID: {general_channel_id}.")
return
message_content = f"Admin {member.mention}, do you require cameras to be ON for this session in {after.channel.name}? React with πŸ“· for YES or ❌ for NO."
try:
message = await general_channel.send(message_content)
pending_messages[message.id] = after.channel.id
await message.add_reaction("πŸ“·")
await message.add_reaction("❌")
except discord.DiscordException as e:
print(f"Failed to send message or add reactions due to: {e}")
@bot.command()
@commands.has_permissions(manage_channels=True)
async def setperms(ctx, channel: discord.TextChannel = None):
channel = channel or ctx.channel
if not channel.permissions_for(ctx.guild.me).manage_channels:
await ctx.send("I don't have permission to manage channels here.")
return
bot_member = ctx.guild.get_member(bot.user.id)
overwrites = {
ctx.guild.default_role: discord.PermissionOverwrite(read_messages=False),
bot_member: discord.PermissionOverwrite(read_messages=True, send_messages=True, manage_messages=True, add_reactions=True)
}
try:
await channel.set_permissions(ctx.guild.default_role, read_messages=False)
await channel.set_permissions(bot_member, **overwrites)
await ctx.send(f"Permissions set for {channel.mention} successfully!")
except Exception as e:
await ctx.send(f"Failed to set permissions for {channel.mention} due to: {e}")
@bot.event
async def on_reaction_add(reaction, user):
await process_reaction(reaction, user)
@bot.event
async def on_reaction_remove(reaction, user):
await process_reaction(reaction, user)
async def process_reaction(reaction, user):
if reaction.message.id in pending_messages and not user.bot:
channel_id = pending_messages[reaction.message.id]
session = sessions.get(channel_id)
if session and any(role.name == "Admin" for role in user.roles):
if str(reaction.emoji) == "πŸ“·":
session.camera_required = True
session.exemptions.clear()
elif str(reaction.emoji) == "❌":
session.camera_required = False
session.exemptions.clear()
# Edit the message to reflect the updated session status
await reaction.message.edit(content=f"Camera requirement for {reaction.message.channel.guild.get_channel(channel_id).name} has been {'UPDATED to ON' if session.camera_required else 'UPDATED to OFF'}.")
# Send instructions only if they haven't been sent before
if not session.instructions_sent:
follow_up_message = (
"**Next Steps for Admins:**\n"
"- Use `#camoff @user` to exempt any specific user from the camera requirement.\n"
"- Use `#camoff` to exempt yourself if actively participating in the session.\n"
"- Use `#end` to conclude the session and reset all settings.\n"
"**Note:** These commands work within the context of the current voice session."
)
await reaction.message.channel.send(follow_up_message)
session.instructions_sent = True # Update the flag to prevent re-sending
# Cleanup
del pending_messages[reaction.message.id]
def dummy_function(input_text):
return "This is a dummy function for the Gradio UI."
iface = gr.Interface(fn=dummy_function,
inputs=gr.Textbox(label="Input"),
outputs=gr.Textbox(label="Output"),
title="Discord Bot",
description="This Space runs a Discord bot in the background.")
@bot.event
async def on_message(message):
# Skip bot messages and non-admin users
if message.author.bot or not any(role.name == "Admin" for role in message.author.roles):
await bot.process_commands(message) # Ensure other commands are still processed
return
# Process commands only if the author is in a voice channel
if message.author.voice:
vc_id = message.author.voice.channel.id
session = sessions.get(vc_id)
if not session:
await message.channel.send("No active session found for this voice channel.")
await bot.process_commands(message)
return
# Admin commands
if "#camoff" in message.content:
# If no users are mentioned, exempt the author; otherwise, exempt mentioned users
users_to_exempt = message.mentions if message.mentions else [message.author]
for user in users_to_exempt:
session.exemptions.add(user.id)
await message.channel.send(f"{user.display_name} is now exempt from the camera requirement.")
elif "#end" in message.content:
# Clear exemptions and reset camera requirement
session.exemptions.clear()
session.camera_required = False
await message.channel.send("Session ended: Camera requirements and exemptions have been reset.")
await bot.process_commands(message) # Ensure this is at the end to process other commands
# Process other commands
await bot.process_commands(message)
@check_camera_status.before_loop
async def before_camera_check():
await bot.wait_until_ready()
bot.run(TOKEN)
iface.launch(inline=False,share=True)