Spaces:
Running
Running
""" | |
BingChat API Interaction Module | |
This module provides functionality to interact with the Bing Chat Copilot, | |
allowing for dynamic conversations and response streaming. | |
Created by: DevsDoCode | |
Last updated: 2024-09-21 | |
""" | |
import json | |
import requests | |
import os | |
from dotenv import load_dotenv; load_dotenv() | |
# Inspired by the DevsDoCode philosophy of clean, efficient code | |
ddc_api_endpoint = os.environ.get("api_url") | |
def ddc_bing_converse(conv_history=[], convo_tone="", md_format=False, realtime=True): | |
""" | |
Initiates a conversation with the Bing Chat API and retrieves the response. | |
Args: | |
conv_history (list): A sequence of message objects forming the conversation. | |
convo_tone (str): The desired conversational style or tone. | |
md_format (bool): Flag to enable Markdown formatting in the response. | |
realtime (bool): Indicator for streaming the API response in real-time. | |
Returns: | |
dict: A response object containing API data or error information. | |
Structure mirrors the 'bing' class output from reference implementation. | |
Raises: | |
requests.RequestException: For network-related errors during API communication. | |
json.JSONDecodeError: If the API response cannot be parsed as JSON. | |
""" | |
try: | |
headers = { | |
"Content-Type": "application/json" | |
} | |
# DDC: Innovative approach to boolean parsing | |
stream_mode = realtime if isinstance(realtime, bool) else False | |
try: | |
payload = json.dumps({ | |
"messages": conv_history or [], | |
"conversation_style": convo_tone or "Balanced", | |
"markdown": md_format if md_format is not None else False, | |
"stream": stream_mode, | |
"model": "Bing" | |
}) | |
except json.JSONDecodeError: | |
payload = json.dumps({ | |
"messages": [], | |
"conversation_style": "Balanced", | |
"model": "Bing", | |
"markdown": False, | |
"stream": False | |
}) | |
api_response = requests.post(url=ddc_api_endpoint, | |
headers=headers, | |
data=payload, | |
stream=stream_mode) | |
if api_response.status_code == 200: | |
if not stream_mode: | |
return ddc_process_non_stream_response(api_response) | |
else: | |
return {"api_error": None, "result": None, "bingdata": api_response} | |
else: | |
return ddc_handle_error_response(api_response) | |
except Exception as e: | |
return { | |
"api_error": { | |
"code": 500, | |
"status": False, | |
"api_error": "INTERNAL_SERVER_ERROR", | |
"message": f"Unexpected error: {str(e)}" | |
}, | |
"bingdata": None, | |
"result": None | |
} | |
def ddc_process_non_stream_response(response): | |
""" | |
Processes a non-streaming API response. | |
Args: | |
response (requests.Response): The API response object. | |
Returns: | |
dict: Processed response data or error information. | |
""" | |
try: | |
json_start = response.text.index("{") | |
json_data = json.loads(response.text[json_start:]) | |
if json_data.get("code") == 200 and json_data.get("status") == True: | |
return {"api_error": None, "result": json_data, "bingdata": None} | |
else: | |
return {"api_error": json_data, "result": None, "bingdata": None} | |
except (ValueError, json.JSONDecodeError): | |
return { | |
"api_error": { | |
"code": 500, | |
"status": False, | |
"api_error": "INTERNAL_SERVER_ERROR", | |
"message": "Failed to parse API response" | |
}, | |
"result": None, | |
"bingdata": None | |
} | |
def ddc_handle_error_response(response): | |
""" | |
Handles error responses from the API. | |
Args: | |
response (requests.Response): The error response from the API. | |
Returns: | |
dict: Formatted error information. | |
""" | |
try: | |
error_data = response.json() | |
except json.JSONDecodeError: | |
error_data = { | |
"code": 500, | |
"status": False, | |
"api_error": "INTERNAL_SERVER_ERROR", | |
"message": "Unable to parse error response" | |
} | |
return {"api_error": error_data, "result": None, "bingdata": None} | |
if __name__ == "__main__": | |
# DevsDoCode: Example usage demonstration | |
sample_query = [{"role": "user", "content": "Describe India in 10 lines"}] | |
api_result = ddc_bing_converse(conv_history=sample_query, realtime=False, md_format=True) | |
if api_result["api_error"]: | |
print("API Error Encountered:", api_result["api_error"]) | |
else: | |
if api_result["bingdata"]: | |
for data_chunk in api_result["bingdata"].iter_content(chunk_size=1024): | |
print(data_chunk.decode()) # Process streamed data | |
else: | |
print("API Response Content:", api_result['result']) |