web_scrape / app.py
jonathanjordan21's picture
Update app.py
df94e38 verified
raw
history blame
12.3 kB
from typing import Annotated, Optional
from fastapi import FastAPI, Header, Query
import html2text
import requests
import httpx
import re
import json
from fastapi.middleware.cors import CORSMiddleware
from bs4 import BeautifulSoup
import googleapiclient
import googleapiclient.discovery
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/linkedin_post_details")
async def linkedin_post_details(post_id: Optional[str] = None, url: Optional[str] = None):
if not url:
url = "https://www.linkedin.com/posts/"+post_id
res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"})
soup = BeautifulSoup(res.content, "html.parser")
script_tags = soup.find_all("script")
for script_tag in script_tags:
try:
script_tag = json.loads(script_tag.string)
if script_tag.get("datePublished"):
desc = script_tag.get("articleBody")
if not desc:
desc = script_tag.get("description")
author = script_tag.get("author")
full_name = author.get("name")
username = author.get("url").rsplit("/", 1)[-1]
user_type = author.get("@type").lower()
date = script_tag.get("datePublished")
except Exception as e:
continue
spans = soup.find_all(
"span", {"data-test-id": "social-actions__reaction-count"}
)
if spans:
reactions = spans[0].text.strip()
else:
reactions = '0'
try:
comments = str(soup.find("a", {"data-test-id": "social-actions__comments"}).get(
"data-num-comments"
))
except:
comments = '0'
return {
"insights": {
"likeCount": None,
# "commentCount": int(comments.replace(",", "")),
"commentCount": comments,
"shareCount": None,
# "reactionCount": int(reactions.replace(",", "")),
"reactionCount":reactions,
"reactions": [],
},
"description": desc,
"username": username,
"name": full_name,
"userType": user_type,
"date": date,
}
# async def linkedin_post_details(post_id: str):
# url = "https://www.linkedin.com/posts/"+post_id
# res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"})
# text_maker = html2text.HTML2Text()
# text_maker.ignore_links = True
# text_maker.ignore_images = True
# text_maker.bypass_tables = False
# docs = text_maker.handle(res.content.decode("utf-8"))
# chunks = docs.split("\n\n#")
# linkedin_content = chunks[1]
# user = linkedin_content.split("\n\n", 5)
# full_name = user[1]
# bio = user[2]
# try:
# date, edited = user[3].split(" ")
# edited = True
# except:
# date = user[3].strip()
# edited = False
# content = "\n\n".join(user[5:])
# insights = chunks[3].split("\n\n")[2]
# likes = insights.split(" ", 1)[0].strip()
# comments = insights.rsplit(" ", 2)[1].strip()
# username = url.rsplit("/",1)[-1].split("_")[0]
# return {
# "userDetails": {"full_name": full_name, "username":username,"bio": bio},
# "content": content,
# "date": date,
# "is_edited": edited,
# "insights": {"likeCount": likes, "commentCount": comments, "shareCount": None, "viewCount":None},
# "username":username
# }
@app.get("/instagram_post_details")
async def ig_post_detail(post_id: Optional[str] = None, url: Optional[str] = None):
if not url:
url = f"https://www.instagram.com/p/{post_id}"
res = requests.get(
url,
headers={
"user-agent": "Googlebot",
"accept-language": "en-US"
},
timeout=(10, 27),
)
soup = BeautifulSoup(res.content, "html.parser")
meta = soup.find("meta", {"name": "description"})
content = meta.get("content")
like_split = content.split(" likes, ")
likes = like_split[0]
comment_split = like_split[1].split(" comments - ")
comments = comment_split[0]
author_split = comment_split[1].split(": "")
author_date = author_split[0].split(" on ")
username = author_date[0]
date = author_date[1].split(":")[0]
name_desc = (
soup.find("meta", {"property": "og:title"})
.get("content")
.split(" on Instagram: ", 1)
)
full_name = name_desc[0]
desc = name_desc[-1]
return {
"insights": {
"likeCount": likes,
"commentCount": comments,
"shareCount": None,
},
"description": desc,
"username": username,
"name": full_name,
"username": username,
"date": date,
}
@app.get("/facebook_post_details")
async def fb_post_detail(username: Optional[str] = None, post_id: Optional[str] = None, url: Optional[str] = None):
if not url:
url = f"https://www.facebook.com/{username}/posts/{post_id}"
user_agent = "Googlebot"
res = requests.get(
url,
headers={
"user-agent": user_agent,
"accept-language": "en-US"
},
timeout=(10, 27),
)
soup = BeautifulSoup(res.content, "html.parser")
script_tags = soup.find_all("script")
print(len(script_tags))
for script_tag in script_tags:
try:
if "important_reactors" in script_tag.string:
splitter = '"reaction_count":{"count":'
total_react, reaction_split = script_tag.string.split(splitter, 2)[1].split("},", 1)
total_react = total_react.split(',"')[0]
pattern = r"\[.*?\]"
reactions = re.search(pattern, reaction_split)
if reactions:
reactions = json.loads(reactions.group(0))
else:
reactions = []
reactions = [
dict(
name=reaction["node"]["localized_name"].lower(),
count=reaction["reaction_count"],
is_visible=reaction["visible_in_bling_bar"],
)
for reaction in reactions
]
splitter = '"share_count":{"count":'
shares = script_tag.string.split(splitter, 2)[1].split(",")[0]
splitter = '"comments":{"total_count":'
comments = script_tag.string.split(splitter, 2)[1].split("}")[0]
likes = [x.get("count") for x in reactions if x.get("name") == "like"][0]
print(total_react, reactions, shares, comments, likes)
if '"message":{"text":"' in script_tag.string:
desc = script_tag.string.split('"message":{"text":"', 1)[-1].split('"},')[0]
except Exception as e:
print(e)
continue
name = soup.find("meta", {"property": "og:title"}).get("content")
return {
"insights": {
"likeCount": likes,
"commentCount": comments,
"shareCount": shares,
"reactionCount": total_react,
"reactions": reactions,
},
"description": desc,
"username": username,
"name": name,
"date": None,
}
@app.get("/google_search")
async def google_search(q: str, delimiter: str = "\n---\n", sites: Annotated[list[str] | None, Query()] = None):
print(sites)
print(type(sites))
url = f"https://www.google.com/search?q={q} "
if sites:
url += " OR ".join(["site:"+site for site in sites])
texts = ""
soup = BeautifulSoup(requests.get(url).content, "html.parser")
for div in soup.find_all("div")[24:]:
if len(div.find_parents("div")) == 8: # Depth 4 means 3 parent divs (0-indexed)
# print(div.get_text().strip())
href = div.find(href=True, recursive=True)
text = div.find(text=True, recursive=False)
if href and text:
print(text)
text = f'[{text}]({href["href"].split("/url?q=")[-1]})'
if text != None and text.strip():
texts += text + delimiter
return {"results":texts}
@app.get("/google_search_urls")
async def google_search_url(q: str, sites: Annotated[list[str] | None, Query()] = None):
url = f"https://www.google.com/search?q={q} "
if sites:
url += " OR ".join(["site:"+site for site in sites])
res = requests.get(
url,
headers={
"user-agent": "Googlebot",
"accept-language": "en-US"
},
timeout=(10, 27),
)
soup = BeautifulSoup(res.content, "html.parser")
prefix = "/url?q=h"
len_prefix = len(prefix)
docs = []
for div in soup.find_all(True):
if len(div.find_parents()) == 2: # Depth 4 means 3 parent divs (0-indexed)
a_tags = div.find_all("a")
for a in a_tags:
doc = a.get("href")
if (
doc[:len_prefix] == prefix
and "google.com" not in doc[len_prefix - 1 :]
):
docs.append(
doc[len_prefix - 1 :]
.split("&")[0]
.replace("%3F", "?")
.replace("%3D", "=")
)
return {"results":docs}
@app.get("/tiktok_video_details")
async def tiktok_video_details(username: Optional[str] = None, video_id:Optional[str] = None, url: Optional[str] = None):
if not url:
if username[0] != "@":
username = "@" + username
url = f"https://www.tiktok.com/{username}/video/{video_id}"
# user_agent = "LinkedInBot"
user_agent = "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)"
res = requests.get(url, headers={"user-agent": user_agent})
# soup = BeautifulSoup(res.content, "html.parser")
# insights = soup.find("meta", {"property": "og:description"}).get("content")
# likes = insights.split(" ", 1)[0]
# desc = insights.rsplit(" comments. “", 1)[-1][:-1]
# comments = insights.split(", ", 1)[-1].split(" ", 1)[0]
# name = soup.find("meta", {"property": "og:title"}).get("content")[9:]
# return {
# "insights": {"likeCount": likes, "commentCount": comments, "shareCount":None, "viewCount":None},
# "description": desc,
# "username": username,
# "name": name,
# }
text_maker = html2text.HTML2Text()
text_maker.ignore_links = True
text_maker.ignore_images = True
text_maker.bypass_tables = False
print("RESPONSE DETAIlL", res.content.decode("utf-8"))
docs = text_maker.handle(res.content.decode("utf-8"))
print("DOCS", docs)
content_detail = docs.split("###")[5]
likes, comments, bookmarks, shares = re.findall(r'\*\*([\w.]+)\*\*', content_detail)
profile = [x.strip() for x in content_detail.split("\n\nSpeed\n\n", 1)[1].split("\n", 6) if x.strip()]
username = profile[0]
date = profile[1].rsplit(" · ", 1)[-1]
desc = profile[-1][2:].replace("**", "")
return {
"insights":{
"likeCount":likes,
"commentCount":comments,
"bookmarkCount":bookmarks,
"shareCount":shares
},
"username":username,
"date":date,
"description":desc
}
@app.get("/youtube_video_details")
async def yt_vid_detail(api_key:str, video_id: Optional[str] = None, url: Optional[str] = None):
# yt_ids = [doc.split("?v=")[-1] for doc in docs]
if url:
video_id = url.split("?v=")[-1]
youtube = googleapiclient.discovery.build(
"youtube", "v3", developerKey=api_key
)
# request = youtube.search().list(part="snippet", q="sari roti", type="video")
request = youtube.videos().list(
part="snippet,statistics,topicDetails",
# id=",".join(yt_ids),
id = video_id,
)
return request.execute()["items"]