web_scrape / app.py
jonathanjordan21's picture
Update app.py
7d68b57 verified
raw
history blame
8.76 kB
from typing import Annotated, Optional
from fastapi import FastAPI, Header, Query
import html2text
import requests
import httpx
import re
import json
from fastapi.middleware.cors import CORSMiddleware
from bs4 import BeautifulSoup
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/linkedin_post_details")
async def linkedin_post_details(post_id: str):
url = "https://www.linkedin.com/posts/"+post_id
res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"})
soup = BeautifulSoup(res.content, "html.parser")
script_tags = soup.find_all("script")
for script_tag in script_tags:
try:
script_tag = json.loads(script_tag.string)
if script_tag.get("datePublished"):
desc = script_tag.get("articleBody")
if not desc:
desc = script_tag.get("description")
author = script_tag.get("author")
full_name = author.get("name")
username = author.get("url").rsplit("/", 1)[-1]
user_type = author.get("@type").lower()
date = script_tag.get("datePublished")
except Exception as e:
continue
spans = soup.find_all(
"span", {"data-test-id": "social-actions__reaction-count"}
)
if spans:
reactions = spans[0].text.strip()
else:
reactions = '0'
try:
comments = str(soup.find("a", {"data-test-id": "social-actions__comments"}).get(
"data-num-comments"
))
except:
comments = '0'
return {
"insights": {
"likeCount": None,
"commentCount": int(comments.replace(",", "")),
"shareCount": None,
"reactionCount": int(reactions.replace(",", "")),
"reactions": [],
},
"description": desc,
"username": username,
"name": full_name,
"userType": user_type,
"date": date,
}
# async def linkedin_post_details(post_id: str):
# url = "https://www.linkedin.com/posts/"+post_id
# res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"})
# text_maker = html2text.HTML2Text()
# text_maker.ignore_links = True
# text_maker.ignore_images = True
# text_maker.bypass_tables = False
# docs = text_maker.handle(res.content.decode("utf-8"))
# chunks = docs.split("\n\n#")
# linkedin_content = chunks[1]
# user = linkedin_content.split("\n\n", 5)
# full_name = user[1]
# bio = user[2]
# try:
# date, edited = user[3].split(" ")
# edited = True
# except:
# date = user[3].strip()
# edited = False
# content = "\n\n".join(user[5:])
# insights = chunks[3].split("\n\n")[2]
# likes = insights.split(" ", 1)[0].strip()
# comments = insights.rsplit(" ", 2)[1].strip()
# username = url.rsplit("/",1)[-1].split("_")[0]
# return {
# "userDetails": {"full_name": full_name, "username":username,"bio": bio},
# "content": content,
# "date": date,
# "is_edited": edited,
# "insights": {"likeCount": likes, "commentCount": comments, "shareCount": None, "viewCount":None},
# "username":username
# }
@app.get("/facebook_post_detail")
async def fb_post_detail(username: str, post_id: str):
url = f"https://www.facebook.com/{username}/posts/{post_id}"
user_agent = "Googlebot"
res = requests.get(
url,
headers={
"user-agent": user_agent,
"accept-language": "en-US"
},
timeout=(10, 27),
)
soup = BeautifulSoup(res.content, "html.parser")
script_tags = soup.find_all("script")
print(len(script_tags))
for script_tag in script_tags:
try:
if "important_reactors" in script_tag.string:
splitter = '"reaction_count":{"count":'
total_react, reaction_split = script_tag.string.split(splitter, 2)[1].split("},", 1)
total_react = total_react.split(',"')[0]
pattern = r"\[.*?\]"
reactions = re.search(pattern, reaction_split)
if reactions:
reactions = json.loads(reactions.group(0))
else:
reactions = []
reactions = [
dict(
name=reaction["node"]["localized_name"].lower(),
count=reaction["reaction_count"],
is_visible=reaction["visible_in_bling_bar"],
)
for reaction in reactions
]
splitter = '"share_count":{"count":'
shares = script_tag.string.split(splitter, 2)[1].split(",")[0]
splitter = '"comments":{"total_count":'
comments = script_tag.string.split(splitter, 2)[1].split("}")[0]
likes = [x.get("count") for x in reactions if x.get("name") == "like"][0]
print(total_react, reactions, shares, comments, likes)
if '"message":{"text":"' in script_tag.string:
desc = script_tag.string.split('"message":{"text":"', 1)[-1].split('"},')[0]
except Exception as e:
print(e)
continue
name = soup.find("meta", {"property": "og:title"}).get("content")
return {
"insights": {
"likeCount": likes,
"commentCount": comments,
"shareCount": shares,
"reactionCount": total_react,
"reactions": reactions,
},
"description": desc,
"username": username,
"name": name,
"date": None,
}
@app.get("/google_search")
async def google_search(q: str, delimiter: str = "\n---\n", sites: Annotated[list[str] | None, Query()] = None):
print(sites)
print(type(sites))
url = f"https://www.google.com/search?q={q} "
if sites:
url += " OR ".join(["site:"+site for site in sites])
texts = ""
soup = BeautifulSoup(requests.get(url).content, "html.parser")
for div in soup.find_all("div")[24:]:
if len(div.find_parents("div")) == 8: # Depth 4 means 3 parent divs (0-indexed)
# print(div.get_text().strip())
href = div.find(href=True, recursive=True)
text = div.find(text=True, recursive=False)
if href and text:
print(text)
text = f'[{text}]({href["href"].split("/url?q=")[-1]})'
if text != None and text.strip():
texts += text + delimiter
return {"results":texts}
@app.get("/tiktok_video_details")
async def tiktok_video_details(username: str, video_id:str):
if username[0] != "@":
username = "@" + username
url = f"https://www.tiktok.com/{username}/video/{video_id}"
# user_agent = "LinkedInBot"
user_agent = "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)"
res = requests.get(url, headers={"user-agent": user_agent})
# soup = BeautifulSoup(res.content, "html.parser")
# insights = soup.find("meta", {"property": "og:description"}).get("content")
# likes = insights.split(" ", 1)[0]
# desc = insights.rsplit(" comments. “", 1)[-1][:-1]
# comments = insights.split(", ", 1)[-1].split(" ", 1)[0]
# name = soup.find("meta", {"property": "og:title"}).get("content")[9:]
# return {
# "insights": {"likeCount": likes, "commentCount": comments, "shareCount":None, "viewCount":None},
# "description": desc,
# "username": username,
# "name": name,
# }
text_maker = html2text.HTML2Text()
text_maker.ignore_links = True
text_maker.ignore_images = True
text_maker.bypass_tables = False
print("RESPONSE DETAIlL", res.content.decode("utf-8"))
docs = text_maker.handle(res.content.decode("utf-8"))
print("DOCS", docs)
content_detail = docs.split("###")[5]
likes, comments, bookmarks, shares = re.findall(r'\*\*([\w.]+)\*\*', content_detail)
profile = [x.strip() for x in content_detail.split("\n\nSpeed\n\n", 1)[1].split("\n", 6) if x.strip()]
username = profile[0]
date = profile[1].rsplit(" · ", 1)[-1]
desc = profile[-1][2:].replace("**", "")
return {
"insights":{
"likeCount":likes,
"commentCount":comments,
"bookmarkCount":bookmarks,
"shareCount":shares
},
"username":username,
"date":date,
"description":desc
}