Evilmass commited on
Commit
8786c0f
·
1 Parent(s): 917acfe

change pymysql to aiomysql

Browse files
_playwright/bilibili.py CHANGED
@@ -20,8 +20,6 @@ from __init__ import (
20
  bilibili_img,
21
  )
22
 
23
- bilibili_live_rooms = get_bilibili_live_rooms_from_pi()
24
-
25
 
26
  # 判断登录状态
27
  async def is_login(page):
@@ -68,6 +66,7 @@ async def live(page):
68
  if not await is_login(page):
69
  return "bilibili_not_login"
70
 
 
71
  for room_id in list(bilibili_live_rooms.keys()):
72
  await page.goto(f"{bilibili_live_url}/{str(room_id)}")
73
  # 输入框选择 xpath 绝对定位
@@ -83,10 +82,14 @@ async def live(page):
83
  if await send_button.is_visible():
84
  # 点击发送按钮
85
  await send_button.click()
86
- room_msg = f"直播间:【{bilibili_live_rooms[room_id]}】打卡成功,打卡时间:【{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}】"
 
 
87
  bilibili_logger.info(room_msg)
88
  else:
89
- room_msg = f"直播间:【{bilibili_live_rooms[room_id]}】打卡失败,打卡时间:【{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}】"
 
 
90
  bilibili_logger.error(room_msg)
91
  msg.append(room_msg)
92
 
 
20
  bilibili_img,
21
  )
22
 
 
 
23
 
24
  # 判断登录状态
25
  async def is_login(page):
 
66
  if not await is_login(page):
67
  return "bilibili_not_login"
68
 
69
+ bilibili_live_rooms = await get_bilibili_live_rooms_from_pi()
70
  for room_id in list(bilibili_live_rooms.keys()):
71
  await page.goto(f"{bilibili_live_url}/{str(room_id)}")
72
  # 输入框选择 xpath 绝对定位
 
82
  if await send_button.is_visible():
83
  # 点击发送按钮
84
  await send_button.click()
85
+ room_msg = (
86
+ f"直播间:【{bilibili_live_rooms[room_id]}】打卡成功,打卡时间:【{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}】"
87
+ )
88
  bilibili_logger.info(room_msg)
89
  else:
90
+ room_msg = (
91
+ f"直播间:【{bilibili_live_rooms[room_id]}】打卡失败,打卡时间:【{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}】"
92
+ )
93
  bilibili_logger.error(room_msg)
94
  msg.append(room_msg)
95
 
_requests/aliyundrive.py CHANGED
@@ -1,4 +1,6 @@
1
  # 接口参考:https://github.com/Anonym-w/autoSigninAliyun/blob/main/autoSignin.js
 
 
2
  import sys
3
  import requests
4
 
@@ -29,17 +31,21 @@ query_body = {}
29
 
30
  def get_refresh_token():
31
  # local storage only work in browser
32
- return False
33
 
34
 
35
- def validate_access_token():
36
  # token只有三小时有效期,所以定时任务刷新间隔为三小时
37
  # 从树莓派获取 refresh_token
38
- refresh_token = get_aliyundrive_refresh_token()
39
  # 查询结构体
40
  query_body.update({"grant_type": "refresh_token", "refresh_token": refresh_token})
41
  # validate
42
- resp = session.post(aliyundrive_access_token_url, headers=headers, json=query_body)
 
 
 
 
43
  if resp.status_code != 200:
44
  msg = "阿里云 access_token 失效"
45
  aliyundrive_logger.error(msg)
@@ -47,16 +53,18 @@ def validate_access_token():
47
  else:
48
  aliyundrive_logger.info("阿里云 access_token 有效")
49
  new_access_token = resp.json()["access_token"]
50
- update_aliyundrive_access_token(new_access_token)
51
  headers["Authorization"] = f"Bearer {new_access_token}"
52
- return new_access_token
53
- return False
54
 
55
 
56
  def get_aliyun_nickname():
57
  nickname = False
58
  # 获取用户名
59
- user_res = session.post(aliyundrive_nickname_url, headers=headers, json=query_body)
 
 
 
 
60
  if user_res.status_code == 200:
61
  user_msg = f"获取用户名成功,stdout:{user_res.json()}"
62
  nickname = user_res.json()["nick_name"]
