|
from openai import OpenAI |
|
import json |
|
import time |
|
from datetime import datetime |
|
from utils import log_step, SYSTEM_PROMPT, logger |
|
|
|
class LSP: |
|
def __init__(self, api_key: str): |
|
if not api_key or api_key.strip() == "": |
|
raise ValueError("API key cannot be empty") |
|
self.client = OpenAI(api_key=api_key) |
|
|
|
def get_section_map(self, content: str) -> tuple[dict, float]: |
|
"""Get map of sections without loading full content""" |
|
start_time = time.time() |
|
log_step("SECTION_MAP", "Starting section map generation") |
|
|
|
sections = {} |
|
in_code_block = False |
|
current_section = None |
|
section_start = 0 |
|
|
|
lines = content.split('\n') |
|
for i, line in enumerate(lines): |
|
if line.strip().startswith('```'): |
|
in_code_block = not in_code_block |
|
continue |
|
|
|
if not in_code_block and line.strip().startswith('#'): |
|
if current_section: |
|
sections[current_section] = (section_start, i-1) |
|
current_section = line.strip() |
|
section_start = i |
|
|
|
if current_section: |
|
sections[current_section] = (section_start, len(lines)-1) |
|
|
|
elapsed = time.time() - start_time |
|
log_step("SECTION_MAP", f"Section map generated in {elapsed:.2f}s", sections) |
|
return sections, elapsed |
|
|
|
def extract_section(self, content: str, start: int, end: int) -> tuple[str, float]: |
|
"""Get just the content of target section""" |
|
start_time = time.time() |
|
log_step("EXTRACT", f"Extracting section from line {start} to {end}") |
|
|
|
lines = content.split('\n') |
|
section = '\n'.join(lines[start:end+1]) |
|
|
|
elapsed = time.time() - start_time |
|
log_step("EXTRACT", f"Section extracted in {elapsed:.2f}s", {"length": len(section)}) |
|
return section, elapsed |
|
|
|
def replace_section(self, content: str, start: int, end: int, new_content: str) -> tuple[str, float]: |
|
"""Replace section and return full content""" |
|
start_time = time.time() |
|
log_step("REPLACE", f"Replacing section from line {start} to {end}") |
|
|
|
lines = content.split('\n') |
|
lines[start:end+1] = new_content.split('\n') |
|
result = '\n'.join(lines) |
|
|
|
elapsed = time.time() - start_time |
|
log_step("REPLACE", f"Section replaced in {elapsed:.2f}s", {"new_length": len(result)}) |
|
return result, elapsed |
|
|
|
def tools(self): |
|
return [ |
|
{ |
|
"type": "function", |
|
"function": { |
|
"name": "identify_section", |
|
"description": "Identify which section needs to be modified based on headers map", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"section_name": { |
|
"type": "string", |
|
"description": "Name of section to edit" |
|
}, |
|
"reason": { |
|
"type": "string", |
|
"description": "Why this section was chosen" |
|
} |
|
}, |
|
"required": ["section_name", "reason"] |
|
} |
|
} |
|
}, |
|
{ |
|
"type": "function", |
|
"function": { |
|
"name": "modify_section", |
|
"description": "Make changes to the section content", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"modified_content": { |
|
"type": "string", |
|
"description": "New content for the section" |
|
}, |
|
"changes_made": { |
|
"type": "string", |
|
"description": "Description of changes" |
|
} |
|
}, |
|
"required": ["modified_content", "changes_made"] |
|
} |
|
} |
|
} |
|
] |
|
|
|
async def edit_smart(self, content: str, instruction: str) -> tuple[str, list[str], float]: |
|
"""Smart editing with section targeting""" |
|
total_start = time.time() |
|
timings = {} |
|
traces = [] |
|
|
|
try: |
|
|
|
section_map, map_time = self.get_section_map(content) |
|
timings['section_map'] = map_time |
|
traces.append(f"[{map_time:.2f}s] Found sections: {json.dumps(section_map, indent=2)}") |
|
|
|
|
|
messages = [ |
|
{"role": "system", "content": SYSTEM_PROMPT}, |
|
{"role": "user", "content": f"Section map: {json.dumps(section_map)}\nInstruction: {instruction}"} |
|
] |
|
|
|
identify_start = time.time() |
|
response = self.client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=messages, |
|
tools=self.tools() |
|
) |
|
identify_time = time.time() - identify_start |
|
timings['identify_section'] = identify_time |
|
|
|
tool_call = response.choices[0].message.tool_calls[0] |
|
args = json.loads(tool_call.function.arguments) |
|
target_section = args["section_name"] |
|
start, end = section_map[target_section] |
|
|
|
traces.append(f"[{identify_time:.2f}s] Selected section: {target_section} (lines {start}-{end})") |
|
|
|
|
|
section_content, extract_time = self.extract_section(content, start, end) |
|
timings['extract_section'] = extract_time |
|
traces.append(f"[{extract_time:.2f}s] Extracted content:\n{section_content}") |
|
|
|
|
|
modify_start = time.time() |
|
messages.extend([ |
|
response.choices[0].message, |
|
{ |
|
"role": "tool", |
|
"content": json.dumps({"success": True, "section": target_section}), |
|
"tool_call_id": tool_call.id |
|
}, |
|
{ |
|
"role": "user", |
|
"content": f"Here's the section to modify:\n{section_content}" |
|
} |
|
]) |
|
|
|
response = self.client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=messages, |
|
tools=self.tools() |
|
) |
|
modify_time = time.time() - modify_start |
|
timings['modify_section'] = modify_time |
|
|
|
tool_call = response.choices[0].message.tool_calls[0] |
|
args = json.loads(tool_call.function.arguments) |
|
traces.append(f"[{modify_time:.2f}s] Modified content:\n{args['modified_content']}") |
|
|
|
|
|
result, replace_time = self.replace_section(content, start, end, args["modified_content"]) |
|
timings['replace_section'] = replace_time |
|
|
|
total_time = time.time() - total_start |
|
timings['total'] = total_time |
|
|
|
|
|
timing_summary = "\nTiming Summary:\n" + "\n".join([ |
|
f"- {step}: {time:.2f}s" for step, time in timings.items() |
|
]) |
|
traces.append(timing_summary) |
|
|
|
return result, traces, total_time |
|
|
|
except Exception as e: |
|
total_time = time.time() - total_start |
|
error_msg = str(e) |
|
traces.append(f"Error after {total_time:.2f}s: {error_msg}") |
|
return error_msg, traces, total_time |
|
|
|
async def edit_naive(self, content: str, instruction: str) -> tuple[str, list[str], float]: |
|
"""Naive approach - give everything to AI""" |
|
total_start = time.time() |
|
traces = [f"[0.00s] Starting naive edit (sending entire document)"] |
|
|
|
try: |
|
messages = [ |
|
{"role": "system", "content": "You are a document editor. Edit the provided document according to instructions."}, |
|
{"role": "user", "content": f"Edit this document according to this instruction: {instruction}\n\nDocument:\n{content}"} |
|
] |
|
|
|
api_start = time.time() |
|
response = self.client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=messages |
|
) |
|
api_time = time.time() - api_start |
|
|
|
result = response.choices[0].message.content |
|
total_time = time.time() - total_start |
|
|
|
traces.extend([ |
|
f"[{api_time:.2f}s] OpenAI API call completed", |
|
f"[{total_time:.2f}s] Total processing completed" |
|
]) |
|
|
|
return result, traces, total_time |
|
|
|
except Exception as e: |
|
total_time = time.time() - total_start |
|
error_msg = str(e) |
|
traces.append(f"Error after {total_time:.2f}s: {error_msg}") |
|
return error_msg, traces, total_time |
|
|