mckabue commited on
Commit
261a211
Β·
1 Parent(s): bd9ee95

Add utility functions for Instagram username validation and availability checking

Browse files
Files changed (3) hide show
  1. app.py +27 -206
  2. index.html β†’ templates/index.html +0 -0
  3. utils.py +130 -0
app.py CHANGED
@@ -1,216 +1,37 @@
1
- import sys
2
- import os
 
 
3
 
4
- # sys.path.append(os.path.abspath("../../../"))
5
 
6
- import inspect
7
- from typing import Callable, Literal
8
- from quart import Quart, send_from_directory
9
- from quart import Quart, Response, request
10
- import requests
11
- from bs4 import BeautifulSoup
12
- import re
13
- import asyncio
14
- from typing import Any, Callable, Coroutine
15
- # from python_utils.get_browser import get_browser_page_async
16
- import re
17
- from requests_tor import RequestsTor
18
 
19
- app = Quart(__name__)
20
 
21
-
22
- # Your custom JavaScript to inject
23
- CUSTOM_SCRIPT = """
24
- <script>
25
- // Add your custom logic here
26
- console.log('Custom script loaded in website B');
27
- // Example: Send message to parent window
28
- window.parent.postMessage('Hello from website B', '*');
29
- </script>
30
- """
31
-
32
- @app.route('/instagram/<path:path>', methods=['GET'])
33
- async def proxy(path: str):
34
- # Construct the full URL for website B
35
- site_b_url = f"https://www.instagram.com/{path}/" # f"https://websiteB.com/{path}" # Replace with actual domain
36
-
37
- try:
38
- # Forward the request to website B
39
- response = requests.get(
40
- site_b_url,
41
- # headers={k: v for k, v in request.headers if k.lower() != 'host'},
42
- # cookies=request.cookies,
43
- allow_redirects=False
44
- )
45
- resp = Response(
46
- response.content,
47
- status=response.status_code
48
- )
49
-
50
- # Forward original headers while maintaining CORS
51
- excluded_headers = ['content-encoding', 'content-length', 'transfer-encoding', 'connection']
52
- headers = [(k, v) for k, v in response.headers.items()
53
- if k.lower() not in excluded_headers]
54
-
55
- # Preserve CORS headers from original response
56
- cors_headers = ['access-control-allow-origin',
57
- 'access-control-allow-methods',
58
- 'access-control-allow-headers',
59
- 'access-control-allow-credentials']
60
-
61
- for header in headers:
62
- if header[0].lower() in cors_headers:
63
- resp.headers[header[0]] = header[1]
64
-
65
- return resp
66
-
67
- except requests.RequestException as e:
68
- return f"Error fetching content: {str(e)}", 500
69
-
70
- @app.route('/')
71
  async def index():
72
- """Route handler for the home page"""
73
  try:
74
- return await send_from_directory('.', 'index.html')
75
  except Exception as e:
76
  return str(e)
77
 
78
- @app.route('/check/<platform>/<username>', methods=['GET'])
79
  async def check_social_media_handle(platform: str, username: str):
