import asyncio
import hashlib
import hmac
import json
import logging
import random
import re
import time
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from http.client import HTTPSConnection
from typing import List, Optional, Tuple, Type, Union

import aiohttp
import aiohttp.client_exceptions
import requests
from asyncache import cached as acached
from bs4 import BeautifulSoup
from cachetools import TTLCache, cached
from duckduckgo_search import DDGS, AsyncDDGS

from lagent.actions.base_action import AsyncActionMixin, BaseAction, tool_api
from lagent.actions.parser import BaseParser, JsonParser
from lagent.utils import async_as_completed


class BaseSearch:

    def __init__(self, topk: int = 3, black_list: List[str] = None):
        self.topk = topk
        self.black_list = black_list

    def _filter_results(self, results: List[tuple]) -> dict:
        filtered_results = {}
        count = 0
        for url, snippet, title in results:
            if all(domain not in url
                   for domain in self.black_list) and not url.endswith('.pdf'):
                filtered_results[count] = {
                    'url': url,
                    'summ': json.dumps(snippet, ensure_ascii=False)[1:-1],
                    'title': title
                }
                count += 1
                if count >= self.topk:
                    break
        return filtered_results


class DuckDuckGoSearch(BaseSearch):

    def __init__(self,
                 topk: int = 3,
                 black_list: List[str] = [
                     'enoN',
                     'youtube.com',
                     'bilibili.com',
                     'researchgate.net',
                 ],
                 **kwargs):
        self.proxy = kwargs.get('proxy')
        self.timeout = kwargs.get('timeout', 30)
        super().__init__(topk, black_list)

    @cached(cache=TTLCache(maxsize=100, ttl=600))
    def search(self, query: str, max_retry: int = 3) -> dict:
        for attempt in range(max_retry):
            try:
                response = self._call_ddgs(
                    query, timeout=self.timeout, proxy=self.proxy)
                return self._parse_response(response)
            except Exception as e:
                logging.exception(str(e))
                warnings.warn(
                    f'Retry {attempt + 1}/{max_retry} due to error: {e}')
                time.sleep(random.randint(2, 5))
        raise Exception(
            'Failed to get search results from DuckDuckGo after retries.')

    @acached(cache=TTLCache(maxsize=100, ttl=600))
    async def asearch(self, query: str, max_retry: int = 3) -> dict:
        for attempt in range(max_retry):
            try:
                ddgs = AsyncDDGS(timeout=self.timeout, proxy=self.proxy)
                response = await ddgs.atext(query.strip("'"), max_results=10)
                return self._parse_response(response)
            except Exception as e:
                if isinstance(e, asyncio.TimeoutError):
                    logging.exception('Request to DDGS timed out.')
                logging.exception(str(e))
                warnings.warn(
                    f'Retry {attempt + 1}/{max_retry} due to error: {e}')
                await asyncio.sleep(random.randint(2, 5))
        raise Exception(
            'Failed to get search results from DuckDuckGo after retries.')

    async def _async_call_ddgs(self, query: str, **kwargs) -> dict:
        ddgs = DDGS(**kwargs)
        try:
            response = await asyncio.wait_for(
                asyncio.to_thread(ddgs.text, query.strip("'"), max_results=10),
                timeout=self.timeout)
            return response
        except asyncio.TimeoutError:
            logging.exception('Request to DDGS timed out.')
            raise

    def _call_ddgs(self, query: str, **kwargs) -> dict:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            response = loop.run_until_complete(
                self._async_call_ddgs(query, **kwargs))
            return response
        finally:
            loop.close()

    def _parse_response(self, response: dict) -> dict:
        raw_results = []
        for item in response:
            raw_results.append(
                (item['href'], item['description']
                 if 'description' in item else item['body'], item['title']))
        return self._filter_results(raw_results)


