import base64 import concurrent import os from concurrent.futures import thread from datetime import datetime import datetime as dt import requests from bs4 import BeautifulSoup from flask import Flask, app, jsonify, request from flask_cors import CORS, cross_origin from flask_mail import Mail, Message from physicsaqa import PhysicsAQA from config import Config from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity from csv_to_db import ImportCSV from models import Users from bson.objectid import ObjectId # import hashlib import random from datetime import datetime from PIL import Image, ImageOps from io import BytesIO import base64 import json import stripe import jwt #import cv2 import re import jwt from fastapi import FastAPI, Header from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional from typing import Generic, TypeVar,Dict,List,AnyStr,Any,Union import asyncio import uvicorn import pytesseract from forgotpassemail import forgotpasswordemail from RevisionBankModels import * from fastapi_utils.tasks import repeat_every from raspsendemail import RaspEmail from revisionbankscheduler import RevisionBankScheduler app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) importcsv = ImportCSV("RevisionBankDB",maindb=0) importcsvqp = ImportCSV("RevisionBankDB",maindb= 1) importcsvqp1 = ImportCSV("RevisionBankQPs1",maindb=2) revisionbankschedule = RevisionBankScheduler(importcsv) JWT_SECRET = "Peter Piper picked a peck of pickled peppers, A peck of pickled peppers Peter Piper picked, If Peter Piper picked a peck of pickled peppers,Where's the peck of pickled peppers Peter Piper picked" #'super-secret' # IRL we should NEVER hardcode the secret: it should be an evironment variable!!! JWT_ALGORITHM = "HS256" JSONObject = Dict[Any, Any] JSONArray = List[Any] JSONStructure = Union[JSONArray, JSONObject] time_hour = 60 * 60 # 1 hour, def secure_encode(token): # if we want to sign/encrypt the JSON object: {"hello": "world"}, we can do it as follows # encoded = jwt.encode({"hello": "world"}, JWT_SECRET, algorithm=JWT_ALGORITHM) encoded_token = jwt.encode(token, JWT_SECRET, algorithm=JWT_ALGORITHM) # this is often used on the client side to encode the user's email address or other properties return encoded_token def secure_decode(token): # if we want to sign/encrypt the JSON object: {"hello": "world"}, we can do it as follows # encoded = jwt.encode({"hello": "world"}, JWT_SECRET, algorithm=JWT_ALGORITHM) decoded_token = jwt.decode(token, JWT_SECRET, algorithms=JWT_ALGORITHM) # this is often used on the client side to encode the user's email address or other properties return decoded_token def getendsubscription(current_user): user_from_db = list(importcsv.db.users.find({"email": current_user}))[0] end_date = user_from_db["end_date_subscription"] return end_date # Sending Emails from Heroku: https://blairconrad.com/2020/03/05/libraryhippo-2020-sending-email-from-heroku/ # Send Email API: https://app.sendgrid.com/ # Signin and Signup page: https://shayff.medium.com/building-your-first-flask-rest-api-with-mongodb-and-jwt-e03f2d317f04 # SetUp Tesseract: https://towardsdatascience.com/deploy-python-tesseract-ocr-on-heroku-bbcc39391a8d def check_user_from_db(current_user): #test email_exists = importcsv.db.users.find_one({"email":current_user}) student_email_exists = importcsv.db.studentsubscriptions.find_one({"email":current_user}) if email_exists: user_from_db = list(importcsv.db.users.find({"email": current_user}))[0] # Gets wanted data for user return user_from_db elif student_email_exists: user_from_db = list(importcsv.db.studentsubscriptions.find({"email": current_user}))[0] return user_from_db @app.get('/')# GET # allow all origins all methods. async def index(): return "Hello World" @app.on_event("startup") @repeat_every(seconds= time_hour * 1 ) # hours # 24 hours 24 async def revisionbankschedulerevisioncardsrepeat() -> None: revisionbankschedule.runschedule() print("All Cards sent.") @app.post("/revisionbanksendemail") # POST # allow all origins all methods. async def revisionbanksendemail(data : JSONStructure = None): try: data = dict(data)#request.get_json() try: attachment = data["attachment"] except KeyError as kex: attachment = None RaspEmail.send(**{"email":data["email"],"message":data["message"],"subject":data["subject"],"attachment":attachment}) return {"message":"email has been sent."} except Exception as ex: return {"error":f"{type(ex)}-{ex}"} @app.post("/revisionbankstripepayment") # POST # allow all origins all methods. async def revisionbankstripepayment(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: data = dict(data)#request.get_json() price = data["price"] stripe.api_key = "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" #"sk_test_51La4WnLpfbhhIhYRjP1w036wUwBoatAgqNRYEoj9u6jMd7GvSmBioKgmwJsabjgAY8V5W8i2r3QdelOPe5VNOueB00zDxeXtDQ" striperesponse = stripe.PaymentIntent.create( amount=round(price*100), currency="gbp", payment_method_types=["card"], ) clientsecret= striperesponse["client_secret"] #print(clientsecret) return {"clientsecret":clientsecret} except Exception as ex: return {"error":f"{type(ex)}-{ex}"} @app.post('/revisionbanktranslate') # POST # allow all origins all methods. async def revisionbanktranslate(data : JSONStructure = None, authorization: str = Header(None)): async def read_img(img): pytesseract.pytesseract.tesseract_cmd = "/app/.apt/usr/bin/tesseract" text = pytesseract.image_to_string(img, lang="eng", config='--dpi 300 --psm 6 --oem 2 -c tessedit_char_blacklist=][|~_}{=!#%&«§><:;—?¢°*@,') return(text) try: # TODO Use Tesseract OCR to get the text from the image hello data = dict(data)#request.get_json() img = data["revisioncardscreenshot"].replace("data:image/png;base64,","").replace("data:image/jpeg;base64,","") # TODO Next Remove Blurriness and Noise from the image with cv2 #https://pyimagesearch.com/2017/07/10/using-tesseract-ocr-python/ img_obj =ImageOps.grayscale(Image.open(BytesIO(base64.b64decode(img)))) text = read_img(img_obj) return {"revisioncardscreenshotext":text } #return {"result":data} except Exception as e: return {f"error":f"{type(e)},{str(e)}"} @app.post('/getedexcelqp') # POST # allow all origins all methods. async def getedexcelqp(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: data = dict(data)#request.get_json() edexcelpapers = list(importcsvqp.db.edexcelpapers.find({data["edexcelpaper"]:{"$exists":"true"}}))[0] del edexcelpapers["_id"] return {"edexcelpaper":edexcelpapers} except Exception as e: return {f"error":f"{type(e)},{str(e)}"} @app.post('/getcomputerscienceqp') # POST # allow all origins all methods. async def getcomputerscienceqp(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: data = dict(data)#request.get_json() edexcelpapers = list(importcsvqp1.db.computerscienceqp.find({data["aqacomputerscience"]:{"$exists":"true"}}))[0] del edexcelpapers["_id"] return edexcelpapers # {"aqacomputerscience":edexcelpapers} except Exception as e: return {f"error":f"{type(e)},{str(e)}"} @app.post('/getcomputersciencems') # POST # allow all origins all methods. async def getcomputersciencems(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: data = dict(data)#request.get_json() edexcelpapers = list(importcsvqp1.db.computersciencems.find({data["aqacomputerscience"]:{"$exists":"true"}}))[0] del edexcelpapers["_id"] return edexcelpapers # {"aqacomputerscience":edexcelpapers} except Exception as e: return {f"error":f"{type(e)},{str(e)}"} @app.post('/getphysicsocrqp') # POST # allow all origins all methods. async def getphysicsocrqp(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: data = dict(data)#request.get_json() if data["subject"] == "physics": edexcelpapers = list(importcsvqp1.db.physicsocrqp.find({data["questionpapersubject"]:{"$exists":"true"}}))[0] elif data["subject"] == "chemistry": edexcelpapers = list(importcsvqp.db.chemistryaqaqp.find({data["questionpapersubject"]:{"$exists":"true"}}))[0] elif data["subject"] == "biology": edexcelpapers = list(importcsvqp.db.biologyaqaqp.find({data["questionpapersubject"]:{"$exists":"true"}}))[0] del edexcelpapers["_id"] #print(edexcelpapers) if data["scheme"] == "qp": return {"questionpapersubject":edexcelpapers[data["questionpapersubject"]]["questionpaper"]} elif data["scheme"] == "ms": return {"questionpapersubject":edexcelpapers[data["questionpapersubject"]]["markscheme"]} except Exception as e: return {f"error":f"{type(e)},{str(e)}"} @app.post('/storeocrrevisioncards') # POST # allow all origins all methods. async def storeocrrevisioncards(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: data_json = dict(data)#request.get_json() print(data_json) data = data_json["revisioncardscheduler"] email_exists = importcsv.db.accountrevisioncards.find_one({"email":current_user}) if email_exists: # Checks if email exists cards_not_exist = [] user_revision_cards = list(importcsv.db.accountrevisioncards.find({"email": current_user}))[0] # Gets the email. #print(user_revision_cards) for card in data["revisioncards"]: # Checks if the revision card exists in the database. if card not in user_revision_cards["revisioncards"]: cards_not_exist.append(card) # If not, add it to the list. #cards_that_exist.append(card) if cards_not_exist != []: new_cards = cards_not_exist + user_revision_cards["revisioncards"] # adds new cards to the list. user_revision_cards["revisioncards"] = new_cards # Updates the list. del user_revision_cards["_id"] user_revision_cards["email"] = current_user # Sets the email to the current user. #importcsv.db.accountrevisioncards.delete_many({"email":current_user}) # Allows data to be updated. #importcsv.db.accountrevisioncards.insert_one(user_revision_cards) # Inserts the new data. importcsv.db.accountrevisioncards.replace_one( {"email":current_user},user_revision_cards ) return {"message":"revision cards updated"} elif cards_not_exist == []: # If the cards are already in the database, return a message. return {"message":"No new cards"} elif not email_exists: return {"message": "account doesn't exist"} except Exception as ex: print(type(ex),ex) @app.post('/storerevisioncards') # POST # allow all origins all methods. async def storerevisioncards(data : JSONStructure = None, authorization: str = Header(None)): try: current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: data_json = dict(data)#request.get_json() # test data = data_json["revisioncardscheduler"] email_exists = importcsv.db.accountrevisioncards.find_one({"email":current_user}) if email_exists: # Checks if email exists cards_not_exist = [] user_revision_cards = list(importcsv.db.accountrevisioncards.find({"email": current_user}))[0] # Gets the email. #print(user_revision_cards) for card in data["revisioncards"]: # Checks if the revision card exists in the database. if card not in user_revision_cards["revisioncards"]: cards_not_exist.append(card) # If not, add it to the list. #cards_that_exist.append(card) if cards_not_exist != []: new_cards = cards_not_exist + user_revision_cards["revisioncards"] # adds new cards to the list. user_revision_cards["revisioncards"] = new_cards # Updates the list. del user_revision_cards["_id"] user_revision_cards["email"] = current_user # Sets the email to the current user. #importcsv.db.accountrevisioncards.delete_many({"email":current_user}) # Allows data to be updated. #importcsv.db.accountrevisioncards.insert_one(user_revision_cards) # Inserts the new data. importcsv.db.accountrevisioncards.replace_one( {"email":current_user},user_revision_cards ) return {"message":"revision cards updated"} elif cards_not_exist == []: # If the cards are already in the database, return a message. return {"message":"No new cards"} elif not email_exists: data["email"] = current_user importcsv.db.accountrevisioncards.insert_one(data) return {"message": "revision card stored"} except Exception as ex: print(type(ex),ex) @app.put('/changesendtoemail')# PUT # allow all origins all methods. async def changesendtoemail(data : JSONStructure = None, authorization: str = Header(None)): # TODO current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: try: data = dict(data)#request.get_json() email_exists = importcsv.db.accountrevisioncards.find_one({"email":current_user}) if email_exists: scheduled_exists = importcsv.db.scheduledcards.find_one({"email":current_user}) if scheduled_exists: user_scheduled_cards = list(importcsv.db.scheduledcards.find({"email": current_user}))[0] #importcsv.db.scheduledcards.delete_many(user_scheduled_cards) del user_scheduled_cards["sendtoemail"] sendtoemailscheduled = user_scheduled_cards["sendtoemail"] user_scheduled_cards.update({"sendtoemail": sendtoemailscheduled}) #importcsv.db.scheduledcards.insert_one(user_scheduled_cards) importcsv.db.scheduledcards.replace_one( {"email":current_user},user_scheduled_cards ) user_revision_cards = list(importcsv.db.accountrevisioncards.find({"email": current_user}))[0] #importcsv.db.accountrevisioncards.delete_many(user_revision_cards) del user_revision_cards["sendtoemail"] sendtoemail = data["sendtoemail"] user_revision_cards.update({"sendtoemail": sendtoemail}) importcsv.db.accountrevisioncards.replace_one( {"email":current_user},user_revision_cards ) #importcsv.db.accountrevisioncards.insert_one(user_revision_cards) return {"message": "Send to email changed."} elif not email_exists: return {"message":"email does not exist"} except Exception as ex: return {f"error":f"{type(ex)},{str(ex)}"} @app.post('/changerevisioncard') # POST # allow all origins all methods. async def changerevisioncard(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: try: data = dict(data)#request.get_json() email_exists = importcsv.db.accountrevisioncards.find_one({"email":current_user}) if email_exists: # Checks if email exists # TODO Slightly buggy here - removes old schedule from the database. scheduled_exists = importcsv.db.scheduledcards.find_one({"email":current_user}) if scheduled_exists: user_scheduled_cards = list(importcsv.db.scheduledcards.find({"email": current_user}))[0] if user_scheduled_cards: for card in user_scheduled_cards["revisioncards"]: oldcard = {i:data[i] for i in data if i!='newrevisioncard'} if card == oldcard: user_scheduled_cards["revisioncards"].remove(card) #importcsv.db.scheduledcards.delete_many({"email":current_user}) #importcsv.db.scheduledcards.insert_one(user_scheduled_cards) importcsv.db.scheduledcards.replace_one( {"email":current_user},user_scheduled_cards ) user_revision_cards = list(importcsv.db.accountrevisioncards.find({"email": current_user}))[0] left_over_image = [] for card in user_revision_cards["revisioncards"]: oldcard = {i:data[i] for i in data if i!='newrevisioncard'} oldcard["translation"] = card["translation"] oldcard["revisioncardimgname"] = card["revisioncardimgname"] oldcard["revisioncardimage"] = card["revisioncardimage"] if card == oldcard: user_revision_cards["revisioncards"].remove(card) left_over_image.append({"revisioncardimgname":card["revisioncardimgname"],"revisioncardimage":card["revisioncardimage"] }) #print(user_revision_cards) del data["revisioncard"] data["translation"] = "" data["revisioncardimgname"] = left_over_image[0]["revisioncardimgname"] data["revisioncardimage"] = left_over_image[0]["revisioncardimage"] data["revisioncard"] = data["newrevisioncard"] del data["newrevisioncard"] user_revision_cards["revisioncards"].insert(0,data) # .append() #print(user_revision_cards) importcsv.db.accountrevisioncards.replace_one( {"email":current_user},user_revision_cards ) #importcsv.db.accountrevisioncards.delete_many({"email":current_user}) #importcsv.db.accountrevisioncards.insert_one(user_revision_cards) return {"message":"revision card changed."} except Exception as ex: #print({f"error":f"{type(ex)},{str(ex)}"}) return {f"error":f"{type(ex)},{str(ex)}"} @app.get('/getrevisioncards')# GET # allow all origins all methods. async def getrevisioncards(authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: try: email_exists = importcsv.db.accountrevisioncards.find_one({"email":current_user}) if email_exists: # Checks if email exists user_revision_cards = list(importcsv.db.accountrevisioncards.find({"email": current_user}))[0] del user_revision_cards["_id"],user_revision_cards["email"] return user_revision_cards elif not email_exists: return {"message":"No revision cards"} # Send in shape of data except Exception as ex: return {f"error":f"{type(ex)},{str(ex)}"} @app.post('/uploadrevisioncardtxtfile') # POST # allow all origins all methods. async def uploadrevisioncardtxtfile(data : JSONStructure = None, authorization: str = Header(None)): try: current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: file = request.files["file"] if file: return {"message":file} elif not file: {"message":"No file"} except Exception as ex: return {f"error":f"{type(ex)},{str(ex)}"} @app.post('/removerevisioncard') # POST # allow all origins all methods. async def removerevisioncard(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: try: data = dict(data)#request.get_json() email_exists = importcsv.db.accountrevisioncards.find_one({"email":current_user}) if email_exists: # Checks if email exists # Remove the revision card from the database. user_revision_cards = list(importcsv.db.accountrevisioncards.find({"email": current_user}))[0] for card in user_revision_cards["revisioncards"]: if card == data: user_revision_cards["revisioncards"].remove(card) #importcsv.db.accountrevisioncards.delete_many({"email":current_user}) #importcsv.db.accountrevisioncards.insert_one(user_revision_cards) importcsv.db.accountrevisioncards.replace_one( {"email":current_user},user_revision_cards ) # Remove the revision card from the scheduled cards try: user_scheduled_cards = list(importcsv.db.scheduledcards.find({"email": current_user}))[0] for card in user_scheduled_cards["revisioncards"]: if card == data: user_scheduled_cards["revisioncards"].remove(card) #importcsv.db.scheduledcards.delete_many({"email":current_user}) #importcsv.db.scheduledcards.insert_one(user_scheduled_cards) importcsv.db.scheduledcards.replace_one( {"email":current_user},user_scheduled_cards ) return {"message":"revision card removed"} except IndexError as iex: return {"message":"revision card removed"} except Exception as ex: return {f"error":f"{type(ex)},{str(ex)}"} @app.post('/schedulerevisioncard') # POST # allow all origins all methods. async def schedulerevisioncard(data : JSONStructure = None, authorization: str = Header(None)): try: current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: data = dict(data)#request.get_json() # test email_exists = importcsv.db.scheduledcards.find_one({"email":current_user}) if email_exists: # Checks if email exists cards_not_exist = [] user_scheduled_cards = list(importcsv.db.scheduledcards.find({"email": current_user}))[0] # Gets the email. #print(user_revision_cards) for card in data["revisioncards"]: # Checks if the revision card exists in the database. if card not in user_scheduled_cards["revisioncards"]: cards_not_exist.append(card) # If not, add it to the list. #cards_that_exist.append(card) if cards_not_exist != []: new_cards = cards_not_exist + user_scheduled_cards["revisioncards"] # adds new cards to the list. user_scheduled_cards["revisioncards"] = new_cards # Updates the list. del user_scheduled_cards["_id"] user_scheduled_cards["email"] = current_user # Sets the email to the current user. #importcsv.db.scheduledcards.delete_many({"email":current_user}) # Allows data to be updated. #importcsv.db.scheduledcards.insert_one(user_scheduled_cards) # Inserts the new data. importcsv.db.scheduledcards.replace_one( {"email":current_user},user_scheduled_cards ) return {"message":"revision cards scheduled"} elif cards_not_exist == []: # If the cards are already in the database, return a message. return {"message":"revision cards already scheduled"} elif not email_exists: data["email"] = current_user importcsv.db.scheduledcards.insert_one(data) return {"message": "revision card scheduled"} except Exception as ex: print(type(ex),ex) @app.delete('/unscheduleallrevisioncard') # DELETE # allow all origins all methods. async def unscheduleallrevisioncard(authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: try: email_exists = importcsv.db.scheduledcards.find_one({"email":current_user}) if email_exists: # Checks if email exists user_revision_cards = list(importcsv.db.scheduledcards.find({"email": current_user}))[0] user_revision_cards["revisioncards"] = [] #importcsv.db.scheduledcards.delete_many({"email":current_user}) #importcsv.db.scheduledcards.insert_one(user_revision_cards) importcsv.db.scheduledcards.replace_one( {"email":current_user},user_revision_cards ) return {"message":"Allrevision card unscheduled"} except Exception as ex: return {f"error":f"{type(ex)},{str(ex)}"} @app.post('/unschedulerevisioncard') # POST # allow all origins all methods. async def unschedulerevisioncard(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: try: data = dict(data)#request.get_json() email_exists = importcsv.db.scheduledcards.find_one({"email":current_user}) if email_exists: # Checks if email exists user_revision_cards = list(importcsv.db.scheduledcards.find({"email": current_user}))[0] for card in user_revision_cards["revisioncards"]: if card == data: user_revision_cards["revisioncards"].remove(card) #importcsv.db.scheduledcards.delete_many({"email":current_user}) #importcsv.db.scheduledcards.insert_one(user_revision_cards) importcsv.db.scheduledcards.replace_one( {"email":current_user},user_revision_cards ) return {"message":"revision card unscheduled"} except Exception as ex: return {f"error":f"{type(ex)},{str(ex)}"} @app.post('/sendnowrevisioncard') # POST # allow all origins all methods. async def sendnowrevisioncard(data : JSONStructure = None, authorization: str = Header(None)): try: current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: data = dict(data)#request.get_json() now = datetime.now().strftime("%c") message = f"""{data['revisioncards'][0]['revisioncardtitle']}{data["revisioncards"][0]["revisioncard"]}""" #response = requests.post("http://0.0.0.0:7860/raspsendemail",json={"raspsendemail":{"email":data["sendtoemail"],"message":message,"subject":f"{data['revisioncards'][0]['subject']} Send Now"}}) RaspEmail.send(**{"email":data["sendtoemail"],"message":message,"subject":f"{data['revisioncards'][0]['subject']} Send Now"}) #print(response.text) #msg = Message(f"{data['revisioncards'][0]['subject']} Send Now", recipients=[data["sendtoemail"]]) # "amari.lawal@gmail.com" #msg.body = f"Mail from RevisionCard Send Now at {now}" #if "!DOCTYPE" not in data["revisioncards"][0]["revisioncard"] or "h1" not in data["revisioncards"][0]["revisioncard"]: # msg.html = f"""
{data['revisioncards'][0]['revisioncardtitle']}
            #    {data["revisioncards"][0]["revisioncard"]}
