import aiohttp import asyncio import certifi import httpx import json import pprint import ssl import urllib from chathub_request_constructor import ChathubRequestConstructor from cookies_constructor import CookiesConstructor ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(certifi.where()) http_proxy = "http://localhost:11111" class ConversationCreator: conversation_create_url = "https://www.bing.com/turing/conversation/create" def __init__(self, cookies={}): self.cookies = cookies self.construct_cookies() def construct_cookies(self): self.httpx_cookies = httpx.Cookies() for key, val in self.cookies.items(): self.httpx_cookies.set(key, val) def create(self, proxy=None): self.response = httpx.get( self.conversation_create_url, proxies=http_proxy if proxy is None else proxy, cookies=self.httpx_cookies, ) self.response_content = json.loads(self.response.content.decode("utf-8")) self.response_headers = dict(self.response.headers) pprint.pprint(self.response_content) # pprint.pprint(self.response_headers) def serialize_websockets_message(msg: dict) -> str: return json.dumps(msg, ensure_ascii=False) + "\x1e" class ConversationChatter: def __init__( self, sec_access_token=None, client_id=None, conversation_id=None, invocation_id=0, cookies={}, ): self.sec_access_token = sec_access_token self.client_id = client_id self.conversation_id = conversation_id self.invocation_id = invocation_id self.cookies = cookies self.ws_url = ( "wss://sydney.bing.com/sydney/ChatHub" + f"?sec_access_token={urllib.parse.quote(self.sec_access_token)}" ) async def _init_handshake(self, wss): await wss.send_str( serialize_websockets_message({"protocol": "json", "version": 1}) ) await wss.receive_str() await wss.send_str(serialize_websockets_message({"type": 6})) async def stream_chat(self, prompt=""): self.aio_session = aiohttp.ClientSession(cookies=self.cookies) request_headers = { "Accept-Encoding": " gzip, deflate, br", "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7", "Cache-Control": "no-cache", "Connection": "Upgrade", "Host": "sydney.bing.com", "Origin": "https://www.bing.com", "Pragma": "no-cache", "Sec-Websocket-Extensions": "permessage-deflate; client_max_window_bits", # "Sec-Websocket-Key": "**********************==", "Sec-Websocket-Version": "13", "Upgrade": "websocket", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", } wss = await self.aio_session.ws_connect( self.ws_url, headers=request_headers, ssl=ssl_context, proxy=http_proxy, ) await self._init_handshake(wss) chathub_request_construtor = ChathubRequestConstructor( prompt=prompt, conversation_style="precise", client_id=self.client_id, conversation_id=self.conversation_id, invocation_id=self.invocation_id, ) chathub_request_construtor.construct() await wss.send_str( serialize_websockets_message(chathub_request_construtor.request_message) ) delta_content_pointer = 0 while not wss.closed: response_lines_str = await wss.receive_str() if isinstance(response_lines_str, str): response_lines = response_lines_str.split("\x1e") else: continue for line in response_lines: if not line: continue data = json.loads(line) if data.get("type") == 1: arguments = data["arguments"][0] if arguments.get("throttling"): throttling = arguments.get("throttling") # pprint.pprint(throttling) if arguments.get("messages"): for message in arguments.get("messages"): message_type = message.get("messageType") if message_type is None: # Displayed message does not contain 'messageType' message_html = message["adaptiveCards"][0]["body"][0][ "text" ] delta_content = message_html[delta_content_pointer:] print(delta_content, end="", flush=True) delta_content_pointer = len(message_html) if message.get("suggestedResponses"): print("\nSuggested Questions: ", flush=True) for suggestion in message.get("suggestedResponses"): suggestion_text = suggestion.get("text") print(f"- {suggestion_text}", flush=True) elif message_type in ["InternalSearchQuery"]: message_hidden_text = message["hiddenText"] print( f"\n[Searching: [{message_hidden_text}]]", flush=True, ) elif message_type in [ "InternalSearchResult", ]: print("[Analyzing search results ...]", flush=True) elif message_type in ["InternalLoaderMessage"]: print("[Generating answers ...]\n", flush=True) elif message_type in ["RenderCardRequest"]: continue else: raise NotImplementedError( f"Not Supported Message Type: {message_type}" ) elif data.get("type") == 2: if data.get("item"): item = data.get("item") print("\n[Saving chat messages ...]") # for message in item.get("messages"): # author = message["author"] # message_text = message["text"] elif data.get("type") == 3: print("[Finished]") await wss.close() break elif data.get("type") == 6: continue else: # pprint.pprint(data) continue if __name__ == "__main__": creator = ConversationCreator() creator.create() chatter = ConversationChatter( sec_access_token=creator.response_headers[ "x-sydney-encryptedconversationsignature" ], client_id=creator.response_content["clientId"], conversation_id=creator.response_content["conversationId"], ) prompt = "Today's weather of California" print(f"\n[User]: {prompt}\n") print(f"[Bing]:") loop = asyncio.get_event_loop() loop.run_until_complete(chatter.stream_chat(prompt=prompt)) loop.close()