class BingSearch(BaseSearch):

    def __init__(self,
                 api_key: str,
                 region: str = 'zh-CN',
                 topk: int = 3,
                 black_list: List[str] = [
                     'enoN',
                     'youtube.com',
                     'bilibili.com',
                     'researchgate.net',
                 ],
                 **kwargs):
        self.api_key = api_key
        self.market = region
        self.proxy = kwargs.get('proxy')
        super().__init__(topk, black_list)

    @cached(cache=TTLCache(maxsize=100, ttl=600))
    def search(self, query: str, max_retry: int = 3) -> dict:
        for attempt in range(max_retry):
            try:
                response = self._call_bing_api(query)
                return self._parse_response(response)
            except Exception as e:
                logging.exception(str(e))
                warnings.warn(
                    f'Retry {attempt + 1}/{max_retry} due to error: {e}')
                time.sleep(random.randint(2, 5))
        raise Exception(
            'Failed to get search results from Bing Search after retries.')

    @acached(cache=TTLCache(maxsize=100, ttl=600))
    async def asearch(self, query: str, max_retry: int = 3) -> dict:
        for attempt in range(max_retry):
            try:
                response = await self._async_call_bing_api(query)
                return self._parse_response(response)
            except Exception as e:
                logging.exception(str(e))
                warnings.warn(
                    f'Retry {attempt + 1}/{max_retry} due to error: {e}')
                await asyncio.sleep(random.randint(2, 5))
        raise Exception(
            'Failed to get search results from Bing Search after retries.')

    def _call_bing_api(self, query: str) -> dict:
        endpoint = 'https://api.bing.microsoft.com/v7.0/search'
        params = {'q': query, 'mkt': self.market, 'count': f'{self.topk * 2}'}
        headers = {'Ocp-Apim-Subscription-Key': self.api_key}
        response = requests.get(
            endpoint, headers=headers, params=params, proxies=self.proxy)
        response.raise_for_status()
        return response.json()

    async def _async_call_bing_api(self, query: str) -> dict:
        endpoint = 'https://api.bing.microsoft.com/v7.0/search'
        params = {'q': query, 'mkt': self.market, 'count': f'{self.topk * 2}'}
        headers = {'Ocp-Apim-Subscription-Key': self.api_key}
        async with aiohttp.ClientSession(raise_for_status=True) as session:
            async with session.get(
                    endpoint,
                    headers=headers,
                    params=params,
                    proxy=self.proxy and
                (self.proxy.get('http') or self.proxy.get('https'))) as resp:
                return await resp.json()

    def _parse_response(self, response: dict) -> dict:
        webpages = {
            w['id']: w
            for w in response.get('webPages', {}).get('value', [])
        }
        raw_results = []

        for item in response.get('rankingResponse',
                                 {}).get('mainline', {}).get('items', []):
            if item['answerType'] == 'WebPages':
                webpage = webpages.get(item['value']['id'])
                if webpage:
                    raw_results.append(
                        (webpage['url'], webpage['snippet'], webpage['name']))
            elif item['answerType'] == 'News' and item['value'][
                    'id'] == response.get('news', {}).get('id'):
                for news in response.get('news', {}).get('value', []):
                    raw_results.append(
                        (news['url'], news['description'], news['name']))

        return self._filter_results(raw_results)


