fileuploader / app.py
coollsd's picture
Update app.py
8025976 verified
raw
history blame
13.5 kB
from fastapi import FastAPI, File, UploadFile, Request
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
import requests
import time
from typing import Dict
app = FastAPI()
HTML_CONTENT = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Pro File Uploader</title>
<link href="https://fonts.googleapis.com/css2?family=Poppins:wght@300;400;600&display=swap" rel="stylesheet">
<style>
body {
font-family: 'Poppins', sans-serif;
background: #121212;
height: 100vh;
margin: 0;
display: flex;
justify-content: center;
align-items: center;
color: #e0e0e0;
}
.container {
background: rgba(255, 255, 255, 0.1);
padding: 2.5rem;
border-radius: 15px;
box-shadow: 0 8px 32px 0 rgba(31, 38, 135, 0.37);
backdrop-filter: blur(4px);
border: 1px solid rgba(255, 255, 255, 0.18);
text-align: center;
max-width: 450px;
width: 100%;
transition: all 0.3s ease;
}
.container:hover {
transform: translateY(-5px);
box-shadow: 0 15px 35px rgba(0, 0, 0, 0.3);
}
h1 {
color: #00CED1;
margin-bottom: 1.5rem;
font-weight: 600;
}
.file-input {
display: none;
}
.btn {
background-color: #00CED1;
color: #121212;
padding: 12px 24px;
border-radius: 5px;
cursor: pointer;
transition: all 0.3s ease;
display: inline-block;
font-weight: 600;
border: none;
font-size: 1rem;
margin: 0.5rem;
position: relative;
overflow: hidden;
}
.btn::after {
content: '';
position: absolute;
top: 50%;
left: 50%;
width: 5px;
height: 5px;
background: rgba(255, 255, 255, 0.5);
opacity: 0;
border-radius: 100%;
transform: scale(1, 1) translate(-50%);
transform-origin: 50% 50%;
}
@keyframes ripple {
0% {
transform: scale(0, 0);
opacity: 1;
}
20% {
transform: scale(25, 25);
opacity: 1;
}
100% {
opacity: 0;
transform: scale(40, 40);
}
}
.btn:focus:not(:active)::after {
animation: ripple 1s ease-out;
}
.btn:hover {
background-color: #00A9A9;
}
.btn:disabled {
background-color: #555;
cursor: not-allowed;
}
.file-name {
margin-top: 1rem;
font-size: 0.9rem;
color: #aaa;
}
.progress-container {
margin-top: 1.5rem;
display: none;
}
.progress-bar {
width: 100%;
height: 10px;
background-color: #333;
border-radius: 5px;
overflow: hidden;
}
.progress {
width: 0%;
height: 100%;
background-color: #00CED1;
transition: width 0.3s ease;
}
.loading-spinner {
display: none;
width: 40px;
height: 40px;
border: 4px solid #333;
border-top: 4px solid #00CED1;
border-radius: 50%;
animation: spin 1s linear infinite;
margin: 20px auto;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
.result-container {
display: none;
margin-top: 1.5rem;
}
.result-link {
color: #00CED1;
text-decoration: none;
font-weight: 600;
transition: all 0.3s ease;
margin-right: 10px;
}
.result-link:hover {
color: #00A9A9;
text-decoration: underline;
}
.copy-btn {
background-color: #333;
color: #e0e0e0;
border: none;
padding: 5px 10px;
border-radius: 5px;
cursor: pointer;
transition: all 0.3s ease;
}
.copy-btn:hover {
background-color: #444;
}
</style>
</head>
<body>
<div class="container">
<h1>Pro File Uploader</h1>
<form id="uploadForm">
<input type="file" name="file" id="file" class="file-input" accept="*/*" required>
<label for="file" class="btn">Choose File</label>
<div class="file-name" id="fileName"></div>
<button type="button" id="uploadBtn" class="btn" style="display: none; margin-top: 1rem;">Upload File</button>
<div class="progress-container" id="progressContainer">
<div class="progress-bar">
<div class="progress" id="progress"></div>
</div>
</div>
<div class="loading-spinner" id="loadingSpinner"></div>
</form>
<div class="result-container" id="resultContainer">
<a href="#" class="result-link" id="resultLink" target="_blank">View Uploaded File</a>
<button class="copy-btn" id="copyBtn">Copy Link</button>
</div>
</div>
<script>
const fileInput = document.getElementById('file');
const fileName = document.getElementById('fileName');
const uploadBtn = document.getElementById('uploadBtn');
const progressContainer = document.getElementById('progressContainer');
const progress = document.getElementById('progress');
const loadingSpinner = document.getElementById('loadingSpinner');
const resultContainer = document.getElementById('resultContainer');
const resultLink = document.getElementById('resultLink');
const copyBtn = document.getElementById('copyBtn');
fileInput.addEventListener('change', (e) => {
if (e.target.files.length > 0) {
fileName.textContent = e.target.files[0].name;
uploadBtn.style.display = 'inline-block';
}
});
uploadBtn.addEventListener('click', () => {
if (fileInput.files.length > 0) {
uploadFile(fileInput.files[0]);
}
});
function uploadFile(file) {
const formData = new FormData();
formData.append('file', file);
progressContainer.style.display = 'block';
loadingSpinner.style.display = 'block';
uploadBtn.disabled = true;
const xhr = new XMLHttpRequest();
xhr.open('POST', '/upload', true);
xhr.upload.onprogress = updateProgress;
xhr.onload = function() {
loadingSpinner.style.display = 'none';
uploadBtn.disabled = false;
if (xhr.status === 200) {
const response = JSON.parse(xhr.responseText);
if (response.url) {
resultLink.href = response.url;
resultLink.textContent = 'View Uploaded File';
resultContainer.style.display = 'block';
} else {
alert('Upload failed: ' + response.error);
}
} else {
alert('Upload failed: ' + xhr.statusText);
}
resetUploadState();
};
xhr.onerror = function() {
loadingSpinner.style.display = 'none';
uploadBtn.disabled = false;
alert('Upload failed: Network error');
resetUploadState();
};
xhr.send(formData);
}
function updateProgress(event) {
if (event.lengthComputable) {
const percentComplete = (event.loaded / event.total) * 100;
progress.style.width = percentComplete + '%';
}
}
function resetUploadState() {
fileInput.value = '';
fileName.textContent = '';
uploadBtn.style.display = 'none';
progressContainer.style.display = 'none';
progress.style.width = '0%';
}
copyBtn.addEventListener('click', () => {
navigator.clipboard.writeText(resultLink.href).then(() => {
alert('Link copied to clipboard!');
});
});
</script>
</body>
</html>
"""
@app.get("/", response_class=HTMLResponse)
async def index():
return HTML_CONTENT
@app.post("/upload")
async def handle_upload(file: UploadFile = File(...)):
if not file.filename:
return JSONResponse(content={"error": "No file selected."}, status_code=400)
cookies = await get_cookies()
if 'csrftoken' not in cookies or 'sessionid' not in cookies:
return JSONResponse(content={"error": "Failed to obtain necessary cookies"}, status_code=500)
upload_result = await initiate_upload(cookies, file.filename, file.content_type)
if not upload_result or 'upload_url' not in upload_result:
return JSONResponse(content={"error": "Failed to initiate upload"}, status_code=500)
file_content = await file.read()
upload_success = await retry_upload(upload_result['upload_url'], file_content, file.content_type)
if not upload_success:
return JSONResponse(content={"error": "File upload failed after multiple attempts"}, status_code=500)
original_url = upload_result['serving_url']
mirrored_url = f"/rbxg/{original_url.split('/pbxt/')[1]}"
return JSONResponse(content={"url": mirrored_url})
@app.get("/rbxg/{path:path}")
async def handle_video_stream(path: str, request: Request):
original_url = f'https://replicate.delivery/pbxt/{path}'
range_header = request.headers.get('Range')
headers = {'Range': range_header} if range_header else {}
response = requests.get(original_url, headers=headers, stream=True)
def generate():
for chunk in response.iter_content(chunk_size=8192):
yield chunk
headers = dict(response.headers)
headers['Access-Control-Allow-Origin'] = '*'
if response.status_code == 206:
headers['Content-Range'] = response.headers.get('Content-Range')
return StreamingResponse(generate(), status_code=response.status_code, headers=headers)
async def get_cookies() -> Dict[str, str]:
try:
response = requests.get('https://replicate.com/levelsio/neon-tokyo', headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36'
})
return dict(response.cookies)
except Exception as e:
print(f'Error fetching the page: {e}')
return {}
async def initiate_upload(cookies: Dict[str, str], filename: str, content_type: str) -> Dict:
url = f'https://replicate.com/api/upload/{filename}?content_type={content_type}'
try:
response = requests.post(url, cookies=cookies, headers={
'X-CSRFToken': cookies.get('csrftoken'),
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36',
'Referer': 'https://replicate.com/levelsio/neon-tokyo',
'Origin': 'https://replicate.com',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'identity',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Sec-GPC': '1',
'Priority': 'u=1, i'
})
print(f'Initiate upload response status: {response.status_code}')
print(f'Initiate upload response headers: {response.headers}')
print(f'Response body: {response.text}')
return response.json()
except Exception as e:
print(f'Error initiating upload: {e}')
raise
async def upload_file(upload_url: str, file_content: bytes, content_type: str) -> bool:
try:
response = requests.put(upload_url, data=file_content, headers={'Content-Type': content_type})
print(f'File upload response status: {response.status_code}')
print(f'File upload response headers: {response.headers}')
return response.status_code == 200
except Exception as e:
print(f'Error uploading file: {e}')
return False
async def retry_upload(upload_url: str, file_content: bytes, content_type: str, max_retries: int = 5, delay: int = 1) -> bool:
for attempt in range(1, max_retries + 1):
print(f'Upload attempt {attempt} of {max_retries}')
success = await upload_file(upload_url, file_content, content_type)
if success:
print('Upload successful')
return True
if attempt < max_retries:
print(f'Upload failed, retrying in {delay}s...')
time.sleep(delay)
delay *= 2 # Exponential backoff
print('Upload failed after all retry attempts')
return False