@@ -69,7 +77,11 @@ def get_aliyun_nickname():
69
 
70
  def sign():
71
  # 签到
72
- sign_res = session.post(aliyundrive_signin_url, headers=headers, json=query_body)
 
 
 
 
73
  if sign_res.status_code != 200:
74
  sign_msg = f"阿里云盘签到失败,stderr:{sign_res.json()}"
75
  aliyundrive_logger.error(sign_msg)
@@ -95,7 +107,11 @@ def sign():
95
  # 领取奖励
96
  def get_reward(signin_count):
97
  query_body.update({"signInDay": signin_count})
98
- reward_res = session.post(aliyundrive_reward_url, headers=headers, json=query_body)
 
 
 
 
99
  if reward_res.status_code == 200:
100
  aliyundrive_logger.info("领取奖励成功")
101
  else:
@@ -106,8 +122,8 @@ def get_reward(signin_count):
106
  return False
107
 
108
 
109
- def aliyundrive_sign():
110
- validate_access_token()
111
  nickname = get_aliyun_nickname()
112
  is_reward, signin_count, curent_reward = sign()
113
  if not is_reward:
@@ -115,8 +131,7 @@ def aliyundrive_sign():
115
  message = f"{nickname} 签到成功,本月累计签到 {signin_count} 天\n本次签到获得:{curent_reward}"
116
  aliyundrive_logger.info(message)
117
  push_msg(title=aliyundrive_title, message=message, img_url=aliyundrive_img)
118
- return message
119
 
120
 
121
  if __name__ == "__main__":
122
- aliyundrive_sign()
 
1
  # 接口参考:https://github.com/Anonym-w/autoSigninAliyun/blob/main/autoSignin.js
2
+
3
+ import asyncio
4
  import sys
5
  import requests
6
 
 
31
 
32
  def get_refresh_token():
33
  # local storage only work in browser
34
+ pass
35
 
36
 
37
+ async def validate_access_token():
38
  # token只有三小时有效期,所以定时任务刷新间隔为三小时
39
  # 从树莓派获取 refresh_token
40
+ refresh_token = await get_aliyundrive_refresh_token()
41
  # 查询结构体
42
  query_body.update({"grant_type": "refresh_token", "refresh_token": refresh_token})
43
  # validate
44
+ resp = session.post(
45
+ aliyundrive_access_token_url,
46
+ headers=headers,
47
+ json=query_body,
48
+ )
49
  if resp.status_code != 200:
50
  msg = "阿里云 access_token 失效"
51
  aliyundrive_logger.error(msg)
 
53
  else:
54
  aliyundrive_logger.info("阿里云 access_token 有效")
55
  new_access_token = resp.json()["access_token"]
56
+ await update_aliyundrive_access_token(new_access_token)
57
  headers["Authorization"] = f"Bearer {new_access_token}"
 
 
58
 
59
 
60
  def get_aliyun_nickname():
61
  nickname = False
62
  # 获取用户名
63
+ user_res = session.post(
64
+ aliyundrive_nickname_url,
65
+ headers=headers,
66
+ json=query_body,
67
+ )
68
  if user_res.status_code == 200:
69
  user_msg = f"获取用户名成功,stdout:{user_res.json()}"
70
  nickname = user_res.json()["nick_name"]
 
77
 
78
  def sign():
79
  # 签到
80
+ sign_res = session.post(
81
+ aliyundrive_signin_url,
82
+ headers=headers,
83
+ json=query_body,
84
+ )
85
  if sign_res.status_code != 200:
86
  sign_msg = f"阿里云盘签到失败,stderr:{sign_res.json()}"
87
  aliyundrive_logger.error(sign_msg)
 
107
  # 领取奖励
108
  def get_reward(signin_count):
109
  query_body.update({"signInDay": signin_count})
110
+ reward_res = session.post(
111
+ aliyundrive_reward_url,
112
+ headers=headers,
113
+ json=query_body,
114
+ )
115
  if reward_res.status_code == 200:
116
  aliyundrive_logger.info("领取奖励成功")
117
  else:
 
122
  return False
123
 
124
 
125
+ async def aliyundrive_sign():
126
+ await validate_access_token()
127
  nickname = get_aliyun_nickname()
128
  is_reward, signin_count, curent_reward = sign()
129
  if not is_reward:
 