class BraveSearch(BaseSearch):
    """
    Wrapper around the Brave Search API.

    To use, you should pass your Brave Search API key to the constructor.

    Args:
        api_key (str): API KEY to use Brave Search API.
            You can create a free API key at https://api.search.brave.com/app/keys.
        search_type (str): Brave Search API supports ['web', 'news', 'images', 'videos'],
            currently only supports 'news' and 'web'.
        topk (int): The number of search results returned in response from API search results.
        region (str): The country code string. Specifies the country where the search results come from.
        language (str): The language code string. Specifies the preferred language for the search results.
        extra_snippets (bool): Allows retrieving up to 5 additional snippets, which are alternative excerpts from the search results.
        **kwargs: Any other parameters related to the Brave Search API. Find more details at
            https://api.search.brave.com/app/documentation/web-search/get-started.
    """

    def __init__(self,
                 api_key: str,
                 region: str = 'ALL',
                 language: str = 'zh-hans',
                 extra_snippests: bool = True,
                 topk: int = 3,
                 black_list: List[str] = [
                     'enoN',
                     'youtube.com',
                     'bilibili.com',
                     'researchgate.net',
                 ],
                 **kwargs):
        self.api_key = api_key
        self.market = region
        self.proxy = kwargs.get('proxy')
        self.language = language
        self.extra_snippests = extra_snippests
        self.search_type = kwargs.get('search_type', 'web')
        self.kwargs = kwargs
        super().__init__(topk, black_list)

    @cached(cache=TTLCache(maxsize=100, ttl=600))
    def search(self, query: str, max_retry: int = 3) -> dict:
        for attempt in range(max_retry):
            try:
                response = self._call_brave_api(query)
                return self._parse_response(response)
            except Exception as e:
                logging.exception(str(e))
                warnings.warn(
                    f'Retry {attempt + 1}/{max_retry} due to error: {e}')
                time.sleep(random.randint(2, 5))
        raise Exception(
            'Failed to get search results from Brave Search after retries.')

    @acached(cache=TTLCache(maxsize=100, ttl=600))
    async def asearch(self, query: str, max_retry: int = 3) -> dict:
        for attempt in range(max_retry):
            try:
                response = await self._async_call_brave_api(query)
                return self._parse_response(response)
            except Exception as e:
                logging.exception(str(e))
                warnings.warn(
                    f'Retry {attempt + 1}/{max_retry} due to error: {e}')
                await asyncio.sleep(random.randint(2, 5))
        raise Exception(
            'Failed to get search results from Brave Search after retries.')

    def _call_brave_api(self, query: str) -> dict:
        endpoint = f'https://api.search.brave.com/res/v1/{self.search_type}/search'
        params = {
            'q': query,
            'country': self.market,
            'search_lang': self.language,
            'extra_snippets': self.extra_snippests,
            'count': self.topk,
            **{
                key: value
                for key, value in self.kwargs.items() if value is not None
            },
        }
        headers = {
            'X-Subscription-Token': self.api_key or '',
            'Accept': 'application/json'
        }
        response = requests.get(
            endpoint, headers=headers, params=params, proxies=self.proxy)
        response.raise_for_status()
        return response.json()

    async def _async_call_brave_api(self, query: str) -> dict:
        endpoint = f'https://api.search.brave.com/res/v1/{self.search_type}/search'
        params = {
            'q': query,
            'country': self.market,
            'search_lang': self.language,
            'extra_snippets': self.extra_snippests,
            'count': self.topk,
            **{
                key: value
                for key, value in self.kwargs.items() if value is not None
            },
        }
        headers = {
            'X-Subscription-Token': self.api_key or '',
            'Accept': 'application/json'
        }
        async with aiohttp.ClientSession(raise_for_status=True) as session:
            async with session.get(
                    endpoint,
                    headers=headers,
                    params=params,
                    proxy=self.proxy and
                (self.proxy.get('http') or self.proxy.get('https'))) as resp:
                return await resp.json()

    def _parse_response(self, response: dict) -> dict:
        if self.search_type == 'web':
            filtered_result = response.get('web', {}).get('results', [])
        else:
            filtered_result = response.get('results', {})
        raw_results = []

        for item in filtered_result:
            raw_results.append((
                item.get('url', ''),
                ' '.join(
                    filter(None, [
                        item.get('description'),
                        *item.get('extra_snippets', [])
                    ])),
                item.get('title', ''),
            ))
        return self._filter_results(raw_results)


