sig / _requests /aliyundrive.py
Evilmass
change pymysql to aiomysql
8786c0f
raw
history blame
4.42 kB
# 接口参考:https://github.com/Anonym-w/autoSigninAliyun/blob/main/autoSignin.js
import asyncio
import sys
import requests
from os.path import abspath, dirname, join
# conf
sys.path.append(join(dirname(abspath(__file__)), "../"))
from utils import (
push_msg,
get_aliyundrive_refresh_token,
update_aliyundrive_access_token,
)
from __init__ import (
aliyundrive_nickname_url,
aliyundrive_signin_url,
aliyundrive_access_token_url,
aliyundrive_reward_url,
aliyundrive_title,
aliyundrive_logger,
aliyundrive_img,
)
# session
headers = {"message-Type": "application/json"}
session = requests.Session()
query_body = {}
def get_refresh_token():
# local storage only work in browser
pass
async def validate_access_token():
# token只有三小时有效期,所以定时任务刷新间隔为三小时
# 从树莓派获取 refresh_token
refresh_token = await get_aliyundrive_refresh_token()
# 查询结构体
query_body.update({"grant_type": "refresh_token", "refresh_token": refresh_token})
# validate
resp = session.post(
aliyundrive_access_token_url,
headers=headers,
json=query_body,
)
if resp.status_code != 200:
msg = "阿里云 access_token 失效"
aliyundrive_logger.error(msg)
push_msg(aliyundrive_title, msg, aliyundrive_img)
else:
aliyundrive_logger.info("阿里云 access_token 有效")
new_access_token = resp.json()["access_token"]
await update_aliyundrive_access_token(new_access_token)
headers["Authorization"] = f"Bearer {new_access_token}"
def get_aliyun_nickname():
nickname = False
# 获取用户名
user_res = session.post(
aliyundrive_nickname_url,
headers=headers,
json=query_body,
)
if user_res.status_code == 200:
user_msg = f"获取用户名成功,stdout:{user_res.json()}"
nickname = user_res.json()["nick_name"]
else:
user_msg = f"获取用户名失败,stderr:{user_res.json()}"
aliyundrive_logger.error(user_msg)
push_msg(title=aliyundrive_title, message=user_msg, img_url=aliyundrive_img)
return nickname
def sign():
# 签到
sign_res = session.post(
aliyundrive_signin_url,
headers=headers,
json=query_body,
)
if sign_res.status_code != 200:
sign_msg = f"阿里云盘签到失败,stderr:{sign_res.json()}"
aliyundrive_logger.error(sign_msg)
push_msg(title=aliyundrive_title, message=sign_res, img_url=aliyundrive_img)
else:
sign_msg = f"阿里云盘签到成功"
aliyundrive_logger.info(sign_msg)
sign_jsondata = sign_res.json()
# 已签到天数
signin_count = sign_jsondata["result"]["signInCount"]
# 当天签到信息,数组下标 -1
current_sign_info = sign_jsondata["result"]["signInLogs"][signin_count - 1]
# 当天签到获得的奖励信息
reward_name = current_sign_info["reward"]["name"]
reward_description = current_sign_info["reward"]["description"]
curent_reward = f"{reward_name}{reward_description}"
# 是否已领取奖励,bool
is_reward = current_sign_info["isReward"]
return is_reward, signin_count, curent_reward
return False
# 领取奖励
def get_reward(signin_count):
query_body.update({"signInDay": signin_count})
reward_res = session.post(
aliyundrive_reward_url,
headers=headers,
json=query_body,
)
if reward_res.status_code == 200:
aliyundrive_logger.info("领取奖励成功")
else:
reward_msg = f"领取奖励失败,stderr:{reward_res.json()}"
aliyundrive_logger.error(reward_msg)
push_msg(title=aliyundrive_title, message=reward_msg, img_url=aliyundrive_img)
return reward_msg
return False
async def aliyundrive_sign():
await validate_access_token()
nickname = get_aliyun_nickname()
is_reward, signin_count, curent_reward = sign()
if not is_reward:
get_reward(signin_count)
message = f"{nickname} 签到成功,本月累计签到 {signin_count} 天\n本次签到获得:{curent_reward}"
aliyundrive_logger.info(message)
push_msg(title=aliyundrive_title, message=message, img_url=aliyundrive_img)
if __name__ == "__main__":
asyncio.run(aliyundrive_sign())