|
from pydantic import BaseModel |
|
from llama_cpp import Llama |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
import re |
|
import httpx |
|
import asyncio |
|
import gradio as gr |
|
import os |
|
import gptcache |
|
from dotenv import load_dotenv |
|
from fastapi import FastAPI, Request |
|
from fastapi.responses import JSONResponse |
|
import uvicorn |
|
from threading import Thread |
|
|
|
load_dotenv() |
|
|
|
HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") |
|
|
|
global_data = { |
|
'models': {}, |
|
'tokens': { |
|
'eos': 'eos_token', |
|
'pad': 'pad_token', |
|
'padding': 'padding_token', |
|
'unk': 'unk_token', |
|
'bos': 'bos_token', |
|
'sep': 'sep_token', |
|
'cls': 'cls_token', |
|
'mask': 'mask_token' |
|
}, |
|
'model_metadata': {}, |
|
'max_tokens': 256, |
|
'tokenizers': {}, |
|
'model_params': {}, |
|
'model_size': {}, |
|
'model_ftype': {}, |
|
'n_ctx_train': {}, |
|
'n_embd': {}, |
|
'n_layer': {}, |
|
'n_head': {}, |
|
'n_head_kv': {}, |
|
'n_rot': {}, |
|
'n_swa': {}, |
|
'n_embd_head_k': {}, |
|
'n_embd_head_v': {}, |
|
'n_gqa': {}, |
|
'n_embd_k_gqa': {}, |
|
'n_embd_v_gqa': {}, |
|
'f_norm_eps': {}, |
|
'f_norm_rms_eps': {}, |
|
'f_clamp_kqv': {}, |
|
'f_max_alibi_bias': {}, |
|
'f_logit_scale': {}, |
|
'n_ff': {}, |
|
'n_expert': {}, |
|
'n_expert_used': {}, |
|
'causal_attn': {}, |
|
'pooling_type': {}, |
|
'rope_type': {}, |
|
'rope_scaling': {}, |
|
'freq_base_train': {}, |
|
'freq_scale_train': {}, |
|
'n_ctx_orig_yarn': {}, |
|
'rope_finetuned': {}, |
|
'ssm_d_conv': {}, |
|
'ssm_d_inner': {}, |
|
'ssm_d_state': {}, |
|
'ssm_dt_rank': {}, |
|
'ssm_dt_b_c_rms': {}, |
|
'vocab_type': {}, |
|
'model_type': {} |
|
} |
|
|
|
model_configs = [ |
|
{"repo_id": "Hjgugugjhuhjggg/testing_semifinal-Q2_K-GGUF", "filename": "testing_semifinal-q2_k.gguf", "name": "testing"} |
|
] |
|
|
|
class ModelManager: |
|
def __init__(self): |
|
self.models = {} |
|
|
|
def load_model(self, model_config): |
|
if model_config['name'] not in self.models: |
|
try: |
|
self.models[model_config['name']] = Llama.from_pretrained( |
|
repo_id=model_config['repo_id'], |
|
filename=model_config['filename'], |
|
use_auth_token=HUGGINGFACE_TOKEN, |
|
n_threads=8, |
|
use_gpu=False |
|
) |
|
except Exception as e: |
|
pass |
|
|
|
def load_all_models(self): |
|
with ThreadPoolExecutor() as executor: |
|
for config in model_configs: |
|
executor.submit(self.load_model, config) |
|
return self.models |
|
|
|
model_manager = ModelManager() |
|
global_data['models'] = model_manager.load_all_models() |
|
|
|
class ChatRequest(BaseModel): |
|
message: str |
|
|
|
def normalize_input(input_text): |
|
return input_text.strip() |
|
|
|
def remove_duplicates(text): |
|
lines = text.split('\n') |
|
unique_lines = [] |
|
seen_lines = set() |
|
for line in lines: |
|
if line not in seen_lines: |
|
unique_lines.append(line) |
|
seen_lines.add(line) |
|
return '\n'.join(unique_lines) |
|
|
|
def cache_response(func): |
|
def wrapper(*args, **kwargs): |
|
cache_key = f"{args}-{kwargs}" |
|
if gptcache.get(cache_key): |
|
return gptcache.get(cache_key) |
|
response = func(*args, **kwargs) |
|
gptcache.set(cache_key, response) |
|
return response |
|
return wrapper |
|
|
|
@cache_response |
|
def generate_model_response(model, inputs): |
|
try: |
|
response = model(inputs) |
|
return remove_duplicates(response['choices'][0]['text']) |
|
except Exception as e: |
|
return "" |
|
|
|
def remove_repetitive_responses(responses): |
|
unique_responses = {} |
|
for response in responses: |
|
if response['model'] not in unique_responses: |
|
unique_responses[response['model']] = response['response'] |
|
return unique_responses |
|
|
|
async def process_message(message): |
|
inputs = normalize_input(message) |
|
with ThreadPoolExecutor() as executor: |
|
futures = [ |
|
executor.submit(generate_model_response, model, inputs) |
|
for model in global_data['models'].values() |
|
] |
|
responses = [{'model': model_name, 'response': future.result()} for model_name, future in zip(global_data['models'].keys(), as_completed(futures))] |
|
unique_responses = remove_repetitive_responses(responses) |
|
formatted_response = "" |
|
for model, response in unique_responses.items(): |
|
formatted_response += f"**{model}:**\n{response}\n\n" |
|
return formatted_response |
|
|
|
app = FastAPI() |
|
|
|
@app.post("/generate") |
|
async def generate(request: ChatRequest): |
|
response = await process_message(request.message) |
|
return JSONResponse(content={"response": response}) |
|
|
|
def run_uvicorn(): |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|
|
iface = gr.Interface( |
|
fn=process_message, |
|
inputs=gr.Textbox(lines=2, placeholder="Enter your message here..."), |
|
outputs=gr.Markdown(), |
|
title="Multi-Model LLM API (CPU Optimized)", |
|
description="Enter a message and get responses from multiple LLMs using CPU." |
|
) |
|
|
|
def run_gradio(): |
|
iface.launch(server_port=7861, prevent_thread_lock=True) |
|
|
|
if __name__ == "__main__": |
|
Thread(target=run_uvicorn).start() |
|
Thread(target=run_gradio).start() |
|
|