discordbot / discord_bot.py
joytou's picture
Test
308a9db
raw
history blame
10.1 kB
# This code is based on the following example:
# https://discordpy.readthedocs.io/en/stable/quickstart.html#a-minimal-bot
import os
import discord
from discord import app_commands
from discord.ext import commands
from threading import Thread
import json
from horde import HordeAPI
import inspect
TYPE_MAPPING = {
"str": str,
"int": int,
"float": float
}
# 创建一个字典,将规则类型映射到相应的条件检查函数
check_functions = {
"equals": lambda content, message: message.content == content,
"contains": lambda content, message: content in message.content,
"startswith": lambda content, message: message.content.startswith(content),
"endswith": lambda content, message: message.content.endswith(content)
}
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix='>', intents=intents)
tree = bot.tree
with open("discord.json", "r") as f:
json_data = json.load(f)
"""
@tree.command(name="hello", description="Sends a greeting!")
async def hello(interaction: discord.Interaction):
await interaction.response.send_message(f"Hello, {interaction.user.mention}!")
@tree.command(name="greet", description="Greets the specified user")
async def greet(interaction: discord.Interaction, name: str):
await interaction.response.send_message(f"Hello, {name}!")
@tree.command(name="get-kudos", description="The amount of Kudos this user has.")
async def getKudos(interaction: discord.Interaction):
await interaction.response.defer()
async with HordeAPI.getUserDetails() as details:
if "kudos" not in details:
await interaction.followup.send(f'Error: {details["code"]} {details["reason"]}')
return
await interaction.followup.send(f'The amount of Kudos this user has is {details["kudos"]}')
@tree.command(name="generate-status", description="Retrieve the status of an Asynchronous generation request.")
async def generateStatus(interaction: discord.Interaction, id: str):
await interaction.response.defer()
async with HordeAPI.generateCheck(id) as details:
if "kudos" not in details:
await interaction.followup.send(f'Check Error: {details["code"]} {details["reason"]}')
return
if bool(details["is_possible"]) == False:
await interaction.followup.send("This generation is impossible.")
return
if bool(details["faulted"]) == True:
await interaction.followup.send("This generation is faulted.")
return
if bool(details["done"]) == True:
async with HordeAPI.generateStatus(id) as generationDetail:
if "generations" not in generationDetail:
await interaction.followup.send(f'Status Error: {generationDetail["code"]} {generationDetail["reason"]}')
for i in range(len(generationDetail["generations"])):
await interaction.followup.send(generationDetail["generations"][i]["img"])
return
if int(details["processing"]) > 0:
total = int(details["finished"]) + int(details["processing"]) + int(details["queue_position"]) + int(details["restarted"]) + int(details["waiting"])
await interaction.followup.send(f'Processing image: {details["processing"]}/{total}')
return
await interaction.followup.send(f'Position in queue: {details["queue_position"]}, wait time: {details["wait_time"]}s')
"""
print(discord.version_info)
print(discord.__version__)
print(inspect.signature(app_commands.Command))
async def hello():
return f"Hello, {interaction.user.mention}!"
async def greet(name: str):
return f"Hello, {name}!"
async def getKudos():
async with HordeAPI.getUserDetails() as details:
if "kudos" not in details:
return f'Error: {details["code"]} {details["reason"]}'
return f'The amount of Kudos this user has is {details["kudos"]}'
async def generateStatus(id: str):
async with HordeAPI.generateCheck(id) as details:
if "kudos" not in details:
return f'Check Error: {details["code"]} {details["reason"]}'
if bool(details["is_possible"]) == False:
return "This generation is impossible."
if bool(details["faulted"]) == True:
return "This generation is faulted."
if bool(details["done"]) == True:
async with HordeAPI.generateStatus(id) as generationDetail:
if "generations" not in generationDetail:
return f'Status Error: {generationDetail["code"]} {generationDetail["reason"]}'
for i in range(len(generationDetail["generations"])):
return generationDetail["generations"][i]["img"]
if int(details["processing"]) > 0:
total = int(details["finished"]) + int(details["processing"]) + int(details["queue_position"]) + int(details["restarted"]) + int(details["waiting"])
return f'Processing image: {details["processing"]}/{total}'
return f'Position in queue: {details["queue_position"]}, wait time: {details["wait_time"]}s'
"""
# 根据 json 数据动态创建命令
for command in json_data["command"]:
@tree.command(name=command["name"], description=command["description"])
async def dynamic_command(interaction: discord.Interaction, **kwargs):
print(kwargs)
await interaction.response.defer()
# 动态调用命令对应的函数
function_name = command["function"]
function = globals()[function_name]
result = await function()
await interaction.followup.send(result)
# 动态创建参数
#params = [app_commands.Parameter(name=param["name"], display_name=param['name'], description=param["description"], type=TYPE_MAPPING[param["type"]], autocomplete=False, required=True)
# for param in command["parameters"]]
# 动态创建命令并注册
tree_command = app_commands.Command(
name=command["name"],
description=command["description"],
callback=dynamic_command
)
# 将命令添加到 bot 的 command tree
bot.tree.add_command(tree_command)
"""
class MyClient(discord.Client):
def __init__(self, **kwargs):
# 从 kwargs 中获取 intents,或者使用默认 intents
intents = kwargs.pop('intents', discord.Intents.default())
# 调用 discord.Client 的 __init__,并将 intents 作为关键字参数传递
super().__init__(intents=intents, **kwargs)
self.tree = app_commands.CommandTree(self)
async def setup_hook(self):
# 在这里你可以注册动态生成的命令
self.tree.add_command(app_commands.Command(name="test", callback=self.test, description="test"))
await self.tree.sync() # 同步命令
async def test(self, interaction: discord.Interaction, x: str, y: int):
print(self)
print(interaction)
print(x)
print(y)
await interaction.response.send_message(f"x: {x}, y: {y}")
MyClient(bot)
@bot.command()
async def ping(ctx):
await ctx.send('pong')
@bot.event
async def on_ready():
await tree.sync()
await bot.tree.sync()
print('We have logged in as {0.user}'.format(bot))
@bot.event
async def on_message(message):
if message.author == bot.user:
return
"""
if message.content == 'ping':
await message.channel.send('pong')
if message.content.startswith('$hello'):
await message.channel.send('Hello!')
"""
for rule in json_data["message"]:
rule_type = rule["type"]
content = rule["content"]
response = rule["response"]
# 根据规则类型动态调用对应的判断函数
if check_functions.get(rule_type, lambda c, m: False)(content, message):
# 如果规则指定了函数,则调用对应的函数
if "function" in rule:
function_name = rule["function"]
function = globals()[function_name]
await function(message)
break
# 否则发送预定义的响应消息
elif "response" in rule:
await message.channel.send(rule["response"])
break
# 确保命令系统正常工作
await bot.process_commands(message)
async def sendMessageToChannelHelper(data):
channel = await bot.fetch_channel(os.environ.get("CHANNEL_ID"))
# 创建一个 embed 对象
mTitle = "Empty Title"
if "id" in data:
mTitle = data["id"]
if "log_tag" in data:
mTitle = data["log_tag"]
mDescription = "Empty Description"
if "model" in data:
mDescription = data["model"]
if "log_message" in data:
mDescription = data["log_message"]
mColor = 0x00ff00
if ("log_tag" in data or "log_message" in data) and (data["log_level"] == "assert" or data["log_level"] == "error"):
mColor = 0xff0000
embed = discord.Embed(title=mTitle, description=mDescription, color=mColor)
# 将 fields 数据加入 embed
for field in data:
if field == "img":
embed.set_image(url=data[field])
else:
embed.add_field(name=field, value=data[field], inline=True)
# 发送 embed 消息
await channel.send(embed=embed)
def sendMessageToChannel(data):
bot.loop.create_task(sendMessageToChannelHelper(data))
def run():
try:
token = os.environ.get("TOKEN") or ""
if token == "":
raise Exception("Please add your token to the Secrets pane.")
bot.run(token)
except discord.HTTPException as e:
if e.status == 429:
print(
"The Discord servers denied the connection for making too many requests"
)
print(
"Get help from https://stackoverflow.com/questions/66724687/in-discord-py-how-to-solve-the-error-for-toomanyrequests"
)
else:
raise e
def discord_bot():
print("Running discord_bot")
run()