131
  message = f"{nickname} 签到成功,本月累计签到 {signin_count} 天\n本次签到获得:{curent_reward}"
132
  aliyundrive_logger.info(message)
133
  push_msg(title=aliyundrive_title, message=message, img_url=aliyundrive_img)
 
134
 
135
 
136
  if __name__ == "__main__":
137
+ asyncio.run(aliyundrive_sign())
api.py CHANGED
@@ -17,7 +17,6 @@ from _playwright.vits import vits_sign
17
  from _playwright.tsdm import tsdm_sign
18
  from _playwright.wuaipojie import wuaipojie_sign
19
  from _requests.aliyundrive import aliyundrive_sign
20
- from utils import get_cookie_from_pi
21
 
22
 
23
  def tick():
@@ -26,8 +25,6 @@ def tick():
26
 
27
  def run_scheduler(loop):
28
  asyncio.set_event_loop(loop)
29
- # pi
30
- get_cookie_from_pi()
31
  # add task
32
  sign_scheduler = AsyncIOScheduler()
33
  sign_scheduler.configure(timezone=pytz.timezone("Asia/Shanghai"))
 
17
  from _playwright.tsdm import tsdm_sign
18
  from _playwright.wuaipojie import wuaipojie_sign
19
  from _requests.aliyundrive import aliyundrive_sign
 
20
 
21
 
22
  def tick():
 
25
 
26
  def run_scheduler(loop):
27
  asyncio.set_event_loop(loop)
 
 
28
  # add task
29
  sign_scheduler = AsyncIOScheduler()
30
  sign_scheduler.configure(timezone=pytz.timezone("Asia/Shanghai"))
app.py CHANGED
@@ -10,7 +10,7 @@ from fastapi.responses import Response
10
  from loguru import logger
11
 
12
  from __init__ import DIR_PATH, api_token
13
- from utils import push_msg
14
  from api import start_daemon_scheduler, kill_daemon_scheduler
15
 
16
  schedule_loop = asyncio.new_event_loop()
@@ -26,10 +26,11 @@ def home():
26
 
27
  # 开启定时任务
28
  @app.get("/run")
29
- def do_run(token: str):
30
  if token != api_token:
31
  return {"code": 422}
32
 
 
33
  start_daemon_scheduler(schedule_loop)
34
  res = {"code": 0, "data": "start scheduler"}
35
  return res
@@ -53,9 +54,7 @@ async def do_log(token: str, name: str):
53
  return {"code": 422}
54
 
55
  log_file = join(DIR_PATH, f"log/{name}.log")
56
- with open(log_file, "r", encoding="utf-8") as f:
57
- data = f.read()
58
-
59
  res = {"code": 200, "data": data}
60
  return res
61
 
 
10
  from loguru import logger
11
 
12
  from __init__ import DIR_PATH, api_token
13
+ from utils import push_msg, read_logfile, get_cookie_from_pi
14
  from api import start_daemon_scheduler, kill_daemon_scheduler
15
 
16
  schedule_loop = asyncio.new_event_loop()
 
26
 
27
  # 开启定时任务
28
  @app.get("/run")
29
+ async def do_run(token: str):
30
  if token != api_token:
31
  return {"code": 422}
32
 
33
+ await get_cookie_from_pi()
34
  start_daemon_scheduler(schedule_loop)
35
  res = {"code": 0, "data": "start scheduler"}
36
  return res
 
54
  return {"code": 422}
55
 
56
  log_file = join(DIR_PATH, f"log/{name}.log")
57
+ data = await read_logfile(log_file)
 
 
58
  res = {"code": 200, "data": data}
59
  return res
60
 
config/settings_prod.py CHANGED
@@ -13,7 +13,9 @@ from option_logger import register_logger
13
  # playwright
14
  browser_headless = True
15
  browser_proxy = {"server": None}
16
- user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
 
 
17
  ignore_https_errors = True
18
 
19
  # api
@@ -74,9 +76,7 @@ bilibili_title = "Bilibili"
74
  bilibili_logger = register_logger("bilibili", logfile_mapping["bilibili"])
75
  bilibili_cookie_file = join(PROD_PATH, "cookie/bilibili.json")
76
  bilibili_url = "https://account.bilibili.com/account/home"