class GoogleSearch(BaseSearch):
    """
    Wrapper around the Serper.dev Google Search API.

    To use, you should pass your serper API key to the constructor.

    Args:
        api_key (str): API KEY to use serper google search API.
            You can create a free API key at https://serper.dev.
        search_type (str): Serper API supports ['search', 'images', 'news',
            'places'] types of search, currently we only support 'search' and 'news'.
        topk (int): The number of search results returned in response from api search results.
        **kwargs: Any other parameters related to the Serper API. Find more details at
            https://serper.dev/playground
    """

    result_key_for_type = {
        'news': 'news',
        'places': 'places',
        'images': 'images',
        'search': 'organic',
    }

    def __init__(self,
                 api_key: str,
                 topk: int = 3,
                 black_list: List[str] = [
                     'enoN',
                     'youtube.com',
                     'bilibili.com',
                     'researchgate.net',
                 ],
                 **kwargs):
        self.api_key = api_key
        self.proxy = kwargs.get('proxy')
        self.search_type = kwargs.get('search_type', 'search')
        self.kwargs = kwargs
        super().__init__(topk, black_list)

    @cached(cache=TTLCache(maxsize=100, ttl=600))
    def search(self, query: str, max_retry: int = 3) -> dict:
        for attempt in range(max_retry):
            try:
                response = self._call_serper_api(query)
                return self._parse_response(response)
            except Exception as e:
                logging.exception(str(e))
                warnings.warn(
                    f'Retry {attempt + 1}/{max_retry} due to error: {e}')
                time.sleep(random.randint(2, 5))
        raise Exception(
            'Failed to get search results from Google Serper Search after retries.'
        )

    @acached(cache=TTLCache(maxsize=100, ttl=600))
    async def asearch(self, query: str, max_retry: int = 3) -> dict:
        for attempt in range(max_retry):
            try:
                response = await self._async_call_serper_api(query)
                return self._parse_response(response)
            except Exception as e:
                logging.exception(str(e))
                warnings.warn(
                    f'Retry {attempt + 1}/{max_retry} due to error: {e}')
                await asyncio.sleep(random.randint(2, 5))
        raise Exception(
            'Failed to get search results from Google Serper Search after retries.'
        )

    def _call_serper_api(self, query: str) -> dict:
        endpoint = f'https://google.serper.dev/{self.search_type}'
        params = {
            'q': query,
            'num': self.topk,
            **{
                key: value
                for key, value in self.kwargs.items() if value is not None
            },
        }
        headers = {
            'X-API-KEY': self.api_key or '',
            'Content-Type': 'application/json'
        }
        response = requests.get(
            endpoint, headers=headers, params=params, proxies=self.proxy)
        response.raise_for_status()
        return response.json()

    async def _async_call_serper_api(self, query: str) -> dict:
        endpoint = f'https://google.serper.dev/{self.search_type}'
        params = {
            'q': query,
            'num': self.topk,
            **{
                key: value
                for key, value in self.kwargs.items() if value is not None
            },
        }
        headers = {
            'X-API-KEY': self.api_key or '',
            'Content-Type': 'application/json'
        }
        async with aiohttp.ClientSession(raise_for_status=True) as session:
            async with session.get(
                    endpoint,
                    headers=headers,
                    params=params,
                    proxy=self.proxy and
                (self.proxy.get('http') or self.proxy.get('https'))) as resp:
                return await resp.json()

    def _parse_response(self, response: dict) -> dict:
        raw_results = []

        if response.get('answerBox'):
            answer_box = response.get('answerBox', {})
            if answer_box.get('answer'):
                raw_results.append(('', answer_box.get('answer'), ''))
            elif answer_box.get('snippet'):
                raw_results.append(
                    ('', answer_box.get('snippet').replace('\n', ' '), ''))
            elif answer_box.get('snippetHighlighted'):
                raw_results.append(
                    ('', answer_box.get('snippetHighlighted'), ''))

        if response.get('knowledgeGraph'):
            kg = response.get('knowledgeGraph', {})
            description = kg.get('description', '')
            attributes = '. '.join(
                f'{attribute}: {value}'
                for attribute, value in kg.get('attributes', {}).items())
            raw_results.append(
                (kg.get('descriptionLink', ''),
                 f'{description}. {attributes}' if attributes else description,
                 f"{kg.get('title', '')}: {kg.get('type', '')}."))

        for result in response[self.result_key_for_type[
                self.search_type]][:self.topk]:
            description = result.get('snippet', '')
            attributes = '. '.join(
                f'{attribute}: {value}'
                for attribute, value in result.get('attributes', {}).items())
            raw_results.append(
                (result.get('link', ''),
                 f'{description}. {attributes}' if attributes else description,
                 result.get('title', '')))

        return self._filter_results(raw_results)


