Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
# coding: utf-8 | |
# In[2]: | |
#pip install evernote-sdk-python3 | |
# import evernote.edam.notestore.NoteStore as NoteStore | |
# import evernote.edam.type.ttypes as Types | |
# from evernote.api.client import EvernoteClient | |
# In[3]: | |
import os | |
import yaml | |
import pandas as pd | |
import numpy as np | |
from datetime import datetime, timedelta | |
# perspective generation | |
import openai | |
import os | |
from openai import OpenAI | |
import gradio as gr | |
import json | |
import sqlite3 | |
import uuid | |
import socket | |
import difflib | |
import time | |
import shutil | |
import requests | |
import re | |
import json | |
import markdown | |
from fpdf import FPDF | |
import hashlib | |
from transformers import pipeline | |
from transformers.pipelines.audio_utils import ffmpeg_read | |
from todoist_api_python.api import TodoistAPI | |
# from flask import Flask, request, jsonify | |
from twilio.rest import Client | |
import asyncio | |
import uvicorn | |
import fastapi | |
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse | |
from fastapi.staticfiles import StaticFiles | |
from pathlib import Path | |
import nest_asyncio | |
from twilio.twiml.messaging_response import MessagingResponse | |
from requests.auth import HTTPBasicAuth | |
from google.cloud import storage, exceptions # Import exceptions for error handling | |
from google.cloud.exceptions import NotFound | |
from google.oauth2 import service_account | |
from reportlab.pdfgen import canvas | |
from reportlab.lib.pagesizes import letter | |
from reportlab.pdfbase import pdfmetrics | |
from reportlab.lib import colors | |
from reportlab.pdfbase.ttfonts import TTFont | |
import logging | |
# Configure logging | |
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# In[4]: | |
# Access the API keys and other configuration data | |
openai_api_key = os.environ["OPENAI_API_KEY"] | |
# Access the API keys and other configuration data | |
todoist_api_key = os.environ["TODOIST_API_KEY"] | |
EVERNOTE_API_TOKEN = os.environ["EVERNOTE_API_TOKEN"] | |
account_sid = os.environ["TWILLO_ACCOUNT_SID"] | |
auth_token = os.environ["TWILLO_AUTH_TOKEN"] | |
twilio_phone_number = os.environ["TWILLO_PHONE_NUMBER"] | |
google_credentials_json = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] | |
twillo_client = Client(account_sid, auth_token) | |
# Set the GOOGLE_APPLICATION_CREDENTIALS environment variable | |
# Load Reasoning Graph JSON File | |
def load_reasoning_json(filepath): | |
"""Load JSON file and return the dictionary.""" | |
with open(filepath, "r") as file: | |
data = json.load(file) | |
return data | |
# Load Action Map | |
def load_action_map(filepath): | |
"""Load action map JSON file and map strings to actual function objects.""" | |
with open(filepath, "r") as file: | |
action_map_raw = json.load(file) | |
# Map string names to actual functions using globals() | |
return {action: globals()[func_name] for action, func_name in action_map_raw.items()} | |
# In[5]: | |
# Define all actions as functions | |
def find_reference(task_topic): | |
"""Finds a reference related to the task topic.""" | |
print(f"Finding reference for topic: {task_topic}") | |
return f"Reference found for topic: {task_topic}" | |
def generate_summary(reference): | |
"""Generates a summary of the reference.""" | |
print(f"Generating summary for reference: {reference}") | |
return f"Summary of {reference}" | |
def suggest_relevance(summary): | |
"""Suggests how the summary relates to the project.""" | |
print(f"Suggesting relevance of summary: {summary}") | |
return f"Relevance of {summary} suggested" | |
def tool_research(task_topic): | |
"""Performs tool research and returns analysis.""" | |
print("Performing tool research") | |
return "Tool analysis data" | |
def generate_comparison_table(tool_analysis): | |
"""Generates a comparison table for a competitive tool.""" | |
print(f"Generating comparison table for analysis: {tool_analysis}") | |
return f"Comparison table for {tool_analysis}" | |
def generate_integration_memo(tool_analysis): | |
"""Generates an integration memo for a tool.""" | |
print(f"Generating integration memo for analysis: {tool_analysis}") | |
return f"Integration memo for {tool_analysis}" | |
def analyze_issue(task_topic): | |
"""Analyzes an issue and returns the analysis.""" | |
print("Analyzing issue") | |
return "Issue analysis data" | |
def generate_issue_memo(issue_analysis): | |
"""Generates an issue memo based on the analysis.""" | |
print(f"Generating issue memo for analysis: {issue_analysis}") | |
return f"Issue memo for {issue_analysis}" | |
def list_ideas(task_topic): | |
"""Lists potential ideas for brainstorming.""" | |
print("Listing ideas") | |
return ["Idea 1", "Idea 2", "Idea 3"] | |
def construct_matrix(ideas): | |
"""Constructs a matrix (e.g., feasibility or impact/effort) for the ideas.""" | |
print(f"Constructing matrix for ideas: {ideas}") | |
return {"Idea 1": "High Impact/Low Effort", "Idea 2": "Low Impact/High Effort", "Idea 3": "High Impact/High Effort"} | |
def prioritize_ideas(matrix): | |
"""Prioritizes ideas based on the matrix.""" | |
print(f"Prioritizing ideas based on matrix: {matrix}") | |
return ["Idea 3", "Idea 1", "Idea 2"] | |
def setup_action_plan(prioritized_ideas): | |
"""Sets up an action plan based on the prioritized ideas.""" | |
print(f"Setting up action plan for ideas: {prioritized_ideas}") | |
return f"Action plan created for {prioritized_ideas}" | |
def unsupported_task(task_topic): | |
"""Handles unsupported tasks.""" | |
print("Task not supported") | |
return "Unsupported task" | |
# In[6]: | |
todoist_api = TodoistAPI(todoist_api_key) | |
# Fetch recent Todoist task | |
def fetch_todoist_task(): | |
try: | |
tasks = todoist_api.get_tasks() | |
if tasks: | |
recent_task = tasks[0] # Fetch the most recent task | |
return f"Recent Task: {recent_task.content}" | |
return "No tasks found in Todoist." | |
except Exception as e: | |
return f"Error fetching tasks: {str(e)}" | |
def add_to_todoist(task_topic, todoist_priority = 3): | |
try: | |
# Create a task in Todoist using the Todoist API | |
# Assuming you have a function `todoist_api.add_task()` that handles the API request | |
todoist_api.add_task( | |
content=task_topic, | |
priority=todoist_priority | |
) | |
msg = f"Task added: {task_topic} with priority {todoist_priority}" | |
logger.debug(msg) | |
return msg | |
except Exception as e: | |
# Return an error message if something goes wrong | |
return f"An error occurred: {e}" | |
# def save_todo(reasoning_steps): | |
# """ | |
# Save reasoning steps to Todoist as tasks. | |
# Args: | |
# reasoning_steps (list of dict): A list of steps with "step" and "priority" keys. | |
# """ | |
# try: | |
# # Validate that reasoning_steps is a list | |
# if not isinstance(reasoning_steps, list): | |
# raise ValueError("The input reasoning_steps must be a list.") | |
# # Iterate over the reasoning steps | |
# for step in reasoning_steps: | |
# # Ensure each step is a dictionary and contains required keys | |
# if not isinstance(step, dict) or "step" not in step or "priority" not in step: | |
# logger.error(f"Invalid step data: {step}, skipping.") | |
# continue | |
# task_content = step["step"] | |
# priority_level = step["priority"] | |
# # Map priority to Todoist's priority levels (1 - low, 4 - high) | |
# priority_mapping = {"Low": 1, "Medium": 2, "High": 4} | |
# todoist_priority = priority_mapping.get(priority_level, 1) # Default to low if not found | |
# # Create a task in Todoist using the Todoist API | |
# # Assuming you have a function `todoist_api.add_task()` that handles the API request | |
# todoist_api.add_task( | |
# content=task_content, | |
# priority=todoist_priority | |
# ) | |
# logger.debug(f"Task added: {task_content} with priority {priority_level}") | |
# return "All tasks processed." | |
# except Exception as e: | |
# # Return an error message if something goes wrong | |
# return f"An error occurred: {e}" | |
# In[7]: | |
# evernote_client = EvernoteClient(token=EVERNOTE_API_TOKEN, sandbox=False) | |
# note_store = evernote_client.get_note_store() | |
# def add_to_evernote(task_topic, notebook_title="Inspirations"): | |
# """ | |
# Add a task topic to the 'Inspirations' notebook in Evernote. If the notebook doesn't exist, create it. | |
# Args: | |
# task_topic (str): The content of the task to be added. | |
# notebook_title (str): The title of the Evernote notebook. Default is 'Inspirations'. | |
# """ | |
# try: | |
# # Check if the notebook exists | |
# notebooks = note_store.listNotebooks() | |
# notebook = next((nb for nb in notebooks if nb.name == notebook_title), None) | |
# # If the notebook doesn't exist, create it | |
# if not notebook: | |
# notebook = Types.Notebook() | |
# notebook.name = notebook_title | |
# notebook = note_store.createNotebook(notebook) | |
# # Search for an existing note with the same title | |
# filter = NoteStore.NoteFilter() | |
# filter.notebookGuid = notebook.guid | |
# filter.words = notebook_title | |
# notes_metadata_result = note_store.findNotesMetadata(filter, 0, 1, NoteStore.NotesMetadataResultSpec(includeTitle=True)) | |
# # If a note with the title exists, append to it; otherwise, create a new note | |
# if notes_metadata_result.notes: | |
# note_guid = notes_metadata_result.notes[0].guid | |
# existing_note = note_store.getNote(note_guid, True, False, False, False) | |
# existing_note.content = existing_note.content.replace("</en-note>", f"<div>{task_topic}</div></en-note>") | |
# note_store.updateNote(existing_note) | |
# else: | |
# # Create a new note | |
# note = Types.Note() | |
# note.title = notebook_title | |
# note.notebookGuid = notebook.guid | |
# note.content = f'<?xml version="1.0" encoding="UTF-8"?>' \ | |
# f'<!DOCTYPE en-note SYSTEM "http://xml.evernote.com/pub/enml2.dtd">' \ | |
# f'<en-note><div>{task_topic}</div></en-note>' | |
# note_store.createNote(note) | |
# print(f"Task '{task_topic}' successfully added to Evernote under '{notebook_title}'.") | |
# except Exception as e: | |
# print(f"Error adding task to Evernote: {e}") | |
# Mock Functions for Task Actions | |
def add_to_evernote(task_topic): | |
return f"Task added to Evernote with title '{task_topic}'." | |
# In[8]: | |
# Access the API keys and other configuration data | |
TASK_WORKFLOW_TREE = load_reasoning_json('curify_ideas_reasoning.json') | |
action_map = load_action_map('action_map.json') | |
# In[9]: | |
def generate_task_hash(task_description): | |
try: | |
# Ensure task_description is a string | |
if not isinstance(task_description, str): | |
logger.warning("task_description is not a string, attempting conversion.") | |
task_description = str(task_description) | |
# Safely encode with UTF-8 and ignore errors | |
encoded_description = task_description.encode("utf-8", errors="ignore") | |
task_hash = hashlib.md5(encoded_description).hexdigest() | |
logger.debug(f"Generated task hash: {task_hash}") | |
return task_hash | |
except Exception as e: | |
# Log any unexpected issues | |
logger.error(f"Error generating task hash: {e}", exc_info=True) | |
return 'output' | |
def save_to_google_storage(bucket_name, file_path, destination_blob_name, expiration_minutes = 1440): | |
credentials_dict = json.loads(google_credentials_json) | |
# Step 3: Use `service_account.Credentials.from_service_account_info` to authenticate directly with the JSON | |
credentials = service_account.Credentials.from_service_account_info(credentials_dict) | |
gcs_client = storage.Client(credentials=credentials, project=credentials.project_id) | |
# Check if the bucket exists; if not, create it | |
try: | |
bucket = gcs_client.get_bucket(bucket_name) | |
except NotFound: | |
print(f"❌ Bucket '{bucket_name}' not found. Please check the bucket name.") | |
bucket = gcs_client.create_bucket(bucket_name) | |
print(f"✅ Bucket '{bucket_name}' created.") | |
except Exception as e: | |
print(f"❌ An unexpected error occurred: {e}") | |
raise | |
# Get a reference to the blob | |
blob = bucket.blob(destination_blob_name) | |
# Upload the file | |
blob.upload_from_filename(file_path) | |
# Generate a signed URL for the file | |
signed_url = blob.generate_signed_url( | |
version="v4", | |
expiration=timedelta(minutes=expiration_minutes), | |
method="GET" | |
) | |
print(f"✅ File uploaded to Google Cloud Storage. Signed URL: {signed_url}") | |
return signed_url | |
# Function to check if content is Simplified Chinese | |
def is_simplified(text): | |
simplified_range = re.compile('[\u4e00-\u9fff]') # Han characters in general | |
simplified_characters = [char for char in text if simplified_range.match(char)] | |
return len(simplified_characters) > len(text) * 0.5 # Threshold of 50% to be considered simplified | |
# Function to choose the appropriate font for the content | |
def choose_font_for_content(content): | |
return 'NotoSansSC' if is_simplified(content) else 'NotoSansTC' | |
# Function to generate and save a document using ReportLab | |
def generate_document(task_description, md_content, user_name='jayw', bucket_name='curify'): | |
logger.debug("Starting to generate document") | |
# Hash the task description to generate a unique filename | |
task_hash = generate_task_hash(task_description) | |
# Truncate the hash if needed (64 characters is sufficient for uniqueness) | |
max_hash_length = 64 # Adjust if needed | |
truncated_hash = task_hash[:max_hash_length] | |
# Generate PDF file locally | |
local_filename = f"{truncated_hash}.pdf" # Use the truncated hash as the local file name | |
c = canvas.Canvas(local_filename, pagesize=letter) | |
# Paths to the TTF fonts for Simplified and Traditional Chinese | |
sc_font_path = 'NotoSansSC-Regular.ttf' # Path to Simplified Chinese font | |
tc_font_path = 'NotoSansTC-Regular.ttf' # Path to Traditional Chinese font | |
try: | |
# Register the Simplified Chinese font | |
sc_font = TTFont('NotoSansSC', sc_font_path) | |
pdfmetrics.registerFont(sc_font) | |
# Register the Traditional Chinese font | |
tc_font = TTFont('NotoSansTC', tc_font_path) | |
pdfmetrics.registerFont(tc_font) | |
# Set default font (Simplified Chinese or Traditional Chinese depending on content) | |
c.setFont('NotoSansSC', 12) | |
except Exception as e: | |
logger.error(f"Error loading font files: {e}") | |
raise RuntimeError("Failed to load one or more fonts. Ensure the font files are accessible.") | |
# Set initial Y position for drawing text | |
y_position = 750 # Starting position for text | |
# Process dictionary and render content | |
for key, value in md_content.items(): | |
# Choose the font based on the key (header) | |
c.setFont(choose_font_for_content(key), 14) | |
c.drawString(100, y_position, f"# {key}") | |
y_position -= 20 | |
# Choose the font for the value | |
c.setFont(choose_font_for_content(str(value)), 12) | |
# Add value | |
if isinstance(value, list): # Handle lists | |
for item in value: | |
c.drawString(100, y_position, f"- {item}") | |
y_position -= 15 | |
else: # Handle single strings | |
c.drawString(100, y_position, value) | |
y_position -= 15 | |
# Check if the page needs to be broken (if Y position is too low) | |
if y_position < 100: | |
c.showPage() # Create a new page | |
c.setFont('NotoSansSC', 12) # Reset font | |
y_position = 750 # Reset the Y position for the new page | |
# Save the PDF | |
c.save() | |
# Organize files into user-specific folders | |
destination_blob_name = f"{user_name}/{truncated_hash}.pdf" | |
# Upload to Google Cloud Storage and get the public URL | |
public_url = save_to_google_storage(bucket_name, local_filename, destination_blob_name) | |
logger.debug("Finished generating document") | |
return public_url | |
# In[10]: | |
def execute_with_retry(sql, params=(), attempts=5, delay=1, db_name = 'curify_ideas.db'): | |
for attempt in range(attempts): | |
try: | |
with sqlite3.connect(db_name) as conn: | |
cursor = conn.cursor() | |
cursor.execute(sql, params) | |
conn.commit() | |
break | |
except sqlite3.OperationalError as e: | |
if "database is locked" in str(e) and attempt < attempts - 1: | |
time.sleep(delay) | |
else: | |
raise e | |
# def enable_wal_mode(db_name = 'curify_ideas.db'): | |
# with sqlite3.connect(db_name) as conn: | |
# cursor = conn.cursor() | |
# cursor.execute("PRAGMA journal_mode=WAL;") | |
# conn.commit() | |
# # Create SQLite DB and table | |
# def create_db(db_name = 'curify_ideas.db'): | |
# with sqlite3.connect(db_name, timeout=30) as conn: | |
# c = conn.cursor() | |
# c.execute('''CREATE TABLE IF NOT EXISTS sessions ( | |
# session_id TEXT, | |
# ip_address TEXT, | |
# project_desc TEXT, | |
# idea_desc TEXT, | |
# idea_analysis TEXT, | |
# prioritization_steps TEXT, | |
# timestamp DATETIME, | |
# PRIMARY KEY (session_id, timestamp) | |
# ) | |
# ''') | |
# conn.commit() | |
# # Function to insert session data into the SQLite database | |
# def insert_session_data(session_id, ip_address, project_desc, idea_desc, idea_analysis, prioritization_steps, db_name = 'curify_ideas.db'): | |
# execute_with_retry(''' | |
# INSERT INTO sessions (session_id, ip_address, project_desc, idea_desc, idea_analysis, prioritization_steps, timestamp) | |
# VALUES (?, ?, ?, ?, ?, ?, ?) | |
# ''', (session_id, ip_address, project_desc, idea_desc, json.dumps(idea_analysis), json.dumps(prioritization_steps), datetime.now()), db_name) | |
# In[11]: | |
def convert_to_listed_json(input_string): | |
""" | |
Converts a string to a listed JSON object. | |
Parameters: | |
input_string (str): The JSON-like string to be converted. | |
Returns: | |
list: A JSON object parsed into a Python list of dictionaries. | |
""" | |
try: | |
# Parse the string into a Python object | |
trimmed_string = input_string[input_string.index('['):input_string.rindex(']') + 1] | |
json_object = json.loads(trimmed_string) | |
return json_object | |
except json.JSONDecodeError as e: | |
return None | |
return None | |
#raise ValueError(f"Invalid JSON format: {e}") | |
def validate_and_extract_json(json_string): | |
""" | |
Validates the JSON string, extracts fields with possible variants using fuzzy matching. | |
Args: | |
- json_string (str): The JSON string to validate and extract from. | |
- field_names (list): List of field names to extract, with possible variants. | |
Returns: | |
- dict: Extracted values with the best matched field names. | |
""" | |
# Try to parse the JSON string | |
trimmed_string = json_string[json_string.index('{'):json_string.rindex('}') + 1] | |
try: | |
parsed_json = json.loads(trimmed_string) | |
return parsed_json | |
except json.JSONDecodeError as e: | |
return None | |
# {"error": "Parsed JSON is not a dictionary."} | |
return None | |
def json_to_pandas(dat_json, dat_schema = {'name':"", 'description':""}): | |
dat_df = pd.DataFrame([dat_schema]) | |
try: | |
dat_df = pd.DataFrame(dat_json) | |
except Exception as e: | |
dat_df = pd.DataFrame([dat_schema]) | |
# ValueError(f"Failed to parse LLM output as JSON: {e}\nOutput: {res}") | |
return dat_df | |
# In[12]: | |
client = OpenAI( | |
api_key= os.environ.get("OPENAI_API_KEY"), # This is the default and can be omitted | |
) | |
# Function to call OpenAI API with compact error handling | |
def call_openai_api(prompt, model="gpt-4o", max_tokens=5000, retries=3, backoff_factor=2): | |
""" | |
Send a prompt to the OpenAI API and handle potential errors robustly. | |
Parameters: | |
prompt (str): The user input or task prompt to send to the model. | |
model (str): The OpenAI model to use (default is "gpt-4"). | |
max_tokens (int): The maximum number of tokens in the response. | |
retries (int): Number of retry attempts in case of transient errors. | |
backoff_factor (int): Backoff time multiplier for retries. | |
Returns: | |
str: The model's response content if successful. | |
""" | |
for attempt in range(1, retries + 1): | |
try: | |
response = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[{"role": "user", "content": prompt}], | |
max_tokens=5000, | |
) | |
return response.choices[0].message.content.strip() | |
except (openai.RateLimitError, openai.APIConnectionError) as e: | |
logging.warning(f"Transient error: {e}. Attempt {attempt} of {retries}. Retrying...") | |
except (openai.BadRequestError, openai.AuthenticationError) as e: | |
logging.error(f"Unrecoverable error: {e}. Check your inputs or API key.") | |
break | |
except Exception as e: | |
logging.error(f"Unexpected error: {e}. Attempt {attempt} of {retries}. Retrying...") | |
# Exponential backoff before retrying | |
if attempt < retries: | |
time.sleep(backoff_factor * attempt) | |
raise RuntimeError(f"Failed to fetch response from OpenAI API after {retries} attempts.") | |
def fn_analyze_task(project_context, task_description): | |
prompt = ( | |
f"You are working in the context of {project_context}. " | |
f"Your task is to analyze the task: {task_description} " | |
"Please analyze the following aspects: " | |
"1) Determine which project this item belongs to. If the idea does not belong to any existing project, categorize it under 'Other'. " | |
"2) Assess whether this idea can be treated as a concrete task. " | |
"3) Evaluate whether a document can be generated as an intermediate result. " | |
"4) Identify the appropriate category of the task. Possible categories are: 'Blogs/Papers', 'Tools', 'Brainstorming', 'Issues', and 'Others'. " | |
"5) Extract the topic of the task. " | |
"Please provide the output in JSON format using the structure below: " | |
"{" | |
" \"description\": \"\", " | |
" \"project_association\": \"\", " | |
" \"is_task\": \"Yes/No\", " | |
" \"is_document\": \"Yes/No\", " | |
" \"task_category\": \"\", " | |
" \"task_topic\": \"\" " | |
"}" | |
) | |
res_task_analysis = call_openai_api(prompt) | |
try: | |
json_task_analysis = validate_and_extract_json(res_task_analysis) | |
return json_task_analysis | |
except ValueError as e: | |
logger.debug("ValueError occurred: %s", str(e), exc_info=True) # Log the exception details | |
return None | |
# In[13]: | |
# Recursive Task Executor | |
def fn_process_task(project_desc_table, task_description, bucket_name='curify'): | |
project_context = project_desc_table.to_string(index=False) | |
task_analysis = fn_analyze_task(project_context, task_description) | |
if task_analysis: | |
execution_status = [] | |
execution_results = task_analysis.copy() | |
execution_results['deliverables'] = '' | |
def traverse(node, previous_output=None): | |
if not node: # If the node is None or invalid | |
return # Exit if the node is invalid | |
# Check if there is a condition to evaluate | |
if "check" in node: | |
# Safely attempt to retrieve the value from execution_results | |
if node["check"] in execution_results: | |
value = execution_results[node["check"]] # Evaluate the check condition | |
traverse(node.get(value, node.get("default")), previous_output) | |
else: | |
# Log an error and exit, but keep partial results | |
logger.error(f"Key '{node['check']}' not found in execution_results.") | |
return | |
# If the node contains an action | |
elif "action" in node: | |
action_name = node["action"] | |
input_key = node.get("input", 'task_topic') | |
if input_key in execution_results.keys(): | |
inputs = {input_key: execution_results[input_key]} | |
else: | |
# Log an error and exit, but keep partial results | |
logger.error(f"Workflow action {action_name} input key {input_key} not in execution_results.") | |
return | |
logger.debug(f"Executing: {action_name} with inputs: {inputs}") | |
# Execute the action function | |
action_func = action_map.get(action_name, unsupported_task) | |
try: | |
output = action_func(**inputs) | |
except Exception as e: | |
# Handle action function failure | |
logger.error(f"Error executing action '{action_name}': {e}") | |
return | |
# Store execution results or append to previous outputs | |
execution_status.append({"action": action_name, "output": output}) | |
# Check if 'output' field exists in the node | |
if 'output' in node: | |
# If 'output' exists, assign the output to execution_results with the key from node['output'] | |
execution_results[node['output']] = output | |
else: | |
# If 'output' does not exist, append the output to 'deliverables' | |
execution_results['deliverables'] += output | |
# Traverse to the next node, if it exists | |
if "next" in node and node["next"]: | |
traverse(node["next"], previous_output) | |
try: | |
traverse(TASK_WORKFLOW_TREE["start"]) | |
execution_results['doc_url'] = generate_document(task_description, execution_results) | |
except Exception as e: | |
logger.error(f"Traverse Error: {e}") | |
finally: | |
# Always return partial results, even if an error occurs | |
return task_analysis, pd.DataFrame(execution_status), execution_results | |
else: | |
logger.error("Empty task analysis.") | |
return {}, pd.DataFrame(), {} | |
# In[14]: | |
# Initialize dataframes for the schema | |
ideas_df = pd.DataFrame(columns=["Idea ID", "Content", "Tags"]) | |
def extract_ideas(context, text): | |
""" | |
Extract project ideas from text, with or without a context, and return in JSON format. | |
Parameters: | |
context (str): Context of the extraction. Can be empty. | |
text (str): Text to extract ideas from. | |
Returns: | |
list: A list of ideas, each represented as a dictionary with name and description. | |
""" | |
if context: | |
# Template when context is provided | |
prompt = ( | |
f"You are working in the context of {context}. " | |
"Please extract the ongoing projects with project name and description." | |
"Please only the listed JSON as output string." | |
f"Ongoing projects: {text}" | |
) | |
else: | |
# Template when context is not provided | |
prompt = ( | |
"Given the following information about the user." | |
"Please extract the ongoing projects with project name and description." | |
"Please only the listed JSON as output string." | |
f"Ongoing projects: {text}" | |
) | |
# return the raw string | |
return call_openai_api(prompt) | |
def df_to_string(df, empty_message = ''): | |
""" | |
Converts a DataFrame to a string if it is not empty. | |
If the DataFrame is empty, returns an empty string. | |
Parameters: | |
ideas_df (pd.DataFrame): The DataFrame to be converted. | |
Returns: | |
str: A string representation of the DataFrame or an empty string. | |
""" | |
if df.empty: | |
return empty_message | |
else: | |
return df.to_string(index=False) | |
# In[15]: | |
# Shared state variables | |
shared_state = {"project_desc_table": pd.DataFrame(), "task_analysis_txt": "", "execution_status": pd.DataFrame(), "execution_results": {}} | |
# Button Action: Fetch State | |
def fetch_updated_state(): | |
# Iterating and logging the shared state | |
for key, value in shared_state.items(): | |
if isinstance(value, pd.DataFrame): | |
logger.debug(f"{key}: DataFrame:\n{value.to_string()}") | |
elif isinstance(value, dict): | |
logger.debug(f"{key}: Dictionary: {value}") | |
elif isinstance(value, str): | |
logger.debug(f"{key}: String: {value}") | |
else: | |
logger.debug(f"{key}: Unsupported type: {value}") | |
return shared_state['project_desc_table'], shared_state['task_analysis_txt'], shared_state['execution_status'], shared_state['execution_results'] | |
# response = requests.get("http://localhost:5000/state") | |
# # Check the status code and the raw response | |
# if response.status_code == 200: | |
# try: | |
# state = response.json() # Try to parse JSON | |
# return pd.DataFrame(state["project_desc_table"]), state["task_analysis_txt"], pd.DataFrame(state["execution_status"]), state["execution_results"] | |
# except ValueError as e: | |
# logger.error(f"JSON decoding failed: {e}") | |
# logger.debug("Raw response body:", response.text) | |
# else: | |
# logger.error(f"Error: {response.status_code} - {response.text}") | |
# """Fetch the updated shared state from FastAPI.""" | |
# return pd.DataFrame(), "", pd.DataFrame(), {} | |
def update_gradio_state(project_desc_table, task_analysis_txt, execution_status, execution_results): | |
# You can update specific components like Textbox or State | |
shared_state['project_desc_table'] = project_desc_table | |
shared_state['task_analysis_txt'] = task_analysis_txt | |
shared_state['execution_status'] = execution_status | |
shared_state['execution_results'] = execution_results | |
return True | |
# In[16]: | |
# # Initialize the database | |
# new_db = 'curify.db' | |
# # Copy the old database to a new one | |
# shutil.copy("curify_idea.db", new_db) | |
#create_db(new_db) | |
#enable_wal_mode(new_db) | |
def project_extraction(project_description): | |
str_projects = extract_ideas('AI-powered tools for productivity', project_description) | |
json_projects = convert_to_listed_json(str_projects) | |
project_desc_table = json_to_pandas(json_projects) | |
update_gradio_state(project_desc_table, "", pd.DataFrame(), {}) | |
return project_desc_table | |
# In[17]: | |
# project_description = 'work on a number of projects including curify (digest, ideas, careers, projects etc), and writing a book on LLM for recommendation system, educating my 3.5-year-old boy and working on a paper for LLM reasoning.' | |
# # convert_to_listed_json(extract_ideas('AI-powered tools for productivity', project_description)) | |
# task_description = 'Build an interview bot for the curify digest project.' | |
# task_analysis, reasoning_path = generate_reasoning_path(project_description, task_description) | |
# steps = store_and_execute_task(task_description, reasoning_path) | |
def message_back(task_message, execution_status, doc_url, from_whatsapp): | |
# Convert task steps to a simple numbered list | |
task_steps_list = "\n".join( | |
[f"{i + 1}. {step['action']} - {step.get('output', '')}" for i, step in enumerate(execution_status.to_dict(orient="records"))] | |
) | |
# Format the body message | |
body_message = ( | |
f"*Task Message:*\n{task_message}\n\n" | |
f"*Execution Status:*\n{task_steps_list}\n\n" | |
f"*Doc URL:*\n{doc_url}\n\n" | |
) | |
# Send response back to WhatsApp | |
try: | |
twillo_client.messages.create( | |
from_=twilio_phone_number, | |
to=from_whatsapp, | |
body=body_message | |
) | |
except Exception as e: | |
logger.error(f"Twilio Error: {e}") | |
raise HTTPException(status_code=500, detail=f"Error sending WhatsApp message: {str(e)}") | |
return {"status": "success"} | |
# Initialize the Whisper pipeline | |
whisper_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-medium") | |
# Function to transcribe audio from a media URL | |
def transcribe_audio_from_media_url(media_url): | |
try: | |
media_response = requests.get(media_url, auth=HTTPBasicAuth(account_sid, auth_token)) | |
# Download the media file | |
media_response.raise_for_status() | |
audio_data = media_response.content | |
# Save the audio data to a file for processing | |
audio_file_path = "temp_audio_file.mp3" | |
with open(audio_file_path, "wb") as audio_file: | |
audio_file.write(audio_data) | |
# Transcribe the audio using Whisper | |
transcription = whisper_pipeline(audio_file_path, return_timestamps=True) | |
logger.debug(f"Transcription: {transcription['text']}") | |
return transcription["text"] | |
except Exception as e: | |
logger.error(f"An error occurred: {e}") | |
return None | |
# In[18]: | |
app = FastAPI() | |
async def fetch_state(): | |
return shared_state | |
async def whatsapp_webhook(request: Request): | |
form_data = await request.form() | |
# Log the form data to debug | |
print("Received data:", form_data) | |
# Extract message and user information | |
incoming_msg = form_data.get("Body", "").strip() | |
from_number = form_data.get("From", "") | |
media_url = form_data.get("MediaUrl0", "") | |
media_type = form_data.get("MediaContentType0", "") | |
# Initialize response variables | |
transcription = None | |
if media_type.startswith("audio"): | |
# If the media is an audio or video file, process it | |
try: | |
transcription = transcribe_audio_from_media_url(media_url) | |
except Exception as e: | |
return JSONResponse( | |
{"error": f"Failed to process voice input: {str(e)}"}, status_code=500 | |
) | |
# Determine message content: use transcription if available, otherwise use text message | |
processed_input = transcription if transcription else incoming_msg | |
logger.debug(f"Processed input: {processed_input}") | |
try: | |
# Generate response | |
project_desc_table, _, _, _ = fetch_updated_state() | |
# If the project_desc_table is empty, return an empty JSON response | |
if project_desc_table.empty: | |
return JSONResponse(content={}) # Returning an empty JSON object | |
# Continue processing if the table is not empty | |
task_analysis_txt, execution_status, execution_results = fn_process_task(project_desc_table, processed_input) | |
update_gradio_state(project_desc_table, task_analysis_txt, execution_status, execution_results) | |
doc_url = 'Fail to generate doc' | |
if 'doc_url' in execution_results: | |
doc_url = execution_results['doc_url'] | |
# Respond to the user on WhatsApp with the processed idea | |
response = message_back(processed_input, execution_status, doc_url, from_number) | |
logger.debug(response) | |
return JSONResponse(content=str(response)) | |
except Exception as e: | |
logger.error(f"Error during task processing: {e}") | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
# In[19]: | |
# Mock Gmail Login Function | |
def mock_login(email): | |
if email.endswith("@gmail.com"): | |
return f"✅ Logged in as {email}", gr.update(visible=False), gr.update(visible=True) | |
else: | |
return "❌ Invalid Gmail address. Please try again.", gr.update(), gr.update() | |
# User Onboarding Function | |
def onboarding_survey(role, industry, project_description): | |
return (project_extraction(project_description), | |
gr.update(visible=False), gr.update(visible=True)) | |
# Mock Integration Functions | |
def integrate_todoist(): | |
return "✅ Successfully connected to Todoist!" | |
def integrate_evernote(): | |
return "✅ Successfully connected to Evernote!" | |
def integrate_calendar(): | |
return "✅ Successfully connected to Google Calendar!" | |
def load_svg_with_size(file_path, width="600px", height="400px"): | |
# Read the SVG content from the file | |
with open(file_path, "r", encoding="utf-8") as file: | |
svg_content = file.read() | |
# Add inline styles to control width and height | |
styled_svg = f""" | |
<div style="width: {width}; height: {height}; overflow: auto;"> | |
{svg_content} | |
</div> | |
""" | |
return styled_svg | |
# In[20]: | |
# Gradio Demo | |
def create_gradio_interface(state=None): | |
with gr.Blocks( | |
css=""" | |
.gradio-table td { | |
white-space: normal !important; | |
word-wrap: break-word !important; | |
} | |
.gradio-table { | |
width: 100% !important; /* Adjust to 100% to fit the container */ | |
table-layout: fixed !important; /* Fixed column widths */ | |
overflow-x: hidden !important; /* Disable horizontal scrolling */ | |
} | |
.gradio-container { | |
overflow-x: hidden !important; /* Disable horizontal scroll for entire container */ | |
padding: 0 !important; /* Remove any default padding */ | |
} | |
.gradio-column { | |
max-width: 100% !important; /* Ensure columns take up full width */ | |
overflow: hidden !important; /* Hide overflow to prevent horizontal scroll */ | |
} | |
.gradio-row { | |
overflow-x: hidden !important; /* Prevent horizontal scroll on rows */ | |
} | |
""") as demo: | |
# Page 1: Mock Gmail Login | |
with gr.Group(visible=True) as login_page: | |
gr.Markdown("### **1️⃣ Login with Gmail**") | |
email_input = gr.Textbox(label="Enter your Gmail Address", placeholder="[email protected]") | |
login_button = gr.Button("Login") | |
login_result = gr.Textbox(label="Login Status", interactive=False, visible=False) | |
# Page 2: User Onboarding | |
with gr.Group(visible=False) as onboarding_page: | |
gr.Markdown("### **2️⃣ Tell Us About Yourself**") | |
role = gr.Textbox(label="What is your role?", placeholder="e.g. Developer, Designer") | |
industry = gr.Textbox(label="Which industry are you in?", placeholder="e.g. Software, Finance") | |
project_description = gr.Textbox(label="Describe your project", placeholder="e.g. A task management app") | |
submit_survey = gr.Button("Submit") | |
# Page 3: Mock Integrations with Separate Buttons | |
with gr.Group(visible=False) as integrations_page: | |
gr.Markdown("### **3️⃣ Connect Integrations**") | |
gr.Markdown("Click on the buttons below to connect each tool:") | |
# Separate Buttons and Results for Each Integration | |
todoist_button = gr.Button("Connect to Todoist") | |
todoist_result = gr.Textbox(label="Todoist Status", interactive=False, visible=False) | |
evernote_button = gr.Button("Connect to Evernote") | |
evernote_result = gr.Textbox(label="Evernote Status", interactive=False, visible=False) | |
calendar_button = gr.Button("Connect to Google Calendar") | |
calendar_result = gr.Textbox(label="Google Calendar Status", interactive=False, visible=False) | |
# Skip Button to proceed directly to next page | |
skip_integrations = gr.Button("Skip ➡️") | |
next_button = gr.Button("Proceed to QR Code") | |
with gr.Group(visible=False) as qr_code_page: | |
# Page 4: QR Code and Curify Ideas | |
gr.Markdown("## Curify: Unified AI Tools for Productivity") | |
with gr.Tab("Curify Idea"): | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("#### ** QR Code**") | |
# Path to your local SVG file | |
svg_file_path = "qr.svg" | |
# Load the SVG content | |
svg_content = load_svg_with_size(svg_file_path, width="200px", height="200px") | |
gr.HTML(svg_content) | |
# Column 1: Webpage rendering | |
with gr.Column(): | |
gr.Markdown("## Projects Overview") | |
project_desc_table = gr.DataFrame( | |
type="pandas" | |
) | |
gr.Markdown("## Enter task message.") | |
idea_input = gr.Textbox( | |
label=None, | |
placeholder="Describe the task you want to execute (e.g., Research Paper Review)") | |
task_btn = gr.Button("Generate Task Steps") | |
fetch_state_btn = gr.Button("Fetch Updated State") | |
with gr.Column(): | |
gr.Markdown("## Task analysis") | |
task_analysis_txt = gr.Textbox( | |
label=None, | |
placeholder="Here is the execution status of your task...") | |
gr.Markdown("## Execution status") | |
execution_status = gr.DataFrame( | |
type="pandas" | |
) | |
gr.Markdown("## Execution output") | |
execution_results = gr.JSON( | |
label=None | |
) | |
state_output = gr.State() # Add a state output to hold the state | |
task_btn.click( | |
fn_process_task, | |
inputs=[project_desc_table, idea_input], | |
outputs=[task_analysis_txt, execution_status, execution_results] | |
) | |
fetch_state_btn.click( | |
fetch_updated_state, | |
inputs=None, | |
outputs=[project_desc_table, task_analysis_txt, execution_status, execution_results] | |
) | |
# Page 1 -> Page 2 Transition | |
login_button.click( | |
mock_login, | |
inputs=email_input, | |
outputs=[login_result, login_page, onboarding_page] | |
) | |
# Page 2 -> Page 3 Transition (Submit and Skip) | |
submit_survey.click( | |
onboarding_survey, | |
inputs=[role, industry, project_description], | |
outputs=[project_desc_table, onboarding_page, integrations_page] | |
) | |
# Integration Buttons | |
todoist_button.click(integrate_todoist, outputs=todoist_result) | |
evernote_button.click(integrate_evernote, outputs=evernote_result) | |
calendar_button.click(integrate_calendar, outputs=calendar_result) | |
# Skip Integrations and Proceed | |
skip_integrations.click( | |
lambda: (gr.update(visible=False), gr.update(visible=True)), | |
outputs=[integrations_page, qr_code_page] | |
) | |
# # Set the load_fn to initialize the state when the page is loaded | |
# demo.load( | |
# curify_ideas, | |
# inputs=[project_input, idea_input], | |
# outputs=[task_steps, task_analysis_txt, state_output] | |
# ) | |
return demo | |
# Load function to initialize the state | |
# demo.load(load_fn, inputs=None, outputs=[state]) # Initialize the state when the page is loaded | |
# Function to launch Gradio | |
# def launch_gradio(): | |
# demo = create_gradio_interface() | |
# demo.launch(share=True, inline=False) # Gradio in the foreground | |
# # Function to run FastAPI server using uvicorn in the background | |
# async def run_fastapi(): | |
# config = uvicorn.Config(app, host="0.0.0.0", port=5000, reload=True, log_level="debug") | |
# server = uvicorn.Server(config) | |
# await server.serve() | |
# # FastAPI endpoint to display a message | |
# @app.get("/", response_class=HTMLResponse) | |
# async def index(): | |
# return "FastAPI is running. Visit Gradio at the provided public URL." | |
# # Main entry point for the asynchronous execution | |
# async def main(): | |
# # Run Gradio in the foreground and FastAPI in the background | |
# loop = asyncio.get_event_loop() | |
# # Run Gradio in a separate thread (non-blocking) | |
# loop.run_in_executor(None, launch_gradio) | |
# # Run FastAPI in the background (asynchronous) | |
# await run_fastapi() | |
# if __name__ == "__main__": | |
# import nest_asyncio | |
# nest_asyncio.apply() # Allow nested use of asyncio event loops in Jupyter notebooks | |
# # Run the main function to launch both services concurrently | |
# asyncio.run(main()) | |
# In[21]: | |
demo = create_gradio_interface() | |
# Use Gradio's `server_app` to get an ASGI app for Blocks | |
# gradio_asgi_app = demo.launch(share=False, inbrowser=False, server_name="0.0.0.0", server_port=7860, inline=False) | |
logging.debug(f"Gradio version: {gr.__version__}") | |
logging.debug(f"FastAPI version: {fastapi.__version__}") | |
# Mount the Gradio ASGI app at "/gradio" | |
# app.mount("/gradio", gradio_asgi_app) | |
gr.mount_gradio_app(app, demo, path="/gradio") | |
# # create a static directory to store the static files | |
# static_dir = Path('./static') | |
# static_dir.mkdir(parents=True, exist_ok=True) | |
# # mount FastAPI StaticFiles server | |
# app.mount("/static", StaticFiles(directory=static_dir), name="static") | |
# Dynamically check for the Gradio asset directory | |
# gradio_assets_path = os.path.join(os.path.dirname(gr.__file__), "static") | |
# if os.path.exists(gradio_assets_path): | |
# # If assets exist, mount them | |
# app.mount("/assets", StaticFiles(directory=gradio_assets_path), name="assets") | |
# else: | |
# logging.error(f"Gradio assets directory not found at: {gradio_assets_path}") | |
# Redirect from the root endpoint to the Gradio app | |
async def index(): | |
return RedirectResponse(url="/gradio", status_code=307) | |
# Run the FastAPI server using uvicorn | |
if __name__ == "__main__": | |
# port = int(os.getenv("PORT", 5000)) # Default to 7860 if PORT is not set | |
uvicorn.run(app, host="0.0.0.0", port=7860) |