Spaces:
Sleeping
Sleeping
from typing import Annotated, Optional | |
from fastapi import FastAPI, Header, Query | |
import html2text | |
import requests | |
import httpx | |
import re | |
import json | |
from fastapi.middleware.cors import CORSMiddleware | |
from bs4 import BeautifulSoup | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
async def linkedin_post_details(post_id: str): | |
url = "https://www.linkedin.com/posts/"+post_id | |
res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"}) | |
soup = BeautifulSoup(res.content, "html.parser") | |
script_tags = soup.find_all("script") | |
for script_tag in script_tags: | |
try: | |
script_tag = json.loads(script_tag.string) | |
if script_tag.get("datePublished"): | |
desc = script_tag.get("articleBody") | |
if not desc: | |
desc = script_tag.get("description") | |
author = script_tag.get("author") | |
full_name = author.get("name") | |
username = author.get("url").rsplit("/", 1)[-1] | |
user_type = author.get("@type").lower() | |
date = script_tag.get("datePublished") | |
except Exception as e: | |
continue | |
spans = soup.find_all( | |
"span", {"data-test-id": "social-actions__reaction-count"} | |
) | |
if spans: | |
reactions = spans[0].text.strip() | |
else: | |
reactions = '0' | |
try: | |
comments = str(soup.find("a", {"data-test-id": "social-actions__comments"}).get( | |
"data-num-comments" | |
)) | |
except: | |
comments = '0' | |
return { | |
"insights": { | |
"likeCount": None, | |
"commentCount": int(comments.replace(",", "")), | |
"shareCount": None, | |
"reactionCount": int(reactions.replace(",", "")), | |
"reactions": [], | |
}, | |
"description": desc, | |
"username": username, | |
"name": full_name, | |
"userType": user_type, | |
"date": date, | |
} | |
# async def linkedin_post_details(post_id: str): | |
# url = "https://www.linkedin.com/posts/"+post_id | |
# res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"}) | |
# text_maker = html2text.HTML2Text() | |
# text_maker.ignore_links = True | |
# text_maker.ignore_images = True | |
# text_maker.bypass_tables = False | |
# docs = text_maker.handle(res.content.decode("utf-8")) | |
# chunks = docs.split("\n\n#") | |
# linkedin_content = chunks[1] | |
# user = linkedin_content.split("\n\n", 5) | |
# full_name = user[1] | |
# bio = user[2] | |
# try: | |
# date, edited = user[3].split(" ") | |
# edited = True | |
# except: | |
# date = user[3].strip() | |
# edited = False | |
# content = "\n\n".join(user[5:]) | |
# insights = chunks[3].split("\n\n")[2] | |
# likes = insights.split(" ", 1)[0].strip() | |
# comments = insights.rsplit(" ", 2)[1].strip() | |
# username = url.rsplit("/",1)[-1].split("_")[0] | |
# return { | |
# "userDetails": {"full_name": full_name, "username":username,"bio": bio}, | |
# "content": content, | |
# "date": date, | |
# "is_edited": edited, | |
# "insights": {"likeCount": likes, "commentCount": comments, "shareCount": None, "viewCount":None}, | |
# "username":username | |
# } | |
async def fb_post_detail(username: str, post_id: str): | |
url = f"https://www.facebook.com/{username}/posts/{post_id}" | |
user_agent = "Googlebot" | |
res = requests.get( | |
url, | |
headers={ | |
"user-agent": user_agent, | |
"accept-language": "en-US" | |
}, | |
timeout=(10, 27), | |
) | |
soup = BeautifulSoup(res.content, "html.parser") | |
script_tags = soup.find_all("script") | |
print(len(script_tags)) | |
for script_tag in script_tags: | |
try: | |
if "important_reactors" in script_tag.string: | |
splitter = '"reaction_count":{"count":' | |
total_react, reaction_split = script_tag.string.split(splitter, 2)[1].split("},", 1) | |
total_react = total_react.split(',"')[0] | |
pattern = r"\[.*?\]" | |
reactions = re.search(pattern, reaction_split) | |
if reactions: | |
reactions = json.loads(reactions.group(0)) | |
else: | |
reactions = [] | |
reactions = [ | |
dict( | |
name=reaction["node"]["localized_name"].lower(), | |
count=reaction["reaction_count"], | |
is_visible=reaction["visible_in_bling_bar"], | |
) | |
for reaction in reactions | |
] | |
splitter = '"share_count":{"count":' | |
shares = script_tag.string.split(splitter, 2)[1].split(",")[0] | |
splitter = '"comments":{"total_count":' | |
comments = script_tag.string.split(splitter, 2)[1].split("}")[0] | |
likes = [x.get("count") for x in reactions if x.get("name") == "like"][0] | |
print(total_react, reactions, shares, comments, likes) | |
if '"message":{"text":"' in script_tag.string: | |
desc = script_tag.string.split('"message":{"text":"', 1)[-1].split('"},')[0] | |
except Exception as e: | |
print(e) | |
continue | |
name = soup.find("meta", {"property": "og:title"}).get("content") | |
return { | |
"insights": { | |
"likeCount": likes, | |
"commentCount": comments, | |
"shareCount": shares, | |
"reactionCount": total_react, | |
"reactions": reactions, | |
}, | |
"description": desc, | |
"username": username, | |
"name": name, | |
"date": None, | |
} | |
async def google_search(q: str, delimiter: str = "\n---\n", sites: Annotated[list[str] | None, Query()] = None): | |
print(sites) | |
print(type(sites)) | |
url = f"https://www.google.com/search?q={q} " | |
if sites: | |
url += " OR ".join(["site:"+site for site in sites]) | |
texts = "" | |
soup = BeautifulSoup(requests.get(url).content, "html.parser") | |
for div in soup.find_all("div")[24:]: | |
if len(div.find_parents("div")) == 8: # Depth 4 means 3 parent divs (0-indexed) | |
# print(div.get_text().strip()) | |
href = div.find(href=True, recursive=True) | |
text = div.find(text=True, recursive=False) | |
if href and text: | |
print(text) | |
text = f'[{text}]({href["href"].split("/url?q=")[-1]})' | |
if text != None and text.strip(): | |
texts += text + delimiter | |
return {"results":texts} | |
async def tiktok_video_details(username: str, video_id:str): | |
if username[0] != "@": | |
username = "@" + username | |
url = f"https://www.tiktok.com/{username}/video/{video_id}" | |
# user_agent = "LinkedInBot" | |
user_agent = "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)" | |
res = requests.get(url, headers={"user-agent": user_agent}) | |
# soup = BeautifulSoup(res.content, "html.parser") | |
# insights = soup.find("meta", {"property": "og:description"}).get("content") | |
# likes = insights.split(" ", 1)[0] | |
# desc = insights.rsplit(" comments. “", 1)[-1][:-1] | |
# comments = insights.split(", ", 1)[-1].split(" ", 1)[0] | |
# name = soup.find("meta", {"property": "og:title"}).get("content")[9:] | |
# return { | |
# "insights": {"likeCount": likes, "commentCount": comments, "shareCount":None, "viewCount":None}, | |
# "description": desc, | |
# "username": username, | |
# "name": name, | |
# } | |
text_maker = html2text.HTML2Text() | |
text_maker.ignore_links = True | |
text_maker.ignore_images = True | |
text_maker.bypass_tables = False | |
print("RESPONSE DETAIlL", res.content.decode("utf-8")) | |
docs = text_maker.handle(res.content.decode("utf-8")) | |
print("DOCS", docs) | |
content_detail = docs.split("###")[5] | |
likes, comments, bookmarks, shares = re.findall(r'\*\*([\w.]+)\*\*', content_detail) | |
profile = [x.strip() for x in content_detail.split("\n\nSpeed\n\n", 1)[1].split("\n", 6) if x.strip()] | |
username = profile[0] | |
date = profile[1].rsplit(" · ", 1)[-1] | |
desc = profile[-1][2:].replace("**", "") | |
return { | |
"insights":{ | |
"likeCount":likes, | |
"commentCount":comments, | |
"bookmarkCount":bookmarks, | |
"shareCount":shares | |
}, | |
"username":username, | |
"date":date, | |
"description":desc | |
} |