Spaces:
Runtime error
Runtime error
import streamlit as st | |
import os | |
import datetime as DT | |
import pytz | |
import time | |
import json | |
import re | |
import random | |
import string | |
from transformers import AutoTokenizer | |
from tools import toolsInfo | |
from dotenv import load_dotenv | |
load_dotenv() | |
useGpt4 = os.environ.get("USE_GPT_4") == "1" | |
if useGpt4: | |
from openai import OpenAI | |
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) | |
MODEL = "gpt-4o-mini" | |
TOOLS_MODEL = "gpt-4o-mini" | |
MAX_CONTEXT = 128000 | |
tokenizer = AutoTokenizer.from_pretrained("Xenova/gpt-4o") | |
else: | |
from groq import Groq | |
client = Groq( | |
api_key=os.environ.get("GROQ_API_KEY"), | |
) | |
MODEL = "llama-3.1-70b-versatile" | |
# MODEL = "llama3-groq-70b-8192-tool-use-preview" | |
TOOLS_MODEL = "llama3-groq-70b-8192-tool-use-preview" | |
MAX_CONTEXT = 8000 | |
tokenizer = AutoTokenizer.from_pretrained("Xenova/Meta-Llama-3.1-Tokenizer") | |
def countTokens(text): | |
text = str(text) | |
tokens = tokenizer.encode(text, add_special_tokens=False) | |
return len(tokens) | |
SYSTEM_MSG = """ | |
You are a personalized email generator for cold outreach. You take the user through the below workflow. | |
Keep your questions crisp. Ask only one question at a time. | |
# You ask about the purpose of sending email. | |
Give well-formatted numbered options to choose from for the email purpose. | |
Group the options by category as described below. | |
Give numbers only to the options and not categories. | |
Give a line break after every category name. | |
##### Category: Acquiring Customers | |
- Lead Generation (spark interest in possible customers) | |
- Sales Outreach (directly ask the decision-makers) | |
- Partnership and Collaboration (build mutually beneficial relationships) | |
- Event Promotion (invite people to webinars, conferences, or other events) | |
- Case study or testimonial requests (ask satisfied customers for testimonials) | |
##### Category: Learning and Connecting | |
- Networking (establish connections with industry experts) | |
- Market Research (gather information about target audiences or industries) | |
- Career Advice (seek guidance from experienced professionals) | |
##### Category: Jobs and Hiring | |
- Job Application (apply for job openings) | |
- Job Referrals (ask for referrals or recommendations) | |
- Recruitment (reach out to potential candidates) | |
# You then ask sender (user) details. Whatever could be relevant for this type of email | |
# You ask for recipient's Industry, if it's required for this type of email | |
# You ask for recipient's Role in the company, if it's required for this type of email | |
# You ask for any other specific details required to draft this type of mail | |
# Once all these details are received, you save them in a Google Sheet | |
# You then check with the user if they can see details in the sheet. | |
# Once the user ackowledges, you move to the next phase of email generation. | |
Based on the above info, you draft 2 very different variations of email and check the user which one they're liking more. | |
Keep the email body in sections separated by divider "-----". | |
Give numbered options at the end to choose from. | |
Ask them if they want to finalize it. If they don't finalize, repeat doing it till the user finalizes on one. | |
Check with them what they would like to change in the variation. | |
# Once the mail is finalized. You ask the user for all the missing placeholder values to write the final mail. | |
# Once the placeholder values are available in the final email, you ask for the recipient email ID. | |
# Once you have the final mail with placeholder values and recipient exact email ID, you send the email to this id. | |
Dont send email until you have received a valid email from user. | |
# You congratulate the user and ask if he would like to save this email as a template. If he agrees, save this template in Google Sheet. | |
# Repeat the process for more profiles and recipients. | |
""" | |
USER_ICON = "icons/man.png" | |
ASSISTANT_ICON = "icons/magic-wand-1.png" | |
TOOL_ICON = "icons/check.png" | |
IMAGE_LOADER = "icons/ripple.svg" | |
TEXT_LOADER = "icons/balls.svg" | |
START_MSG = "Let's start π" | |
ROLE_TO_AVATAR = { | |
"user": USER_ICON, | |
"assistant": ASSISTANT_ICON, | |
"tool": TOOL_ICON, | |
} | |
st.set_page_config( | |
page_title="EmailGenie", | |
page_icon=ASSISTANT_ICON, | |
) | |
ipAddress = st.context.headers.get("x-forwarded-for") | |
def __nowInIST() -> DT.datetime: | |
return DT.datetime.now(pytz.timezone("Asia/Kolkata")) | |
def pprint(log: str): | |
now = __nowInIST() | |
now = now.strftime("%Y-%m-%d %H:%M:%S") | |
print(f"[{now}] [{ipAddress}] {log}") | |
pprint("\n") | |
st.markdown( | |
""" | |
<style> | |
@keyframes blinker { | |
0% { | |
opacity: 1; | |
} | |
50% { | |
opacity: 0.2; | |
} | |
100% { | |
opacity: 1; | |
} | |
} | |
.blinking { | |
animation: blinker 3s ease-out infinite; | |
} | |
.code { | |
color: green; | |
border-radius: 3px; | |
padding: 2px 4px; /* Padding around the text */ | |
font-family: 'Courier New', Courier, monospace; /* Monospace font */ | |
} | |
</style> | |
""", | |
unsafe_allow_html=True | |
) | |
def __isInvalidResponse(response: str): | |
# new line followed by small case char | |
if len(re.findall(r'\n[a-z]', response)) > 3: | |
return True | |
# lot of repeating words | |
if len(re.findall(r'\b(\w+)(\s+\1){2,}\b', response)) > 1: | |
return True | |
# lots of paragraphs | |
if len(re.findall(r'\n\n', response)) > 25: | |
return True | |
def __resetButtonState(): | |
st.session_state["buttonValue"] = "" | |
def __setStartMsg(msg): | |
st.session_state.startMsg = msg | |
if "chatHistory" not in st.session_state: | |
st.session_state.chatHistory = [] | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "buttonValue" not in st.session_state: | |
__resetButtonState() | |
if "startMsg" not in st.session_state: | |
st.session_state.startMsg = "" | |
st.session_state.toolResponseDisplay = {} | |
def __getMessages(): | |
def getContextSize(): | |
currContextSize = countTokens(SYSTEM_MSG) + countTokens(st.session_state.messages) + 100 | |
pprint(f"{currContextSize=}") | |
return currContextSize | |
while getContextSize() > MAX_CONTEXT: | |
pprint("Context size exceeded, removing first message") | |
st.session_state.messages.pop(0) | |
return st.session_state.messages | |
tools = [ | |
toolsInfo["saveProfileDetailsInGSheet"]["schema"], | |
toolsInfo["saveTemplateInGSheet"]["schema"], | |
toolsInfo["sendEmail"]["schema"], | |
] | |
def __showToolResponse(toolResponseDisplay: dict): | |
msg = toolResponseDisplay.get("text") | |
icon = toolResponseDisplay.get("icon") | |
col1, col2 = st.columns([1, 20]) | |
with col1: | |
st.image( | |
icon or TOOL_ICON, | |
width=30 | |
) | |
with col2: | |
if "`" not in msg: | |
st.markdown(f"`{msg}`") | |
else: | |
st.markdown(msg) | |
def __process_stream_chunk(chunk): | |
delta = chunk.choices[0].delta | |
if delta.content: | |
return delta.content | |
elif delta.tool_calls: | |
return delta.tool_calls[0] | |
return None | |
def __addToolCallToMsgs(toolCall: dict): | |
st.session_state.messages.append( | |
{ | |
"role": "assistant", | |
"tool_calls": [ | |
{ | |
"id": toolCall.id, | |
"function": { | |
"name": toolCall.function.name, | |
"arguments": toolCall.function.arguments, | |
}, | |
"type": toolCall.type, | |
} | |
], | |
} | |
) | |
def __processToolCalls(toolCalls): | |
for toolCall in toolCalls: | |
functionName = toolCall.function.name | |
functionToCall = toolsInfo[functionName]["func"] | |
functionArgsStr = toolCall.function.arguments | |
pprint(f"{functionName=} | {functionArgsStr=}") | |
functionArgs = json.loads(functionArgsStr) | |
functionResult = functionToCall(**functionArgs) | |
functionResponse = functionResult.get("response") | |
responseDisplay = functionResult.get("display") | |
pprint(f"{functionResponse=}") | |
if responseDisplay: | |
__showToolResponse(responseDisplay) | |
st.session_state.toolResponseDisplay = responseDisplay | |
__addToolCallToMsgs(toolCall) | |
st.session_state.messages.append( | |
{ | |
"role": "tool", | |
"tool_call_id": toolCall.id, | |
"name": functionName, | |
"content": functionResponse, | |
} | |
) | |
def __dedupeToolCalls(toolCalls: list): | |
toolCallsDict = {} | |
for toolCall in toolCalls: | |
toolCallsDict[toolCall.function.name] = toolCall | |
dedupedToolCalls = list(toolCallsDict.values()) | |
if len(toolCalls) != len(dedupedToolCalls): | |
pprint("Deduped tool calls!") | |
pprint(f"{toolCalls=} -> {dedupedToolCalls=}") | |
return dedupedToolCalls | |
def __getRandomToolId(): | |
return ''.join( | |
random.choices( | |
string.ascii_lowercase + string.digits, | |
k=4 | |
) | |
) | |
def predict(model: str = None): | |
model = model or MODEL | |
messagesFormatted = [{"role": "system", "content": SYSTEM_MSG}] | |
messagesFormatted.extend(__getMessages()) | |
contextSize = countTokens(messagesFormatted) | |
pprint(f"{contextSize=} | {model}") | |
pprint(f"{messagesFormatted=}") | |
response = client.chat.completions.create( | |
model=model, | |
messages=messagesFormatted, | |
temperature=0.5, | |
max_tokens=4000, | |
stream=False, | |
tools=tools | |
) | |
responseMessage = response.choices[0].message | |
# pprint(f"{responseMessage=}") | |
responseContent = responseMessage.content | |
# pprint(f"{responseContent=}") | |
if responseContent and '<function=' in responseContent: | |
pprint("Switching to TOOLS_MODEL") | |
return predict(TOOLS_MODEL) | |
# if responseContent and responseContent.startswith('<function='): | |
# function_match = re.match(r'<function=(\w+)>(.*?)</+function>', responseContent) | |
# if function_match: | |
# function_name, function_args = function_match.groups() | |
# toolCalls = [ | |
# { | |
# "id": __getRandomToolId(), | |
# "type": "function", | |
# "function": { | |
# "name": function_name, | |
# "arguments": function_args | |
# } | |
# } | |
# ] | |
# responseContent = None # Set content to None as it's a function call | |
# else: | |
# toolCalls = None | |
# else: | |
# toolCalls = responseMessage.tool_calls | |
if responseContent: | |
yield responseContent | |
toolCalls = responseMessage.tool_calls | |
# pprint(f"{toolCalls=}") | |
if toolCalls: | |
pprint(f"{toolCalls=}") | |
toolCalls = __dedupeToolCalls(toolCalls) | |
try: | |
__processToolCalls(toolCalls) | |
return predict() | |
except Exception as e: | |
pprint(e) | |
st.title("EmailGenie π§π§ββοΈ") | |
if not (st.session_state["buttonValue"] or st.session_state["startMsg"]): | |
st.button(START_MSG, on_click=lambda: __setStartMsg(START_MSG)) | |
for chat in st.session_state.chatHistory: | |
role = chat["role"] | |
content = chat["content"] | |
imagePath = chat.get("image") | |
toolResponseDisplay = chat.get("toolResponseDisplay") | |
avatar = ROLE_TO_AVATAR[role] | |
with st.chat_message(role, avatar=avatar): | |
st.markdown(content) | |
if toolResponseDisplay: | |
__showToolResponse(toolResponseDisplay) | |
if imagePath: | |
st.image(imagePath) | |
if prompt := (st.chat_input() or st.session_state["buttonValue"] or st.session_state["startMsg"]): | |
__resetButtonState() | |
__setStartMsg("") | |
with st.chat_message("user", avatar=USER_ICON): | |
st.markdown(prompt) | |
pprint(f"{prompt=}") | |
st.session_state.chatHistory.append({"role": "user", "content": prompt }) | |
st.session_state.messages.append({"role": "user", "content": prompt }) | |
with st.chat_message("assistant", avatar=ASSISTANT_ICON): | |
responseContainer = st.empty() | |
def __printAndGetResponse(): | |
response = "" | |
# responseContainer.markdown(".....") | |
responseContainer.image(TEXT_LOADER) | |
responseGenerator = predict() | |
for chunk in responseGenerator: | |
response += chunk | |
if __isInvalidResponse(response): | |
pprint(f"Invalid_{response=}") | |
return | |
responseContainer.markdown(response) | |
return response | |
response = __printAndGetResponse() | |
while not response: | |
pprint("Empty response. Retrying..") | |
time.sleep(0.5) | |
response = __printAndGetResponse() | |
pprint(f"{response=}") | |
def selectButton(optionLabel): | |
st.session_state["buttonValue"] = optionLabel | |
pprint(f"Selected: {optionLabel}") | |
toolResponseDisplay = st.session_state.toolResponseDisplay | |
st.session_state.chatHistory.append({ | |
"role": "assistant", | |
"content": response, | |
"toolResponseDisplay": toolResponseDisplay | |
}) | |
st.session_state.messages.append({ | |
"role": "assistant", | |
"content": response, | |
}) | |