# coding:utf-8 import sys import asyncio from datetime import datetime from os.path import abspath, dirname, join from playwright.async_api import async_playwright sys.path.append(join(dirname(abspath(__file__)), "../")) from utils import push_msg, get_bilibili_live_rooms_from_pi from __init__ import ( browser_headless, browser_proxy, bilibili_url, bilibili_sign_url, bilibili_live_url, bilibili_cookie_file, bilibili_title, bilibili_logger, bilibili_img, ) # 判断登录状态 async def is_login(page): await page.goto(bilibili_url) if await page.get_by_text("修改资料").is_visible(timeout=5000): bilibili_logger.info("登录成功") return True else: stderr = "Cookie 过期" bilibili_logger.error(stderr) push_msg(bilibili_title, stderr, bilibili_img) return False # 签到 async def sign(page): msg = [] if not await is_login(page): return "bilibili_not_login" await page.goto(bilibili_sign_url) if await page.get_by_text("无法重复签到").is_visible(): sign_res = f"签到时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n签到状态:今日已签到" bilibili_logger.info(sign_res) msg.append(sign_res) elif await page.get_by_text('"code":0').is_visible(): sign_res = f"签到时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n签到状态:签到成功" bilibili_logger.info(sign_res) msg.append(sign_res) else: sign_res = f"签到失败" bilibili_logger.error(sign_res) push_msg(bilibili_title, sign_res, bilibili_img) msg.append(sign_res) return "\n".join(msg) # 直播间打卡 async def live(page): msg = [] bilibili_live_rooms = await get_bilibili_live_rooms_from_pi() for room_id in list(bilibili_live_rooms.keys()): await page.goto(f"{bilibili_live_url}/{str(room_id)}") # 输入框选择 xpath 绝对定位 input_box = page.locator( "xpath=/html/body/div[3]/main/div[1]/section[1]/div[3]/div[12]/div/div[2]/div[2]/textarea", ) # 输入内容 await input_box.fill("1") # 发送按钮 send_button = page.locator( "xpath=/html/body/div[3]/main/div[1]/section[1]/div[3]/div[12]/div/div[3]/div/button/span" ) if await send_button.is_visible(): # 点击发送按钮 await send_button.click() room_msg = f"直播间:【{bilibili_live_rooms[room_id]}】打卡成功,打卡时间:【{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}】" bilibili_logger.info(room_msg) else: room_msg = f"直播间:【{bilibili_live_rooms[room_id]}】打卡失败,打卡时间:【{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}】" bilibili_logger.error(room_msg) msg.append(room_msg) return "\n".join(msg) async def bilibili_sign(): async with async_playwright() as playwright: browser = await playwright.firefox.launch(headless=browser_headless) context = await browser.new_context(storage_state=bilibili_cookie_file) # 签到 sign_page = await context.new_page() sign_task = asyncio.create_task(sign(sign_page)) await asyncio.gather(sign_task) await sign_page.close() # 直播间打卡 live_page = await context.new_page() live_task = asyncio.create_task(live(live_page)) await asyncio.gather(live_task) await live_page.close() # 退出 await browser.close() # 推送 sign_result = sign_task.result() live_result = live_task.result() # ""分割两个服务的消息 push_msg( title=bilibili_title, message="\n".join([sign_result, "", live_result]), img_url=bilibili_img, ) if __name__ == "__main__": asyncio.run(bilibili_sign())