EasyDetect / pipeline /tool /google_serper.py
sunnychenxiwang's picture
Update pipeline/tool/google_serper.py
5b8e00b verified
# The following code was adapted from https://github.com/hwchase17/langchain/blob/master/langchain/utilities/google_serper.py
"""Util that calls Google Search using the Serper.dev API."""
import pdb
import requests
import asyncio
import aiohttp
import yaml
import os
from openai import OpenAI
# env
# serper_api_key = factool_env_config.serper_api_key
# a6a49bf063005dd814b426f0e925308926fdc08c
class GoogleSerperAPIWrapper():
"""Wrapper around the Serper.dev Google Search API.
You can create a free API key at https://serper.dev.
To use, you should have the environment variable ``SERPER_API_KEY``
set with your API key, or pass `serper_api_key` as a named parameter
to the constructor.
Example:
.. code-block:: python
from langchain import GoogleSerperAPIWrapper
google_serper = GoogleSerperAPIWrapper()
"""
def __init__(self, snippet_cnt = 10):
self.k = snippet_cnt# 结果段
self.gl = "us"
self.hl = "en"
#self.serper_api_key = "a6a49bf063005dd814b426f0e925308926fdc08c"#os.environ.get(, None)
#assert self.serper_api_key is not None, "Please set the SERPER_API_KEY environment variable."
#assert self.serper_api_key != '', "Please set the SERPER_API_KEY environment variable."
async def _google_serper_search_results(self, session, search_term: str, gl: str, hl: str) -> dict:
headers = {
"X-API-KEY": "f6fec9a06c92981a1734ff670c7d645e56120ad5",
"Content-Type": "application/json",
}
params = {"q": search_term, "gl": gl, "hl": hl}
async with session.post(
"https://google.serper.dev/search", headers=headers, params=params, raise_for_status=True
) as response:
return await response.json()
def _parse_results(self, results):
snippets = []
if results.get("answerBox"):
answer_box = results.get("answerBox", {})
if answer_box.get("answer"):
element = {"content":answer_box.get("answer"),"source":"None"}
return [element]
elif answer_box.get("snippet"):
element = {"content":answer_box.get("snippet").replace("\n", " "),"source":"None"}
return [element]
elif answer_box.get("snippetHighlighted"):
element = {"content":answer_box.get("snippetHighlighted"),"source":"None"}
return [element]
if results.get("knowledgeGraph"):
kg = results.get("knowledgeGraph", {})
title = kg.get("title")
entity_type = kg.get("type")
if entity_type:
element = {"content":f"{title}: {entity_type}","source":"None"}
snippets.append(element)
description = kg.get("description")
if description:
element = {"content":description,"source":"None"}
snippets.append(element)
for attribute, value in kg.get("attributes", {}).items():
element = {"content":f"{attribute}: {value}","source":"None"}
snippets.append(element)
for result in results["organic"][: self.k]:
if "snippet" in result:
if result["snippet"].find("Missing") != -1:
continue
element = {"content":result["snippet"],"source":result["link"]}
snippets.append(element)
for attribute, value in result.get("attributes", {}).items():
element = {"content":f"{attribute}: {value}","source":result["link"]}
if element["content"].find("Missing") != -1:
continue
snippets.append(element)
if len(snippets) == 0:
element = {"content":"No good Google Search Result was found","source":"None"}
return [element]
# keep only the first k snippets
# TODO all in
snippets = snippets[:int(self.k / 2)]
return snippets
async def parallel_searches(self, search_queries, gl, hl):
async with aiohttp.ClientSession() as session:
tasks = [self._google_serper_search_results(session, query, gl, hl) for query in search_queries]
search_results = await asyncio.gather(*tasks, return_exceptions=True)
return search_results
async def run(self, queries):
"""Run query through GoogleSearch and parse result."""
flattened_queries = []
for sublist in queries:
if sublist is None:
sublist = ['None', 'None']
for item in sublist:
flattened_queries.append(item)
results = await self.parallel_searches(flattened_queries, gl=self.gl, hl=self.hl)
# print(results)
# print(len(results))
snippets_list = []
for i in range(len(results)):
snippets_list.append(self._parse_results(results[i]))
# print(snippets_list)
# print(len(snippets_list))
# TODO review
snippets_split = [snippets_list[i] + snippets_list[i+1] for i in range(0, len(snippets_list), 2)]
return snippets_split
def execute(self,input,content):
query_list = [content.split(",")[0][2:-1],content.split(",")[1][2:-2]]
#print(query_list) get_event_loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
search_outputs_for_claims = loop.run_until_complete(self.run([query_list]))
evidences = [[output['content'] for output in search_outputs_for_claim] for search_outputs_for_claim in
search_outputs_for_claims]
#print(evidences)
return evidences[0]
if __name__ == "__main__":
search = GoogleSerperAPIWrapper()
evidence = search.execute(input="", content="['yu xiang rou si','Volkswagen logo']")
# print(evidence[0])
# print(evidence[1])
print(evidence)
# search.execute("Samsung is a South Korean multinational conglomerate headquartered in Suwon, South Korea")
# loop = asyncio.get_event_loop()
# What is the capital of the United States?
# Could you provide information on the focus of AMGTV as a television network?
# "Could you please inform me whether Tata Motors is included in the BSE SENSEX index?", "Tata Motors"
# "Who is the CEO of twitter?", "CEO Twitter"
# Could you please provide some information about Sancho Panza and his role in the poem by Don Miguel de Cervantes Saavedra?
# search_outputs_for_claims = loop.run_until_complete(search.run([["Is Samsung a South Korean multinational conglomerate headquartered in Suwon, South Korea?"]]))
# #print(search_outputs_for_claims)
# evidences = [[output['content'] for output in search_outputs_for_claim] for search_outputs_for_claim in
# search_outputs_for_claims]
# print(evidences)
# print(loop.run_until_complete(sea.run([["Could you please inform me whether Tata Motors is included in the BSE SENSEX index?", "Tata Motors"]])))
# print(asyncio.run(sea.run("What is the capital of the United States?")))
# [["On the contrary, Dr Reddy's Labs, with a weightage of 0.8% on the index, is likely to have seen an outflow of $90 million, according to Nuvama ...", "Check Tata Motors Ltd live BSE/NSE stock price along with it's performance analysis, share price history, market capitalization, shareholding & financial ...", "Asia Index Private Limited on Friday announced reconstitution S&P BSE Sensex with auto major Tata Motors to replace pharma stock Dr Reddy's ...", 'Tata Motors Share Price: Find the latest news on Tata Motors Stock Price. Get all the information on Tata Motors with historic price charts for NSE / BSE.', "Stock exchange BSE today announced that auto major Tata Motors will replace pharma stock Dr Reddy's Laboratories in Sensex from next month.", 'ATA Motorcars: Used car dealer in Lilburn, Georgia', 'Address: 3945 Lawrenceville Hwy RM 6, Lilburn, GA 30047', 'Hours: Closed ⋅ Opens 10\u202fAM Mon', 'Phone: (470) 268-7745', 'ATA Motorcars is your #1 source for buying a quality pre-owned vehicle. We have extensive relationships in the dealer community allowing us to purchase a wide ...']]