File size: 9,887 Bytes
709da00
 
 
 
 
 
 
 
 
 
 
 
dd28b0b
 
 
 
 
709da00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
073094f
 
 
 
 
 
709da00
 
 
 
 
 
 
 
 
 
 
dd28b0b
 
709da00
dd28b0b
 
709da00
 
 
 
dd28b0b
709da00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
import gradio as gr
import os
import torch
import numpy as np
import random
from huggingface_hub import login, HfFolder
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoModelForCausalLM, TextIteratorStreamer
from scipy.special import softmax
import logging
import spaces
from threading import Thread
from collections.abc import Iterator
import csv

# Increase CSV field size limit
csv.field_size_limit(1000000)  # Or an even larger value if needed


# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')

# Set a seed for reproducibility
seed = 42
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

# Login to Hugging Face
token = os.getenv("hf_token")
HfFolder.save_token(token)
login(token)

# --- Quality Prediction Model Setup ---
model_paths = [
    'karths/binary_classification_train_test',
    "karths/binary_classification_train_process",
    "karths/binary_classification_train_infrastructure",
    "karths/binary_classification_train_documentation",
    "karths/binary_classification_train_design",
    "karths/binary_classification_train_defect",
    "karths/binary_classification_train_code",
    "karths/binary_classification_train_build",
    "karths/binary_classification_train_automation",
    "karths/binary_classification_train_people",
    "karths/binary_classification_train_architecture",
]

quality_mapping = {
    'binary_classification_train_test': 'Test',
    'binary_classification_train_process': 'Process',
    'binary_classification_train_infrastructure': 'Infrastructure',
    'binary_classification_train_documentation': 'Documentation',
    'binary_classification_train_design': 'Design',
    'binary_classification_train_defect': 'Defect',
    'binary_classification_train_code': 'Code',
    'binary_classification_train_build': 'Build',
    'binary_classification_train_automation': 'Automation',
    'binary_classification_train_people': 'People',
    'binary_classification_train_architecture': 'Architecture'
}

# Pre-load models and tokenizer for quality prediction
tokenizer = AutoTokenizer.from_pretrained("distilroberta-base")
models = {path: AutoModelForSequenceClassification.from_pretrained(path) for path in model_paths}

def get_quality_name(model_name):
    return quality_mapping.get(model_name.split('/')[-1], "Unknown Quality")

@spaces.GPU
def model_prediction(model, text, device):
    model.to(device)
    model.eval()
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
    inputs = {k: v.to(device) for k, v in inputs.items()}
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
        probs = softmax(logits.cpu().numpy(), axis=1)
    avg_prob = np.mean(probs[:, 1])
    return avg_prob

# --- Llama 3.2 3B Model Setup ---
LLAMA_MAX_MAX_NEW_TOKENS = 2048
LLAMA_DEFAULT_MAX_NEW_TOKENS = 1024
LLAMA_MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096"))
llama_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")  # Explicitly define device
llama_model_id = "meta-llama/Llama-3.2-3B-Instruct"
llama_tokenizer = AutoTokenizer.from_pretrained(llama_model_id)
llama_model = AutoModelForCausalLM.from_pretrained(
    llama_model_id,
    device_map="auto",  # Automatically distribute model across devices
    torch_dtype=torch.bfloat16,
)
llama_model.eval()

# --- IMPORTANT: Set Pad Token ---
# Llama3 does *not* have a default pad token.  We *must* set one.
# Using the EOS token as the PAD token is a common and recommended practice.
if llama_tokenizer.pad_token is None:
    llama_tokenizer.pad_token = llama_tokenizer.eos_token


@spaces.GPU(duration=90)
def llama_generate(
    message: str,
    max_new_tokens: int = LLAMA_DEFAULT_MAX_NEW_TOKENS,
    temperature: float = 0.6,
    top_p: float = 0.9,
    top_k: int = 50,
    repetition_penalty: float = 1.2,
) -> Iterator[str]:

    inputs = llama_tokenizer(message, return_tensors="pt", padding=True, truncation=True, max_length=LLAMA_MAX_INPUT_TOKEN_LENGTH).to(llama_model.device)
    #The line above was changed to add attention mask

    if inputs.input_ids.shape[1] > LLAMA_MAX_INPUT_TOKEN_LENGTH:
        inputs.input_ids = inputs.input_ids[:, -LLAMA_MAX_INPUT_TOKEN_LENGTH:]
        gr.Warning(f"Trimmed input from conversation as it was longer than {LLAMA_MAX_INPUT_TOKEN_LENGTH} tokens.")

    streamer = TextIteratorStreamer(llama_tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True)
    generate_kwargs = dict(
        inputs,  # Pass the entire inputs dictionary
        streamer=streamer,
        max_new_tokens=max_new_tokens,
        do_sample=True,
        top_p=top_p,
        top_k=top_k,
        temperature=temperature,
        num_beams=1,
        repetition_penalty=repetition_penalty,
    )
    t = Thread(target=llama_model.generate, kwargs=generate_kwargs)
    t.start()
    outputs = []
    for text in streamer:
        outputs.append(text)
        yield "".join(outputs)



