import os
import pickle as pkl
from pathlib import Path
from threading import Thread
from typing import List, Tuple, Iterator, Optional, Generator
from queue import Queue

import spaces
import gradio as gr
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

# TODO this is not as fast as it could be using generate function with 1 token at a time
# TODO log prob output scaling highlighting instead?
# TODO make it look nicer
# TODO better examples. 
# TODO streaming output (need custom generation function because of probes)
# TODO add options to switch between models, SLT/TBG, layers?
# TODO full semantic entropy calculation

MAX_MAX_NEW_TOKENS = 1024
DEFAULT_MAX_NEW_TOKENS = 100
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096"))

DESCRIPTION = """
<h1>Llama-2 7B Chat with Uncertainty Probes</h1>
<p>This Space demonstrates the Llama-2-7b-chat model with a semantic uncertainty probe.</p>
<p>This demo is based on our paper: <a href="https://arxiv.org/abs/2406.15927" target="_blank">"Semantic Entropy Probes: Robust and Cheap Hallucination Detection in LLMs"</a> by Jannik Kossen*, Jiatong Han*, Muhammed Razzak*, Lisa Schut, Shreshth Malik and Yarin Gal.</p>
<p>The highlighted text shows the model's uncertainty in real-time:</p>
<ul>
    <li><span style="background-color: #00FF00; color: black">Green</span> indicates more certain generations</li>
    <li><span style="background-color: #FF0000; color: black">Red</span> indicates more uncertain generations</li>
</ul>
<p>Please see our paper for more details. NOTE: This demo is a work in progress.</p>
"""

EXAMPLES = [
    ["What is the capital of France?", ""],
    ["Who landed on the moon?", ""],
    ["Who is Yarin Gal?", ""],
    ["Explain the theory of relativity in simple terms.", ""],
]

if torch.cuda.is_available():
    model_id = "meta-llama/Llama-2-7b-chat-hf"
    # TODO load the full model not the 8bit one?
    model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto")
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    tokenizer.use_default_system_prompt = False

    # load the probe data
    with open("./model/20240625-131035_demo.pkl", "rb") as f:
        probe_data = pkl.load(f)
    # take the NQ open one
    probe_data = probe_data[-2]
    se_probe = probe_data['t_bmodel']
    se_layer_range = probe_data['sep_layer_range']
    acc_probe = probe_data['t_amodel']
    acc_layer_range = probe_data['ap_layer_range']
    print(f"Loaded probes with layer ranges: {se_layer_range}, {acc_layer_range}")
else:
    DESCRIPTION += "\n<p>Running on CPU 🥶 This demo does not work on CPU.</p>"

