# 接口参考:https://github.com/Anonym-w/autoSigninAliyun/blob/main/autoSignin.js import asyncio import sys import requests from os.path import abspath, dirname, join # conf sys.path.append(join(dirname(abspath(__file__)), "../")) from utils import ( push_msg, get_aliyundrive_refresh_token, update_aliyundrive_access_token, ) from __init__ import ( aliyundrive_nickname_url, aliyundrive_signin_url, aliyundrive_access_token_url, aliyundrive_reward_url, aliyundrive_title, aliyundrive_logger, aliyundrive_img, ) # session headers = {"message-Type": "application/json"} session = requests.Session() query_body = {} def get_refresh_token(): # local storage only work in browser pass async def validate_access_token(): # token只有三小时有效期,所以定时任务刷新间隔为三小时 # 从树莓派获取 refresh_token refresh_token = await get_aliyundrive_refresh_token() # 查询结构体 query_body.update({"grant_type": "refresh_token", "refresh_token": refresh_token}) # validate resp = session.post( aliyundrive_access_token_url, headers=headers, json=query_body, ) if resp.status_code != 200: msg = "阿里云 access_token 失效" aliyundrive_logger.error(msg) push_msg(aliyundrive_title, msg, aliyundrive_img) else: aliyundrive_logger.info("阿里云 access_token 有效") new_access_token = resp.json()["access_token"] await update_aliyundrive_access_token(new_access_token) headers["Authorization"] = f"Bearer {new_access_token}" def get_aliyun_nickname(): nickname = False # 获取用户名 user_res = session.post( aliyundrive_nickname_url, headers=headers, json=query_body, ) if user_res.status_code == 200: user_msg = f"获取用户名成功,stdout:{user_res.json()}" nickname = user_res.json()["nick_name"] else: user_msg = f"获取用户名失败,stderr:{user_res.json()}" aliyundrive_logger.error(user_msg) push_msg(title=aliyundrive_title, message=user_msg, img_url=aliyundrive_img) return nickname def sign(): # 签到 sign_res = session.post( aliyundrive_signin_url, headers=headers, json=query_body, ) if sign_res.status_code != 200: sign_msg = f"阿里云盘签到失败,stderr:{sign_res.json()}" aliyundrive_logger.error(sign_msg) push_msg(title=aliyundrive_title, message=sign_res, img_url=aliyundrive_img) else: sign_msg = f"阿里云盘签到成功" aliyundrive_logger.info(sign_msg) sign_jsondata = sign_res.json() # 已签到天数 signin_count = sign_jsondata["result"]["signInCount"] # 当天签到信息,数组下标 -1 current_sign_info = sign_jsondata["result"]["signInLogs"][signin_count - 1] # 当天签到获得的奖励信息 reward_name = current_sign_info["reward"]["name"] reward_description = current_sign_info["reward"]["description"] curent_reward = f"{reward_name}{reward_description}" # 是否已领取奖励,bool is_reward = current_sign_info["isReward"] return is_reward, signin_count, curent_reward return False # 领取奖励 def get_reward(signin_count): query_body.update({"signInDay": signin_count}) reward_res = session.post( aliyundrive_reward_url, headers=headers, json=query_body, ) if reward_res.status_code == 200: aliyundrive_logger.info("领取奖励成功") else: reward_msg = f"领取奖励失败,stderr:{reward_res.json()}" aliyundrive_logger.error(reward_msg) push_msg(title=aliyundrive_title, message=reward_msg, img_url=aliyundrive_img) return reward_msg return False async def aliyundrive_sign(): await validate_access_token() nickname = get_aliyun_nickname() is_reward, signin_count, curent_reward = sign() if not is_reward: get_reward(signin_count) message = f"{nickname} 签到成功,本月累计签到 {signin_count} 天\n本次签到获得:{curent_reward}" aliyundrive_logger.info(message) push_msg(title=aliyundrive_title, message=message, img_url=aliyundrive_img) if __name__ == "__main__": asyncio.run(aliyundrive_sign())