""" #elif "!DOCTYPE" in data["revisioncards"][0]["revisioncard"] or "h1" in data["revisioncards"][0]["revisioncard"]: # msg.html = f""" # {data['revisioncards'][0]['revisioncardtitle']} # {data["revisioncards"][0]["revisioncard"]} # """ #print(msg) #mail.send(msg) return {"message":"revision card sent"} except Exception as ex: print({f"error":f"{type(ex)},{str(ex)}"}) return {f"error":f"{type(ex)},{str(ex)}"} @app.get('/checkschedulerevisioncard')# GET # allow all origins all methods. async def checkschedulerevisioncard(authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: try: email_exists = importcsv.db.scheduledcards.find_one({"email":current_user}) if email_exists: # Checks if email exists user_scheduled_cards = list(importcsv.db.scheduledcards.find({"email": current_user}))[0] del user_scheduled_cards["_id"],user_scheduled_cards["email"],user_scheduled_cards["revisionscheduleinterval"],user_scheduled_cards["sendtoemail"] return user_scheduled_cards elif not email_exists: return {"message":"revision cards not scheduled"} # Send in shape of data except Exception as ex: return {f"error":f"{type(ex)},{str(ex)}"} @app.post('/fmathsqp') # POST # allow all origins all methods. async def fmathsqp(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: datajson = dict(data)#request.get_json() try: main_qp_url = "https://www.physicsandmathstutor.com/a-level-maths-papers/" data = datajson["furthermaths"] email = data["email"] book_inp = data["furthermathsbook"] topic_inp = data["furthermathstopic"] platform = data["platform"] qp_sections = {"Core":["c1",'c2','c3','c4'],"Mechanics":['m1','m2','m3','m4','m5'],"Statistics":['s1','s2','s3','s4'],'Further Pure':['fp1','fp2','fp3'],'Decision Maths':['d1','d2']} if "c".lower() in book_inp.lower(): book_choice = "c" elif "m".lower() in book_inp.lower(): book_choice = "m" elif "s".lower() in book_inp.lower(): book_choice = "s" elif "fp".lower() in book_inp.lower(): book_choice = "fp" elif "d".lower() in book_inp.lower(): book_choice = "d" elif "a".lower() in book_inp.lower(): book_choice = "a" else: return {"result": "doesn't exist"} if book_choice != "a": endpoints = [endpoint for val in qp_sections.values() for endpoint in val if book_choice in endpoint] elif book_choice == "a": endpoints = [endpoint for val in qp_sections.values() for endpoint in val] pdf_result = [] pdf_titles = [] def qp_extraction(qp_url,end): headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:66.0) Gecko/20100101 Firefox/66.0","Accept-Encoding": "gzip, async deflate", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8","DNT": "1", "Connection": "close", "Upgrade-Insecure-Requests": "1"} response_topic = requests.get(f"{qp_url}/{end}-by-topic/",headers=headers).text #if "This site is currently under going scheduled maintenance." in response_topic: # return {"error":"Physics maths tutor is in maintenance mode."} soup = BeautifulSoup(response_topic,features='lxml') for child in soup.find_all(['a'],href=True): if topic_inp.lower() in str(child.text).lower(): pdf_url = child['href'] #print(pdf_url) #print(f'{str(child.text).capitalize()}.pdf') pdf_titles.append(f'{str(child.text).capitalize()}.pdf') pdf_result.append(pdf_url) #response = requests.get(pdf_url) #pdf_result.append(str(response.content).replace("b'","").replace("'","")) #print(endpoints) def topic_extraction(end): qp_extraction(main_qp_url,end) def threads_url(data : JSONStructure = None, authorization: str = Header(None)): #threads = 60 # 20 # TODO Number of threads may be blowing up the router. threads = 4 with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: #max_workers=threads executor.map(topic_extraction,endpoints) threads_url() message = """ The Further Maths question papers, email has been sent to you: """ linkmessage = """ The Further Maths question papers links: """ #random.shuffle(pdf_result) #random.shuffle(pdf_titles) if pdf_result != []: if len(pdf_result) > 5: pdf_slice = round(len(pdf_result) * (100/100)) else: pdf_slice = round(len(pdf_result) * (100/100)) for link,title in zip(pdf_result[:pdf_slice],pdf_titles[:pdf_slice]): linkmessage += "
" linkmessage += f"{title}" + "
" linkmessage += link.replace(" ",'%20') + "
" for i in pdf_titles: message += "\n" message += i + "\n" user_from_db = check_user_from_db(current_user) student_email_exists = importcsv.db.studentsubscriptions.find_one({"email":current_user}) if "end_date_subscription" in user_from_db: end_date = getendsubscription(current_user) if user_from_db["emailsleft"] <= 0: if platform == "app": pdf_response = {"furthermathsresult":{"furthermathsmessage":message}} # "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage elif platform == "web": pdf_response = {"furthermathsresult":{"furthermathsmessage":linkmessage,"emailcount":0,"end_date_subscription":end_date}}# "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage return pdf_response elif user_from_db["emailsleft"] > 0: now = datetime.now().strftime("%c") message = f"""