class TencentSearch(BaseSearch):
    """Wrapper around the tencentclound Search API.

    To use, you should pass your secret_id and secret_key to the constructor.

    Args:
        secret_id (str): Your Tencent Cloud secret ID for accessing the API.
            For more details, refer to the documentation: https://cloud.tencent.com/document/product/598/40488.
        secret_key (str): Your Tencent Cloud secret key for accessing the API.
        api_key (str, optional): Additional API key, if required.
        action (str): The action for this interface, use `SearchCommon`.
        version (str): The API version, use `2020-12-29`.
        service (str): The service name, use `tms`.
        host (str): The API host, use `tms.tencentcloudapi.com`.
        topk (int): The maximum number of search results to return.
        tsn (int): Time filter for search results. Valid values:
            1 (within 1 day), 2 (within 1 week), 3 (within 1 month),
            4 (within 1 year), 5 (within 6 months), 6 (within 3 years).
        insite (str): Specify a site to search within (supports only a single site).
            If not specified, the entire web is searched. Example: `zhihu.com`.
        category (str): Vertical category for filtering results. Optional values include:
            `baike` (encyclopedia), `weather`, `calendar`, `medical`, `news`, `train`, `star` (horoscope).
        vrid (str): Result card type(s). Different `vrid` values represent different types of result cards.
            Supports multiple values separated by commas. Example: `30010255`.
    """

    def __init__(self,
                 secret_id: str = 'Your SecretId',
                 secret_key: str = 'Your SecretKey',
                 api_key: str = '',
                 action: str = 'SearchCommon',
                 version: str = '2020-12-29',
                 service: str = 'tms',
                 host: str = 'tms.tencentcloudapi.com',
                 topk: int = 3,
                 tsn: int = None,
                 insite: str = None,
                 category: str = None,
                 vrid: str = None,
                 black_list: List[str] = [
                     'enoN',
                     'youtube.com',
                     'bilibili.com',
                     'researchgate.net',
                 ]):
        self.secret_id = secret_id
        self.secret_key = secret_key
        self.api_key = api_key
        self.action = action
        self.version = version
        self.service = service
        self.host = host
        self.tsn = tsn
        self.insite = insite
        self.category = category
        self.vrid = vrid
        super().__init__(topk, black_list=black_list)

    @cached(cache=TTLCache(maxsize=100, ttl=600))
    def search(self, query: str, max_retry: int = 3) -> dict:
        for attempt in range(max_retry):
            try:
                response = self._call_tencent_api(query)
                return self._parse_response(response)
            except Exception as e:
                logging.exception(str(e))
                warnings.warn(
                    f'Retry {attempt + 1}/{max_retry} due to error: {e}')
                time.sleep(random.randint(2, 5))
        raise Exception(
            'Failed to get search results from Bing Search after retries.')

    @acached(cache=TTLCache(maxsize=100, ttl=600))
    async def asearch(self, query: str, max_retry: int = 3) -> dict:
        for attempt in range(max_retry):
            try:
                response = await self._async_call_tencent_api(query)
                return self._parse_response(response)
            except Exception as e:
                logging.exception(str(e))
                warnings.warn(
                    f'Retry {attempt + 1}/{max_retry} due to error: {e}')
                await asyncio.sleep(random.randint(2, 5))
        raise Exception(
            'Failed to get search results from Bing Search after retries.')

    def _get_headers_and_payload(self, query: str) -> tuple:

        def sign(key, msg):
            return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

        params = dict(Query=query)
        # if self.topk:
        #     params['Cnt'] = self.topk
        if self.tsn:
            params['Tsn'] = self.tsn
        if self.insite:
            params['Insite'] = self.insite
        if self.category:
            params['Category'] = self.category
        if self.vrid:
            params['Vrid'] = self.vrid
        payload = json.dumps(params)
        algorithm = 'TC3-HMAC-SHA256'
        timestamp = int(time.time())
        date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d')

        # ************* 步骤 1:拼接规范请求串 *************
        http_request_method = 'POST'
        canonical_uri = '/'
        canonical_querystring = ''
        ct = 'application/json; charset=utf-8'
        canonical_headers = f'content-type:{ct}\nhost:{self.host}\nx-tc-action:{self.action.lower()}\n'
        signed_headers = 'content-type;host;x-tc-action'
        hashed_request_payload = hashlib.sha256(
            payload.encode('utf-8')).hexdigest()
        canonical_request = (
            http_request_method + '\n' + canonical_uri + '\n' +
            canonical_querystring + '\n' + canonical_headers + '\n' +
            signed_headers + '\n' + hashed_request_payload)

        # ************* 步骤 2:拼接待签名字符串 *************
        credential_scope = date + '/' + self.service + '/' + 'tc3_request'
        hashed_canonical_request = hashlib.sha256(
            canonical_request.encode('utf-8')).hexdigest()
        string_to_sign = (
            algorithm + '\n' + str(timestamp) + '\n' + credential_scope +
            '\n' + hashed_canonical_request)

        # ************* 步骤 3:计算签名 *************
        secret_date = sign(('TC3' + self.secret_key).encode('utf-8'), date)
        secret_service = sign(secret_date, self.service)
        secret_signing = sign(secret_service, 'tc3_request')
        signature = hmac.new(secret_signing, string_to_sign.encode('utf-8'),
                             hashlib.sha256).hexdigest()

        # ************* 步骤 4:拼接 Authorization *************
        authorization = (
            algorithm + ' ' + 'Credential=' + self.secret_id + '/' +
            credential_scope + ', ' + 'SignedHeaders=' + signed_headers +
            ', ' + 'Signature=' + signature)

        # ************* 步骤 5:构造并发起请求 *************
        headers = {
            'Authorization': authorization,
            'Content-Type': 'application/json; charset=utf-8',
            'Host': self.host,
            'X-TC-Action': self.action,
            'X-TC-Timestamp': str(timestamp),
            'X-TC-Version': self.version
        }
        # if self.region:
        #     headers["X-TC-Region"] = self.region
        if self.api_key:
            headers['X-TC-Token'] = self.api_key
        return headers, payload

    def _call_tencent_api(self, query: str) -> dict:
        headers, payload = self._get_headers_and_payload(query)
        req = HTTPSConnection(self.host)
        req.request('POST', '/', headers=headers, body=payload.encode('utf-8'))
        resp = req.getresponse()
        try:
            resp = json.loads(resp.read().decode('utf-8'))
        except Exception as e:
            logging.warning(str(e))
            import ast
            resp = ast.literal_eval(resp)
        return resp.get('Response', dict())

    async def _async_call_tencent_api(self, query: str):
        headers, payload = self._get_headers_and_payload(query)
        async with aiohttp.ClientSession(raise_for_status=True) as session:
            async with session.post(
                    'https://' + self.host.lstrip('/'),
                    headers=headers,
                    data=payload) as resp:
                return (await resp.json()).get('Response', {})

    def _parse_response(self, response: dict) -> dict:
        raw_results = []
        for item in response.get('Pages', []):
            display = json.loads(item['Display'])
            if not display['url']:
                continue
            raw_results.append((display['url'], display['content']
                                or display['abstract_info'], display['title']))
        return self._filter_results(raw_results)


