lsp / models.py
airabbitX's picture
Upload models.py
ec32ae2 verified
from openai import OpenAI
import json
import time
from datetime import datetime
from utils import log_step, SYSTEM_PROMPT, logger
class LSP:
def __init__(self, api_key: str):
if not api_key or api_key.strip() == "":
raise ValueError("API key cannot be empty")
self.client = OpenAI(api_key=api_key)
def get_section_map(self, content: str) -> tuple[dict, float]:
"""Get map of sections without loading full content"""
start_time = time.time()
log_step("SECTION_MAP", "Starting section map generation")
sections = {}
in_code_block = False
current_section = None
section_start = 0
lines = content.split('\n')
for i, line in enumerate(lines):
if line.strip().startswith('```'):
in_code_block = not in_code_block
continue
if not in_code_block and line.strip().startswith('#'):
if current_section:
sections[current_section] = (section_start, i-1)
current_section = line.strip()
section_start = i
if current_section:
sections[current_section] = (section_start, len(lines)-1)
elapsed = time.time() - start_time
log_step("SECTION_MAP", f"Section map generated in {elapsed:.2f}s", sections)
return sections, elapsed
def extract_section(self, content: str, start: int, end: int) -> tuple[str, float]:
"""Get just the content of target section"""
start_time = time.time()
log_step("EXTRACT", f"Extracting section from line {start} to {end}")
lines = content.split('\n')
section = '\n'.join(lines[start:end+1])
elapsed = time.time() - start_time
log_step("EXTRACT", f"Section extracted in {elapsed:.2f}s", {"length": len(section)})
return section, elapsed
def replace_section(self, content: str, start: int, end: int, new_content: str) -> tuple[str, float]:
"""Replace section and return full content"""
start_time = time.time()
log_step("REPLACE", f"Replacing section from line {start} to {end}")
lines = content.split('\n')
lines[start:end+1] = new_content.split('\n')
result = '\n'.join(lines)
elapsed = time.time() - start_time
log_step("REPLACE", f"Section replaced in {elapsed:.2f}s", {"new_length": len(result)})
return result, elapsed
def tools(self):
return [
{
"type": "function",
"function": {
"name": "identify_section",
"description": "Identify which section needs to be modified based on headers map",
"parameters": {
"type": "object",
"properties": {
"section_name": {
"type": "string",
"description": "Name of section to edit"
},
"reason": {
"type": "string",
"description": "Why this section was chosen"
}
},
"required": ["section_name", "reason"]
}
}
},
{
"type": "function",
"function": {
"name": "modify_section",
"description": "Make changes to the section content",
"parameters": {
"type": "object",
"properties": {
"modified_content": {
"type": "string",
"description": "New content for the section"
},
"changes_made": {
"type": "string",
"description": "Description of changes"
}
},
"required": ["modified_content", "changes_made"]
}
}
}
]
async def edit_smart(self, content: str, instruction: str) -> tuple[str, list[str], float]:
"""Smart editing with section targeting"""
total_start = time.time()
timings = {}
traces = []
try:
# Phase 1: Get section map
section_map, map_time = self.get_section_map(content)
timings['section_map'] = map_time
traces.append(f"[{map_time:.2f}s] Found sections: {json.dumps(section_map, indent=2)}")
# Phase 2: Identify section
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Section map: {json.dumps(section_map)}\nInstruction: {instruction}"}
]
identify_start = time.time()
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
tools=self.tools()
)
identify_time = time.time() - identify_start
timings['identify_section'] = identify_time
tool_call = response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
target_section = args["section_name"]
start, end = section_map[target_section]
traces.append(f"[{identify_time:.2f}s] Selected section: {target_section} (lines {start}-{end})")
# Phase 3: Extract section
section_content, extract_time = self.extract_section(content, start, end)
timings['extract_section'] = extract_time
traces.append(f"[{extract_time:.2f}s] Extracted content:\n{section_content}")
# Phase 4: Modify section
modify_start = time.time()
messages.extend([
response.choices[0].message,
{
"role": "tool",
"content": json.dumps({"success": True, "section": target_section}),
"tool_call_id": tool_call.id
},
{
"role": "user",
"content": f"Here's the section to modify:\n{section_content}"
}
])
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
tools=self.tools()
)
modify_time = time.time() - modify_start
timings['modify_section'] = modify_time
tool_call = response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
traces.append(f"[{modify_time:.2f}s] Modified content:\n{args['modified_content']}")
# Phase 5: Replace section
result, replace_time = self.replace_section(content, start, end, args["modified_content"])
timings['replace_section'] = replace_time
total_time = time.time() - total_start
timings['total'] = total_time
# Add timing summary
timing_summary = "\nTiming Summary:\n" + "\n".join([
f"- {step}: {time:.2f}s" for step, time in timings.items()
])
traces.append(timing_summary)
return result, traces, total_time
except Exception as e:
total_time = time.time() - total_start
error_msg = str(e)
traces.append(f"Error after {total_time:.2f}s: {error_msg}")
return error_msg, traces, total_time
async def edit_naive(self, content: str, instruction: str) -> tuple[str, list[str], float]:
"""Naive approach - give everything to AI"""
total_start = time.time()
traces = [f"[0.00s] Starting naive edit (sending entire document)"]
try:
messages = [
{"role": "system", "content": "You are a document editor. Edit the provided document according to instructions."},
{"role": "user", "content": f"Edit this document according to this instruction: {instruction}\n\nDocument:\n{content}"}
]
api_start = time.time()
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
api_time = time.time() - api_start
result = response.choices[0].message.content
total_time = time.time() - total_start
traces.extend([
f"[{api_time:.2f}s] OpenAI API call completed",
f"[{total_time:.2f}s] Total processing completed"
])
return result, traces, total_time
except Exception as e:
total_time = time.time() - total_start
error_msg = str(e)
traces.append(f"Error after {total_time:.2f}s: {error_msg}")
return error_msg, traces, total_time