The Further Maths question papers links:

{linkmessage}

. """ #response = requests.post("http://0.0.0.0:7860/raspsendemail",json={"raspsendemail":{"email":email,"message":message,"subject":"FMathqp PDFs"}}) RaspEmail.send(**{"email":email,"message":message,"subject":"FMathqp PDFs"}) #msg = Message("FMathqp PDFs", recipients=[email]) # "amari.lawal@gmail.com" #msg.body = f"Mail from FMathqp at {now}" #msg.html = f""" #

The Further Maths question papers links:

#

{linkmessage}

. #""" #mail.send(msg) user_from_db.update({"emailsleft":int(user_from_db["emailsleft"])-1}) importcsv.db.users.update_one({"email": current_user}, {"$set": user_from_db},upsert=True) if platform == "app": pdf_response = {"furthermathsresult":{"furthermathsmessage":message}} # "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage elif platform == "web": pdf_response = {"furthermathsresult":{"furthermathsmessage":linkmessage,"emailcount":int(user_from_db["emailsleft"])-1,"end_date_subscription":end_date}} return pdf_response elif "end_date_subscription" not in user_from_db and not student_email_exists: if platform == "app": pdf_response = {"furthermathsresult":{"furthermathsmessage":message}} # "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage elif platform == "web": pdf_response = {"furthermathsresult":{"furthermathsmessage":linkmessage,"emailcount":0,"end_date_subscription":99999999}}# "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage return pdf_response elif "end_date_subscription" not in user_from_db and student_email_exists: if user_from_db["emailsleft"] <= 0: if platform == "app": pdf_response = {"furthermathsresult":{"furthermathsmessage":message}} # "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage elif platform == "web": pdf_response = {"furthermathsresult":{"furthermathsmessage":linkmessage,"emailcount":0,"end_date_subscription":end_date}}# "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage return pdf_response elif user_from_db["emailsleft"] > 0: now = datetime.now().strftime("%c") message = f"""

