# File: prompts.py DOCUMENT_OUTLINE_PROMPT_SYSTEM = """You are a document generator. Provide the outline of the document requested in in JSON format. Include sections and subsections if required. Use the "Content" field to provide a specific prompt or instruction for generating content for that particular section or subsection. OUTPUT IN FOLLOWING JSON FORMAT enclosed in tags { "Document": { "Title": "Document Title", "Author": "Author Name", "Date": "YYYY-MM-DD", "Version": "1.0", "Sections": [ { "SectionNumber": "1", "Title": "Section Title", "Content": "Specific prompt or instruction for generating content for this section", "Subsections": [ { "SectionNumber": "1.1", "Title": "Subsection Title", "Content": "Specific prompt or instruction for generating content for this subsection" } ] } ] } } """ DOCUMENT_OUTLINE_PROMPT_USER = """{query}""" DOCUMENT_SECTION_PROMPT_SYSTEM = """You are a document generator, You need to output only the content requested in the section in the prompt. FORMAT YOUR OUTPUT AS MARKDOWN ENCLOSED IN tags {overall_objective} {document_layout}""" DOCUMENT_SECTION_PROMPT_USER = """Output the content for the section "{section_or_subsection_title}" formatted as markdown. Follow this instruction: {content_instruction}""" # File: app.py import os import json import re import time import asyncio from typing import List, Dict, Optional, Any, Callable from openai import OpenAI import logging import functools from fastapi import APIRouter, HTTPException from pydantic import BaseModel from fastapi_cache.decorator import cache from starlette.responses import StreamingResponse logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def log_execution(func: Callable) -> Callable: @functools.wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> Any: logger.info(f"Executing {func.__name__}") try: result = await func(*args, **kwargs) logger.info(f"{func.__name__} completed successfully") return result except Exception as e: logger.error(f"Error in {func.__name__}: {e}") raise return wrapper class AIClient: def __init__(self): self.client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key="sk-or-v1-"+os.environ['OPENROUTER_API_KEY'] ) @log_execution async def generate_response( self, messages: List[Dict[str, str]], model: str = "openai/gpt-4o-mini", max_tokens: int = 32000 ) -> Optional[str]: if not messages: return None loop = asyncio.get_event_loop() response = await loop.run_in_executor(None, functools.partial( self.client.chat.completions.create, model=model, messages=messages, max_tokens=max_tokens, stream=False )) return response.choices[0].message.content class DocumentGenerator: def __init__(self, ai_client: AIClient): self.ai_client = ai_client self.document_outline = None self.content_messages = [] @staticmethod def extract_between_tags(text: str, tag: str) -> str: pattern = f"<{tag}>(.*?)" match = re.search(pattern, text, re.DOTALL) return match.group(1).strip() if match else "" @staticmethod def remove_duplicate_title(content: str, title: str, section_number: str) -> str: patterns = [ rf"^#+\s*{re.escape(section_number)}(?:\s+|\s*:\s*|\.\s*){re.escape(title)}", rf"^#+\s*{re.escape(title)}", rf"^{re.escape(section_number)}(?:\s+|\s*:\s*|\.\s*){re.escape(title)}", rf"^{re.escape(title)}", ] for pattern in patterns: content = re.sub(pattern, "", content, flags=re.MULTILINE | re.IGNORECASE) return content.lstrip() @log_execution async def generate_document_outline(self, query: str, max_retries: int = 3) -> Optional[Dict]: messages = [ {"role": "system", "content": DOCUMENT_OUTLINE_PROMPT_SYSTEM}, {"role": "user", "content": DOCUMENT_OUTLINE_PROMPT_USER.format(query=query)} ] for attempt in range(max_retries): outline_response = await self.ai_client.generate_response(messages, model="openai/gpt-4o") outline_json_text = self.extract_between_tags(outline_response, "output") try: self.document_outline = json.loads(outline_json_text) return self.document_outline except json.JSONDecodeError as e: if attempt < max_retries - 1: logger.warning(f"Failed to parse JSON (attempt {attempt + 1}): {e}") logger.info("Retrying...") else: logger.error(f"Failed to parse JSON after {max_retries} attempts: {e}") return None @log_execution async def generate_content(self, title: str, content_instruction: str, section_number: str) -> str: self.content_messages.append({ "role": "user", "content": DOCUMENT_SECTION_PROMPT_USER.format( section_or_subsection_title=title, content_instruction=content_instruction ) }) section_response = await self.ai_client.generate_response(self.content_messages) content = self.extract_between_tags(section_response, "response") content = self.remove_duplicate_title(content, title, section_number) self.content_messages.append({ "role": "assistant", "content": section_response }) return content @log_execution async def generate_full_document(self, document_outline: Dict, query: str): self.document_outline = document_outline overall_objective = query document_layout = json.dumps(self.document_outline, indent=2) self.content_messages = [ { "role": "system", "content": DOCUMENT_SECTION_PROMPT_SYSTEM.format( overall_objective=overall_objective, document_layout=document_layout ) } ] for section in self.document_outline["Document"].get("Sections", []): section_title = section.get("Title", "") section_number = section.get("SectionNumber", "") content_instruction = section.get("Content", "") logger.info(f"Generating content for section: {section_title}") section["Content"] = await self.generate_content(section_title, content_instruction, section_number) yield json.dumps({"type": "document_section", "content": section}) + "\n" for subsection in section.get("Subsections", []): subsection_title = subsection.get("Title", "") subsection_number = subsection.get("SectionNumber", "") subsection_content_instruction = subsection.get("Content", "") logger.info(f"Generating content for subsection: {subsection_title}") subsection["Content"] = await self.generate_content(subsection_title, subsection_content_instruction, subsection_number) yield json.dumps({"type": "document_subsection", "content": subsection}) + "\n" # Generate the complete markdown document full_document = self.document_outline markdown_document = MarkdownConverter.convert_to_markdown(full_document["Document"]) yield json.dumps({"type": "complete_document", "content": markdown_document}) + "\n" class MarkdownConverter: @staticmethod def slugify(text: str) -> str: return re.sub(r'\W+', '-', text.lower()) @classmethod def generate_toc(cls, sections: List[Dict]) -> str: toc = "
\n\n" toc += "

