Spaces:
Runtime error
Runtime error
| """ | |
| This file contains all the code which defines architectures and | |
| architecture components. | |
| """ | |
| import chromadb | |
| import json | |
| import os | |
| import regex as re | |
| import requests | |
| import shutil | |
| import traceback | |
| from abc import ABC, abstractmethod | |
| from enum import Enum | |
| from huggingface_hub import Repository | |
| from queue import Queue | |
| from threading import Thread, Timer | |
| from time import time | |
| from typing import List, Optional, Dict | |
| from better_profanity import profanity | |
| from src.common import config_dir, data_dir, hf_api_token, escape_dollars | |
| class ArchitectureRequest: | |
| """ | |
| This class represents a request (chat query) from a user which can then be built up or | |
| modified through the pipeline process. It also holds the response to the request which again | |
| is a stack which can be modified through life. | |
| """ | |
| def __init__(self, query: str): | |
| self._request: List[str] = [query] # Stack for the request text as it evolves down the pipeline | |
| self._response: List[str] = [] # Stack for the response text as it evolves down the pipeline | |
| self.early_exit: bool = False | |
| self.early_exit_message: str = None | |
| def request(self): | |
| return self._request[-1] | |
| def request(self, value: str): | |
| self._request.append(value) | |
| def response(self): | |
| if len(self._response) > 0: | |
| return self._response[-1] | |
| return None | |
| def response(self, value: str): | |
| self._response.append(value) | |
| def as_markdown(self) -> str: | |
| """ | |
| Returns a markdown representation for display / testing | |
| :return: str - the markdown | |
| """ | |
| md = "- **Request evolution**" | |
| for r in self._request: | |
| md += f"\n - {r}" | |
| md += "\n- **Response evolution**" | |
| for r in self._response: | |
| md += f"\n - {r}" | |
| return escape_dollars(md) | |
| def as_dict(self) -> Dict: | |
| return {'request_evolution': self._request, 'response_evolution': self._response} | |
| class ArchitectureTraceOutcome(Enum): | |
| """ | |
| Class representing the outcome of a component step in an architecture | |
| """ | |
| NONE = 0 | |
| SUCCESS = 1 | |
| EARLY_EXIT = 2 | |
| EXCEPTION = 3 | |
| class ArchitectureTraceStep: | |
| """ | |
| Class to hold the details of a single trace step | |
| """ | |
| def __init__(self, name: str): | |
| self.name = name | |
| self.start_ms = int(time() * 1000) | |
| self.end_ms = None | |
| self.outcome = ArchitectureTraceOutcome.NONE | |
| self._exception: str = None | |
| self.early_exit_message: str = None | |
| def end(self, outcome: ArchitectureTraceOutcome): | |
| self.end_ms = int(time() * 1000) | |
| self.outcome = outcome | |
| def exception(self) -> str: | |
| return self._exception | |
| def exception(self, value: Exception): | |
| self._exception = f'{value}' # Hold any exception as a string in the trace | |
| def as_markdown(self) -> str: | |
| """ | |
| Converts the trace to markdown for simple display purposes | |
| :return: a string of markdown | |
| """ | |
| md = f"- **Step**: {self.name} \n" | |
| md += f" - **Start**: {self.start_ms}; **End**: {self.end_ms} \n" | |
| md += f" - **Elapsed time**: {self.end_ms - self.start_ms}ms \n" | |
| outcome = "None" | |
| if self.outcome == ArchitectureTraceOutcome.SUCCESS: | |
| outcome = "Success" | |
| elif self.outcome == ArchitectureTraceOutcome.EARLY_EXIT: | |
| outcome = f"Early Exit ({self.early_exit_message})" | |
| elif self.outcome == ArchitectureTraceOutcome.EXCEPTION: | |
| outcome = f"Exception ({self._exception})" | |
| md += f" - **Outcome**: {outcome}" | |
| return escape_dollars(md) | |
| def as_dict(self) -> Dict: | |
| return { | |
| 'name': self.name, | |
| 'start_ms': self.start_ms, | |
| 'end_ms': self.end_ms, | |
| 'outcome': str(self.outcome), | |
| 'exception': "" if self._exception is None else f"{self._exception}", | |
| 'early_exit_message': "" if self.early_exit_message is None else self.early_exit_message | |
| } | |
| class ArchitectureTrace: | |
| """ | |
| This class represents the system instrumentation / trace for a request. It holds the name | |
| for each component called, the start and end time of the component processing and the outcome | |
| of the step. | |
| """ | |
| def __init__(self): | |
| self.steps: List[ArchitectureTraceStep] = [] | |
| def start_trace(self, name: str): | |
| self.steps.append(ArchitectureTraceStep(name=name)) | |
| def end_trace(self, outcome: ArchitectureTraceOutcome, early_exit_message: str = None): | |
| assert len(self.steps) > 0 | |
| assert self.steps[-1].outcome == ArchitectureTraceOutcome.NONE | |
| self.steps[-1].end(outcome=outcome) | |
| if early_exit_message is not None: | |
| self.steps[-1].early_exit_message = early_exit_message | |
| def as_markdown(self) -> str: | |
| """ | |
| Converts the trace to markdown for simple display purposes | |
| :return: a string of markdown | |
| """ | |
| md = ' \n'.join([s.as_markdown() for s in self.steps]) | |
| return md | |
| def as_dict(self) -> Dict: | |
| return {'steps': [s.as_dict() for s in self.steps]} | |
| class ArchitectureComponent(ABC): | |
| description = "Components should override a description" | |
| def process_request(self, request: ArchitectureRequest) -> None: | |
| """ | |
| The principal method that concrete implementations of a component must implement. | |
| They should signal anything to the pipeline through direct modification of the provided | |
| request (i.e. amending the request text or response text, or setting the early_exit flag). | |
| :param request: The request which is flowing down the pipeline | |
| :return: None | |
| """ | |
| pass | |
| def config_description(self) -> str: | |
| """ | |
| Optional method to override for providing a string of description in markdown format for | |
| display purposes for the component | |
| :return: a markdwon string (defaulting to empty in the base class) | |
| """ | |
| return "" | |
| class LogWorker(Thread): | |
| instance = None | |
| architectures = None | |
| save_repo = None | |
| save_repo_load_error = False | |
| save_repo_url = "https://huggingface.co/datasets/alfraser/llm-arch-trace" | |
| trace_dir = "trace" | |
| trace_file_name = "trace.json" | |
| trace_file = os.path.join(trace_dir, trace_file_name) | |
| queue = Queue() | |
| commit_time = 10 # Number of seconds after which to commit with no activity | |
| commit_after = 10 # Number of records after which to commit irrespective of time | |
| commit_count = 0 # Current uncommitted records | |
| commit_timer = None # The actual commit timer - we will schedule the commit on this | |
| def run(self): | |
| while True: | |
| request, trace, trace_tags, trace_comment = LogWorker.queue.get() | |
| if request is None: | |
| LogWorker.commit_repo() | |
| else: | |
| if LogWorker.commit_timer is not None and LogWorker.commit_timer.is_alive(): | |
| LogWorker.commit_timer.cancel() | |
| LogWorker.commit_timer = None | |
| try: | |
| save_dict = { | |
| 'architecture': self.name, | |
| 'request': request.as_dict(), | |
| 'trace': trace.as_dict(), | |
| 'test_tags': trace_tags, | |
| 'test_comment': trace_comment | |
| } | |
| LogWorker.append_and_save_data_as_json(save_dict) | |
| LogWorker.commit_count += 1 | |
| if LogWorker.commit_count >= LogWorker.commit_after: | |
| LogWorker.commit_repo() | |
| except Exception as err: | |
| print(f"Request / trace save failed {err}") | |
| LogWorker.commit_timer = Timer(LogWorker.commit_time, LogWorker.signal_commit) | |
| LogWorker.commit_timer.start() | |
| def append_and_save_data_as_json(cls, data: Dict): | |
| print(f"LogWorker logging open record {LogWorker.commit_count + 1}") | |
| if cls.save_repo is None and not cls.save_repo_load_error: | |
| try: | |
| hf_write_token = hf_api_token(write=True) | |
| cls.save_repo = Repository(local_dir=cls.trace_dir, clone_from=cls.save_repo_url, token=hf_write_token) | |
| except Exception as err: | |
| cls.save_repo_load_error = True | |
| print(f"Error connecting to the save repo {err} - persistence now disabled") | |
| if cls.save_repo is not None: | |
| with open(cls.trace_file, 'r') as f: | |
| test_json = json.load(f) | |
| test_json['tests'].append(data) | |
| with open(cls.trace_file, 'w') as f: | |
| json.dump(test_json, f, indent=2) | |
| def commit_repo(cls): | |
| print(f"LogWorker committing {LogWorker.commit_count} open records") | |
| cls.save_repo.push_to_hub() | |
| LogWorker.commit_count = 0 | |
| def signal_commit(cls): | |
| print("LogWorker signalling commit based on time elapsed") | |
| cls.queue.put((None, None, None, None)) | |
| def write(cls, request: ArchitectureRequest, trace: ArchitectureTrace, | |
| trace_tags: List[str] = None, trace_comment: str = None): | |
| trace_tags = [] if trace_tags is None else trace_tags | |
| trace_comment = "" if trace_comment is None else trace_comment | |
| cls.queue.put((request, trace, trace_tags, trace_comment)) | |
| # Instantiate and run worker on import | |
| if LogWorker.instance is None: | |
| LogWorker.instance = LogWorker() | |
| LogWorker.instance.start() | |
| class Architecture: | |
| """ | |
| An architecture is built as a callable pipeline of steps. An | |
| ArchitectureRequest object is passed down the pipeline sequentially | |
| to each component. A component can modify the request if needed, update the response | |
| or signal an early exit. The Architecture framework also provides trace timing | |
| and logging, plus exception handling so an individual request cannot | |
| crash the system. | |
| """ | |
| architectures = None | |
| save_repo = None | |
| save_repo_load_error = False | |
| save_repo_url = "https://huggingface.co/datasets/alfraser/llm-arch-trace" | |
| trace_dir = "trace" | |
| trace_file_name = "trace.json" | |
| trace_file = os.path.join(trace_dir, trace_file_name) | |
| def wipe_trace(cls, hf_write_token:str = None): | |
| if os.path.exists(cls.trace_dir): | |
| shutil.rmtree(cls.trace_dir) | |
| try: | |
| if hf_write_token is None: | |
| hf_write_token = hf_api_token(write=True) | |
| cls.save_repo = Repository(local_dir=cls.trace_dir, clone_from=cls.save_repo_url, token=hf_write_token) | |
| test_json = {'tests': []} | |
| with open(cls.trace_file, 'w') as f: | |
| json.dump(test_json, f, indent=2) | |
| cls.save_repo.push_to_hub() | |
| except Exception as err: | |
| cls.save_repo_load_error = True | |
| print(f"Error connecting to the save repo {err} - persistence now disabled") | |
| def get_trace_records(cls) -> List[Dict]: | |
| if not os.path.isfile(cls.trace_file): | |
| hf_write_token = hf_api_token(write=True) | |
| try: | |
| cls.save_repo = Repository(local_dir=cls.trace_dir, clone_from=cls.save_repo_url, token=hf_write_token) | |
| except Exception as err: | |
| cls.save_repo_load_error = True | |
| print(f"Error connecting to the save repo {err} - persistence now disabled") | |
| return [] | |
| with open(cls.trace_file, 'r') as f: | |
| test_json = json.load(f) | |
| return test_json['tests'] | |
| def load_architectures(cls, force_reload: bool = False) -> None: | |
| """ | |
| Class method to load the configuration file and try and set up architectures for each | |
| config entry (a named sequence of components with optional setup params). | |
| :param force_reload: A bool of whether to force a reload, defaults to False. | |
| """ | |
| if cls.architectures is None or force_reload: | |
| config_file = os.path.join(config_dir, "architectures.json") | |
| with open(config_file, "r") as f: | |
| configs = json.load(f)['architectures'] | |
| archs = [] | |
| for c in configs: | |
| arch_name = c['name'] | |
| arch_description = c['description'] | |
| arch_img = None | |
| if 'img' in c: | |
| arch_img = c['img'] | |
| arch_comps = [] | |
| for s in c['steps']: | |
| component_class_name = s['class'] | |
| component_init_params = {} | |
| if 'params' in s: | |
| component_init_params = s['params'] | |
| arch_comps.append(globals()[component_class_name](**component_init_params)) | |
| arch = Architecture(name=arch_name, description=arch_description, steps=arch_comps, img=arch_img) | |
| archs.append(arch) | |
| cls.architectures = archs | |
| def get_architecture(cls, name: str): | |
| """ | |
| Lookup an architecture by name | |
| :param name: The name of the architecture to look up | |
| :return: The architecture object | |
| """ | |
| if cls.architectures is None: | |
| cls.load_architectures() | |
| for a in cls.architectures: | |
| if a.name == name: | |
| return a | |
| raise ValueError(f"Could not find an architecture named {name}") | |
| def __init__(self, | |
| name: str, | |
| description: str, | |
| steps: List[ArchitectureComponent], | |
| img: Optional[str] = None, | |
| exception_text: str = "Sorry an internal technical error occurred.", | |
| no_response_text: str = "Sorry I can't answer that."): | |
| self.name = name | |
| self.description = description | |
| self.steps = steps | |
| self.img = img | |
| self.exception_text = exception_text | |
| self.no_response_text = no_response_text | |
| def __call__(self, request: ArchitectureRequest, trace_tags: List[str] = None, trace_comment: str = None) -> ArchitectureTrace: | |
| """ | |
| The main entry point to call the pipeline. Passes the request through each pipeline step | |
| in sequence, allowing them to amend the request or early exit the processing. Also captures | |
| exceptions and generates the trace, plus saves the request/response and the trace to a store | |
| for analysis. | |
| :param request: | |
| :return: | |
| """ | |
| print(f'{self.name} processing query "{request.request}"') | |
| trace = ArchitectureTrace() | |
| for component in self.steps: | |
| trace.start_trace(name=component.__class__.__name__) | |
| try: | |
| component.process_request(request) | |
| if request.early_exit: | |
| trace.end_trace(outcome=ArchitectureTraceOutcome.EARLY_EXIT, | |
| early_exit_message=request.early_exit_message) | |
| break | |
| else: | |
| trace.end_trace(outcome=ArchitectureTraceOutcome.SUCCESS) | |
| except Exception as err: | |
| trace.end_trace(outcome=ArchitectureTraceOutcome.EXCEPTION) | |
| trace.steps[-1].exception = err | |
| traceback.print_exc() | |
| break | |
| LogWorker.write(request, trace, trace_tags, trace_comment) | |
| return trace | |
| class InputRequestScreener(ArchitectureComponent): | |
| description = "Simplistic input screener for demonstration. Screens inputs for profanity." | |
| def process_request(self, request: ArchitectureRequest) -> None: | |
| if profanity.contains_profanity(request.request): | |
| request.response = "Sorry - I cannot answer this question. Please try and rephrase it." | |
| request.early_exit = True | |
| request.early_exit_message = "Profanity detected in request" | |
| class OutputResponseScreener(ArchitectureComponent): | |
| description = "Screens outputs for offensive responses." | |
| def __init__(self): | |
| self.api_token = hf_api_token() | |
| self.endpoint_url = "https://yl89ru8gdr1wkbej.eu-west-1.aws.endpoints.huggingface.cloud" | |
| def process_request(self, request: ArchitectureRequest) -> None: | |
| system_prompt = "You are screening for offensive content. In a single word (yes or no), is the response offensive?" | |
| headers = { | |
| "Accept": "application/json", | |
| "Authorization": f"Bearer {self.api_token}", | |
| "Content-Type": "application/json" | |
| } | |
| query_input = f"[INST] <<SYS>> {system_prompt} <<SYS>> {request.response} [/INST] " | |
| payload = { | |
| "inputs": query_input, | |
| "parameters": { | |
| "temperature": 0.1, | |
| "max_new_tokens": 10 | |
| } | |
| } | |
| llm_response = requests.post(self.endpoint_url, headers=headers, json=payload) | |
| generated_text = json.loads(llm_response.text)[0]['generated_text'].strip() | |
| if generated_text[0:2].lower() != 'no': # Lean cautious even if the model fails to return yes/no | |
| request.response = "Sorry - I cannot answer this question. Please try and rephrase it." | |
| request.early_exit = True | |
| class RetrievalAugmentor(ArchitectureComponent): | |
| description = "Retrieves appropriate documents from the store and then augments the request." | |
| def __init__(self, vector_store: str, doc_count: int = 5): | |
| chroma_db = os.path.join(data_dir, 'vector_stores', f'{vector_store}_chroma') | |
| self.vector_store = chroma_db | |
| client = chromadb.PersistentClient(path=chroma_db) | |
| self.collection = client.get_collection(name='products') | |
| self.doc_count = doc_count | |
| def process_request(self, request: ArchitectureRequest) -> None: | |
| # Get the count nearest documents from the doc store | |
| input_query = request.request | |
| results = self.collection.query(query_texts=[input_query], n_results=self.doc_count) | |
| documents = results['documents'][0] # Index 0 as we are always asking one question | |
| # Update the request to include the retrieved documents | |
| new_query = '{"background": [' | |
| new_query += ', '.join([f'"{d}"' for d in documents]) | |
| new_query += ']}\n\nQUESTION: ' | |
| new_query += input_query | |
| # Put the request back into the architecture request | |
| request.request = new_query | |
| def config_description(self) -> str: | |
| """ | |
| Custom config details as markdown | |
| """ | |
| desc = f"Vector Store: {self.vector_store}; " | |
| desc += f"Max docs: {self.doc_count}" | |
| return desc | |
| class HFInferenceEndpoint(ArchitectureComponent): | |
| """ | |
| A concrete pipeline component which sends the user text to a given llama chat based | |
| inference endpoint on HuggingFace | |
| """ | |
| def __init__(self, endpoint_url: str, model_name: str, system_prompt: str, max_new_tokens: int, | |
| temperature: float = 1.0, prompt_style: str = "multi_line"): | |
| self.endpoint_url: str = endpoint_url | |
| self.prompt_style = prompt_style | |
| self.model_name: str = model_name | |
| self.system_prompt: str = system_prompt | |
| self.max_new_tokens = max_new_tokens | |
| self.api_token = hf_api_token() | |
| self.temperature = temperature | |
| def config_description(self) -> str: | |
| """ | |
| Custom config details as markdown | |
| """ | |
| desc = f"Model: {self.model_name}; " | |
| desc += f"Endpoint: {self.endpoint_url}; " | |
| desc += f"Max tokens: {self.max_new_tokens}; " | |
| desc += f"Temperature: {self.temperature}; " | |
| desc += f"System prompt: {self.system_prompt}" | |
| return desc | |
| def process_request(self, request: ArchitectureRequest) -> None: | |
| """ | |
| Main processing method for this function. Calls the HTTP service for the model | |
| by port if provided or attempting to lookup by name, and then adds this to the | |
| response element of the request. | |
| """ | |
| headers = { | |
| "Accept": "application/json", | |
| "Authorization": f"Bearer {self.api_token}", | |
| "Content-Type": "application/json" | |
| } | |
| if self.prompt_style == "multi_line": | |
| query_input = f"<s>[INST] <<SYS>>\n{self.system_prompt}\n<</SYS>>\n\n{request.request} [/INST] " | |
| elif self.prompt_style == "multi_line_no_sys": | |
| query_input = f"<s>[INST]\n{request.request} [/INST] " | |
| elif self.prompt_style == "single_line_no_sys": | |
| query_input = f"<s>[INST] {request.request} [/INST] " | |
| elif self.prompt_style == "single_line": | |
| query_input = f"<s>[INST] <<SYS>>\n{self.system_prompt}\n<</SYS>> {request.request} [/INST] " | |
| elif self.prompt_style == "multi_line_with_roles": | |
| query_input = f"<<SYS>>\n{self.system_prompt}\n<</SYS>>\n[INST]\nUser:{request.request}\n[/INST]\n\nAssistant:" | |
| elif self.prompt_style == "raw": | |
| # No formatting - used to just send things straight through from the front end | |
| query_input = request.request | |
| else: | |
| raise ValueError(f"Config error - Unknown prompt style: {self.prompt_style}") | |
| payload = { | |
| "inputs": query_input, | |
| "parameters": { | |
| "temperature": self.temperature, | |
| "max_new_tokens": self.max_new_tokens | |
| } | |
| } | |
| llm_response = requests.post(self.endpoint_url, headers=headers, json=payload) | |
| if llm_response.status_code == 200: | |
| generated_text = llm_response.json()[0]['generated_text'].strip() | |
| request.response = generated_text | |
| elif llm_response.status_code == 502: | |
| request.response = "Received 502 error from LLM service - service initialising, try again shortly" | |
| else: | |
| request.response = f"Received {llm_response.status_code} - {llm_response.text}" | |
| class ResponseTrimmer(ArchitectureComponent): | |
| """ | |
| A concrete pipeline component which trims the response based on a regex match, | |
| then uppercases the first character of what is left. | |
| """ | |
| description = "Trims the response based on a regex" | |
| def __init__(self, regexes: List[str]): | |
| quoted_regexes = [f'"{r}"' for r in regexes] | |
| self.regex_display = f"[{', '.join(quoted_regexes)}]" | |
| self.regexes = [re.compile(r, re.IGNORECASE) for r in regexes] | |
| def process_request(self, request: ArchitectureRequest): | |
| new_response = request.response | |
| for regex in self.regexes: | |
| new_response = regex.sub('', new_response) | |
| new_response = new_response[:1].upper() + new_response[1:] | |
| request.response = new_response | |
| def config_description(self) -> str: | |
| return f"Regexes: {self.regex_display}" | |
| if __name__ == "__main__": | |
| req = ArchitectureRequest("Testing") | |
| a = Architecture.get_architecture("1. Baseline LLM") | |
| a(req) | |
| print("Hold") | |