80
- logs = []
81
- logger = lambda key, value: logs.append({ "key": key, "value": value })
82
- response = {}
83
- match platform.lower():
84
- case "instagram":
85
- response = await async_availability_status(
86
- resolve = resolve_instagram_username(username, logger),
87
- logger = logger,
88
- message = (
89
- f'username <b>"{username}"</b> is βœ… Available.'
90
- ' However, usernames from disabled or deleted accounts may also appear'
91
- ' available but you can\'t choose them, eg: usernames like <b>"we"</b>, <b>"us"</b>, and <b>"the"</b>.'
92
- ' Go to <a target="_blank" href="https://accountscenter.instagram.com/">accounts center</a>'
93
- " and try changing the username of an existing account and see if it it's available"))
94
- case "linkedin-user":
95
- response = await async_availability_status(
96
- resolve = resolve_linkedin_username(username, "in", logger),
97
- logger = logger,
98
- message = (
99
- f'username <b>"{username}"</b> is βœ… Available.'
100
- ' However, usernames from private or deleted accounts will appear'
101
- " available, login into LinkedIn and go to"
102
- f" \"https://www.linkedin.com/in/{username}\" and see if it it's available"))
103
- case "linkedin-page":
104
- response = await async_availability_status(
105
- resolve = resolve_linkedin_username(username, "company", logger),
106
- logger = logger)
107
- case _:
108
- response = {
109
- "message": f'❌ The platform "{platform}" is not supported'
110
- }
111
- return {**response, "logs": logs}
112
-
113
- def resolve_instagram_username(
114
- username: str, logger: Callable[[str, str], None]) -> tuple[str, bool, str]:
115
- def get_json_value(page_source, key, value_pattern):
116
- pattern = rf'[\'"]?{key}[\'"]?\s*:\s*[\'"]?({value_pattern})[\'"]?'
117
- match = re.search(pattern, page_source, flags=re.IGNORECASE)
118
- return match.group(1) if match else None
119
- def is_valid_instagram_username(username):
120
- """
121
- Validates an Instagram username based on their username rules:
122
- - 1 to 30 characters long
123
- - Can contain letters (a-z), numbers (0-9), and periods/underscores
124
- - Cannot start or end with a period
125
- - Cannot have consecutive periods
126
- - Cannot have periods next to underscores
127
- - Can start or end with underscore
128
- """
129
- # Regex pattern for Instagram username validation
130
- pattern = r'^(?!.*\.\.)(?!.*\._)(?!.*_\.)[a-zA-Z0-9._][a-zA-Z0-9._]{0,28}[a-zA-Z0-9._]$'
131
- # Additional length check since regex alone might not handle it perfectly
132
- if len(username) < 1 or len(username) > 30:
133
- return False
134
- return re.match(pattern, username) is not None
135
- def resolve() -> bool:
136
- if not is_valid_instagram_username(username):
137
- raise Exception(f'"{username}" is not a valid instagram username')
138
- profile_uri = f"https:/www.instagram.com/{username}/"
139
- profile_response = requests.get(profile_uri, allow_redirects = False)
140
- profile_response_username = get_json_value(profile_response.text, "username", "\w+") or ""
141
- logger("profile_response_username", profile_response_username)
142
- _return_result = lambda is_available: (username, is_available, profile_uri)
143
- # if there is a username in the page, then this is likely an existing account
144
- if profile_response_username.lower().strip() == username.lower().strip():
145
- return _return_result(True)
146
- x_ig_app_id = get_json_value(profile_response.text, "X-IG-App-ID", "\d+")
147
- web_profile_response = requests.get(
148
- url=f"https://www.instagram.com/api/v1/users/web_profile_info/?username={username}",
149
- headers={
150
- "x-ig-app-id": x_ig_app_id,
151
- },
152
- allow_redirects = False)
153
- logger("web_profile_response.status_code", web_profile_response.status_code)
154
- # if status is 404, then the account doesnt exist!
155
- is_html = re.match(r'.*(\w+)/html', web_profile_response.headers.get("Content-Type"))
156
- if web_profile_response.status_code == 404 and is_html:
157
- return _return_result(False)
158
- # if status is 200, check status of the json
159
- is_json = re.match(r'.*(\w+)/json', web_profile_response.headers.get("Content-Type"))
160
- json_status = (web_profile_response.json() or {}).get('status') == 'ok' if is_json else False
161
- return _return_result(web_profile_response.status_code == 200 and json_status)
162
- return resolve
163
-
164
- def resolve_linkedin_username(
165
- username: str, company_or_user: Literal["company", "in"],
166
- logger: Callable[[str, str], None],) -> tuple[str, bool, str]:
167
- async def resolve() -> tuple[str, bool, str]:
168
- # can replace "www." with "de.", ".ke", ".ug", etc
169
- # inkedin private user => kamau
170
- uri: str = f"https://www.linkedin.com/{company_or_user}/{username}"
171
- page, close = await get_browser_page_async()
172
- response = None
173
- async def capture_response(resp):
174
- nonlocal response
175
- if uri in resp.url:
176
- response = resp
177
- page.on("response", capture_response)
178
- await page.goto("https://www.linkedin.com/")
179
- await page.evaluate(f"""
180
- fetch("{uri}", {{ "mode": "no-cors", "credentials": "include" }})
181
- """)
182
- await close()
183
- return (username, response.ok, uri)
184
- return resolve
185
-
186
- async def async_availability_status(
187
- resolve: Callable[[str], Coroutine[Any, Any, bool]],
188
- logger: Callable[[str, str], None],
189
- message: str = None):
190
- try:
191
- username_is_available_uri: tuple[str, bool, str] = await resolve()\
192
- if inspect.iscoroutinefunction(resolve) or inspect.isawaitable(resolve)\
193
- else await asyncio.to_thread(resolve)
194
- logger("username_is_available_uri", username_is_available_uri)
195
- username, is_available, uri = username_is_available_uri
196
- if is_available == True:
197
- return {
198
- 'available': False,
199
- 'message': f"{username}: ❌ Taken",
200
- 'url': uri
201
- }
202
- if message:
203
- return {
204
- 'available': True,
205
- 'message': message,
206
- 'url': uri
207
- }
208
- else:
209
- return {
210
- 'available': True,
211
- 'message': f"{username}: βœ… Available",
212
- 'url': uri
213
- }
214
- except Exception as e:
215
- logger(f"{async_availability_status.__name__}:Exception", str(e))
216
- return { 'message': f"❌ {str(e)}" }
 