Table of Contents

\n\n" toc += "\n\n" return toc @classmethod def convert_to_markdown(cls, document: Dict) -> str: # First page with centered content markdown = "
\n\n" markdown += f"

{document['Title']}

\n\n" markdown += f"

By {document['Author']}

\n\n" markdown += f"

Version {document['Version']} | {document['Date']}

\n\n" markdown += "
\n\n" # Table of Contents on the second page markdown += cls.generate_toc(document['Sections']) # Main content markdown += "
\n\n" for section in document['Sections']: markdown += "
\n\n" section_number = section['SectionNumber'] section_title = section['Title'] markdown += f"

{section_number}. {section_title}

\n\n" markdown += f"
\n\n{section['Content']}\n\n
\n\n" for subsection in section.get('Subsections', []): subsection_number = subsection['SectionNumber'] subsection_title = subsection['Title'] markdown += f"

{subsection_number} {subsection_title}

\n\n" markdown += f"
\n\n{subsection['Content']}\n\n
\n\n" markdown += "
" return markdown router = APIRouter() class DocumentRequest(BaseModel): query: str class JsonDocumentResponse(BaseModel): json_document: Dict class MarkdownDocumentRequest(BaseModel): json_document: Dict query: str @cache(expire=600*24*7) @router.post("/generate-document/json", response_model=JsonDocumentResponse) async def generate_document_outline_endpoint(request: DocumentRequest): ai_client = AIClient() document_generator = DocumentGenerator(ai_client) try: # Generate the document outline json_document = await document_generator.generate_document_outline(request.query) if json_document is None: raise HTTPException(status_code=500, detail="Failed to generate a valid document outline") return JsonDocumentResponse(json_document=json_document) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/generate-document/markdown") async def generate_markdown_document_endpoint(request: MarkdownDocumentRequest): ai_client = AIClient() document_generator = DocumentGenerator(ai_client) async def event_stream(): try: # Generate the full document content and stream it async for section in document_generator.generate_full_document(request.json_document, request.query): yield section except Exception as e: yield json.dumps({"type": "error", "message": str(e)}) + "\n" return StreamingResponse(event_stream(), media_type="application/json") @router.post("/generate-document-test", response_model=MarkdownDocumentResponse) async def test_generate_document_endpoint(request: DocumentRequest): try: # Load JSON document from file json_path = os.path.join("output/document_generator", "ai-chatbot-prd.json") with open(json_path, "r") as json_file: json_document = json.load(json_file) # Load Markdown document from file md_path = os.path.join("output/document_generator", "ai-chatbot-prd.md") with open(md_path, "r") as md_file: markdown_document = md_file.read() return MarkdownDocumentResponse(markdown_document=markdown_document) except FileNotFoundError: raise HTTPException(status_code=404, detail="Test files not found") except json.JSONDecodeError: raise HTTPException(status_code=500, detail="Error parsing JSON file") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) class CacheTestResponse(BaseModel): result: str execution_time: float @router.get("/test-cache/{test_id}", response_model=CacheTestResponse) @cache(expire=60) # Cache for 1 minute async def test_cache(test_id: int): start_time = time.time() # Simulate some time-consuming operation await asyncio.sleep(2) result = f"Test result for ID: {test_id}" end_time = time.time() execution_time = end_time - start_time return CacheTestResponse( result=result, execution_time=execution_time )