The Further Maths question papers links:

{linkmessage}

. """ #response = requests.post("http://0.0.0.0:7860/raspsendemail",json={"raspsendemail":{"email":email,"message":message,"subject":"FMathqp PDFs"}}) RaspEmail.send(**{"email":email,"message":message,"subject":"FMathqp PDFs"}) user_from_db.update({"emailsleft":int(user_from_db["emailsleft"])-1}) importcsv.db.studentsubscriptions.update_one({"email": current_user}, {"$set": user_from_db},upsert=True) if platform == "app": pdf_response = {"furthermathsresult":{"furthermathsmessage":message}} # "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage elif platform == "web": pdf_response = {"furthermathsresult":{"furthermathsmessage":linkmessage,"emailcount":int(user_from_db["emailsleft"])-1,"end_date_subscription":99999999}}# "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage return pdf_response elif pdf_result == []: return {"error":"No further maths question papers available"} except Exception as e: return {f"error":f"{type(e)},{str(e)}"} else: return {"message": "Login first please."} @app.post('/fmathsb') # POST # allow all origins all methods. async def fmathsb(data : JSONStructure = None, authorization: str = Header(None)): #pure maths: 0, statistics mechanics: 1, core pure maths: 2, further pure maths: 3, further statistics: 4, further mechanics: 5, decision maths: 6" # year/book: 1, year/book: 2 # {"furthermathsb":{"email":"amari.lawal@gmail.com","furthermathsbbook": 0,"furthermathsbyear":2}} # Output PureMaths Book 2 current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: sb_books_list = {"0":"pure-maths","1":"statistics-mechanics","2":"core-pure-maths","3":"further-pure-maths","4":"further-statistics","5":"further-mechanics","6":"decision-maths"} datajson = dict(data)#request.get_json() data = datajson["furthermathsb"] email = data["email"] sb_book_inp = str(data["furthermathsbbook"]) sb_year_inp = str(data["furthermathsbyear"]) sb_exercise = str(data["furthermathsbexercise"]) platform = data["platform"] sb_book = sb_books_list[str(sb_book_inp)] sb_year = str(sb_year_inp) if sb_book == "pure-maths" or sb_book == "statistics-mechanics": sb_url = f"https://www.physicsandmathstutor.com/maths-revision/solutionbanks/edexcel-{sb_book}-year-{sb_year}/" #print(sb_url) else: sb_url = f"https://www.physicsandmathstutor.com/maths-revision/solutionbanks/edexcel-{sb_book}-{sb_year}/" book_dir_name = f"{sb_book}{str(sb_year)}".capitalize() response = requests.get(sb_url).text soup = BeautifulSoup(response,features='lxml') soup_a_tags = soup.find_all(['a'],href=True) sb_result = [] sb_titles = [] async def sb_extraction(child): if "Exercise" in child.text and sb_exercise.upper() in child.text: print(child.text) pdf_url = child['href'] #response = requests.get(pdf_url) sb_titles.append(f'{book_dir_name}-{str(child.text).capitalize()}.pdf') sb_result.append(pdf_url) async def threads_url(data : JSONStructure = None, authorization: str = Header(None)): #threads = 60 # 20 # TODO Number of threads may be blowing up the router. threads = 4 with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: #max_workers=threads executor.map(sb_extraction,soup_a_tags) threads_url() if sb_result != []: message = """ The Further Maths solution bank, email has been sent to you: """ linkmessage = """ The Further Maths question papers links: """ for link,title in zip(sb_result,sb_titles): linkmessage += "
" linkmessage += f"{title}" + "
" linkmessage += link.replace(" ",'%20') + "
" for i in sb_titles: message += "\n" message += i + "\n" user_from_db = check_user_from_db(current_user) student_email_exists = importcsv.db.studentsubscriptions.find_one({"email":current_user}) if "end_date_subscription" in user_from_db: end_date = getendsubscription(current_user) if user_from_db["emailsleft"] <= 0: if platform == "app": pdf_response = {"furthermathsresult":{"furthermathsmessage":message}} # "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage elif platform == "web": pdf_response = {"furthermathsresult":{"furthermathsmessage":linkmessage,"emailcount":0,"end_date_subscription":end_date}}# "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage return pdf_response elif user_from_db["emailsleft"] > 0: now = datetime.now().strftime("%c") message = f"""

