# coding:utf-8 import sys import asyncio from os.path import abspath, dirname, join from playwright.async_api import async_playwright sys.path.append(join(dirname(abspath(__file__)), "../")) from utils import push_msg from __init__ import ( browser_headless, browser_proxy, aliyundrive_url, aliyundrive_cookie_file, aliyundrive_title, aliyundrive_logger, ) # 判断登录状态 async def is_login(page): await page.goto(aliyundrive_url) # page.locator("li").filter(has_text="回收站").get_by_role("img").click() if page.get_by_text("回收站").is_visible(): # 加 await 就会在页面加载完成前跳过 aliyundrive_logger.info("登录成功") return True else: stderr = "Cookie 过期" aliyundrive_logger.error(stderr) push_msg(aliyundrive_title, stderr) return False # 签到 async def sign(page): msg = [] if not await is_login(page): return "aliyundrive_not_login" # await page.pause() if page.get_by_text("待领取").is_visible(): sign_res = "今日已签到" aliyundrive_logger.info(sign_res) msg.append(sign_res) # 领取奖励 await page.get_by_text("待领取").click() reward_res = "今日领取奖励成功" aliyundrive_logger.info(reward_res) elif page.get_by_text("待签到").is_visible(): await page.get_by_text("待签到").click() sign_res = "签到成功" aliyundrive_logger.info(sign_res) msg.append(sign_res) else: reward_res = "今日奖励已领取" aliyundrive_logger.info(reward_res) return "\n".join(msg) async def main(): async with async_playwright() as playwright: browser = await playwright.chromium.launch(headless=browser_headless) context = await browser.new_context(storage_state=aliyundrive_cookie_file) page = await context.new_page() task = asyncio.create_task(sign(page)) await asyncio.gather(task) # 退出 await page.close() await browser.close() # 推送 result = task.result() push_msg(aliyundrive_title, "\n".join([result])) if __name__ == "__main__": asyncio.run(main())