Spaces:
Runtime error
Runtime error
File size: 7,814 Bytes
31fd2e3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
from email.mime.text import MIMEText
from google.oauth2 import service_account
from googleapiclient.http import MediaFileUpload
import base64
import os
import base64
import streamlit as st
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from email.mime.multipart import MIMEMultipart
import requests
import json
from langchain.tools import tool
# Constants
SCOPES_DRIVE = ['https://www.googleapis.com/auth/drive']
SERVICE_ACCOUNT_FILE = 'service_account.json'
PARENT_FOLDER_ID = "1REXfwxk9dcPdpZXJOFZSur3880soVN9y"
# Authenticate and return credentials for Google Drive
def authenticate_drive():
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE, scopes=SCOPES_DRIVE)
return credentials
# Upload file to Google Drive
def upload_file(filepath, filename, parent_folder_id):
creds = authenticate_drive()
service = build('drive', 'v3', credentials=creds)
file_metadata = {
'name': filename,
'parents': [parent_folder_id]
}
media = MediaFileUpload(filepath, resumable=True)
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()
print(f'File ID: {file.get("id")}')
return file.get('id')
SCOPES = ["https://www.googleapis.com/auth/gmail.send"]
def authenticate_gmail():
"""Authenticate and return the Gmail service."""
creds = None
if os.path.exists("token.json"):
creds = Credentials.from_authorized_user_file("token.json", SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES)
creds = flow.run_local_server(port=0)
with open("token.json", "w") as token:
token.write(creds.to_json())
service = build("gmail", "v1", credentials=creds)
return service
def send_email_with_company_details(user_email, company_name, topic):
blog_id = upload_file('blog_post.docx', 'blog post', PARENT_FOLDER_ID)
video_id = upload_file('video.mp4', 'video', PARENT_FOLDER_ID)
# Prepare email content
blog_link = f"https://drive.google.com/file/d/{blog_id}/view?usp=sharing"
video_link = f"https://drive.google.com/file/d/{video_id}/view?usp=sharing"
email_subject = f"Blog and Video for the topic {topic}"
email_body = f"""
<html>
<body>
<p>Hello,</p>
<p>The requested blog and video has been shared with you by <b>{company_name}</b>.</p>
<p>You can view the files using the following links:</p>
<b>Blog:</b>
<a href="{blog_link}"> {topic}</a>
<br>
<br>
<b>Video:</b>
<a href="{video_link}"> {topic}</a>
<br>
<p>Best regards,<br>{company_name}</p>
</body>
</html>
"""
try:
# Create message container - the correct MIME type is multipart/alternative.
msg = MIMEMultipart('alternative')
msg['to'] = user_email
msg['subject'] = email_subject
# Attach parts into message container
part1 = MIMEText(email_body, 'plain')
part2 = MIMEText(email_body, 'html')
msg.attach(part1)
msg.attach(part2)
raw_string = base64.urlsafe_b64encode(msg.as_bytes()).decode()
service = authenticate_gmail()
sent_message = service.users().messages().send(userId='me', body={'raw': raw_string}).execute()
# # Connect to the SMTP server
# server = smtplib.SMTP('smtp.gmail.com', 587)
# server.starttls() # Secure the connection
# server.login(SENDER_EMAIL, PASSWORD)
# server.sendmail(SENDER_EMAIL, user_email, msg.as_string())
print('Email sent successfully!')
except Exception as e:
print(f'Error sending email: {str(e)}')
# Upload blog post and video, then share them and send an email
# def main():
# blog_filepath = 'blog_post.docx'
# video_filepath = 'video.mp4'
# user_email = receiver_email
# company_name = 'digiotai'
# # Upload blog post
# blog_file_id = upload_file(blog_filepath, 'blog post', PARENT_FOLDER_ID)
# # Upload video
# video_file_id = upload_file(video_filepath, 'video', PARENT_FOLDER_ID)
# send_email_with_company_details(blog_file_id, video_file_id, user_email, company_name)
# main()
def get_urn(token):
# access_token = '<your_access_token>'
url = 'https://api.linkedin.com/v2/userinfo'
headers = {
'Authorization': f'Bearer {token}'
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
user_info = response.json()
print(user_info['sub'])
return user_info['sub']
else:
print(f'Failed to fetch user info: {response.status_code}')
print(response.text)
@tool
def post_image_and_text(
token: str, title: str, image_path: str, text_content: str
):
"""
Posts an article on LinkedIn with an image.
:param token: str. LinkedIn OAuth token.
:param title: str. Article title.
:param text_content: str. Article content.
:param image_path: str. Local file path of the image to be used as a thumbnail.
"""
owner = get_urn(token)
# Initialize the upload to get the upload URL and image URN
init_url = "https://api.linkedin.com/rest/images?action=initializeUpload"
headers = {
"LinkedIn-Version": "202401",
"X-RestLi-Protocol-Version": "2.0.0",
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
}
init_data = json.dumps({"initializeUploadRequest": {"owner": f'urn:li:person:{owner}'}})
init_response = requests.post(init_url, headers=headers, data=init_data)
print(init_response.content)
if init_response.status_code != 200:
raise Exception(f"Failed to initialize upload: {init_response.text}")
init_response_data = init_response.json()["value"]
upload_url = init_response_data["uploadUrl"]
image_urn = init_response_data["image"]
# Upload the file
with open(image_path, "rb") as f:
upload_response = requests.post(upload_url, files={"file": f})
if upload_response.status_code not in [200, 201]:
raise Exception(f"Failed to upload file: {upload_response.text}")
# Create the post with the uploaded image URN as thumbnail
post_url = "https://api.linkedin.com/rest/posts"
post_data = json.dumps(
{
"author": f'urn:li:person:{owner}',
"commentary": text_content,
"visibility": "PUBLIC",
"distribution": {
"feedDistribution": "MAIN_FEED",
"targetEntities": [],
"thirdPartyDistributionChannels": [],
},
"content": {
"media": {
"title": title,
"id": image_urn,
}
},
"lifecycleState": "PUBLISHED",
"isReshareDisabledByAuthor": False,
}
)
post_response = requests.post(post_url, headers=headers, data=post_data)
print(post_response.content)
if post_response.status_code in [200, 201]:
return "Linkedin article has been posted successfully!"
else:
raise Exception(f"Failed to post article: {post_response.text}") |