77
- bilibili_sign_url = (
78
- "https://api.live.bilibili.com/xlive/web-ucenter/v1/sign/DoSign" # 签到地址
79
- )
80
  bilibili_live_url = "https://live.bilibili.com" # 直播间地址
81
  bilibili_live_rooms = os.environ.get("bilibili_live_rooms")
82
  bilibili_img = "https://img.evimo.top/bilibili.png"
 
13
  # playwright
14
  browser_headless = True
15
  browser_proxy = {"server": None}
16
+ user_agent = (
17
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
18
+ )
19
  ignore_https_errors = True
20
 
21
  # api
 
76
  bilibili_logger = register_logger("bilibili", logfile_mapping["bilibili"])
77
  bilibili_cookie_file = join(PROD_PATH, "cookie/bilibili.json")
78
  bilibili_url = "https://account.bilibili.com/account/home"
79
+ bilibili_sign_url = "https://api.live.bilibili.com/xlive/web-ucenter/v1/sign/DoSign" # 签到地址
 
 
80
  bilibili_live_url = "https://live.bilibili.com" # 直播间地址
81
  bilibili_live_rooms = os.environ.get("bilibili_live_rooms")
82
  bilibili_img = "https://img.evimo.top/bilibili.png"
option_mysql.py CHANGED
@@ -1,54 +1,67 @@
1
- import pymysql
2
-
3
-
4
- class OptionMysql(object):
5
- def __init__(self, options):
6
- host = options["HOST"]
7
- user = options["USERNAME"]
8
- password = options["PASSWORD"]
9
- database = options["DATABASE"]
10
- port = options["PORT"]
11
- charset = "utf8"
12
- # 连接数据库
13
- self.conn = pymysql.connect(
14
- host=host,
15
- port=port,
16
- user=user,
17
- password=password,
18
- database=database,
19
- charset=charset,
20
- )
21
- # 创建游标
22
- self.cur = self.conn.cursor()
23
- self.dict_cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
24
-
25
- def __exit__(self):
26
- self.cur.close()
27
- self.dict_cur.close()
28
- self.conn.close()
29
-
30
- def get_dict_data(self, sql, params=[]):
31
- """查询,返回字典类型"""
 
 
 
32
  try:
33
- if params:
34
- self.dict_cur.execute(sql, params)
35
- else:
36
- self.dict_cur.execute(sql)
37
- data = self.dict_cur.fetchall()
38
- return data
39
- except Exception as e:
40
- self.conn.rollback()
41
- raise e
42
-
43
- def update_data(self, sql, params=[]):
44
- """更新"""
 
 
45
  try:
46
- if not params:
47
- self.cur.execute(sql)
48
- else:
49
- self.cur.execute(sql, params)
50
- self.conn.commit()
51
- except Exception as e:
52
- self.conn.rollback()
53
- raise e
54
- return True
 
 
 
 
 
 
 
 
 
1
+ import aiomysql
2
+
3
+ from __init__ import mysql_option
4
+
5
+
6
+ class AIOMYSQL:
7
+ def __init__(self) -> None:
8
+ self.pool = None
9
+
10
+ async def init_pool(self):
11
+ try:
12
+ __pool = await aiomysql.create_pool(
13
+ host=mysql_option["HOST"],
14
+ port=mysql_option["PORT"],
15
+ user=mysql_option["USERNAME"],
16
+ password=mysql_option["PASSWORD"],
17
+ db=mysql_option["DATABASE"],
18
+ charset="utf8",
19
+ autocommit=False,
20
+ minsize=5,
21
+ maxsize=10,
22
+ cursorclass=aiomysql.DictCursor,
23
+ )
24
+ return __pool
25
+ except:
26
+ raise ("aiomysql create_pool error")
27
+
28
+ async def get_cursor(self):
29
+ conn = await self.pool.acquire()
30
+ cur = await conn.cursor()
31
+ return conn, cur
32
+
33
+ async def query(self, sql, param=None):
34
+ conn, cur = await self.get_cursor()
35
  try:
36
+ await cur.execute(sql, param)
37
+ except:
38
+ await cur.rollback()
39
+ print("aiomysql query error")
40
+ else:
41
+ return await cur.fetchall()
42
+ finally:
43
+ if cur:
44
+ await cur.close()
45
+ # 释放 conn 连接回连接池
46
+ await self.pool.release(conn)
47
+
48
+ async def update(self, sql, param=None):
49
+ conn, cur = await self.get_cursor()
50
  try:
