from typing import Annotated, Optional from fastapi import FastAPI, Header, Query import html2text import requests import httpx import re import json from fastapi.middleware.cors import CORSMiddleware from bs4 import BeautifulSoup import googleapiclient import googleapiclient.discovery app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/linkedin_post_details") async def linkedin_post_details(post_id: Optional[str] = None, url: Optional[str] = None): if not url: url = "https://www.linkedin.com/posts/"+post_id res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"}) soup = BeautifulSoup(res.content, "html.parser") script_tags = soup.find_all("script") for script_tag in script_tags: try: script_tag = json.loads(script_tag.string) if script_tag.get("datePublished"): desc = script_tag.get("articleBody") if not desc: desc = script_tag.get("description") author = script_tag.get("author") full_name = author.get("name") username = author.get("url").rsplit("/", 1)[-1] user_type = author.get("@type").lower() date = script_tag.get("datePublished") except Exception as e: continue spans = soup.find_all( "span", {"data-test-id": "social-actions__reaction-count"} ) if spans: reactions = spans[0].text.strip() else: reactions = '0' try: comments = str(soup.find("a", {"data-test-id": "social-actions__comments"}).get( "data-num-comments" )) except: comments = '0' return { "insights": { "likeCount": None, # "commentCount": int(comments.replace(",", "")), "commentCount": comments, "shareCount": None, # "reactionCount": int(reactions.replace(",", "")), "reactionCount":reactions, "reactions": [], }, "description": desc, "username": username, "name": full_name, "userType": user_type, "date": date, } # async def linkedin_post_details(post_id: str): # url = "https://www.linkedin.com/posts/"+post_id # res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"}) # text_maker = html2text.HTML2Text() # text_maker.ignore_links = True # text_maker.ignore_images = True # text_maker.bypass_tables = False # docs = text_maker.handle(res.content.decode("utf-8")) # chunks = docs.split("\n\n#") # linkedin_content = chunks[1] # user = linkedin_content.split("\n\n", 5) # full_name = user[1] # bio = user[2] # try: # date, edited = user[3].split(" ") # edited = True # except: # date = user[3].strip() # edited = False # content = "\n\n".join(user[5:]) # insights = chunks[3].split("\n\n")[2] # likes = insights.split(" ", 1)[0].strip() # comments = insights.rsplit(" ", 2)[1].strip() # username = url.rsplit("/",1)[-1].split("_")[0] # return { # "userDetails": {"full_name": full_name, "username":username,"bio": bio}, # "content": content, # "date": date, # "is_edited": edited, # "insights": {"likeCount": likes, "commentCount": comments, "shareCount": None, "viewCount":None}, # "username":username # } @app.get("/instagram_post_details") async def ig_post_detail(post_id: Optional[str] = None, url: Optional[str] = None): if not url: url = f"https://www.instagram.com/p/{post_id}" res = requests.get( url, headers={ "user-agent": "Googlebot", "accept-language": "en-US" }, timeout=(10, 27), ) soup = BeautifulSoup(res.content, "html.parser") meta = soup.find("meta", {"name": "description"}) content = meta.get("content") like_split = content.split(" likes, ") likes = like_split[0] comment_split = like_split[1].split(" comments - ") comments = comment_split[0] author_split = comment_split[1].split(": "") author_date = author_split[0].split(" on ") username = author_date[0] date = author_date[1].split(":")[0] name_desc = ( soup.find("meta", {"property": "og:title"}) .get("content") .split(" on Instagram: ", 1) ) full_name = name_desc[0] desc = name_desc[-1] return { "insights": { "likeCount": likes, "commentCount": comments, "shareCount": None, }, "description": desc, "username": username, "name": full_name, "username": username, "date": date, } @app.get("/facebook_post_details") async def fb_post_detail(username: Optional[str] = None, post_id: Optional[str] = None, url: Optional[str] = None): if not url: url = f"https://www.facebook.com/{username}/posts/{post_id}" user_agent = "Googlebot" res = requests.get( url, headers={ "user-agent": user_agent, "accept-language": "en-US" }, timeout=(10, 27), ) soup = BeautifulSoup(res.content, "html.parser") script_tags = soup.find_all("script") print(len(script_tags)) for script_tag in script_tags: try: if "important_reactors" in script_tag.string: splitter = '"reaction_count":{"count":' total_react, reaction_split = script_tag.string.split(splitter, 2)[1].split("},", 1) total_react = total_react.split(',"')[0] pattern = r"\[.*?\]" reactions = re.search(pattern, reaction_split) if reactions: reactions = json.loads(reactions.group(0)) else: reactions = [] reactions = [ dict( name=reaction["node"]["localized_name"].lower(), count=reaction["reaction_count"], is_visible=reaction["visible_in_bling_bar"], ) for reaction in reactions ] splitter = '"share_count":{"count":' shares = script_tag.string.split(splitter, 2)[1].split(",")[0] splitter = '"comments":{"total_count":' comments = script_tag.string.split(splitter, 2)[1].split("}")[0] likes = [x.get("count") for x in reactions if x.get("name") == "like"][0] print(total_react, reactions, shares, comments, likes) if '"message":{"text":"' in script_tag.string: desc = script_tag.string.split('"message":{"text":"', 1)[-1].split('"},')[0] except Exception as e: print(e) continue name = soup.find("meta", {"property": "og:title"}).get("content") return { "insights": { "likeCount": likes, "commentCount": comments, "shareCount": shares, "reactionCount": total_react, "reactions": reactions, }, "description": desc, "username": username, "name": name, "date": None, } @app.get("/google_search") async def google_search(q: str, delimiter: str = "\n---\n", sites: Annotated[list[str] | None, Query()] = None): print(sites) print(type(sites)) url = f"https://www.google.com/search?q={q} " if sites: url += " OR ".join(["site:"+site for site in sites]) texts = "" soup = BeautifulSoup(requests.get(url).content, "html.parser") for div in soup.find_all("div")[24:]: if len(div.find_parents("div")) == 8: # Depth 4 means 3 parent divs (0-indexed) # print(div.get_text().strip()) href = div.find(href=True, recursive=True) text = div.find(text=True, recursive=False) if href and text: print(text) text = f'[{text}]({href["href"].split("/url?q=")[-1]})' if text != None and text.strip(): texts += text + delimiter return {"results":texts} @app.get("/google_search_urls") async def google_search_url(q: str, sites: Annotated[list[str] | None, Query()] = None): url = f"https://www.google.com/search?q={q} " if sites: url += " OR ".join(["site:"+site for site in sites]) res = requests.get( url, headers={ "user-agent": "Googlebot", "accept-language": "en-US" }, timeout=(10, 27), ) soup = BeautifulSoup(res.content, "html.parser") prefix = "/url?q=h" len_prefix = len(prefix) docs = [] for div in soup.find_all(True): if len(div.find_parents()) == 2: # Depth 4 means 3 parent divs (0-indexed) a_tags = div.find_all("a") for a in a_tags: doc = a.get("href") if ( doc[:len_prefix] == prefix and "google.com" not in doc[len_prefix - 1 :] ): docs.append( doc[len_prefix - 1 :] .split("&")[0] .replace("%3F", "?") .replace("%3D", "=") ) return {"results":docs} @app.get("/tiktok_video_details") async def tiktok_video_details(username: Optional[str] = None, video_id:Optional[str] = None, url: Optional[str] = None): if not url: if username[0] != "@": username = "@" + username url = f"https://www.tiktok.com/{username}/video/{video_id}" # user_agent = "LinkedInBot" user_agent = "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)" res = requests.get(url, headers={"user-agent": user_agent}) # soup = BeautifulSoup(res.content, "html.parser") # insights = soup.find("meta", {"property": "og:description"}).get("content") # likes = insights.split(" ", 1)[0] # desc = insights.rsplit(" comments. “", 1)[-1][:-1] # comments = insights.split(", ", 1)[-1].split(" ", 1)[0] # name = soup.find("meta", {"property": "og:title"}).get("content")[9:] # return { # "insights": {"likeCount": likes, "commentCount": comments, "shareCount":None, "viewCount":None}, # "description": desc, # "username": username, # "name": name, # } text_maker = html2text.HTML2Text() text_maker.ignore_links = True text_maker.ignore_images = True text_maker.bypass_tables = False print("RESPONSE DETAIlL", res.content.decode("utf-8")) docs = text_maker.handle(res.content.decode("utf-8")) print("DOCS", docs) content_detail = docs.split("###")[5] likes, comments, bookmarks, shares = re.findall(r'\*\*([\w.]+)\*\*', content_detail) profile = [x.strip() for x in content_detail.split("\n\nSpeed\n\n", 1)[1].split("\n", 6) if x.strip()] username = profile[0] date = profile[1].rsplit(" · ", 1)[-1] desc = profile[-1][2:].replace("**", "") return { "insights":{ "likeCount":likes, "commentCount":comments, "bookmarkCount":bookmarks, "shareCount":shares }, "username":username, "date":date, "description":desc } @app.get("/youtube_video_details") async def yt_vid_detail(api_key:str, video_id: Optional[str] = None, url: Optional[str] = None): # yt_ids = [doc.split("?v=")[-1] for doc in docs] if url: video_id = url.split("?v=")[-1] youtube = googleapiclient.discovery.build( "youtube", "v3", developerKey=api_key ) # request = youtube.search().list(part="snippet", q="sari roti", type="video") request = youtube.videos().list( part="snippet,statistics,topicDetails", # id=",".join(yt_ids), id = video_id, ) return request.execute()["items"]