at / app.py
deeme's picture
Upload 6 files
75eef29 verified
import requests
import schedule
import time
import os
import threading
from fastapi import FastAPI
app = FastAPI()
# 从环境变量获取基础 URL
BASE_URL = os.getenv('BASE_URL', "")
def check_available_tokens():
"""
检查可用的 Tokens 数量
"""
try:
response = requests.get(f"{BASE_URL}")
if response.status_code == 200:
content = response.text
if '当前可用 Tokens 数量:<span class="text-blue-600">0' in content:
return True
return False
except Exception as e:
print(f"检查 Tokens 时发生错误: {e}")
return False
def read_and_prepare_tokens(file_path):
"""
从文件读取tokens并使用\r\n连接
"""
try:
# 适配 Hugging Face Spaces 的文件路径
if os.path.exists(file_path):
with open(file_path, 'r') as file:
# 去除每行的空白字符,过滤掉空行
tokens = [line.strip() for line in file if line.strip()]
else:
# 如果文件不存在,尝试在 /home/user 目录下查找
alternative_path = os.path.join('/home/user', file_path)
with open(alternative_path, 'r') as file:
tokens = [line.strip() for line in file if line.strip()]
# 使用\r\n连接tokens
combined_token = '\r\n'.join(tokens)
return combined_token
except FileNotFoundError:
print(f"错误:文件 {file_path} 未找到")
return None
except Exception as e:
print(f"读取文件发生错误: {e}")
return None
def post_tokens():
"""
读取tokens并发送单次POST请求
"""
url = f"{BASE_URL}/upload"
# 读取并准备tokens
combined_token = read_and_prepare_tokens('access_token.txt')
if not combined_token:
print("没有可用的tokens")
# 触发一次at.py环境变量读取生成access_token.txt
os.system('python at.py')
return
try:
payload = {"text": combined_token}
response = requests.post(url, data=payload)
#print(f"响应状态码: {response.status_code}")
print(f"响应内容: {response.text}")
except Exception as e:
print(f"请求发生错误: {e}")
@app.get("/")
async def main():
# 首先检查是否需要发送 tokens
if not check_available_tokens():
print("当前仍有可用 Tokens,无需发送")
return
return {"message": post_tokens()}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)