51
+ await cur.execute(sql, param)
52
+ except:
53
+ await cur.rollback()
54
+ print("aiomysql query error")
55
+ else:
56
+ await conn.commit()
57
+ finally:
58
+ if cur:
59
+ await cur.close()
60
+ # 释放 conn 连接回连接池
61
+ await self.pool.release(conn)
62
+
63
+
64
+ async def get_aiomysql_instance():
65
+ instance = AIOMYSQL()
66
+ instance.pool = await instance.init_pool()
67
+ return instance
requirements.txt CHANGED
@@ -10,6 +10,8 @@ uvicorn
10
  fastapi
11
  pytz
12
  pymysql
 
 
13
  #ddddocr
14
  #Pillow
15
  #selenium
 
10
  fastapi
11
  pytz
12
  pymysql
13
+ aiomysql
14
+ aiofiles
15
  #ddddocr
16
  #Pillow
17
  #selenium
utils.py CHANGED
@@ -1,8 +1,7 @@
1
  # coding:utf-8
2
- import os
3
- import sys
4
  import socket
5
  import asyncio
 
6
  import psutil
7
  import requests
8
 
@@ -12,13 +11,12 @@ from loguru import logger
12
  from playwright.async_api import async_playwright, Page
13
  from playwright_stealth import stealth_async
14
 