The Further Maths Solution Bank links:

{linkmessage}

. """ #response = requests.post("http://0.0.0.0:7860/raspsendemail",json={"raspsendemail":{"email":email,"message":message,"subject":"FMathSB PDFs"}}) RaspEmail.send(**{"email":email,"message":message,"subject":"FMathSB PDFs"}) #msg = Message("FMathSB PDFs", recipients=[email]) # "amari.lawal@gmail.com" #msg.body = f"Mail from FMathsb at {now}" #msg.html = f""" #

The Further Maths Solution Bank links:

#

{linkmessage}

. #""" #mail.send(msg) emailcount = int(user_from_db["emailsleft"])-1 user_from_db.update({"emailsleft":emailcount}) importcsv.db.users.update_one({"email": current_user}, {"$set": user_from_db},upsert=True) if platform == "app": pdf_response = {"furthermathsresult":{"furthermathsmessage":message}} # "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage elif platform == "web": pdf_response = {"furthermathsresult":{"furthermathsmessage":linkmessage,"emailcount":emailcount,"end_date_subscription":end_date}}# "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage return pdf_response elif "end_date_subscription" not in user_from_db and not student_email_exists: if platform == "app": pdf_response = {"furthermathsresult":{"furthermathsmessage":message}} # "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage elif platform == "web": pdf_response = {"furthermathsresult":{"furthermathsmessage":linkmessage,"emailcount":0,"end_date_subscription":9999999}}# "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage return pdf_response elif "end_date_subscription" not in user_from_db and student_email_exists: if user_from_db["emailsleft"] <= 0: if platform == "app": pdf_response = {"furthermathsresult":{"furthermathsmessage":message}} # "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage elif platform == "web": pdf_response = {"furthermathsresult":{"furthermathsmessage":linkmessage,"emailcount":0,"end_date_subscription":end_date}}# "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage return pdf_response elif user_from_db["emailsleft"] > 0: now = datetime.now().strftime("%c") message = f"""

The Further Maths Solution Bank links:

{linkmessage}

. """ #response = requests.post("http://0.0.0.0:7860/raspsendemail",json={"raspsendemail":{"email":email,"message":message,"subject":"FMathSB PDFs"}}) RaspEmail.send(**{"email":email,"message":message,"subject":"FMathSB PDFs"}) user_from_db.update({"emailsleft":int(user_from_db["emailsleft"])-1}) importcsv.db.studentsubscriptions.update_one({"email": current_user}, {"$set": user_from_db},upsert=True) if platform == "app": pdf_response = {"furthermathsresult":{"furthermathsmessage":message}} # "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage elif platform == "web": pdf_response = {"furthermathsresult":{"furthermathsmessage":linkmessage,"emailcount":int(user_from_db["emailsleft"])-1,"end_date_subscription":9999999}}# "furthermathslinks":pdf_result,"furthermathstitles":pdf_titles,"furthermathslinkmessage":linkmessage return pdf_response elif sb_result == []: return {f"error":f"No further maths solution bank for {sb_book} {sb_year}"} except Exception as e: return {f"error":f"{type(e)},{str(e)}"} else: return {"message": "Login first please."} @app.post('/ocrsciencebookanswers') # POST # allow all origins all methods. async def scienceocranswers(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: datajson = dict(data)#request.get_json() async def ocrscienceanswers(querydata): examboards = "OCR" url = "https://global.oup.com/education/content/secondary/series/ocr-a-level-sciences/a-level-sciences-for-ocr-student-book-answers/?region=uk" physicsanswerspdf = {} response= requests.get(url).text soup = BeautifulSoup(response,features='lxml') for divele in soup.find_all('div',{'class':'content_block half_width enclosed'}): # inner_block text_only_single ## if querydata in divele.text: for a in divele.find_all("a",href=True): if a["href"] != "?region=uk": physicsanswerspdf.update({a.text.replace("\xa0",' '): a["href"].replace("?region=uk",'')}) result = {querydata:{examboards:physicsanswerspdf}} return result try: data = datajson["physicsocr"] email = data["email"] subject = data["subject"] # physics, chemistry, biology physicsocralph = data["physicsocralph"] # A or B chapter = data["chapter"] # Chapter 1 year = data["year"] # AS/Year 1, A Level platform = data["platform"] # web or app query = f"{subject.capitalize()} {physicsocralph} {year}" papers = ocrscienceanswers(query) answerlink = papers[query]["OCR"][f"{chapter.capitalize()} (PDF)"].replace(" ",'%20') user_from_db = check_user_from_db(current_user) student_email_exists = importcsv.db.studentsubscriptions.find_one({"email":current_user}) if "end_date_subscription" in user_from_db: end_date = getendsubscription(current_user) if user_from_db["emailsleft"] <= 0: result = {"scienceocranswers": answerlink,"emailcount":0,"end_date_subscription":end_date} return result elif user_from_db["emailsleft"] > 0: now = datetime.now().strftime("%c") message = f"""

OCR Science {query} Answers:

{answerlink}

. """ #response = requests.post("http://0.0.0.0:7860/raspsendemail",json={"raspsendemail":{"email":email,"message":message,"subject":f"OCR {query} Answers"}}) RaspEmail.send(**{"email":email,"message":message,"subject":f"OCR {query} Answers"}) #msg = Message(f"OCR {query} Answers", recipients=[email]) # "amari.lawal@gmail.com" #msg.body = f"Mail from {query} at {now}" #msg.html = f""" #

OCR Science {query} Answers:

#

{answerlink}

. #""" #mail.send(msg) emailcount = int(user_from_db["emailsleft"])-1 user_from_db.update({"emailsleft":emailcount}) importcsv.db.users.update_one({"email": current_user}, {"$set": user_from_db},upsert=True) result = {"scienceocranswers": answerlink,"emailcount":emailcount,"end_date_subscription":end_date} return result elif "end_date_subscription" not in user_from_db and not student_email_exists: result = {"scienceocranswers": answerlink,"emailcount":0,"end_date_subscription":9999999} return result elif "end_date_subscription" not in user_from_db and student_email_exists: if user_from_db["emailsleft"] <= 0: result = {"scienceocranswers": answerlink,"emailcount":0,"end_date_subscription":end_date} return result elif user_from_db["emailsleft"] > 0: now = datetime.now().strftime("%c") message = f"""

OCR Science {query} Answers:

{answerlink}