1
+ from fastapi import FastAPI
2
+ from fastapi.staticfiles import StaticFiles
3
+ from fastapi.responses import FileResponse
4
+ from .utils import get_socials, get_logger, availability_response
5
 
6
+ app = FastAPI()
7
 
8
+ # Mount the entire static directory
9
+ app.mount("/static", StaticFiles(directory="static"), name="static")
 
 
 
 
 
 
 
 
 
 
10
 
11
+ socials = get_socials()
12
 
13
+ @app.get('/')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  async def index():
 
15
  try:
16
+ return FileResponse('static/index.html')
17
  except Exception as e:
18
  return str(e)
19
 
20
+ @app.get('/check/{platform}/{username}')
21
  async def check_social_media_handle(platform: str, username: str):
22
+ social = next(i for i in socials if i.get('id') == platform)
23
+ if social is None:
24
+ return {
25
+ "message": f'❌ The platform "{platform}" is not supported'
26
+ }
27
+ return await _resolve(username, **social)
28
+
29
+ async def _resolve(platform: str, username: str, *, validate, resolve, message = None):
30
+ if not validate(username):
31
+ raise Exception(f'"{username}" is not a valid {platform} handle/username')
32
+ logs, logger = get_logger()
33
+ response = await availability_response(
34
+ resolve = resolve(username, logger),
35
+ logger = logger,
36
+ message = message(username) if message else None)
37
+ return { **response, "logs": logs }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
index.html β†’ templates/index.html RENAMED
File without changes
utils.py ADDED
@@ -0,0 +1,130 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+ import inspect
3
+ import requests
4
+ import re
5
+ import asyncio
6
+ from typing import Any, Callable, Coroutine
7
+
8
+ def get_socials():
9
+ return [
10
+ {
11
+ "id": "instagram",
12
+ "name": "Instagram",
13
+ "img": "https://cdn.jsdelivr.net/npm/simple-icons@v6/icons/instagram.svg",
14
+ "validate": is_valid_instagram_username,
15
+ "resolve": resolve_instagram_username,
16
+ "message": lambda username: (
17
+ f'username <b>"{username}"</b> is βœ… Available.'
18
+ ' However, usernames from disabled or deleted accounts may also appear'
19
+ ' available but you can\'t choose them, eg: usernames like <b>"we"</b>, <b>"us"</b>, and <b>"the"</b>.'
20
+ ' Go to <a target="_blank" href="https://accountscenter.instagram.com/">accounts center</a>'
21
+ " and try changing the username of an existing account and see if it it's available")
22
+ },
23
+ # {
24
+ # id: "x",
25
+ # name: "X (formerly Twitter)",
26
+ # img: "https://cdn.jsdelivr.net/npm/simple-icons@v6/icons/twitter.svg"
27
+ # },
28
+ # {
29
+ # id: "linkedin-user",
30
+ # name: "LinkedIn User",
31
+ # img: "https://cdn.jsdelivr.net/npm/simple-icons@v6/icons/linkedin.svg",
32
+ # message = (
33
+ # f'username <b>"{username}"</b> is βœ… Available.'
34
+ # ' However, usernames from private or deleted accounts will appear'
35
+ # " available, login into LinkedIn and go to"
36
+ # f" \"https://www.linkedin.com/in/{username}\" and see if it it's available"))
37
+ # },
38
+ # {
39
+ # id: "linkedin-page",
40
+ # name: "LinkedIn Company",
41
+ # img: "https://cdn.jsdelivr.net/npm/simple-icons@v6/icons/linkedin.svg"
42
+ # }
43
+ ]
44
+
45
+ def is_valid_instagram_username(username):
46
+ """
47
+ Validates an Instagram username based on their username rules:
48
+ - 1 to 30 characters long
49
+ - Can contain letters (a-z), numbers (0-9), and periods/underscores
50
+ - Cannot start or end with a period
51
+ - Cannot have consecutive periods
52
+ - Cannot have periods next to underscores
53
+ - Can start or end with underscore
54
+ """
55
+ # Regex pattern for Instagram username validation
56
+ pattern = r'^(?!.*\.\.)(?!.*\._)(?!.*_\.)[a-zA-Z0-9._][a-zA-Z0-9._]{0,28}[a-zA-Z0-9._]$'
57
+ # Additional length check since regex alone might not handle it perfectly
58
+ if len(username) < 1 or len(username) > 30:
59
+ return False
60
+ return re.match(pattern, username) is not None
61
+
62
+ def get_logger() -> tuple[list[dict[str, str]], Callable[[str, str], None]]:
63
+ logs = []
64
+ return logs, lambda key, value: logs.append({ "key": key, "value": value })
65
+
66
+ def get_json_value(page_source, key, value_pattern):
67
+ pattern = rf'[\'"]?{key}[\'"]?\s*:\s*[\'"]?({value_pattern})[\'"]?'
68
+ match = re.search(pattern, page_source, flags=re.IGNORECASE)
69
+ return match.group(1) if match else None
70
+
71
+ async def availability_response(
72
+ resolve: Callable[[str], Coroutine[Any, Any, bool]],
73
+ logger: Callable[[str, str], None],
74
+ message: str = None):
75
+ try:
76
+ username_is_available_uri: tuple[str, bool, str] = await resolve()\
77
+ if inspect.iscoroutinefunction(resolve) or inspect.isawaitable(resolve)\
78
+ else await asyncio.to_thread(resolve)
79
+ logger("username_is_available_uri", username_is_available_uri)
80
+ username, is_available, uri = username_is_available_uri
81
+ if is_available == True:
82
+ return {
83
+ 'available': False,
84
+ 'message': f"{username}: ❌ Taken",
85
+ 'url': uri
86
+ }
87
+ if message:
88
+ return {
89
+ 'available': True,
90
+ 'message': message,
91
+ 'url': uri
92
+ }
93
+ else:
94
+ return {
95
+ 'available': True,
96
+ 'message': f"{username}: βœ… Available",
97
+ 'url': uri
98
+ }
99
+ except Exception as e:
100
+ logger(f"{availability_response.__name__}:Exception", str(e))
101
+ return { 'message': f"❌ {str(e)}" }
102
+
103
+ def resolve_instagram_username(
104
+ username: str, logger: Callable[[str, str], None]) -> tuple[str, bool, str]:
105
+ def resolve() -> bool:
106
+ profile_uri = f"https://www.instagram.com/{username}/"
107
+ profile_response = requests.get(profile_uri, allow_redirects = False)
108
+ profile_response_username = get_json_value(profile_response.text, "username", "\w+") or ""
109
+ logger("profile_response_username", profile_response_username)
110
+ _return_result = lambda is_available: (username, is_available, profile_uri)
111
+ # if there is a username in the page, then this is likely an existing account
112
+ if profile_response_username.lower().strip() == username.lower().strip():
113
+ return _return_result(True)
114
+ x_ig_app_id = get_json_value(profile_response.text, "X-IG-App-ID", "\d+")
115
+ web_profile_response = requests.get(
116
+ url=f"https://www.instagram.com/api/v1/users/web_profile_info/?username={username}",
117
+ headers={
118
+ "x-ig-app-id": x_ig_app_id,
119
+ },
120
+ allow_redirects = False)
121
+ logger("web_profile_response.status_code", web_profile_response.status_code)
122
+ # if status is 404, then the account doesnt exist!
123
+ is_html = re.match(r'.*(\w+)/html', web_profile_response.headers.get("Content-Type"))
124
+ if web_profile_response.status_code == 404 and is_html:
125
+ return _return_result(False)
126
+ # if status is 200, check status of the json
127
+ is_json = re.match(r'.*(\w+)/json', web_profile_response.headers.get("Content-Type"))
128
+ json_status = (web_profile_response.json() or {}).get('status') == 'ok' if is_json else False
129
+ return _return_result(web_profile_response.status_code == 200 and json_status)
130
+ return resolve