15
- from option_mysql import OptionMysql
16
  from __init__ import (
17
  gotify_host,
18
  gotify_port,
19
  gotify_token,
20
  gotify_img,
21
- mysql_option,
22
  DIR_PATH,
23
  DEBUG,
24
  browser_headless,
@@ -52,7 +50,7 @@ async def init_page(storage_state=None, browser_type: str = "chromium") -> Page:
52
  page = await context.new_page()
53
  # 隐藏 webdriver 特征
54
  await stealth_async(page)
55
- # await page.pause() # async 跳出上下文就会 playwright._impl._api_types.Error: Connection closed
56
  await goto_github(page)
57
  await dump_cookie(page) # 创建函数:一个 browser 对应一个 context、page,在 with 内完成多个任务
58
  await browser.close()
@@ -63,22 +61,14 @@ async def goto_github(page):
63
 
64
 
65
  async def dump_cookie(page):
66
- # url = "https://www.52pojie.cn/home.php"
67
- # url = "https://v2ex.com"
68
- # url = "https://www.bilibili.com"
69
- # url = "https://www.tsdm39.com/forum.php?mobile=no"
70
- # url = "https://www.aliyundrive.com/drive"
71
  url = "https://huggingface.co/login"
72
  save_cookie = join(DIR_PATH, f"{get_domain(url)}.json")
73
  await page.goto(url)
74
  logger.debug(DIR_PATH)
75
- await page.screenshot(
76
- path=join(DIR_PATH, "screenshot.png"),
77
- full_page=True,
78
- ) # 截图
79
- while input("input c to continue: ") != "c": # 阻塞等待填写登录信息
80
  continue
81
- await page.context.storage_state(path=save_cookie) # 保存 Cookie 到文件
82
 
83
 
84
  # 获取容器内 IP
@@ -101,91 +91,88 @@ def get_domain(url: str) -> str:
101
 
102
 
103
  # 从域名获取 IP
104
- def get_domain_ip(host: str) -> str:
105
  domain_ip = socket.gethostbyname(host)
106
  return domain_ip
107
 
108
 
109
  # gotify 推送
110
- def push_msg(title: str = "无标题", message: str = "无内容", img_url: str = gotify_img):
111
- gotify_ip = get_domain_ip(gotify_host)
112
  url = f"http://{gotify_ip}:{gotify_port}/message?token={gotify_token}"
113
- resp = requests.post(
114
- url,
115
- json={
116
- "title": title,
117
- "message": message,
118
- "priority": 10,
119
- "extras": {
120
- "client::display": {"contentType": "text/markdown"},
121
- "client::notification": {"bigImageUrl": img_url},
122
- },
123
  },
124
- )
125
- return resp.json()
 
126
 
127
 
128
- def get_bilibili_live_rooms_from_pi() -> dict:
129
- logger.info("getting bilibili_live_rooms from pi")
130
- bilibili_live_rooms = {}
131
- mysql = OptionMysql(mysql_option)
132
  sql = f"SELECT room_id, username FROM live_rooms"
133
- data = mysql.get_dict_data(sql)
134
- if data:
135
- for d in data:
136
- bilibili_live_rooms[d["room_id"]] = d["username"]
137
  return bilibili_live_rooms
138
 
139
 
140
- def get_cookie_from_pi() -> dict:
141
- logger.info("getting cookie from pi")
142
- mysql = OptionMysql(mysql_option)
143
  sql = f"SELECT cookie_name, cookie_value FROM cookie"
144
- data = mysql.get_dict_data(sql)
 
145
  for d in data:
146
  log_file = join(DIR_PATH, f"cookie/{d['cookie_name']}.json")
147
- with open(log_file, "w") as lf:
148
- lf.write(d["cookie_value"])
149
  return data[0]["cookie_name"]
150
 
151
 
152
- def get_aliyundrive_refresh_token():
153
- mysql = OptionMysql(mysql_option)
154
  sql = f"SELECT token FROM aliyundrive WHERE id=1"
155
- data = mysql.get_dict_data(sql)
 
156
  if data:
157
  logger.info("获取阿里云盘 refresh_token 成功")
158
  return data[0]["token"]
159
  return False
160
 
161
 
162
- def update_aliyundrive_access_token(access_token):
163
- mysql = OptionMysql(mysql_option)
164
  sql = f"UPDATE aliyundrive SET token=%s WHERE id=2"
165
- data = mysql.update_data(sql, [access_token])
 
166
  if data:
167
  logger.info("update aliyundrive access token")
168
  return True
169
  return False
170
 
171
 
172
- def get_log(log_file: str) -> str:
173
- with open(log_file, "r", encoding="utf-8") as f:
174
- data = f.read()
175
  return data
176
 
177
 
 
 
 
 
 
 
 
 
 
 
 
178
  if __name__ == "__main__":
179
  logger.debug(f"DEBUG Mode: {DEBUG}")
180
  logger.debug(f"DIR_PATH: {DIR_PATH}")
181
- # asyncio.run(init_page())
182
- # logger.debug(f"get_domain_ip: {get_domain_ip(gotify_host)}")
183
- # logger.debug(f'get_domain: {get_domain("https://github.com")}')
184
- # logger.debug(f"list_path: {list_path()}")
185
  logger.debug(f'push_msg: {push_msg("utils.py", "hello world")}')
186
- # logger.debug(f"get_aliyun_token_from_pi(): {get_aliyun_token_from_pi()}")
187
- # logger.debug(f"get_bilibili_live_rooms_from_pi(): {get_bilibili_live_rooms_from_pi()}")
188
- # logger.debug(f"get_cookie_from_pi(): {get_cookie_from_pi()}")
189
- # logger.debug(f'get_cron_log(): {get_cron_log("log/tsdm.log")}')
190
- # logger.debug(f"get_aliyundrive_access_token: {get_aliyundrive_access_token()}")
191
- # logger.debug(f"update_aliyundrive_access_token: {update_aliyundrive_access_token('123')}")
 
1
  # coding:utf-8
 
 
2
  import socket
3
  import asyncio
4
+ import aiofiles
5
  import psutil
6
  import requests
7
 
 
11
  from playwright.async_api import async_playwright, Page
12
  from playwright_stealth import stealth_async
13
 
14
+ from option_mysql import get_aiomysql_instance
15
  from __init__ import (
16
  gotify_host,
17
  gotify_port,
18
  gotify_token,
19
  gotify_img,
 
20
  DIR_PATH,
21
  DEBUG,
22
  browser_headless,
 
50
  page = await context.new_page()
51
  # 隐藏 webdriver 特征
52
  await stealth_async(page)
53
+ # await page.pause() # async_playwright 跳出上下文就会 playwright._impl._api_types.Error: Connection closed
54
  await goto_github(page)
55
  await dump_cookie(page) # 创建函数:一个 browser 对应一个 context、page,在 with 内完成多个任务
56
  await browser.close()
 
61
 
62
 
63
  async def dump_cookie(page):
 
 
 
 
 
64
  url = "https://huggingface.co/login"
65
  save_cookie = join(DIR_PATH, f"{get_domain(url)}.json")
66
  await page.goto(url)
67
  logger.debug(DIR_PATH)
68
+ await page.screenshot(path=join(DIR_PATH, "screenshot.png"), full_page=True) # 截图
69
+ while input("input c to continue: ") != "c": # 阻塞等待填完登录信息
 
 
 
70
  continue
71
+ await page.context.storage_state(path=save_cookie) # 保存 Cookie 到本地
72
 
73
 
74
  # 获取容器内 IP
 
91
 
92
 
93
  # 从域名获取 IP
94
+ def get_pi_ip(host: str) -> str:
95
  domain_ip = socket.gethostbyname(host)
96
  return domain_ip
97
 
98
 
99
  # gotify 推送
100
+ def push_msg(title: str = "无标题", message: str = "无内容", img_url: str = gotify_img) -> dict:
101
+ gotify_ip = get_pi_ip(gotify_host)
102
  url = f"http://{gotify_ip}:{gotify_port}/message?token={gotify_token}"
103
+ data = {
104
+ "title": title,
105
+ "message": message,
106
+ "priority": 10,
107
+ "extras": {
108
+ "client::display": {"contentType": "text/markdown"},
109
+ "client::notification": {"bigImageUrl": img_url},
 
 
 
110
  },
111
+ }
112
+ resp = requests.post(url=url, json=data, timeout=20)
113
+ return {"code": resp.status_code, "res": resp.content}
114
 
115
 
116
+ async def get_bilibili_live_rooms_from_pi() -> dict:
 
 
 
117
  sql = f"SELECT room_id, username FROM live_rooms"
118
+ pool = await get_aiomysql_instance()
119
+ data = await pool.query(sql)
120
+ bilibili_live_rooms = data[0]
 
121
  return bilibili_live_rooms
122
 
123
 
124
+ async def get_cookie_from_pi() -> dict:
 
 
125
  sql = f"SELECT cookie_name, cookie_value FROM cookie"
126
+ pool = await get_aiomysql_instance()
127
+ data = await pool.query(sql)
128
  for d in data:
129
  log_file = join(DIR_PATH, f"cookie/{d['cookie_name']}.json")
130
+ async with aiofiles.open(log_file, mode="w") as handle:
131
+ await handle.write(d["cookie_value"])
132
  return data[0]["cookie_name"]
133
 
134
 
135
+ async def get_aliyundrive_refresh_token():
 
136
  sql = f"SELECT token FROM aliyundrive WHERE id=1"
137
+ pool = await get_aiomysql_instance()
138
+ data = await pool.query(sql)
139
  if data:
140
  logger.info("获取阿里云盘 refresh_token 成功")
141
  return data[0]["token"]
142
  return False
143
 
144
 
145
+ async def update_aliyundrive_access_token(access_token):
 
146
  sql = f"UPDATE aliyundrive SET token=%s WHERE id=2"
147
+ pool = await get_aiomysql_instance()
148
+ data = await pool.update(sql, [access_token])
149
  if data:
150
  logger.info("update aliyundrive access token")
151
  return True
152
  return False
153
 
154
 
155
+ async def read_logfile(log_file: str) -> str:
156
+ async with aiofiles.open(log_file, mode="r", encoding="utf-8") as f:
157
+ data = await f.read()
158
  return data
159
 
160
 
161
+ async def main():
162
+ # await init_page()
163
+ # res = await get_bilibili_live_rooms_from_pi()
164
+ # res = await get_cookie_from_pi()
165
+ # res = await get_aliyundrive_refresh_token()
166
+ # res = await update_aliyundrive_access_token("no use")
167
+ # res = await get_bilibili_live_rooms_from_pi()
168
+ res = await read_logfile("log/bilibili.log")
169
+ logger.debug(f"utils test: {res}")
170
+
171
+
172
  if __name__ == "__main__":
173
  logger.debug(f"DEBUG Mode: {DEBUG}")
174
  logger.debug(f"DIR_PATH: {DIR_PATH}")
175
+ logger.debug(f"get_pi_ip: {get_pi_ip(gotify_host)}")
176
+ logger.debug(f'get_domain: {get_domain("https://github.com")}')
 
 
177
  logger.debug(f'push_msg: {push_msg("utils.py", "hello world")}')
178
+ # asyncio.run(main())