File size: 9,421 Bytes
ec32ae2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
from openai import OpenAI
import json
import time
from datetime import datetime
from utils import log_step, SYSTEM_PROMPT, logger
class LSP:
def __init__(self, api_key: str):
if not api_key or api_key.strip() == "":
raise ValueError("API key cannot be empty")
self.client = OpenAI(api_key=api_key)
def get_section_map(self, content: str) -> tuple[dict, float]:
"""Get map of sections without loading full content"""
start_time = time.time()
log_step("SECTION_MAP", "Starting section map generation")
sections = {}
in_code_block = False
current_section = None
section_start = 0
lines = content.split('\n')
for i, line in enumerate(lines):
if line.strip().startswith('```'):
in_code_block = not in_code_block
continue
if not in_code_block and line.strip().startswith('#'):
if current_section:
sections[current_section] = (section_start, i-1)
current_section = line.strip()
section_start = i
if current_section:
sections[current_section] = (section_start, len(lines)-1)
elapsed = time.time() - start_time
log_step("SECTION_MAP", f"Section map generated in {elapsed:.2f}s", sections)
return sections, elapsed
def extract_section(self, content: str, start: int, end: int) -> tuple[str, float]:
"""Get just the content of target section"""
start_time = time.time()
log_step("EXTRACT", f"Extracting section from line {start} to {end}")
lines = content.split('\n')
section = '\n'.join(lines[start:end+1])
elapsed = time.time() - start_time
log_step("EXTRACT", f"Section extracted in {elapsed:.2f}s", {"length": len(section)})
return section, elapsed
def replace_section(self, content: str, start: int, end: int, new_content: str) -> tuple[str, float]:
"""Replace section and return full content"""
start_time = time.time()
log_step("REPLACE", f"Replacing section from line {start} to {end}")
lines = content.split('\n')
lines[start:end+1] = new_content.split('\n')
result = '\n'.join(lines)
elapsed = time.time() - start_time
log_step("REPLACE", f"Section replaced in {elapsed:.2f}s", {"new_length": len(result)})
return result, elapsed
def tools(self):
return [
{
"type": "function",
"function": {
"name": "identify_section",
"description": "Identify which section needs to be modified based on headers map",
"parameters": {
"type": "object",
"properties": {
"section_name": {
"type": "string",
"description": "Name of section to edit"
},
"reason": {
"type": "string",
"description": "Why this section was chosen"
}
},
"required": ["section_name", "reason"]
}
}
},
{
"type": "function",
"function": {
"name": "modify_section",
"description": "Make changes to the section content",
"parameters": {
"type": "object",
"properties": {
"modified_content": {
"type": "string",
"description": "New content for the section"
},
"changes_made": {
"type": "string",
"description": "Description of changes"
}
},
"required": ["modified_content", "changes_made"]
}
}
}
]
async def edit_smart(self, content: str, instruction: str) -> tuple[str, list[str], float]:
"""Smart editing with section targeting"""
total_start = time.time()
timings = {}
traces = []
try:
# Phase 1: Get section map
section_map, map_time = self.get_section_map(content)
timings['section_map'] = map_time
traces.append(f"[{map_time:.2f}s] Found sections: {json.dumps(section_map, indent=2)}")
# Phase 2: Identify section
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Section map: {json.dumps(section_map)}\nInstruction: {instruction}"}
]
identify_start = time.time()
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
tools=self.tools()
)
identify_time = time.time() - identify_start
timings['identify_section'] = identify_time
tool_call = response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
target_section = args["section_name"]
start, end = section_map[target_section]
traces.append(f"[{identify_time:.2f}s] Selected section: {target_section} (lines {start}-{end})")
# Phase 3: Extract section
section_content, extract_time = self.extract_section(content, start, end)
timings['extract_section'] = extract_time
traces.append(f"[{extract_time:.2f}s] Extracted content:\n{section_content}")
# Phase 4: Modify section
modify_start = time.time()
messages.extend([
response.choices[0].message,
{
"role": "tool",
"content": json.dumps({"success": True, "section": target_section}),
"tool_call_id": tool_call.id
},
{
"role": "user",
"content": f"Here's the section to modify:\n{section_content}"
}
])
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
tools=self.tools()
)
modify_time = time.time() - modify_start
timings['modify_section'] = modify_time
tool_call = response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
traces.append(f"[{modify_time:.2f}s] Modified content:\n{args['modified_content']}")
# Phase 5: Replace section
result, replace_time = self.replace_section(content, start, end, args["modified_content"])
timings['replace_section'] = replace_time
total_time = time.time() - total_start
timings['total'] = total_time
# Add timing summary
timing_summary = "\nTiming Summary:\n" + "\n".join([
f"- {step}: {time:.2f}s" for step, time in timings.items()
])
traces.append(timing_summary)
return result, traces, total_time
except Exception as e:
total_time = time.time() - total_start
error_msg = str(e)
traces.append(f"Error after {total_time:.2f}s: {error_msg}")
return error_msg, traces, total_time
async def edit_naive(self, content: str, instruction: str) -> tuple[str, list[str], float]:
"""Naive approach - give everything to AI"""
total_start = time.time()
traces = [f"[0.00s] Starting naive edit (sending entire document)"]
try:
messages = [
{"role": "system", "content": "You are a document editor. Edit the provided document according to instructions."},
{"role": "user", "content": f"Edit this document according to this instruction: {instruction}\n\nDocument:\n{content}"}
]
api_start = time.time()
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
api_time = time.time() - api_start
result = response.choices[0].message.content
total_time = time.time() - total_start
traces.extend([
f"[{api_time:.2f}s] OpenAI API call completed",
f"[{total_time:.2f}s] Total processing completed"
])
return result, traces, total_time
except Exception as e:
total_time = time.time() - total_start
error_msg = str(e)
traces.append(f"Error after {total_time:.2f}s: {error_msg}")
return error_msg, traces, total_time
|