class ContentFetcher:

    def __init__(self, timeout: int = 5):
        self.timeout = timeout

    @cached(cache=TTLCache(maxsize=100, ttl=600))
    def fetch(self, url: str) -> Tuple[bool, str]:
        try:
            response = requests.get(url, timeout=self.timeout)
            response.raise_for_status()
            html = response.content
        except requests.RequestException as e:
            return False, str(e)

        text = BeautifulSoup(html, 'html.parser').get_text()
        cleaned_text = re.sub(r'\n+', '\n', text)
        return True, cleaned_text

    @acached(cache=TTLCache(maxsize=100, ttl=600))
    async def afetch(self, url: str) -> Tuple[bool, str]:
        try:
            async with aiohttp.ClientSession(
                    raise_for_status=True,
                    timeout=aiohttp.ClientTimeout(self.timeout)) as session:
                async with session.get(url) as resp:
                    html = await resp.text(errors='ignore')
                    text = BeautifulSoup(html, 'html.parser').get_text()
                    cleaned_text = re.sub(r'\n+', '\n', text)
                    return True, cleaned_text
        except Exception as e:
            return False, str(e)


class WebBrowser(BaseAction):
    """Wrapper around the Web Browser Tool.
    """

    def __init__(self,
                 searcher_type: str = 'DuckDuckGoSearch',
                 timeout: int = 5,
                 black_list: Optional[List[str]] = [
                     'enoN',
                     'youtube.com',
                     'bilibili.com',
                     'researchgate.net',
                 ],
                 topk: int = 20,
                 description: Optional[dict] = None,
                 parser: Type[BaseParser] = JsonParser,
                 **kwargs):
        self.searcher = eval(searcher_type)(
            black_list=black_list, topk=topk, **kwargs)
        self.fetcher = ContentFetcher(timeout=timeout)
        self.search_results = None
        super().__init__(description, parser)

    @tool_api
    def search(self, query: Union[str, List[str]]) -> dict:
        """BING search API
        Args:
            query (List[str]): list of search query strings
        """
        queries = query if isinstance(query, list) else [query]
        search_results = {}

        with ThreadPoolExecutor() as executor:
            future_to_query = {
                executor.submit(self.searcher.search, q): q
                for q in queries
            }

            for future in as_completed(future_to_query):
                query = future_to_query[future]
                try:
                    results = future.result()
                except Exception as exc:
                    warnings.warn(f'{query} generated an exception: {exc}')
                else:
                    for result in results.values():
                        if result['url'] not in search_results:
                            search_results[result['url']] = result
                        else:
                            search_results[
                                result['url']]['summ'] += f"\n{result['summ']}"

        self.search_results = {
            idx: result
            for idx, result in enumerate(search_results.values())
        }
        return self.search_results

    @tool_api
    def select(self, select_ids: List[int]) -> dict:
        """get the detailed content on the selected pages.

        Args:
            select_ids (List[int]): list of index to select. Max number of index to be selected is no more than 4.
        """
        if not self.search_results:
            raise ValueError('No search results to select from.')

        new_search_results = {}
        with ThreadPoolExecutor() as executor:
            future_to_id = {
                executor.submit(self.fetcher.fetch, self.search_results[select_id]['url']): select_id
                for select_id in select_ids if select_id in self.search_results
            }
            for future in as_completed(future_to_id):
                select_id = future_to_id[future]
                try:
                    web_success, web_content = future.result()
                except Exception as exc:
                    warnings.warn(f'{select_id} generated an exception: {exc}')
                else:
                    if web_success:
                        self.search_results[select_id][
                            'content'] = web_content[:8192]
                        new_search_results[select_id] = self.search_results[
                            select_id].copy()
                        new_search_results[select_id].pop('summ')

        return new_search_results

    @tool_api
    def open_url(self, url: str) -> dict:
        print(f'Start Browsing: {url}')
        web_success, web_content = self.fetcher.fetch(url)
        if web_success:
            return {'type': 'text', 'content': web_content}
        else:
            return {'error': web_content}