@spaces.GPU
def generate(
    message: str,
    system_prompt: str,
    max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS,
    temperature: float = 0.6,
    top_p: float = 0.9,
    top_k: int = 50,
    repetition_penalty: float = 1.2,
) -> Generator[Tuple[str, str], None, None]:
    conversation = []
    if system_prompt:
        conversation.append({"role": "system", "content": system_prompt})
    conversation.append({"role": "user", "content": message})

    input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt")
    if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH:
        input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:]
        gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.")
    input_ids = input_ids.to(model.device)

    generation_kwargs = dict(
        do_sample=True,
        top_p=top_p,
        top_k=top_k,
        temperature=temperature,
        repetition_penalty=repetition_penalty,
        output_hidden_states=True,
        return_dict_in_generate=True,
    )
    sentence_start_idx = input_ids.shape[1]
    sentence_token_count = 0
    finished = False


    with torch.no_grad():
        # highlight and return the prompt
        outputs = model.generate(**generation_kwargs, input_ids=input_ids, max_new_tokens=1)
        prompt_tokens = outputs.sequences[0, :input_ids.shape[1]]
        prompt_text = tokenizer.decode(prompt_tokens, skip_special_tokens=True)
        print(prompt_tokens, prompt_text)
        # hidden states
        hidden = outputs.hidden_states
        # last token embeddings (note this is the same as the token before generation given this is the prompt)
        token_embeddings = torch.stack([generated_token[0, -1, :].cpu() for generated_token in hidden[0]]).numpy()
        se_concat_layers = token_embeddings[se_layer_range[0]:se_layer_range[1]].reshape(-1)
        se_probe_pred = se_probe.predict_proba(se_concat_layers.reshape(1, -1))[0][1] * 2 - 1
        acc_concat_layers = token_embeddings[acc_layer_range[0]:acc_layer_range[1]].reshape(-1)
        acc_probe_pred = acc_probe.predict_proba(acc_concat_layers.reshape(1, -1))[0][0] * 2 - 1    # accuracy probe is inverted wrt uncertainty
        se_new_highlighted_text = highlight_text(prompt_text, se_probe_pred)
        acc_new_highlighted_text = highlight_text(prompt_text, acc_probe_pred)
        se_highlighted_text = f"{se_new_highlighted_text}<br>"
        acc_highlighted_text = f"{acc_new_highlighted_text}<br>"

        while not finished:
            outputs = model.generate(**generation_kwargs, input_ids=input_ids, max_new_tokens=1) 
            # this should only be the one extra token (equivalent to -1)
            generated_tokens = outputs.sequences[0, input_ids.shape[1]:]
            print(f"generated_tokens {generated_tokens}" )
            # add to the conversation
            input_ids = torch.cat([input_ids, generated_tokens.unsqueeze(0)], dim=-1)
            # stop at the end of a sequence
            if generated_tokens[-1] == tokenizer.eos_token_id or input_ids.shape[1] > max_new_tokens:
                print("Finished")
                finished = True
                if generated_text != "":
                    # do final prediction on the last generated text (one before the eos token)
                    print("Predicting probes")
                    hidden = outputs.hidden_states  # hidden states = (num generated tokens, num layers, batch size, num tokens, hidden size)
                    # last token embeddings
                    token_embeddings = torch.stack([generated_token[0, -2, :].cpu() for generated_token in hidden[-1]]).numpy()
                    
                    se_concat_layers = token_embeddings[se_layer_range[0]:se_layer_range[1]].reshape(-1)
                    se_probe_pred = se_probe.predict_proba(se_concat_layers.reshape(1, -1))[0][1] * 2 - 1
                    
                    acc_concat_layers = token_embeddings[acc_layer_range[0]:acc_layer_range[1]].reshape(-1)
                    acc_probe_pred = acc_probe.predict_proba(acc_concat_layers.reshape(1, -1))[0][0] * 2 - 1
                    print(f"se_probe_pred {se_probe_pred}, acc_probe_pred {acc_probe_pred}")

                    se_new_highlighted_text = highlight_text(generated_text, se_probe_pred)
                    acc_new_highlighted_text = highlight_text(generated_text, acc_probe_pred)
                    se_highlighted_text += f" {se_new_highlighted_text}"
                    acc_highlighted_text += f" {acc_new_highlighted_text}"
                    sentence_start_idx += sentence_token_count
                    sentence_token_count = 0
            
            # decode the full generated text
            generated_text = tokenizer.decode(outputs.sequences[0, sentence_start_idx:], skip_special_tokens=True)
            print(f"generated_text: {generated_text}")
            sentence_token_count += 1

            # TODO this should be when a factoid is detected rather than just punctuation. Is the SLT token always basically a period for the probes?
            if generated_text.endswith(('.', '!', '?', ';', '."', '!"', '?"')):
                print("Predicting probes")
                hidden = outputs.hidden_states  # hidden states = (num generated tokens, num layers, batch size, num tokens, hidden size)
                # last token embeddings
                token_embeddings = torch.stack([generated_token[0, -1, :].cpu() for generated_token in hidden[-1]]).numpy()
                
                se_concat_layers = token_embeddings[se_layer_range[0]:se_layer_range[1]].reshape(-1)
                se_probe_pred = se_probe.predict_proba(se_concat_layers.reshape(1, -1))[0][1] * 2 - 1
                
                acc_concat_layers = token_embeddings[acc_layer_range[0]:acc_layer_range[1]].reshape(-1)
                acc_probe_pred = acc_probe.predict_proba(acc_concat_layers.reshape(1, -1))[0][0] * 2 - 1
                print(f"se_probe_pred {se_probe_pred}, acc_probe_pred {acc_probe_pred}")

                se_new_highlighted_text = highlight_text(generated_text, se_probe_pred)
                acc_new_highlighted_text = highlight_text(generated_text, acc_probe_pred)
                se_highlighted_text += f" {se_new_highlighted_text}"
                acc_highlighted_text += f" {acc_new_highlighted_text}"
                sentence_start_idx += sentence_token_count
                sentence_token_count = 0
                generated_text = ""

            # yield se_highlighted_text + generated_text, acc_highlighted_text + generated_text
            yield se_highlighted_text + generated_text #, acc_highlighted_text + generated_text


def highlight_text(text: str, uncertainty_score: float) -> str:
    if uncertainty_score > 0:
        html_color = "#%02X%02X%02X" % (
            255,
            int(255 * (1 - uncertainty_score)),
            int(255 * (1 - uncertainty_score)),
        )
    else:
        html_color = "#%02X%02X%02X" % (
            int(255 * (1 + uncertainty_score)),
            255,
            int(255 * (1 + uncertainty_score)),
        )
    return '<span style="background-color: {}; color: black">{}</span>'.format(
        html_color, text
    )


with gr.Blocks(title="Llama-2 7B Chat with Semantic Uncertainty Probes", css="footer {visibility: hidden}") as demo:
    gr.HTML(DESCRIPTION)
    
    with gr.Row():
        with gr.Column():
            message = gr.Textbox(label="Message")
            system_prompt = gr.Textbox(label="System prompt", lines=2)
        
        with gr.Column():
            max_new_tokens = gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS)
            temperature = gr.Slider(label="Temperature", minimum=0.01, maximum=2.0, step=0.1, value=0.01)
            top_p = gr.Slider(label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9)
            top_k = gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50)
            repetition_penalty = gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2)
    
    with gr.Row():
        generate_btn = gr.Button("Generate")
        stop_btn = gr.Button("Stop")
    # Add spacing between probes
    gr.HTML("<br><br>")

    # with gr.Row():
    with gr.Column():
        title = gr.HTML("<h2>Semantic Uncertainty Probe</h2>")
        se_output = gr.HTML(label="Semantic Uncertainty Probe") 
        # with gr.Column():
            # make a box
            # title = gr.HTML("<h2>Semantic Uncertainty Probe</h2>")
            # se_output = gr.HTML(label="Semantic Uncertainty Probe")

        # Add spacing between columns
        # gr.HTML("<div style='width: 20px;'></div>")

        # with gr.Column():
            # title = gr.HTML("<h2>Accuracy Probe</h2>")
            # acc_output = gr.HTML(label="Accuracy Probe")
    
    gr.Examples(
        examples=EXAMPLES,
        inputs=[message, system_prompt],
        # outputs=[se_output, acc_output],
        outputs=[se_output],
        fn=generate,
    )
    
    generate_event = generate_btn.click(
        generate,
        inputs=[message, system_prompt, max_new_tokens, temperature, top_p, top_k, repetition_penalty],
        # outputs=[se_output, acc_output]
        outputs=[se_output]
    )
    stop_btn.click(fn=None, inputs=None, outputs=None, cancels=[generate_event])


if __name__ == "__main__":
    demo.launch()