Spaces:
Runtime error
Runtime error
""" | |
This file contains all the code which defines architectures and | |
architecture components. | |
""" | |
import chromadb | |
import json | |
import os | |
import regex as re | |
import requests | |
import traceback | |
from abc import ABC, abstractmethod | |
from enum import Enum | |
from time import time | |
from typing import List, Optional | |
from better_profanity import profanity | |
from src.common import config_dir, data_dir, hf_api_token, escape_dollars | |
class ArchitectureRequest: | |
""" | |
This class represents a request (chat query) from a user which can then be built up or | |
modified through the pipeline process. It also holds the response to the request which again | |
is a stack which can be modified through life. | |
""" | |
def __init__(self, query: str): | |
self._request: List[str] = [query] # Stack for the request text as it evolves down the pipeline | |
self._response: List[str] = [] # Stack for the response text as it evolves down the pipeline | |
self.early_exit: bool = False | |
self.early_exit_message: str = None | |
def request(self): | |
return self._request[-1] | |
def request(self, value: str): | |
self._request.append(value) | |
def response(self): | |
if len(self._response) > 0: | |
return self._response[-1] | |
return None | |
def response(self, value: str): | |
self._response.append(value) | |
def as_markdown(self) -> str: | |
""" | |
Returns a markdown representation for display / testing | |
:return: str - the markdown | |
""" | |
md = "- **Request evolution**" | |
for r in self._request: | |
md += f"\n - {r}" | |
md += "\n- **Response evolution**" | |
for r in self._response: | |
md += f"\n - {r}" | |
return escape_dollars(md) | |
class ArchitectureTraceOutcome(Enum): | |
""" | |
Class representing the outcome of a component step in an architecture | |
""" | |
NONE = 0 | |
SUCCESS = 1 | |
EARLY_EXIT = 2 | |
EXCEPTION = 3 | |
class ArchitectureTraceStep: | |
""" | |
Class to hold the details of a single trace step | |
""" | |
def __init__(self, name: str): | |
self.name = name | |
self.start_ms = int(time() * 1000) | |
self.end_ms = None | |
self.outcome = ArchitectureTraceOutcome.NONE | |
self._exception: str = None | |
self.early_exit_message: str = None | |
def end(self, outcome: ArchitectureTraceOutcome): | |
self.end_ms = int(time() * 1000) | |
self.outcome = outcome | |
def exception(self) -> str: | |
return self._exception | |
def exception(self, value: Exception): | |
self._exception = f'{value}' # Hold any exception as a string in the trace | |
def as_markdown(self) -> str: | |
""" | |
Converts the trace to markdown for simple display purposes | |
:return: a string of markdown | |
""" | |
md = f"- **Step**: {self.name} \n" | |
md += f" - **Start**: {self.start_ms}; **End**: {self.end_ms} \n" | |
md += f" - **Elapsed time**: {self.end_ms - self.start_ms}ms \n" | |
outcome = "None" | |
if self.outcome == ArchitectureTraceOutcome.SUCCESS: | |
outcome = "Success" | |
elif self.outcome == ArchitectureTraceOutcome.EARLY_EXIT: | |
outcome = f"Early Exit ({self.early_exit_message})" | |
elif self.outcome == ArchitectureTraceOutcome.EXCEPTION: | |
outcome = f"Exception ({self._exception})" | |
md += f" - **Outcome**: {outcome}" | |
return escape_dollars(md) | |
class ArchitectureTrace: | |
""" | |
This class represents the system instrumentation / trace for a request. It holds the name | |
for each component called, the start and end time of the component processing and the outcome | |
of the step. | |
""" | |
def __init__(self): | |
self.steps: List[ArchitectureTraceStep] = [] | |
def start_trace(self, name: str): | |
self.steps.append(ArchitectureTraceStep(name=name)) | |
def end_trace(self, outcome: ArchitectureTraceOutcome, early_exit_message: str = None): | |
assert len(self.steps) > 0 | |
assert self.steps[-1].outcome == ArchitectureTraceOutcome.NONE | |
self.steps[-1].end(outcome=outcome) | |
if early_exit_message is not None: | |
self.steps[-1].early_exit_message = early_exit_message | |
def as_markdown(self) -> str: | |
""" | |
Converts the trace to markdown for simple display purposes | |
:return: a string of markdown | |
""" | |
md = ' \n'.join([s.as_markdown() for s in self.steps]) | |
return md | |
class ArchitectureComponent(ABC): | |
description = "Components should override a description" | |
def process_request(self, request: ArchitectureRequest) -> None: | |
""" | |
The principle method that concrete implementations of a component must implement. | |
They should signal anything to the pipeline through direct modification of the provided | |
request (i.e. amending the request text or response text, or setting the early_exit flag). | |
:param request: The request which is flowing down the pipeline | |
:return: None | |
""" | |
pass | |
def config_description(self) -> str: | |
""" | |
Optional method to override for providing a string of description in markdown format for | |
display purposes for the component | |
:return: a markdwon string (defaulting to empty in the base class) | |
""" | |
return "" | |
class Architecture: | |
""" | |
An architecture is built as a callable pipeline of steps. An | |
ArchitectureRequest object is passed down the pipeline sequentially | |
to each component. A component can modify the request if needed, update the response | |
or signal an early exit. The Architecture framework also provides trace timing | |
and logging, plus exception handling so an individual request cannot | |
crash the system. | |
""" | |
architectures = None | |
def load_architectures(cls, force_reload: bool = False) -> None: | |
""" | |
Class method to load the configuration file and try and set up architectures for each | |
config entry (a named sequence of components with optional setup params). | |
:param force_reload: A bool of whether to force a reload, defaults to False. | |
""" | |
if cls.architectures is None or force_reload: | |
config_file = os.path.join(config_dir, "architectures.json") | |
with open(config_file, "r") as f: | |
configs = json.load(f)['architectures'] | |
archs = [] | |
for c in configs: | |
arch_name = c['name'] | |
arch_description = c['description'] | |
arch_img = None | |
if 'img' in c: | |
arch_img = c['img'] | |
arch_comps = [] | |
for s in c['steps']: | |
component_class_name = s['class'] | |
component_init_params = {} | |
if 'params' in s: | |
component_init_params = s['params'] | |
arch_comps.append(globals()[component_class_name](**component_init_params)) | |
arch = Architecture(name=arch_name, description=arch_description, steps=arch_comps, img=arch_img) | |
archs.append(arch) | |
cls.architectures = archs | |
def get_architecture(cls, name: str): | |
""" | |
Lookup an architecture by name | |
:param name: The name of the architecture to look up | |
:return: The architecture object | |
""" | |
if cls.architectures is None: | |
cls.load_architectures() | |
for a in cls.architectures: | |
if a.name == name: | |
return a | |
raise ValueError(f"Could not find an architecture named {name}") | |
def __init__(self, | |
name: str, | |
description: str, | |
steps: List[ArchitectureComponent], | |
img: Optional[str] = None, | |
exception_text: str = "Sorry an internal technical error occurred.", | |
no_response_text: str = "Sorry I can't answer that."): | |
self.name = name | |
self.description = description | |
self.steps = steps | |
self.img = img | |
self.exception_text = exception_text | |
self.no_response_text = no_response_text | |
def __call__(self, request: ArchitectureRequest) -> ArchitectureTrace: | |
""" | |
The main entry point to call the pipeline. Passes the request through each pipeline step | |
in sequence, allowing them to amend the request or early exit the processing. Also captures | |
exceptions and generates the trace, plus saves the request/response and the trace to a store | |
for analysis. | |
:param request: | |
:return: | |
""" | |
trace = ArchitectureTrace() | |
for component in self.steps: | |
trace.start_trace(name=component.__class__.__name__) | |
try: | |
component.process_request(request) | |
if request.early_exit: | |
trace.end_trace(outcome=ArchitectureTraceOutcome.EARLY_EXIT, | |
early_exit_message=request.early_exit_message) | |
break | |
else: | |
trace.end_trace(outcome=ArchitectureTraceOutcome.SUCCESS) | |
except Exception as err: | |
trace.end_trace(outcome=ArchitectureTraceOutcome.EXCEPTION) | |
trace.steps[-1].exception = err | |
traceback.print_exc() | |
break | |
# TODO - save the request / response | |
# TODO - save the trace | |
return trace | |
class InputRequestScreener(ArchitectureComponent): | |
description = "Simplistic input screener for demonstration. Screens inputs for profanity." | |
def process_request(self, request: ArchitectureRequest) -> None: | |
if profanity.contains_profanity(request.request): | |
request.response = "Sorry - I cannot answer this question. Please try and rephrase it." | |
request.early_exit = True | |
request.early_exit_message = "Profanity detected in request" | |
class OutputResponseScreener(ArchitectureComponent): | |
description = "Screens outputs for offensive responses." | |
def __init__(self): | |
self.api_token = hf_api_token() | |
self.endpoint_url = "https://yl89ru8gdr1wkbej.eu-west-1.aws.endpoints.huggingface.cloud" | |
def process_request(self, request: ArchitectureRequest) -> None: | |
system_prompt = "You are screening for offensive content. In a single word (yes or no), is the response offensive?" | |
headers = { | |
"Accept": "application/json", | |
"Authorization": f"Bearer {self.api_token}", | |
"Content-Type": "application/json" | |
} | |
query_input = f"[INST] <<SYS>> {system_prompt} <<SYS>> {request.response} [/INST] " | |
payload = { | |
"inputs": query_input, | |
"parameters": { | |
"temperature": 0.1, | |
"max_new_tokens": 10 | |
} | |
} | |
llm_response = requests.post(self.endpoint_url, headers=headers, json=payload) | |
generated_text = json.loads(llm_response.text)[0]['generated_text'].strip() | |
print(f"Response screener got LLM response: {generated_text}") | |
if generated_text[0:2].lower() != 'no': # Lean cautious even if the model fails to return yes/no | |
request.response = "Sorry - I cannot answer this question. Please try and rephrase it." | |
request.early_exit = True | |
class RetrievalAugmentor(ArchitectureComponent): | |
description = "Retrieves appropriate documents from the store and then augments the request." | |
def __init__(self, vector_store: str, doc_count: int = 5): | |
chroma_db = os.path.join(data_dir, 'vector_stores', f'{vector_store}_chroma') | |
self.vector_store = chroma_db | |
client = chromadb.PersistentClient(path=chroma_db) | |
self.collection = client.get_collection(name='products') | |
self.doc_count = doc_count | |
def process_request(self, request: ArchitectureRequest) -> None: | |
# Get the count nearest documents from the doc store | |
input_query = request.request | |
results = self.collection.query(query_texts=[input_query], n_results=self.doc_count) | |
print(results) | |
documents = results['documents'][0] # Index 0 as we are always asking one question | |
# Update the request to include the retrieved documents | |
#new_query = f'QUESTION: {input_query}\n\n' | |
#new_query += '\n'.join([f'FACT: {d}' for d in documents]) | |
new_query = '{"background": [' | |
new_query += ', '.join([f'"{d}"' for d in documents]) | |
new_query += ']}\n\nQUESTION: ' | |
new_query += input_query | |
# Put the request back into the architecture request | |
request.request = new_query | |
def config_description(self) -> str: | |
""" | |
Custom config details as markdown | |
""" | |
desc = f"Vector Store: {self.vector_store}; " | |
desc += f"Max docs: {self.doc_count}" | |
return desc | |
class HFInferenceEndpoint(ArchitectureComponent): | |
""" | |
A concrete pipeline component which sends the user text to a given llama chat based | |
inference endpoint on HuggingFace | |
""" | |
def __init__(self, endpoint_url: str, model_name: str, system_prompt: str, max_new_tokens: int, temperature: float = 1.0): | |
self.endpoint_url: str = endpoint_url | |
self.model_name: str = model_name | |
self.system_prompt: str = system_prompt | |
self.max_new_tokens = max_new_tokens | |
self.api_token = hf_api_token() | |
self.temperature = temperature | |
def config_description(self) -> str: | |
""" | |
Custom config details as markdown | |
""" | |
desc = f"Model: {self.model_name}; " | |
desc += f"Endpoint: {self.endpoint_url}; " | |
desc += f"Max tokens: {self.max_new_tokens}; " | |
desc += f"Temperature: {self.temperature}; " | |
desc += f"System prompt: {self.system_prompt}" | |
return desc | |
def process_request(self, request: ArchitectureRequest) -> None: | |
""" | |
Main processing method for this function. Calls the HTTP service for the model | |
by port if provided or attempting to lookup by name, and then adds this to the | |
response element of the request. | |
""" | |
headers = { | |
"Accept": "application/json", | |
"Authorization": f"Bearer {self.api_token}", | |
"Content-Type": "application/json" | |
} | |
query_input = f"[INST] <<SYS>> {self.system_prompt} <<SYS>> {request.request} [/INST] " | |
payload = { | |
"inputs": query_input, | |
"parameters": { | |
"temperature": self.temperature, | |
"max_new_tokens": self.max_new_tokens | |
} | |
} | |
llm_response = requests.post(self.endpoint_url, headers=headers, json=payload) | |
if llm_response.status_code == 200: | |
generated_text = llm_response.json()[0]['generated_text'].strip() | |
request.response = generated_text | |
elif llm_response.status_code == 502: | |
request.response = "Received 502 error from LLM service - service initialising, try again shortly" | |
else: | |
request.response = f"Received {llm_response.status_code} - {llm_response.text}" | |
class ResponseTrimmer(ArchitectureComponent): | |
""" | |
A concrete pipeline component which trims the response based on a regex match, | |
then uppercases the first character of what is left. | |
""" | |
description = "Trims the response based on a regex" | |
def __init__(self, regexes: List[str]): | |
quoted_regexes = [f'"{r}"' for r in regexes] | |
self.regex_display = f"[{', '.join(quoted_regexes)}]" | |
self.regexes = [re.compile(r, re.IGNORECASE) for r in regexes] | |
def process_request(self, request: ArchitectureRequest): | |
new_response = request.response | |
for regex in self.regexes: | |
new_response = regex.sub('', new_response) | |
new_response = new_response[:1].upper() + new_response[1:] | |
request.response = new_response | |
def config_description(self) -> str: | |
return f"Regexes: {self.regex_display}" | |