. """ #response = requests.post("http://0.0.0.0:7860/raspsendemail",json={"raspsendemail":{"email":email,"message":message,"subject":f"OCR {query} Answers"}}) RaspEmail.send(**{"email":email,"message":message,"subject":f"OCR {query} Answers"}) user_from_db.update({"emailsleft":int(user_from_db["emailsleft"])-1}) importcsv.db.studentsubscriptions.update_one({"email": current_user}, {"$set": user_from_db},upsert=True) result = {"scienceocranswers": answerlink,"emailcount":int(user_from_db["emailsleft"])-1,"end_date_subscription":9999999} return result except Exception as e: return {f"error":f"{type(e)},{str(e)}"} else: return {"message": "Login first please."} @app.post('/physicsaqa') # POST # allow all origins all methods. async def physicsaqa(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: datajson = dict(data)#request.get_json() topicquestions = PhysicsAQA().collectdata() data = datajson["physicsaqa"] email = data["email"] chapter = data["chapter"] # Section 1: Measurement & Their Errors topic = data["topic"] # Constituents of the Atom or The Law of the Atom platform = data["platform"] # web or app try: questionpaper = topicquestions[chapter][topic] except Exception as ex: return {"error":"chapter or topic not found"} try: markscheme = topicquestions[chapter][f"{topic} MS"] except Exception as ex: return {"error":"chapter or topic mark scheme not found"} user_from_db = check_user_from_db(current_user) student_email_exists = importcsv.db.studentsubscriptions.find_one({"email":current_user}) if "end_date_subscription" in user_from_db: end_date = getendsubscription(current_user) if user_from_db["emailsleft"] <= 0: return {"physicsaqa":{"chapter":chapter,"topic":topic,"question paper":questionpaper,"markscheme":markscheme,"emailcount":emailcount,"end_date_subscription":end_date}} elif user_from_db["emailsleft"] > 0: now = datetime.now().strftime("%c") message = f"""

PhysicsAqa Question Papers:

{questionpaper}

{markscheme}

. """ #response = requests.post("http://0.0.0.0:7860/raspsendemail",json={"raspsendemail":{"email":email,"message":message,"subject":f"PhysicsAqa Papers"}}) RaspEmail.send(**{"email":email,"message":message,"subject":f"PhysicsAqa Papers"}) #msg = Message(f"PhysicsAqa Papers", recipients=[email]) # "amari.lawal@gmail.com" #msg.body = f"Mail from physicsaqaApi at {now}" #msg.html = f""" #

PhysicsAqa Question Papers:

#

{questionpaper}

#

{markscheme}

. #""" #mail.send(msg) emailcount = int(user_from_db["emailsleft"])-1 user_from_db.update({"emailsleft":emailcount}) importcsv.db.users.update_one({"email": current_user}, {"$set": user_from_db},upsert=True) return {"physicsaqa":{"chapter":chapter,"topic":topic,"question paper":questionpaper,"markscheme":markscheme,"emailcount":emailcount,"end_date_subscription":end_date}} elif "end_date_subscription" not in user_from_db and not student_email_exists: return {"physicsaqa":{"chapter":chapter,"topic":topic,"question paper":questionpaper,"markscheme":markscheme,"emailcount":0,"end_date_subscription":9999999}} # If it is a student account elif "end_date_subscription" not in user_from_db and student_email_exists: # Check number of emails left if user_from_db["emailsleft"] <= 0: return {"physicsaqa":{"chapter":chapter,"topic":topic,"question paper":questionpaper,"markscheme":markscheme,"emailcount":emailcount,"end_date_subscription":end_date}} elif user_from_db["emailsleft"] > 0: now = datetime.now().strftime("%c") message = f"""

PhysicsAqa Question Papers:

{questionpaper}

{markscheme}

