Spaces:
Running
Running
# This code is based on the following example: | |
# https://discordpy.readthedocs.io/en/stable/quickstart.html#a-minimal-bot | |
import os | |
import discord | |
from discord import app_commands | |
from discord.ext import commands | |
from threading import Thread | |
import json | |
from horde import HordeAPI | |
import inspect | |
TYPE_MAPPING = { | |
"str": str, | |
"int": int, | |
"float": float | |
} | |
# 创建一个字典,将规则类型映射到相应的条件检查函数 | |
check_functions = { | |
"equals": lambda content, message: message.content == content, | |
"contains": lambda content, message: content in message.content, | |
"startswith": lambda content, message: message.content.startswith(content), | |
"endswith": lambda content, message: message.content.endswith(content) | |
} | |
intents = discord.Intents.default() | |
intents.message_content = True | |
bot = commands.Bot(command_prefix='>', intents=intents) | |
tree = bot.tree | |
with open("discord.json", "r") as f: | |
json_data = json.load(f) | |
""" | |
@tree.command(name="hello", description="Sends a greeting!") | |
async def hello(interaction: discord.Interaction): | |
await interaction.response.send_message(f"Hello, {interaction.user.mention}!") | |
@tree.command(name="greet", description="Greets the specified user") | |
async def greet(interaction: discord.Interaction, name: str): | |
await interaction.response.send_message(f"Hello, {name}!") | |
@tree.command(name="get-kudos", description="The amount of Kudos this user has.") | |
async def getKudos(interaction: discord.Interaction): | |
await interaction.response.defer() | |
async with HordeAPI.getUserDetails() as details: | |
if "kudos" not in details: | |
await interaction.followup.send(f'Error: {details["code"]} {details["reason"]}') | |
return | |
await interaction.followup.send(f'The amount of Kudos this user has is {details["kudos"]}') | |
@tree.command(name="generate-status", description="Retrieve the status of an Asynchronous generation request.") | |
async def generateStatus(interaction: discord.Interaction, id: str): | |
await interaction.response.defer() | |
async with HordeAPI.generateCheck(id) as details: | |
if "kudos" not in details: | |
await interaction.followup.send(f'Check Error: {details["code"]} {details["reason"]}') | |
return | |
if bool(details["is_possible"]) == False: | |
await interaction.followup.send("This generation is impossible.") | |
return | |
if bool(details["faulted"]) == True: | |
await interaction.followup.send("This generation is faulted.") | |
return | |
if bool(details["done"]) == True: | |
async with HordeAPI.generateStatus(id) as generationDetail: | |
if "generations" not in generationDetail: | |
await interaction.followup.send(f'Status Error: {generationDetail["code"]} {generationDetail["reason"]}') | |
for i in range(len(generationDetail["generations"])): | |
await interaction.followup.send(generationDetail["generations"][i]["img"]) | |
return | |
if int(details["processing"]) > 0: | |
total = int(details["finished"]) + int(details["processing"]) + int(details["queue_position"]) + int(details["restarted"]) + int(details["waiting"]) | |
await interaction.followup.send(f'Processing image: {details["processing"]}/{total}') | |
return | |
await interaction.followup.send(f'Position in queue: {details["queue_position"]}, wait time: {details["wait_time"]}s') | |
""" | |
print(discord.version_info) | |
print(discord.__version__) | |
print(inspect.signature(app_commands.Command)) | |
async def hello(): | |
return f"Hello, {interaction.user.mention}!" | |
async def greet(name: str): | |
return f"Hello, {name}!" | |
async def getKudos(): | |
async with HordeAPI.getUserDetails() as details: | |
if "kudos" not in details: | |
return f'Error: {details["code"]} {details["reason"]}' | |
return f'The amount of Kudos this user has is {details["kudos"]}' | |
async def generateStatus(id: str): | |
async with HordeAPI.generateCheck(id) as details: | |
if "kudos" not in details: | |
return f'Check Error: {details["code"]} {details["reason"]}' | |
if bool(details["is_possible"]) == False: | |
return "This generation is impossible." | |
if bool(details["faulted"]) == True: | |
return "This generation is faulted." | |
if bool(details["done"]) == True: | |
async with HordeAPI.generateStatus(id) as generationDetail: | |
if "generations" not in generationDetail: | |
return f'Status Error: {generationDetail["code"]} {generationDetail["reason"]}' | |
for i in range(len(generationDetail["generations"])): | |
return generationDetail["generations"][i]["img"] | |
if int(details["processing"]) > 0: | |
total = int(details["finished"]) + int(details["processing"]) + int(details["queue_position"]) + int(details["restarted"]) + int(details["waiting"]) | |
return f'Processing image: {details["processing"]}/{total}' | |
return f'Position in queue: {details["queue_position"]}, wait time: {details["wait_time"]}s' | |
""" | |
# 根据 json 数据动态创建命令 | |
for command in json_data["command"]: | |
@tree.command(name=command["name"], description=command["description"]) | |
async def dynamic_command(interaction: discord.Interaction, **kwargs): | |
print(kwargs) | |
await interaction.response.defer() | |
# 动态调用命令对应的函数 | |
function_name = command["function"] | |
function = globals()[function_name] | |
result = await function() | |
await interaction.followup.send(result) | |
# 动态创建参数 | |
#params = [app_commands.Parameter(name=param["name"], display_name=param['name'], description=param["description"], type=TYPE_MAPPING[param["type"]], autocomplete=False, required=True) | |
# for param in command["parameters"]] | |
# 动态创建命令并注册 | |
tree_command = app_commands.Command( | |
name=command["name"], | |
description=command["description"], | |
callback=dynamic_command | |
) | |
# 将命令添加到 bot 的 command tree | |
bot.tree.add_command(tree_command) | |
""" | |
class MyClient(discord.Client): | |
def __init__(self, **kwargs): | |
# 从 kwargs 中获取 intents,或者使用默认 intents | |
intents = kwargs.pop('intents', discord.Intents.default()) | |
# 调用 discord.Client 的 __init__,并将 intents 作为关键字参数传递 | |
super().__init__(intents=intents, **kwargs) | |
self.tree = app_commands.CommandTree(self) | |
async def setup_hook(self): | |
# 在这里你可以注册动态生成的命令 | |
self.tree.add_command(app_commands.Command(name="test", callback=self.test, description="test")) | |
await self.tree.sync() # 同步命令 | |
async def test(self, interaction: discord.Interaction, x: str, y: int): | |
print(self) | |
print(interaction) | |
print(x) | |
print(y) | |
await interaction.response.send_message(f"x: {x}, y: {y}") | |
MyClient(bot) | |
async def ping(ctx): | |
await ctx.send('pong') | |
async def on_ready(): | |
await tree.sync() | |
await bot.tree.sync() | |
print('We have logged in as {0.user}'.format(bot)) | |
async def on_message(message): | |
if message.author == bot.user: | |
return | |
""" | |
if message.content == 'ping': | |
await message.channel.send('pong') | |
if message.content.startswith('$hello'): | |
await message.channel.send('Hello!') | |
""" | |
for rule in json_data["message"]: | |
rule_type = rule["type"] | |
content = rule["content"] | |
response = rule["response"] | |
# 根据规则类型动态调用对应的判断函数 | |
if check_functions.get(rule_type, lambda c, m: False)(content, message): | |
# 如果规则指定了函数,则调用对应的函数 | |
if "function" in rule: | |
function_name = rule["function"] | |
function = globals()[function_name] | |
await function(message) | |
break | |
# 否则发送预定义的响应消息 | |
elif "response" in rule: | |
await message.channel.send(rule["response"]) | |
break | |
# 确保命令系统正常工作 | |
await bot.process_commands(message) | |
async def sendMessageToChannelHelper(data): | |
channel = await bot.fetch_channel(os.environ.get("CHANNEL_ID")) | |
# 创建一个 embed 对象 | |
mTitle = "Empty Title" | |
if "id" in data: | |
mTitle = data["id"] | |
if "log_tag" in data: | |
mTitle = data["log_tag"] | |
mDescription = "Empty Description" | |
if "model" in data: | |
mDescription = data["model"] | |
if "log_message" in data: | |
mDescription = data["log_message"] | |
mColor = 0x00ff00 | |
if ("log_tag" in data or "log_message" in data) and (data["log_level"] == "assert" or data["log_level"] == "error"): | |
mColor = 0xff0000 | |
embed = discord.Embed(title=mTitle, description=mDescription, color=mColor) | |
# 将 fields 数据加入 embed | |
for field in data: | |
if field == "img": | |
embed.set_image(url=data[field]) | |
else: | |
embed.add_field(name=field, value=data[field], inline=True) | |
# 发送 embed 消息 | |
await channel.send(embed=embed) | |
def sendMessageToChannel(data): | |
bot.loop.create_task(sendMessageToChannelHelper(data)) | |
def run(): | |
try: | |
token = os.environ.get("TOKEN") or "" | |
if token == "": | |
raise Exception("Please add your token to the Secrets pane.") | |
bot.run(token) | |
except discord.HTTPException as e: | |
if e.status == 429: | |
print( | |
"The Discord servers denied the connection for making too many requests" | |
) | |
print( | |
"Get help from https://stackoverflow.com/questions/66724687/in-discord-py-how-to-solve-the-error-for-toomanyrequests" | |
) | |
else: | |
raise e | |
def discord_bot(): | |
print("Running discord_bot") | |
run() | |