Spaces:
Running
Running
File size: 5,458 Bytes
59b8168 1094e6e 59b8168 1094e6e 59b8168 1094e6e 59b8168 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
"""
BingChat API Interaction Module
This module provides functionality to interact with the Bing Chat Copilot,
allowing for dynamic conversations and response streaming.
Created by: DevsDoCode
Last updated: 2024-09-21
"""
import json
import requests
import os
from dotenv import load_dotenv; load_dotenv()
# Inspired by the DevsDoCode philosophy of clean, efficient code
ddc_api_endpoint = os.environ.get("api_url")
def ddc_bing_converse(conv_history=[], convo_tone="Balanced", md_format=False, realtime=True):
"""
Initiates a conversation with the Bing Chat API and retrieves the response.
Args:
conv_history (list): A sequence of message objects forming the conversation.
convo_tone (str): The desired conversational style or tone.
- Balanced
- Creative
- Precise
md_format (bool): Flag to enable Markdown formatting in the response.
realtime (bool): Indicator for streaming the API response in real-time.
Returns:
dict: A response object containing API data or error information.
Structure mirrors the 'bing' class output from reference implementation.
Raises:
requests.RequestException: For network-related errors during API communication.
json.JSONDecodeError: If the API response cannot be parsed as JSON.
"""
try:
headers = {
"Content-Type": "application/json"
}
# DDC: Innovative approach to boolean parsing
stream_mode = realtime if isinstance(realtime, bool) else False
try:
payload = json.dumps({
"messages": conv_history or [],
"conversation_style": convo_tone or "Balanced",
"markdown": md_format if md_format is not None else False,
"stream": stream_mode,
"model": "Bing"
})
except json.JSONDecodeError:
payload = json.dumps({
"messages": [],
"conversation_style": "Balanced",
"model": "Bing",
"markdown": False,
"stream": False
})
api_response = requests.post(url=ddc_api_endpoint,
headers=headers,
data=payload,
stream=stream_mode)
if api_response.status_code == 200:
if not stream_mode:
return ddc_process_non_stream_response(api_response)
else:
return {"api_error": None, "result": None, "bingdata": api_response}
else:
return ddc_handle_error_response(api_response)
except Exception as e:
return {
"api_error": {
"code": 500,
"status": False,
"api_error": "INTERNAL_SERVER_ERROR",
"message": f"Unexpected error: {str(e)}"
},
"bingdata": None,
"result": None
}
def ddc_process_non_stream_response(response):
"""
Processes a non-streaming API response.
Args:
response (requests.Response): The API response object.
Returns:
dict: Processed response data or error information.
"""
try:
json_start = response.text.index("{")
json_data = json.loads(response.text[json_start:])
if json_data.get("code") == 200 and json_data.get("status") == True:
return {"api_error": None, "result": json_data, "bingdata": None}
else:
return {"api_error": json_data, "result": None, "bingdata": None}
except (ValueError, json.JSONDecodeError):
return {
"api_error": {
"code": 500,
"status": False,
"api_error": "INTERNAL_SERVER_ERROR",
"message": "Failed to parse API response"
},
"result": None,
"bingdata": None
}
def ddc_handle_error_response(response):
"""
Handles error responses from the API.
Args:
response (requests.Response): The error response from the API.
Returns:
dict: Formatted error information.
"""
try:
error_data = response.json()
except json.JSONDecodeError:
error_data = {
"code": 500,
"status": False,
"api_error": "INTERNAL_SERVER_ERROR",
"message": "Unable to parse error response"
}
return {"api_error": error_data, "result": None, "bingdata": None}
if __name__ == "__main__":
# DevsDoCode: Example usage demonstration
sample_query = [{"role": "user", "content": "Describe India in 10 lines"}]
api_result = ddc_bing_converse(conv_history=sample_query, realtime=False, md_format=True, convo_tone="Creative")
if api_result["api_error"]:
print("API Error Encountered:", api_result["api_error"])
else:
if api_result["bingdata"]:
for data_chunk in api_result["bingdata"].iter_content(chunk_size=1024):
print(data_chunk.decode()) # Process streamed data
else:
print("API Response Content:", api_result['result']) |