|
|
|
|
|
import asyncio |
|
import sys |
|
import requests |
|
|
|
from os.path import abspath, dirname, join |
|
|
|
|
|
sys.path.append(join(dirname(abspath(__file__)), "../")) |
|
from utils import ( |
|
push_msg, |
|
get_aliyundrive_refresh_token, |
|
update_aliyundrive_access_token, |
|
) |
|
from __init__ import ( |
|
aliyundrive_nickname_url, |
|
aliyundrive_signin_url, |
|
aliyundrive_access_token_url, |
|
aliyundrive_reward_url, |
|
aliyundrive_title, |
|
aliyundrive_logger, |
|
aliyundrive_img, |
|
) |
|
|
|
|
|
headers = {"message-Type": "application/json"} |
|
session = requests.Session() |
|
query_body = {} |
|
|
|
|
|
def get_refresh_token(): |
|
|
|
pass |
|
|
|
|
|
async def validate_access_token(): |
|
|
|
|
|
refresh_token = await get_aliyundrive_refresh_token() |
|
|
|
query_body.update({"grant_type": "refresh_token", "refresh_token": refresh_token}) |
|
|
|
resp = session.post( |
|
aliyundrive_access_token_url, |
|
headers=headers, |
|
json=query_body, |
|
) |
|
if resp.status_code != 200: |
|
msg = "阿里云 access_token 失效" |
|
aliyundrive_logger.error(msg) |
|
push_msg(aliyundrive_title, msg, aliyundrive_img) |
|
else: |
|
aliyundrive_logger.info("阿里云 access_token 有效") |
|
new_access_token = resp.json()["access_token"] |
|
await update_aliyundrive_access_token(new_access_token) |
|
headers["Authorization"] = f"Bearer {new_access_token}" |
|
|
|
|
|
def get_aliyun_nickname(): |
|
nickname = False |
|
|
|
user_res = session.post( |
|
aliyundrive_nickname_url, |
|
headers=headers, |
|
json=query_body, |
|
) |
|
if user_res.status_code == 200: |
|
user_msg = f"获取用户名成功,stdout:{user_res.json()}" |
|
nickname = user_res.json()["nick_name"] |
|
else: |
|
user_msg = f"获取用户名失败,stderr:{user_res.json()}" |
|
aliyundrive_logger.error(user_msg) |
|
push_msg(title=aliyundrive_title, message=user_msg, img_url=aliyundrive_img) |
|
return nickname |
|
|
|
|
|
def sign(): |
|
|
|
sign_res = session.post( |
|
aliyundrive_signin_url, |
|
headers=headers, |
|
json=query_body, |
|
) |
|
if sign_res.status_code != 200: |
|
sign_msg = f"阿里云盘签到失败,stderr:{sign_res.json()}" |
|
aliyundrive_logger.error(sign_msg) |
|
push_msg(title=aliyundrive_title, message=sign_res, img_url=aliyundrive_img) |
|
else: |
|
sign_msg = f"阿里云盘签到成功" |
|
aliyundrive_logger.info(sign_msg) |
|
sign_jsondata = sign_res.json() |
|
|
|
signin_count = sign_jsondata["result"]["signInCount"] |
|
|
|
current_sign_info = sign_jsondata["result"]["signInLogs"][signin_count - 1] |
|
|
|
reward_name = current_sign_info["reward"]["name"] |
|
reward_description = current_sign_info["reward"]["description"] |
|
curent_reward = f"{reward_name}{reward_description}" |
|
|
|
is_reward = current_sign_info["isReward"] |
|
return is_reward, signin_count, curent_reward |
|
return False |
|
|
|
|
|
|
|
def get_reward(signin_count): |
|
query_body.update({"signInDay": signin_count}) |
|
reward_res = session.post( |
|
aliyundrive_reward_url, |
|
headers=headers, |
|
json=query_body, |
|
) |
|
if reward_res.status_code == 200: |
|
aliyundrive_logger.info("领取奖励成功") |
|
else: |
|
reward_msg = f"领取奖励失败,stderr:{reward_res.json()}" |
|
aliyundrive_logger.error(reward_msg) |
|
push_msg(title=aliyundrive_title, message=reward_msg, img_url=aliyundrive_img) |
|
return reward_msg |
|
return False |
|
|
|
|
|
async def aliyundrive_sign(): |
|
await validate_access_token() |
|
nickname = get_aliyun_nickname() |
|
is_reward, signin_count, curent_reward = sign() |
|
if not is_reward: |
|
get_reward(signin_count) |
|
message = f"{nickname} 签到成功,本月累计签到 {signin_count} 天\n本次签到获得:{curent_reward}" |
|
aliyundrive_logger.info(message) |
|
push_msg(title=aliyundrive_title, message=message, img_url=aliyundrive_img) |
|
|
|
|
|
if __name__ == "__main__": |
|
asyncio.run(aliyundrive_sign()) |
|
|