Spaces:
Runtime error
Runtime error
File size: 5,143 Bytes
c6a9f21 fb24ad1 c6a9f21 fb24ad1 c6a9f21 fb24ad1 c6a9f21 fb24ad1 c6a9f21 fb24ad1 c6a9f21 fb24ad1 c6a9f21 fb24ad1 c6a9f21 fb24ad1 c6a9f21 fb24ad1 c6a9f21 fb24ad1 c6a9f21 fb24ad1 c6a9f21 fb24ad1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
from fastapi.testclient import TestClient
from fastapi.routing import APIRoute
from app.routers.video import updateArtifact
from app.main import app
from app.constants import deviceId
from app import db
import os
import pytest
import requests
import json
from google.cloud.firestore_v1.base_query import FieldFilter
def endpoints():
endpoints = []
for route in app.routes:
if isinstance(route, APIRoute):
endpoints.append(route.path)
return endpoints
@pytest.fixture
def client():
client = TestClient(app, "http://0.0.0.0:3000")
yield client
@pytest.fixture
def user():
url = "https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key=" + os.environ.get("FIREBASE_API_KEY")
payload = json.dumps({
"email": "[email protected]",
"password": "testing",
"returnSecureToken": True
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
data = response.json()
user = {"id": data['localId'], "token": data["idToken"]}
yield user
db.collection("user").document(user['id']).delete()
class TestVideoAPI:
@pytest.mark.skipif("/video" not in endpoints(),reason="Route not defined")
def test_video_API(self, user, client):
# Test when no token is pass to route
payload = {}
files=[
('file',('demo.mp4',open('demo.mp4','rb'),'application/octet-stream'))
]
headers = {
'Content-Type': 'application/json',
}
response = client.request("POST", 'video', headers=headers, data=payload, files=files)
assert response.status_code == 403
# Test when a dummy (not valid) token passed
payload = {}
files=[
('file',('demo.mp4',open('demo.mp4','rb'),'application/octet-stream'))
]
headers = {
'Content-Type': 'application/json',
'Authorization': "Bearer saikoljncaskljnfckjnasckjna"
}
response = client.request("POST", 'video', headers=headers, data=payload, files=files)
assert response.status_code == 401
# Test when sent file is not a video
db.collection("user").document(user['id']).set({"deviceId": deviceId})
payload = {}
files=[
('file',('demo.jpg',open('demo.jpg','rb'),'application/octet-stream'))
]
headers = {
'Content-Type': 'application/json',
'Authorization': "Bearer " + user['token']
}
response = client.request("POST", 'video', headers=headers, data=payload, files=files)
assert response.status_code == 400
assert response.text == "File must be video"
# Test on valid token + user
payload = {}
files=[
('file',('demo.mp4',open('demo.mp4','rb'),'application/octet-stream'))
]
headers = {
'Content-Type': 'application/json',
'Authorization': "Bearer " + user['id']
}
response = client.request("POST", 'video', headers=headers, data=payload, files=files)
assert response.status_code == 400
# Test when all requirements have been fulfilled
payload = {}
files=[
('file',('demo.mp4',open('demo.mp4','rb'),'application/octet-stream'))
]
headers = {
'Content-Type': 'application/json',
'Authorization': "Bearer " + user['token']
}
response = client.request("POST", 'video', headers=headers, data=payload, files=files)
assert response.status_code == 200
artifactName = response.text
docs = db.collection("artifacts").where(filter = FieldFilter("name", '==', artifactName))
index = 0
for doc in docs:
# For each document in docs. Verify name and status of the artifact
index += 1
data = doc.get().to_dict()
assert data['name'] == artifactName
assert data['status'] == 'pending'
assert index == 1
db.collection("user").document(user['id']).delete()
def test_update_artifact(self):
# Check and preprocess test data before testing
test_artifact = db.collection("artifacts").document('test')
if not test_artifact.get().exists:
db.collection("artifacts").add({"name": "test", "path": "", "status": "testing", "thumbnailURL":""})
test_artifact = db.collection("artifacts").document('test').get()
else:
test_artifact.update({"status": "testing", 'path': '', "thumbnailURL":""})
# Testing update on each field
updateArtifact(test_artifact['id'], {{"status": "test_done"}})
assert test_artifact.get().to_dict()['status'] == 'test_done'
updateArtifact(test_artifact['id'], {{"path": "test_path"}})
assert test_artifact.get().to_dict()['path'] == 'test_path'
updateArtifact(test_artifact['id'], {{"thumbnailURL": "test_path"}})
assert test_artifact.get().to_dict()['thumbnailURl'] == 'test_path'
#Delete data for next time test
test_artifact.delete()
|