. """ #response = requests.post("http://0.0.0.0:7860/raspsendemail",json={"raspsendemail":{"email":email,"message":message,"subject":f"PhysicsAqa Papers"}}) RaspEmail.send(**{"email":email,"message":message,"subject":f"PhysicsAqa Papers"}) user_from_db.update({"emailsleft":int(user_from_db["emailsleft"])-1}) importcsv.db.studentsubscriptions.update_one({"email": current_user}, {"$set": user_from_db},upsert=True) return {"physicsaqa":{"chapter":chapter,"topic":topic,"question paper":questionpaper,"markscheme":markscheme,"emailcount":int(user_from_db["emailsleft"])-1,"end_date_subscription":9999999}} except TypeError as tex: return {f"error":f"request is wrong shape {tex}"} except Exception as ex: return {f"error":f"{type(ex)} {str(ex)}"} else: return {"message": "Login first please."} @app.post('/signupapi') # POST async def signup(data: JSONStructure = None): try: signupdata = {} data = dict(data) hashed = hashlib.sha256(data["password"].encode('utf-8')).hexdigest() signupdata["email"] = data["email"] signupdata["password"] = hashed email_exists = importcsv.db.users.find_one({"email": signupdata["email"]}) email_exists_student = importcsv.db.studentsubscriptions.find_one({"email": signupdata["email"]}) # Checks if student account exists if email_exists or email_exists_student: return {"message": "Email already exists"} # , 400 elif not email_exists: # Notifies who are the beta testers #if datetime.now() < "2022-05-19T21:37:00.057084": # signupdata.update({"betatest":"true"}) importcsv.db.users.insert_one(signupdata) access_token = secure_encode({"email":signupdata["email"]})#create_access_token(identity=signupdata["email"]) callback = {"status": "success","access_token":access_token} return callback except Exception as ex: error_detected = {"error": "error occured","errortype":type(ex), "error": str(ex)} return error_detected @app.post('/loginapi') # POST async def login(login_details: JSONStructure = None): # ,authorization: str = Header(None) # Login API try: def provide_access_token(login_details,student=0): if student == 0: email_exists = list(importcsv.db.users.find({"email": login_details["email"]}))[0] elif student == 1: email_exists = list(importcsv.db.studentsubscriptions.find({"email": login_details["email"]}))[0] encrypted_password = hashlib.sha256(login_details["password"].encode('utf-8')).hexdigest() if email_exists["password"] == encrypted_password: access_token = secure_encode({"email":email_exists["email"]}) #create_access_token(identity=email_exists["email"]) return access_token else: return "Wrong password" login_details = dict(login_details) #print(login_details) email_exists = importcsv.db.users.find_one({"email": login_details["email"]}) email_exists_student = importcsv.db.studentsubscriptions.find_one({"email": login_details["email"]}) # Checks if student account exists if email_exists: access_token = provide_access_token(login_details,student=0) if access_token == "Wrong password": return {"message": "The username or password is incorrect."} else: return {"access_token": access_token} elif email_exists_student: access_token = provide_access_token(login_details,student=1) if access_token == "Wrong password": return {"message": "The username or password is incorrect."} else: return {"access_token": access_token} return {"message": "The username or password is incorrect."} except Exception as ex: return {"error": f"{type(ex)} {str(ex)}"} # @app.post('/forgotpassword') # POST async def forgotpassword(data : JSONStructure = None, authorization: str = Header(None)): # Login API data = dict(data)#request.get_json() try: #print(data["email"]) access_token = secure_decode(data["email"]) #create_access_token(identity=data["email"]) # store token in database temporarily now = datetime.now().strftime("%c") #response = requests.post("http://0.0.0.0:7860/raspsendemail",json={"raspsendemail":{"email":data["email"],"message":forgotpasswordemail(data["email"],access_token),"subject":f"RevsionBank Password Reset"}}) RaspEmail.send(**{"email":data["email"],"message":forgotpasswordemail(data["email"],access_token),"subject":f"RevsionBank Password Reset"}) #msg = Message(f"RevsionBank Password Reset", recipients=[data["email"]]) # "amari.lawal@gmail.com" #msg.body = f"Mail from RevisionBank at {now}" #msg.html = forgotpasswordemail(data["email"],access_token) #mail.send(msg) return {"message": "Reset Email sent"} except Exception as ex: return {"error": f"{type(ex)} {str(ex)}"} @app.put('/resetpassword') # PUT async def resetpassword(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: try: data = dict(data)#request.get_json() email_exists = importcsv.db.users.find_one({"email": current_user}) #print(email_exists) if email_exists: user_from_db = list(importcsv.db.users.find({"email": current_user}))[0] #print(user_from_db) # TODO Delete password from here and replace. #importcsv.db.users.delete_many(user_from_db) del user_from_db["password"] encrypted_password = hashlib.sha256(data["password"].encode('utf-8')).hexdigest() user_from_db.update({"password": encrypted_password}) #importcsv.db.users.insert_one(user_from_db) importcsv.db.users.replace_one( {"email":current_user},user_from_db ) return {"message": "Password reset successful."} elif not email_exists: return {"message": "Email Doesn't exist."} except Exception as ex: return {"error": f"{type(ex)} {str(ex)}"} @app.post('/storesubscription') # POST async def storesubscription(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: data = dict(data)#request.get_json() user_from_db = list(importcsv.db.users.find({"email": current_user}))[0] # Gets wanted data for user if data["subscription"] == 'basic': emailsleft = {'emailsleft': 0} elif data["subscription"] == 'standard': emailsleft = {'emailsleft': 40} elif data["subscription"] == 'premium' or data["subscription"] == 'educational': emailsleft = {'emailsleft': 10000000000} if data["subscription"] == "educational": user_from_db.update({"numofaccounts": 200}) # TODO Constant Value needs to be changed when frontend is changed user_from_db.update({"start_date_subscription": data["start_date_subscription"]}) user_from_db.update({"end_date_subscription": data["end_date_subscription"]}) user_from_db.update({"subscription": data["subscription"]}) # Updates the user with the new subscription user_from_db.update(emailsleft) importcsv.db.users.update_one( { "email": current_user}, {"$set": user_from_db}, upsert = True ) importcsv.db.subscriptionlog.insert_one({"email": user_from_db["email"],"start_date_subscription": data["start_date_subscription"], "end_date_subscription": data["end_date_subscription"], "subscription": data["subscription"], "emailsleft": emailsleft["emailsleft"]}) return {"message": "Subscription Completed."} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} else: return {"message": "User not found"} @app.post('/storebetatester') # POST # async def storebetatester(data : JSONStructure = None, authorization: str = Header(None)): data = dict(data)#request.get_json() emailsleft = {'emailsleft': 10000000000} email_exists = importcsv.db.users.find_one({"email": data["email"]}) if email_exists: user_from_db = list(importcsv.db.users.find({"email": data["email"]}))[0] # Gets wanted data for user #user_from_db.update({"numofaccounts": 200}) # TODO Constant Value needs to be changed when frontend is changed date_now = datetime.now() datetime_delta = dt.timedelta(weeks=2) user_from_db.update({"start_date_subscription": date_now.isoformat()}) user_from_db.update({"end_date_subscription": (datetime_delta + date_now).isoformat()}) user_from_db.update({"subscription": "premium"}) # Updates the user with the new subscription user_from_db.update(emailsleft) user_from_db.update({"betatester": "true"}) importcsv.db.users.update_one( { "email": data["email"]}, {"$set": user_from_db}, upsert = True ) #importcsv.db.subscriptionlog.insert_one({"email": user_from_db["email"],"start_date_subscription": data["start_date_subscription"], "end_date_subscription": data["end_date_subscription"], "subscription": data["subscription"], "emailsleft": emailsleft["emailsleft"]}) return {"message": "Beta Tester Subscription Completed."} elif not email_exists: return {"message": "User not found"} @app.post('/storeeducationalfreetrial') # POST # async def storeeducationalfreetrial(data : JSONStructure = None, authorization: str = Header(None)): data = dict(data)#request.get_json() try: emailsleft = {'emailsleft': 10000000000} email_exists = importcsv.db.users.find_one({"email": data["email"]}) if email_exists: user_from_db = list(importcsv.db.users.find({"email": data["email"]}))[0] # Gets wanted data for user #user_from_db.update({"numofaccounts": 200}) # TODO Constant Value needs to be changed when frontend is changed date_now = datetime.now() decimal_part = float(3 / 7) datetime_delta = dt.timedelta(weeks=4 + decimal_part) user_from_db.update({"start_date_subscription": date_now.isoformat()}) user_from_db.update({"numofaccounts": 200}) user_from_db.update({"end_date_subscription": (datetime_delta + date_now).isoformat()}) user_from_db.update({"subscription": "educational"}) # Updates the user with the new subscription user_from_db.update(emailsleft) importcsv.db.users.update_one( { "email": data["email"]}, {"$set": user_from_db}, upsert = True ) #importcsv.db.subscriptionlog.insert_one({"email": user_from_db["email"],"start_date_subscription": data["start_date_subscription"], "end_date_subscription": data["end_date_subscription"], "subscription": data["subscription"], "emailsleft": emailsleft["emailsleft"]}) return {"message": "Educational Freetrial Subscription Completed."} elif not email_exists: return {"message": "User not found"} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.post('/scheduleeducationalfreetrial') # POST # async def scheduleeducationalfreetrial(data : JSONStructure = None, authorization: str = Header(None)): data = dict(data)#request.get_json() try: regexdatetime = re.compile(r'\d\d\d\d-\d\d-\d\d') mo = regexdatetime.search(data["educationalfreetrialdate"]) educationalfreetrialdate = mo.group() except AttributeError as aex: return {"error":r"Datetime shape is %Y-%m-%d"} try: email_exists = importcsv.db.users.find_one({"email": data["email"]}) if email_exists: importcsv.db.schedulededucationalfreetrial.insert_one({"email": data["email"],"educationalfreetrialdate":educationalfreetrialdate}) return {"message": "Educational Freetrial Scheduled."} elif not email_exists: return {"message": "User not found"} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.post('/deletescheduleeducationalfreetrial') # POST # async def deletescheduleeducationalfreetrial(data : JSONStructure = None, authorization: str = Header(None)): data = dict(data)#request.get_json() current_user = data["email"] current_user = importcsv.db.users.find_one({"email": data["email"]}) if current_user: try: user_from_db = list(importcsv.db.schedulededucationalfreetrial.find({"email": data["email"]}))[0] importcsv.db.schedulededucationalfreetrial.delete_many(user_from_db) return {"message":"Educational Freetrial Unscheduled."} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.post('/removebetatester') # POST # async def removebetatester(data : JSONStructure = None, authorization: str = Header(None)): data = dict(data)#request.get_json() email_exists = importcsv.db.users.find_one({"email": data["email"]}) if email_exists: user_from_db = list(importcsv.db.users.find({"email": data["email"]}))[0] # Gets wanted data for user importcsv.db.users.delete_many(user_from_db) del user_from_db["end_date_subscription"], user_from_db["start_date_subscription"],user_from_db["subscription"],user_from_db["emailsleft"], user_from_db["betatester"] importcsv.db.users.update_one( { "email": data["email"]}, {"$set": user_from_db}, upsert = True ) return {"message": "Beta Tester Subscription Deleted."} elif not email_exists: return {"message": "User not found"} @app.get('/getsubscription') # GET async def getsubscription(authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: user_from_db = list(importcsv.db.users.find({"email": current_user}))[0] # Gets wanted data for user end_date = user_from_db["end_date_subscription"] end_date_subscription = {"end_date_subscription": end_date} return end_date_subscription except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.get('/getemailcount') # GET async def getemailcount(authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: user_from_db = list(importcsv.db.users.find({"email": current_user}))[0] # Gets wanted data for user emailcount = user_from_db["emailsleft"] emailcountres = {"emailcount": emailcount} return emailcountres except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.post('/storefreetrial') # POST async def storefreetrial(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: data = dict(data)#request.get_json() user_from_db = list(importcsv.db.users.find({"email": current_user}))[0] # Gets wanted data for user if 'freetrial' not in user_from_db: user_from_db.update({"freetrial": "true"}) emailsleft = {'emailsleft': 10000000000} user_from_db.update({"start_date_subscription": data["start_date_subscription"]}) user_from_db.update({"end_date_subscription": data["end_date_subscription"]}) user_from_db.update({"subscription": data["subscription"]}) # Updates the user with the new subscription user_from_db.update(emailsleft) importcsv.db.users.update_one( { "email": current_user}, {"$set": user_from_db}, upsert = True ) importcsv.db.users.update_one({"email": current_user}, {"$set": user_from_db}, upsert = True) importcsv.db.freetrialhistory.insert_one({"email": user_from_db["email"],"freetrial":"true"}) return {"message": "Freetrial Redeemed."} elif 'freetrial' in user_from_db: return {"error": "Freetrial has already used."} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.post('/setstudentsubscriptions') # POST async def setstudentsubscriptions(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: # Hostemail is the primary key for the studentsubscription collection try: data = dict(data)#request.get_json() # {"hostemail": "amari.lawal05@gmail.com","hostnumofaccounts": 198,"studentemails": [{"email": "6Lawala@truroschool.com","password": "mann35"},{"email": "Bola.lawal@hotmail.co.uk","password": "billy45"},{"email": "amari.lawal@gmail.com","password": "bobby46"}],"studentemailsleft": 20,"studentsubscription": "student educational"} user_from_db = list(importcsv.db.users.find({"email": current_user}))[0] # Host emails data studentsnotexist = [] for student in data["studentemails"]: # data["studentemails"] is a list of dictionaries [{"email":"example@gmail.com","password":"password"},{"email":"example@gmail.com","password":"password"}] student_user_from_db = importcsv.db.studentsubscriptions.find_one({"email": student["email"]}) # Checks if any of the emails added are in the database if not student_user_from_db: # If the email is not in the database, then we need to store the data into the database studentsnotexist.append(student) # Adds the email to the list of emails that do not exist in the database if studentsnotexist == []: # If all data is already in the database, no need to store it. return {"message": "all students exist."} elif studentsnotexist != []: # If there are emails that are not in the database, we need to store the data into the database if user_from_db["numofaccounts"] > 0: for student in studentsnotexist: # Goes through the emails not in the database encrypted_password = hashlib.sha256(student["password"].encode('utf-8')).hexdigest()# Encrypts the password # Then stores data into the studentsubscriptions collection importcsv.db.studentsubscriptions.insert_one({"hostemail":current_user,"email": student["email"],"password": encrypted_password,"emailsleft": 20,"subscription": "student educational"}) return {"message": "student subscriptions Set."} elif user_from_db["numofaccounts"] <= 0: return {"error": "No more student accounts left."} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.get('/getstudentsubscriptions') # GET async def getstudentsubscriptions(authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: try: student_user_from_db = list(importcsv.db.studentsubscriptions.find({"hostemail": current_user})) # [0] user_from_db = list(importcsv.db.users.find({"email": current_user}))[0] for student in student_user_from_db: del student["_id"], student["password"],student["hostemail"],student['subscription'] #importcsv.db.users.delete_many(user_from_db) # Deletes the data in order to update it. del user_from_db["numofaccounts"] # Deletes the numofaccounts to update it. user_from_db.update({"numofaccounts": 200 - len(student_user_from_db)}) # Updates the number of accounts #importcsv.db.users.insert_one(user_from_db) # inserts updated data into the host emails account importcsv.db.users.replace_one( {"email":current_user},user_from_db ) return {"result":student_user_from_db} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.get('/checkstudentsubscriptions') # GET async def checkstudentsubscriptions(authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: student_from_db = list(importcsv.db.studentsubscriptions.find({"email": current_user}))[0] # Gets wanted data for user student_subscription = student_from_db["subscription"] student_subscription_json = {"student_subscription": student_subscription} return student_subscription_json except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.post('/deletestudentaccount') # POST async def deletestudentaccount(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: data = dict(data)#request.get_json() try: hostkey = importcsv.db.studentsubscriptions.find_one({"hostemail": current_user}) studentkey = importcsv.db.studentsubscriptions.find_one({"email": data["studentemail"]}) if hostkey and studentkey: importcsv.db.studentsubscriptions.delete_one({"email": data["studentemail"]}) return {"message": "Student account deleted."} else: return {"error": "Student account does not exist."} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.put('/changestudentpassword') # PUT async def changestudentpassword(data : JSONStructure = None, authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] if current_user: data = dict(data)#request.get_json() try: hostkey = importcsv.db.studentsubscriptions.find_one({"hostemail": current_user}) studentkey = importcsv.db.studentsubscriptions.find_one({"email": data["studentemail"]}) if hostkey and studentkey: student_user_from_db = list(importcsv.db.studentsubscriptions.find({"email": data["studentemail"]}))[0] # TODO Delete password from here and replace. #importcsv.db.studentsubscriptions.delete_many(student_user_from_db) del student_user_from_db["password"] encrypted_password = hashlib.sha256(data["password"].encode('utf-8')).hexdigest() student_user_from_db.update({"password": encrypted_password}) importcsv.db.studentsubscriptions.replace_one( {"email":current_user},student_user_from_db ) #importcsv.db.studentsubscriptions.insert_one(student_user_from_db) return {"message": "Password reset successful."} else: return {"error": "Student account does not exist."} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.get('/getfreetrial') # GET async def getfreetrial(authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: freetrialhistory = list(importcsv.db.freetrialhistory.find({"email": current_user}))[0] # Gets wanted data for user freetrial = freetrialhistory["freetrial"] freetrial_subscription = {"freetrial": freetrial} # check freetrial return freetrial_subscription except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.get('/getemail') # GET async def getemail(authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: user_from_db = check_user_from_db(current_user) return {"email":user_from_db["email"]} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.delete('/deletesubscription') # DELETE async def deletesubscription(authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: user_from_db = list(importcsv.db.users.find({"email": current_user}))[0] #importcsv.db.users.delete_many(user_from_db) if "end_date_subscription" in user_from_db: del user_from_db["end_date_subscription"] if "start_date_subscription" in user_from_db: del user_from_db["start_date_subscription"] if "subscription" in user_from_db: del user_from_db["subscription"] if "emailsleft" in user_from_db: del user_from_db["emailsleft"] if "numofaccounts" in user_from_db: del user_from_db["numofaccounts"] importcsv.db.users.replace_one( {"email":current_user},user_from_db ) #importcsv.db.users.update_one( { "email": current_user}, {"$set": user_from_db}, upsert = True ) return {"message":"Subscription deleted from expiration"} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.get('/getaccountinfo') # GET async def getaccountinfo(authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: email_exists = importcsv.db.users.find_one({"email": current_user}) email_exists_student = importcsv.db.studentsubscriptions.find_one({"email": current_user}) if email_exists: user_from_db = list(importcsv.db.users.find({"email": current_user}))[0] del user_from_db["password"], user_from_db["_id"] return user_from_db elif email_exists_student: student_user_from_db = list(importcsv.db.studentsubscriptions.find({"email": current_user}))[0] host_from_db = list(importcsv.db.users.find({"email": student_user_from_db["hostemail"]}))[0] student_user_from_db.update({"start_date_subscription":host_from_db["start_date_subscription"]}) student_user_from_db.update({"end_date_subscription":host_from_db["end_date_subscription"]}) del student_user_from_db["password"], student_user_from_db["_id"] return student_user_from_db #return {"error": f"account not found"} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.delete('/deleteaccount') # DELETE async def deleteaccount(authorization: str = Header(None)): current_user = secure_decode(authorization.replace("Bearer ",""))["email"] # outputs the email of the user example@gmail.com if current_user: try: user_from_db = list(importcsv.db.users.find({"email": current_user}))[0] importcsv.db.users.delete_many(user_from_db) return {"message":"Account Deleted"} except Exception as ex: return {"error": f"{type(ex)}-{ex}"} @app.get('/getedexcelpapers')# GET # allow all origins all methods. async def getedexcelpapers(authorization: str = Header(None)): try: data = dict(data)#request.get_json() dataedexcel = list(importcsv.db.edexcelpapers.find({"year":"AS Level"})) return {"result":dataedexcel} except Exception as ex: return {"error":f"{type(ex)},ex"} async def main(): config = uvicorn.Config("main:app", port=7860, log_level="info",host="0.0.0.0",reload=True) server = uvicorn.Server(config) await server.serve() if __name__ == "__main__": asyncio.run(main())