|
import asyncio |
|
from contextlib import asynccontextmanager |
|
from typing import Optional |
|
from fastapi import APIRouter, FastAPI |
|
from fastapi.routing import APIRouter |
|
import httpx |
|
from pydantic import BaseModel, Field |
|
from playwright.async_api import async_playwright, Browser, BrowserContext, Page |
|
import logging |
|
import uvicorn |
|
|
|
from scrap import PatentScrapBulkResponse, scrap_patent_async, scrap_patent_bulk_async |
|
from serp import SerpQuery, SerpResults, query_arxiv, query_bing_search, query_brave_search, query_ddg_search, query_google_patents, query_google_scholar |
|
from utils import log_gathered_exceptions |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]: %(message)s', |
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
) |
|
|
|
|
|
playwright = None |
|
pw_browser: Optional[Browser] = None |
|
|
|
|
|
httpx_client = httpx.AsyncClient(timeout=30, limits=httpx.Limits( |
|
max_connections=30, max_keepalive_connections=20)) |
|
|
|
|
|
@asynccontextmanager |
|
async def api_lifespan(app: FastAPI): |
|
global playwright, pw_browser |
|
playwright = await async_playwright().start() |
|
pw_browser = await playwright.chromium.launch(headless=True) |
|
yield |
|
|
|
await pw_browser.close() |
|
await playwright.stop() |
|
|
|
app = FastAPI(lifespan=api_lifespan, docs_url="/", |
|
title="SERPent", description=open("docs/docs.md").read()) |
|
|
|
|
|
scrap_router = APIRouter(prefix="/scrap", tags=["scrapping"]) |
|
|
|
serp_router = APIRouter(prefix="/serp", tags=["serp scrapping"]) |
|
|
|
|
|
|
|
|
|
@serp_router.post("/search_scholar") |
|
async def search_google_scholar(params: SerpQuery): |
|
"""Queries google scholar for the specified query""" |
|
logging.info(f"Searching Google Scholar for queries: {params.queries}") |
|
results = await asyncio.gather(*[query_google_scholar(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) |
|
log_gathered_exceptions(results, "google scholar search", params) |
|
|
|
|
|
filtered_results = [r for r in results if not isinstance(r, Exception)] |
|
flattened_results = [ |
|
item for sublist in filtered_results for item in sublist] |
|
|
|
|
|
if len(filtered_results) == 0: |
|
return SerpResults(results=[], error=str(results[-1])) |
|
|
|
return SerpResults(results=flattened_results, error=None) |
|
|
|
|
|
@serp_router.post("/search_arxiv") |
|
async def search_arxiv(params: SerpQuery): |
|
"""Searches arxiv for the specified queries and returns the found documents.""" |
|
logging.info(f"Searching Arxiv for queries: {params.queries}") |
|
results = await asyncio.gather(*[query_arxiv(httpx_client, q, params.n_results) for q in params.queries], return_exceptions=True) |
|
log_gathered_exceptions(results, "arxiv search", params) |
|
|
|
filtered_results = [r for r in results if not isinstance(r, Exception)] |
|
flattened_results = [ |
|
item for sublist in filtered_results for item in sublist] |
|
|
|
if len(filtered_results) == 0: |
|
return SerpResults(results=[], error=str(results[-1])) |
|
|
|
return SerpResults(results=flattened_results, error=None) |
|
|
|
|
|
@serp_router.post("/search_patents") |
|
async def search_patents(params: SerpQuery) -> SerpResults: |
|
"""Searches google patents for the specified queries and returns the found documents.""" |
|
logging.info(f"Searching Google Patents for queries: {params.queries}") |
|
results = await asyncio.gather(*[query_google_patents(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) |
|
log_gathered_exceptions(results, "google patent search", params) |
|
|
|
|
|
filtered_results = [r for r in results if not isinstance(r, Exception)] |
|
flattened_results = [ |
|
item for sublist in filtered_results for item in sublist] |
|
|
|
|
|
if len(filtered_results) == 0: |
|
return SerpResults(results=[], error=str(results[-1])) |
|
|
|
return SerpResults(results=flattened_results, error=None) |
|
|
|
|
|
@serp_router.post("/search_brave") |
|
async def search_brave(params: SerpQuery) -> SerpResults: |
|
"""Searches brave search for the specified queries and returns the found documents.""" |
|
logging.info(f"Searching Brave Search for queries: {params.queries}") |
|
results = await asyncio.gather(*[query_brave_search(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) |
|
log_gathered_exceptions(results, "brave search", params) |
|
|
|
|
|
filtered_results = [r for r in results if not isinstance(r, Exception)] |
|
flattened_results = [ |
|
item for sublist in filtered_results for item in sublist] |
|
|
|
|
|
if len(filtered_results) == 0: |
|
return SerpResults(results=[], error=str(results[-1])) |
|
|
|
return SerpResults(results=flattened_results, error=None) |
|
|
|
|
|
@serp_router.post("/search_bing") |
|
async def search_bing(params: SerpQuery) -> SerpResults: |
|
"""Searches Bing search for the specified queries and returns the found documents.""" |
|
logging.info(f"Searching Bing Search for queries: {params.queries}") |
|
results = await asyncio.gather(*[query_bing_search(pw_browser, q, params.n_results) for q in params.queries], return_exceptions=True) |
|
log_gathered_exceptions(results, "bing search", params) |
|
|
|
|
|
filtered_results = [r for r in results if not isinstance(r, Exception)] |
|
flattened_results = [ |
|
item for sublist in filtered_results for item in sublist] |
|
|
|
|
|
if len(filtered_results) == 0: |
|
return SerpResults(results=[], error=str(results[-1])) |
|
|
|
return SerpResults(results=flattened_results, error=None) |
|
|
|
|
|
@serp_router.post("/search_duck") |
|
async def search_duck(params: SerpQuery) -> SerpResults: |
|
"""Searches duckduckgo for the specified queries and returns the found documents""" |
|
logging.info(f"Searching DuckDuckGo for queries: {params.queries}") |
|
results = await asyncio.gather(*[query_ddg_search(q, params.n_results) for q in params.queries], return_exceptions=True) |
|
log_gathered_exceptions(results, "duckduckgo search", params) |
|
|
|
|
|
filtered_results = [r for r in results if not isinstance(r, Exception)] |
|
flattened_results = [ |
|
item for sublist in filtered_results for item in sublist] |
|
|
|
|
|
if len(filtered_results) == 0: |
|
return SerpResults(results=[], error=str(results[-1])) |
|
|
|
return SerpResults(results=flattened_results, error=None) |
|
|
|
|
|
@serp_router.post("/search") |
|
async def search(params: SerpQuery): |
|
"""Attempts to search the specified queries using ALL backends""" |
|
results = [] |
|
|
|
for q in params.queries: |
|
try: |
|
logging.info(f"Querying DDG with query: `{q}`") |
|
res = await query_ddg_search(q, params.n_results) |
|
results.extend(res) |
|
continue |
|
except Exception as e: |
|
logging.error(f"Failed to query DDG with query `{q}`: {e}") |
|
logging.info("Trying with next browser backend.") |
|
|
|
try: |
|
logging.info(f"Querying Brave Search with query: `{q}`") |
|
res = await query_brave_search(pw_browser, q, params.n_results) |
|
results.extend(res) |
|
continue |
|
except Exception as e: |
|
logging.error( |
|
f"Failed to query Brave Search with query `{q}`: {e}") |
|
logging.info("Trying with next browser backend.") |
|
|
|
try: |
|
logging.info(f"Querying Bing with query: `{q}`") |
|
res = await query_bing_search(pw_browser, q, params.n_results) |
|
results.extend(res) |
|
continue |
|
except Exception as e: |
|
logging.error(f"Failed to query Bing search with query `{q}`: {e}") |
|
logging.info("Trying with next browser backend.") |
|
|
|
if len(results) == 0: |
|
return SerpResults(results=[], error="All backends are rate-limited.") |
|
|
|
return SerpResults(results=results, error=None) |
|
|
|
|
|
|
|
|
|
|
|
@scrap_router.get("/scrap_patent/{patent_id}") |
|
async def scrap_patent(patent_id: str): |
|
"""Scraps the specified patent from Google Patents.""" |
|
try: |
|
patent = await scrap_patent_async(httpx_client, f"https://patents.google.com/patent/{patent_id}/en") |
|
return patent |
|
except Exception as e: |
|
logging.warning(f"Failed to scrap patent {patent_id}: {e}") |
|
return None |
|
|
|
|
|
class ScrapPatentsRequest(BaseModel): |
|
"""Request model for scrapping multiple patents.""" |
|
patent_ids: list[str] = Field(..., |
|
description="List of patent IDs to scrap") |
|
|
|
|
|
@scrap_router.post("/scrap_patents_bulk", response_model=PatentScrapBulkResponse) |
|
async def scrap_patents(params: ScrapPatentsRequest) -> PatentScrapBulkResponse: |
|
"""Scraps multiple patents from Google Patents.""" |
|
patents = await scrap_patent_bulk_async(httpx_client, params.patent_ids) |
|
return patents |
|
|
|
|
|
|
|
app.include_router(serp_router) |
|
app.include_router(scrap_router) |
|
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|