def generate_explanation(issue_text, top_qualities):
    """Generates an explanation using Llama 3.2 3B."""
    if not top_qualities:
        return "No explanation available as no quality tags were predicted."

    prompt = f"""
    Given the following issue description:
    ---
    {issue_text}
    ---
    Explain why this issue might be classified under the following quality categories: {', '.join([q[0] for q in top_qualities])}. 
    Provide a concise explanation for each category, relating it back to the issue description.
    """
    explanation = ""
    try:
        for chunk in llama_generate(prompt):
            explanation += chunk  # Accumulate generated text
    except Exception as e:
        logging.error(f"Error during Llama generation: {e}")
        return "An error occurred while generating the explanation."

    return explanation


def main_interface(text):
    if not text.strip():
        return "<div style='color: red;'>No text provided. Please enter a valid issue description.</div>", "", ""

    if len(text) < 30:
        return "<div style='color: red;'>Text is less than 30 characters.</div>", "", ""

    device = "cuda" if torch.cuda.is_available() else "cpu"
    results = []
    for model_path, model in models.items():
        quality_name = get_quality_name(model_path)
        avg_prob = model_prediction(model, text, device)
        if avg_prob >= 0.95:
            results.append((quality_name, avg_prob))
        logging.info(f"Model: {model_path}, Quality: {quality_name}, Average Probability: {avg_prob:.3f}")

    if not results:
        return "<div style='color: red;'>No recommendation. Prediction probability is below the threshold. </div>", "", ""

    top_qualities = sorted(results, key=lambda x: x[1], reverse=True)[:3]
    output_html = render_html_output(top_qualities)

    # Generate explanation using the top qualities and the original input text
    explanation = generate_explanation(text, top_qualities)

    return output_html, "", explanation  # Return explanation as the third output

def render_html_output(top_qualities):
    styles = """
    <style>
        .quality-container {
            font-family: Arial, sans-serif;
            text-align: center;
            margin-top: 20px;
        }
        .quality-label, .ranking {
            display: inline-block;
            padding: 0.5em 1em;
            font-size: 18px;
            font-weight: bold;
            color: white;
            background-color: #007bff;
            border-radius: 0.5rem;
            margin-right: 10px;
            box-shadow: 0 2px 4px rgba(0, 0, 0, 0.2);
        }
        .probability {
            display: block;
            margin-top: 10px;
            font-size: 16px;
            color: #007bff;
        }
    </style>
    """
    html_content = ""
    ranking_labels = ['Top 1 Prediction', 'Top 2 Prediction', 'Top 3 Prediction']
    top_n = min(len(top_qualities), len(ranking_labels))
    for i in range(top_n):
        quality, prob = top_qualities[i]
        html_content += f"""
        <div class="quality-container">
            <span class="ranking">{ranking_labels[i]}</span>
            <span class="quality-label">{quality}</span>
        </div>
        """
    return styles + html_content

example_texts = [
    ["The algorithm does not accurately distinguish between the positive and negative classes during edge cases.\n\nEnvironment: Production\nReproduction: Run the classifier on the test dataset with known edge cases."],
    ["The regression tests do not cover scenarios involving concurrent user sessions.\n\nEnvironment: Test automation suite\nReproduction: Update the test scripts to include tests for concurrent sessions."],
    ["There is frequent miscommunication between the development and QA teams regarding feature specifications.\n\nEnvironment: Inter-team meetings\nReproduction: Audit recent communication logs and meeting notes between the teams."],
    ["The service-oriented architecture does not effectively isolate failures, leading to cascading failures across services.\n\nEnvironment: Microservices architecture\nReproduction: Simulate a service failure and observe the impact on other services."]
]

interface = gr.Interface(
    fn=main_interface,
    inputs=gr.Textbox(lines=7, label="Issue Description", placeholder="Enter your issue text here"),
    outputs=[
        gr.HTML(label="Prediction Output"),
        gr.Textbox(label="Predictions", visible=False),
        gr.Textbox(label="Explanation", lines=5)  # Added Textbox for explanation
    ],
    title="QualityTagger",
    description="This tool classifies text into different quality domains such as Security, Usability, etc., and provides explanations.",
    examples=example_texts
)
interface.launch(share=True)