class AsyncWebBrowser(AsyncActionMixin, WebBrowser):
    """Wrapper around the Web Browser Tool.
    """

    @tool_api
    async def search(self, query: Union[str, List[str]]) -> dict:
        """BING search API
        
        Args:
            query (List[str]): list of search query strings
        """
        queries = query if isinstance(query, list) else [query]
        search_results = {}

        tasks = []
        for q in queries:
            task = asyncio.create_task(self.searcher.asearch(q))
            task.query = q
            tasks.append(task)
        async for future in async_as_completed(tasks):
            query = future.query
            try:
                results = await future
            except Exception as exc:
                warnings.warn(f'{query} generated an exception: {exc}')
            else:
                for result in results.values():
                    if result['url'] not in search_results:
                        search_results[result['url']] = result
                    else:
                        search_results[
                            result['url']]['summ'] += f"\n{result['summ']}"

        self.search_results = {
            idx: result
            for idx, result in enumerate(search_results.values())
        }
        return self.search_results

    @tool_api
    async def select(self, select_ids: List[int]) -> dict:
        """get the detailed content on the selected pages.

        Args:
            select_ids (List[int]): list of index to select. Max number of index to be selected is no more than 4.
        """
        if not self.search_results:
            raise ValueError('No search results to select from.')

        new_search_results = {}
        tasks = []
        for select_id in select_ids:
            if select_id in self.search_results:
                task = asyncio.create_task(
                    self.fetcher.afetch(self.search_results[select_id]['url']))
                task.select_id = select_id
                tasks.append(task)
        async for future in async_as_completed(tasks):
            select_id = future.select_id
            try:
                web_success, web_content = await future
            except Exception as exc:
                warnings.warn(f'{select_id} generated an exception: {exc}')
            else:
                if web_success:
                    self.search_results[select_id][
                        'content'] = web_content[:8192]
                    new_search_results[select_id] = self.search_results[
                        select_id].copy()
                    new_search_results[select_id].pop('summ')
        return new_search_results

    @tool_api
    async def open_url(self, url: str) -> dict:
        print(f'Start Browsing: {url}')
        web_success, web_content = await self.fetcher.afetch(url)
        if web_success:
            return {'type': 'text', 'content': web_content}
        else:
            return {'error': web_content}