from fastapi import FastAPI, Request, BackgroundTasks, HTTPException
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
import os
import subprocess
import threading
import time
import json
import datetime
import uuid
import shutil
from typing import Dict, Any, Optional, List
from pathlib import Path
import psycopg2
import logging
# Add these imports
import signal
# import os # os is already imported
# import time # time is already imported
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("pgmigrator")
# Initialize FastAPI app
app = FastAPI(title="TimescaleDB Migration Tool")
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create necessary directories
os.makedirs("templates", exist_ok=True)
os.makedirs("dumps", exist_ok=True)
# Setup templates
templates = Jinja2Templates(directory="templates")
# Create a static files directory for downloads
static_dir = Path("dumps")
static_dir.mkdir(exist_ok=True)
app.mount("/downloads", StaticFiles(directory="dumps"), name="downloads")
# Global state for migration
migration_state = {
"id": str(uuid.uuid4()),
"running": False,
"operation": None, # "dump" or "restore"
"start_time": None,
"end_time": None,
"dump_file": None,
"dump_file_size": 0,
"previous_size": 0,
"dump_completed": False,
"restore_completed": False,
"last_activity": time.time(),
"log": [],
"process": None,
"progress": {
"current_table": None,
"tables_completed": 0,
"total_tables": 0,
"current_size_mb": 0,
"growth_rate_mb_per_sec": 0,
"estimated_time_remaining": None,
"percent_complete": 0
}
}
# Lock for updating global state
migration_lock = threading.Lock()
def log_message(message: str, level: str = "info", command: str = None):
"""Add timestamped log message with level"""
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = {
"timestamp": timestamp,
"message": message,
"level": level,
"command": command,
"id": len(migration_state["log"])
}
with migration_lock:
migration_state["log"].append(log_entry)
migration_state["last_activity"] = time.time()
logger.info(f"[{level.upper()}] {message}")
if command:
logger.info(f"Command: {command}")
def test_connection_logic(connection_string: str) -> bool:
"""Test a PostgreSQL connection string (internal logic)"""
try:
conn = psycopg2.connect(connection_string)
conn.close()
return True
except Exception as e:
logger.error(f"Connection test failed: {str(e)}")
return False
def get_file_size_mb(file_path: str) -> float:
"""Get file size in megabytes"""
try:
size_bytes = os.path.getsize(file_path)
return size_bytes / (1024 * 1024) # Convert to MB
except Exception:
return 0
def monitor_dump_size():
"""Monitor the dump file size and update state"""
while migration_state["running"] and migration_state["operation"] == "dump":
try:
if migration_state["dump_file"] and os.path.exists(migration_state["dump_file"]):
# Get current file size
current_size = get_file_size_mb(migration_state["dump_file"])
# Calculate growth rate
elapsed = time.time() - migration_state["start_time"]
if elapsed > 0:
growth_rate = current_size / elapsed # MB/sec
# Update progress state
with migration_lock:
migration_state["dump_file_size"] = current_size
migration_state["progress"]["current_size_mb"] = round(current_size, 2)
migration_state["progress"]["growth_rate_mb_per_sec"] = round(growth_rate, 2)
# Update size change for UI display
size_change = current_size - migration_state["previous_size"]
if size_change > 0:
migration_state["previous_size"] = current_size
except Exception as e:
logger.error(f"Error monitoring dump size: {str(e)}")
time.sleep(1) # Update every second
def run_dump(source_conn: str, file_path: str, options: dict):
"""Run pg_dump in a background thread"""
try:
# Convert to absolute path
absolute_file_path = os.path.abspath(file_path)
# Log the path conversion
log_message(f"Converting path: {file_path} -> {absolute_file_path}", "info")
# Clear any existing file
if os.path.exists(absolute_file_path):
os.remove(absolute_file_path)
# Set environment variables for connection
env = os.environ.copy()
# Build pg_dump command
format_flag = "-F" + options.get("format", "c") # Default to custom format
cmd = ["pg_dump", source_conn, format_flag, "-v", "-f", absolute_file_path]
# Add schema if specified
if options.get("schema"):
cmd.extend(["-n", options["schema"]])
# Add compression level if specified
if options.get("compression") and options["compression"] != "default":
cmd.extend(["-Z", options["compression"]])
log_message(f"Starting database dump to {absolute_file_path}", "info", " ".join(cmd))
# Start monitoring thread for file size
monitor_thread = threading.Thread(target=monitor_dump_size, daemon=True)
monitor_thread.start()
# Start the dump process
with migration_lock:
migration_state["start_time"] = time.time()
migration_state["running"] = True
migration_state["operation"] = "dump"
migration_state["dump_file"] = absolute_file_path
migration_state["dump_completed"] = False
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True,
bufsize=1 # Line buffered to get real-time output
)
with migration_lock:
migration_state["process"] = process
# Function to read stderr in real-time
def read_stderr():
for line in iter(process.stderr.readline, ''):
line = line.strip()
if line:
log_message(line, "info")
if "Dumping" in line and "table" in line:
try:
table_parts = line.split("Dumping")
if len(table_parts) > 1:
table_info = table_parts[1].strip()
table_parts = table_info.split(" ")
if len(table_parts) > 1:
table_name = table_parts[1].strip('"')
with migration_lock:
migration_state["progress"]["current_table"] = table_name
migration_state["progress"]["tables_completed"] += 1
except Exception as e:
log_message(f"Error parsing table name: {str(e)}", "warning")
# Start stderr reading thread
stderr_thread = threading.Thread(target=read_stderr, daemon=True)
stderr_thread.start()
# Wait for process to complete
exit_code = process.wait()
# Wait a moment for the stderr thread to catch up
stderr_thread.join(timeout=2.0)
if exit_code == 0:
# Verify file exists and has content
if os.path.exists(absolute_file_path):
final_size = os.path.getsize(absolute_file_path)
if final_size > 0:
# Success - file exists and has content
with migration_lock:
migration_state["dump_file_size"] = final_size / (1024 * 1024) # Convert to MB
migration_state["progress"]["current_size_mb"] = round(final_size / (1024 * 1024), 2)
migration_state["dump_completed"] = True
migration_state["end_time"] = time.time()
migration_state["running"] = False
migration_state["process"] = None
total_time = migration_state["end_time"] - migration_state["start_time"]
log_message(
f"Database dump completed successfully. Size: {round(final_size / (1024 * 1024), 2)} MB. Time: {round(total_time, 2)} seconds",
"success"
)
return True
else:
log_message(f"Dump file exists but is empty (0 bytes): {absolute_file_path}", "error")
else:
log_message(f"Dump completed but file not found: {absolute_file_path}", "error")
# If we get here, something went wrong with the file
with migration_lock:
migration_state["dump_completed"] = False
migration_state["end_time"] = time.time()
migration_state["running"] = False
migration_state["process"] = None
return False
else:
log_message(f"Database dump failed with exit code {exit_code}", "error")
with migration_lock:
migration_state["running"] = False
migration_state["process"] = None
return False
except Exception as e:
log_message(f"Error during database dump: {str(e)}", "error")
with migration_lock:
migration_state["running"] = False
migration_state["process"] = None
return False
def run_restore(target_conn: str, file_path: str, options: dict):
"""Run pg_restore in a background thread"""
try:
if not os.path.exists(file_path):
log_message(f"Dump file not found: {file_path}", "error")
with migration_lock:
migration_state["running"] = False # Ensure state is consistent
return False
# Set environment variables for connection
env = os.environ.copy()
# Run timescaledb_pre_restore() if specified
if options.get("timescaledb_pre_restore", True):
pre_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_pre_restore();"]
log_message("Running timescaledb_pre_restore()", "info", " ".join(pre_restore_cmd))
pre_restore_process = subprocess.Popen(
pre_restore_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True
)
pre_restore_stdout, pre_restore_stderr = pre_restore_process.communicate()
if pre_restore_process.returncode != 0:
log_message(f"Pre-restore failed: {pre_restore_stderr or pre_restore_stdout}", "error")
with migration_lock:
migration_state["running"] = False # Ensure state is consistent
return False
# Build pg_restore command
cmd = ["pg_restore", "-d", target_conn, "-v"]
# Add no-owner flag if specified
if options.get("no_owner", True):
cmd.append("--no-owner")
# Add clean flag if specified
if options.get("clean", False):
cmd.append("--clean")
# Add single transaction flag if specified
if options.get("single_transaction", True):
cmd.append("--single-transaction")
# Add file path
cmd.append(file_path)
log_message(f"Starting database restore from {file_path}", "info", " ".join(cmd))
with migration_lock:
migration_state["start_time"] = time.time()
migration_state["running"] = True
migration_state["operation"] = "restore"
migration_state["restore_completed"] = False
migration_state["progress"]["tables_completed"] = 0 # Reset counter
# Use preexec_fn=os.setsid to create a new process group
preexec_fn_to_use = None
if hasattr(os, 'setsid'):
preexec_fn_to_use = os.setsid
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True,
bufsize=1, # Line buffering
universal_newlines=True,
preexec_fn=preexec_fn_to_use # Create a new process group
)
with migration_lock:
migration_state["process"] = process
# Process output
if process.stderr:
for line in iter(process.stderr.readline, ''):
line = line.strip()
if not line:
continue
# Log verbose output
log_message(line, "info")
# Try to parse table name (pg_restore output format varies)
if "processing" in line.lower() and ("table data" in line.lower() or "table" in line.lower()):
try:
# Attempt to extract table name, might need refinement
parts = line.split()
table_index = -1
if "table" in parts: table_index = parts.index("table") + 1
elif "data" in parts: table_index = parts.index("data") + 1
if table_index > 0 and table_index < len(parts):
table_name = parts[table_index].strip('."')
with migration_lock:
migration_state["progress"]["current_table"] = table_name
migration_state["progress"]["tables_completed"] += 1
else:
logger.warning(f"Could not parse table name from restore line: {line}")
except Exception as parse_err:
logger.warning(f"Error parsing restore line '{line}': {parse_err}")
# Check if process is still running
with migration_lock:
if not migration_state["running"]:
break # Stop processing if process was terminated
# Wait for process to complete
stdout, stderr = process.communicate()
exit_code = process.returncode
post_restore_success = True
# Run timescaledb_post_restore() if specified and restore was successful so far
if exit_code == 0 and options.get("timescaledb_post_restore", True):
post_restore_cmd = ["psql", target_conn, "-c", "SELECT timescaledb_post_restore(); ANALYZE;"]
log_message("Running timescaledb_post_restore() and ANALYZE", "info", " ".join(post_restore_cmd))
post_restore_process = subprocess.Popen(
post_restore_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True
)
post_restore_stdout, post_restore_stderr = post_restore_process.communicate()
if post_restore_process.returncode != 0:
log_message(f"Post-restore failed: {post_restore_stderr or post_restore_stdout}", "error")
post_restore_success = False # Mark post-restore as failed
with migration_lock:
# Ensure running state is updated based on process completion
if migration_state["running"]: # Only update if not stopped manually
if exit_code == 0 and post_restore_success:
migration_state["restore_completed"] = True
migration_state["end_time"] = time.time()
total_time = migration_state["end_time"] - migration_state["start_time"]
log_message(
f"Database restore completed successfully. Time: {round(total_time, 2)} seconds",
"success"
)
elif exit_code != 0:
error_message = stderr or stdout or "Unknown error during restore"
log_message(f"Database restore failed: {error_message}", "error")
# If post_restore failed, it's already logged.
migration_state["running"] = False
migration_state["process"] = None
return exit_code == 0 and post_restore_success
except Exception as e:
log_message(f"Error during database restore: {str(e)}", "error")
with migration_lock:
migration_state["running"] = False
migration_state["process"] = None
return False
# Replace the old stop_current_process with the new one
def stop_current_process():
"""Stop the current process with improved forceful termination"""
with migration_lock:
if migration_state["process"] and migration_state["running"]:
try:
process = migration_state["process"]
pid = process.pid
operation = migration_state["operation"]
log_message(f"Attempting to stop {operation} process (PID: {pid})...", "warning")
# Check if process is already terminated before trying to stop
if process.poll() is not None:
log_message(f"{operation.capitalize()} process (PID: {pid}) already terminated.", "info")
migration_state["process"] = None
migration_state["running"] = False
return True
# First try graceful termination (SIGTERM)
process.terminate()
# Wait up to 3 seconds for graceful termination
for _ in range(30): # 3 seconds with 0.1s checks
if process.poll() is not None: # Process has terminated
log_message(f"{operation.capitalize()} process (PID: {pid}) terminated gracefully (SIGTERM)", "warning")
break
time.sleep(0.1)
else: # Loop finished without break, process still running
# If still running, force kill with SIGKILL
if process.poll() is None:
log_message(f"Process (PID: {pid}) not responding to graceful termination, forcing kill (SIGKILL)...", "warning")
# Try to kill process group (more thorough) - Unix only
killed_pg = False
if hasattr(os, 'killpg') and hasattr(os, 'getpgid'):
try:
# On Unix systems, negative PID means kill process group
os.killpg(os.getpgid(pid), signal.SIGKILL)
killed_pg = True
log_message(f"Sent SIGKILL to process group of PID {pid}", "warning")
except ProcessLookupError:
log_message(f"Process group for PID {pid} not found (already terminated?).", "info")
# Process likely died between poll and killpg, proceed as if killed
killed_pg = True # Treat as success for logic below
except Exception as kill_err:
log_message(f"Error killing process group for PID {pid}: {kill_err}. Falling back to direct kill.", "error")
# Fallback to direct kill if process group kill fails
process.kill()
log_message(f"Sent SIGKILL directly to PID {pid}", "warning")
else: # Not on Unix or functions unavailable
process.kill()
log_message(f"Sent SIGKILL directly to PID {pid} (killpg not available)", "warning")
# Wait a bit for kill to take effect
time.sleep(0.5)
process.poll() # Update process status after kill attempt
# Final check
if process.poll() is None:
log_message(f"Warning: Process (PID: {pid}) may not have terminated successfully after SIGKILL", "error")
# Even if termination is uncertain, update state to reflect stop attempt
migration_state["process"] = None
migration_state["running"] = False
migration_state["end_time"] = time.time()
return False # Indicate potential failure
else:
log_message(f"Database {operation} operation (PID: {pid}) stopped", "warning")
migration_state["process"] = None
migration_state["running"] = False
migration_state["end_time"] = time.time()
return True
except ProcessLookupError:
# This can happen if the process terminated between the initial check and trying to kill it
log_message(f"Process (PID: {pid}) already terminated before stop action completed.", "info")
migration_state["process"] = None
migration_state["running"] = False
migration_state["end_time"] = time.time()
return True
except Exception as e:
log_message(f"Error stopping process: {str(e)}", "error")
# Force state update even on error
migration_state["process"] = None
migration_state["running"] = False
migration_state["end_time"] = time.time()
return False
else:
# No process was running or associated with the state
log_message("Stop command received, but no process found in current state.", "info")
# Ensure state reflects not running if it wasn't already
if migration_state["running"]:
migration_state["running"] = False
migration_state["process"] = None
return False # Indicate no action was needed/taken on a process
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""Home page with migration UI"""
# In a real app, load from file, but for simplicity, keep it inline.
# Create the file if it doesn't exist (e.g., first run)
if not os.path.exists("templates/index.html"):
with open("templates/index.html", "w") as f:
f.write("Placeholder - HTML will be generated") # Basic placeholder
# The actual HTML content (NOTE: Frontend changes mentioned in prompt are NOT applied here, only backend)
html_content = """
TimescaleDB Migrator
TimescaleDB Migrator
Idle
Connections
Dump
Restore
Logs
About
Database Connections
Source Database
Not Connected
No Connection
Test your connection to view database information
Target Database
Not Connected
No Connection
Test your connection to view database information
Next Steps
After connecting to your databases, proceed to the Dump tab to create a backup or the Restore tab to recover from a backup.
Database Dump
Dump Settings
The output file format used by pg_dump
Higher compression saves space but can take longer
Leave empty for all schemas
Appropriate file extension will be added automatically
Ready to execute commands. Check logs below for details.
Activity Log
About TimescaleDB Migrator
What is TimescaleDB Migrator?
TimescaleDB Migrator is a tool designed to simplify the process of migrating data between TimescaleDB instances using the PostgreSQL native backup and restore utilities: pg_dump and pg_restore.
Key Features
Easy Database Migration: Migrate your entire TimescaleDB database with just a few clicks
Secure Connections: Support for secure connections with password protection
Backup Download: Download your database backup for safekeeping
Real-time Monitoring: Track the progress of your dump and restore operations
The dump operation uses pg_dump to create a backup of your source database. This backup can be in various formats (custom, directory, plain SQL, or tar) and with different compression levels.
Restore Operation
The restore operation uses pg_restore to import your backup into the target database. It includes TimescaleDB-specific pre and post-restore functions to ensure data integrity.
Are you sure you want to proceed with this action?
"""
# Update the file content
with open("templates/index.html", "w", encoding="utf-8") as f:
f.write(html_content)
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/test-connection")
async def test_connection_endpoint(data: Dict[str, str]):
"""Test a database connection and get basic info"""
try:
connection_string = data.get("connection_string")
connection_type = data.get("connection_type", "source") # Added type
if not connection_string:
return JSONResponse(
status_code=400,
content={"success": False, "message": "Connection string is required"}
)
# Test the connection using internal logic
if not test_connection_logic(connection_string):
# Error is logged within test_connection_logic
return JSONResponse(
content={"success": False, "message": "Failed to connect to database"}
)
# If connection successful, get database info
conn = psycopg2.connect(connection_string)
try:
with conn.cursor() as cur:
# Get server info
cur.execute("SELECT version()")
version_result = cur.fetchone()
version = version_result[0] if version_result else "Unknown"
# Check if TimescaleDB is installed and get version
is_timescaledb = False
ts_version = None
try:
# Use EXISTS for better performance and error handling if extension not present
cur.execute("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'timescaledb');")
if cur.fetchone()[0]:
cur.execute("SELECT extversion FROM pg_extension WHERE extname = 'timescaledb'")
ts_version_result = cur.fetchone()
is_timescaledb = ts_version_result is not None
ts_version = ts_version_result[0] if ts_version_result else None
except psycopg2.Error as ts_err:
logger.warning(f"Could not check TimescaleDB extension: {ts_err}")
# Continue without TimescaleDB info if query fails
# Get database name
cur.execute("SELECT current_database()")
db_result = cur.fetchone()
database = db_result[0] if db_result else "Unknown"
# Extract server host/IP (best effort parsing)
server_match = "Unknown"
try:
# Extract from connection string if possible (more reliable)
host_part = connection_string.split('@')[-1].split('/')[0].split(':')[0]
if host_part:
server_match = host_part
# Fallback to parsing version string if needed
elif " on " in version:
server_match = version.split(" on ")[-1].split(",")[0]
except Exception:
logger.warning("Could not parse server host from connection string or version.")
log_message(f"Successful connection test to {connection_type} database: {database} on {server_match}", "success")
return JSONResponse(content={
"success": True,
"version": version,
"is_timescaledb": is_timescaledb,
"timescaledb_version": ts_version, # Add this line
"database": database,
"server": server_match
})
finally:
conn.close()
except psycopg2.Error as db_err:
log_message(f"Database connection error during info fetch: {str(db_err)}", "error")
return JSONResponse(
content={"success": False, "message": f"Database error: {str(db_err)}"}
)
except Exception as e:
log_message(f"Connection test failed unexpectedly: {str(e)}", "error")
return JSONResponse(
status_code=500,
content={"success": False, "message": f"An unexpected error occurred: {str(e)}"}
)
@app.post("/database-info")
async def get_database_info(data: Dict[str, str]):
"""Get additional database information like table count and size"""
try:
connection_string = data.get("connection_string")
if not connection_string:
return JSONResponse(
status_code=400,
content={"success": False, "message": "Connection string is required"}
)
# Use the tested connection logic first
if not test_connection_logic(connection_string):
return JSONResponse(
content={"success": False, "message": "Connection failed"}
)
conn = psycopg2.connect(connection_string)
try:
table_count = 0
db_size = "Unknown"
with conn.cursor() as cur:
# Get table count (excluding system, temp, and TOAST schemas)
cur.execute("""
SELECT count(*) FROM information_schema.tables
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
AND table_schema NOT LIKE 'pg_toast%'
AND table_schema NOT LIKE 'pg_temp%'
AND table_type = 'BASE TABLE'
""")
count_result = cur.fetchone()
table_count = count_result[0] if count_result else 0
# Get database size
cur.execute("SELECT pg_size_pretty(pg_database_size(current_database()))")
size_result = cur.fetchone()
db_size = size_result[0] if size_result else "Unknown"
return JSONResponse(content={
"success": True,
"table_count": table_count,
"database_size": db_size
})
finally:
conn.close()
except psycopg2.Error as db_err:
log_message(f"Failed to get database info: {str(db_err)}", "error")
return JSONResponse(
content={"success": False, "message": f"Database query error: {str(db_err)}"}
)
except Exception as e:
log_message(f"Failed to get database info: {str(e)}", "error")
return JSONResponse(
status_code=500,
content={"success": False, "message": f"An unexpected error occurred: {str(e)}"}
)
@app.post("/start-dump")
async def start_dump(data: Dict[str, Any], background_tasks: BackgroundTasks):
"""Start a database dump process"""
try:
source_conn = data.get("source_conn")
options = data.get("options", {})
if not source_conn:
return JSONResponse(
status_code=400,
content={"success": False, "message": "Source connection string is required"}
)
# Basic validation: Test connection before starting dump
if not test_connection_logic(source_conn):
return JSONResponse(
status_code=400,
content={"success": False, "message": "Source connection failed. Cannot start dump."}
)
# Stop any running process first (important!)
if migration_state["running"]:
logger.warning("Another process is running. Stopping it before starting dump.")
stopped = stop_current_process()
if not stopped:
logger.error("Failed to stop the existing process. Cannot start dump.")
return JSONResponse(
status_code=500, # Internal Server Error might be appropriate
content={"success": False, "message": "Failed to stop the currently running process."}
)
# Add a small delay to allow the process to fully terminate
time.sleep(0.5)
# Create dump file path
filename = options.get("filename", "timescale_backup").strip()
# Basic filename sanitization (replace spaces, avoid path traversal)
filename = filename.replace(" ", "_").replace("..", "").replace("/", "").replace("\\", "")
if not filename: filename = "timescale_backup" # Fallback if sanitization results in empty name
format_flag = options.get("format", "c")
# Determine file extension
extension = ".dump"
if format_flag == "p":
extension = ".sql"
elif format_flag == "d":
extension = "" # Directory format has no extension
elif format_flag == "t":
extension = ".tar"
# Generate file path carefully
dumps_dir = Path("dumps").resolve() # Ensure absolute path
file_path = dumps_dir / f"{filename}{extension}"
# Prevent potential directory traversal if filename somehow still contains harmful chars
if not str(file_path).startswith(str(dumps_dir)):
logger.error(f"Invalid filename resulted in path traversal attempt: {filename}")
return JSONResponse(
status_code=400,
content={"success": False, "message": "Invalid filename specified."}
)
# Reset state before starting background task
with migration_lock:
migration_state["id"] = str(uuid.uuid4()) # New ID for new operation
migration_state["running"] = False # Will be set true by run_dump
migration_state["operation"] = "dump"
migration_state["start_time"] = None
migration_state["end_time"] = None
migration_state["dump_file"] = str(file_path)
migration_state["dump_file_size"] = 0
migration_state["previous_size"] = 0
migration_state["dump_completed"] = False
migration_state["restore_completed"] = False
migration_state["last_activity"] = time.time()
# Keep logs or clear them? Let's keep them for now.
# migration_state["log"] = []
migration_state["process"] = None
migration_state["progress"] = { # Reset progress
"current_table": None,
"tables_completed": 0,
"total_tables": 0,
"current_size_mb": 0,
"growth_rate_mb_per_sec": 0,
"estimated_time_remaining": None,
"percent_complete": 0
}
# Start dump in background
background_tasks.add_task(run_dump, source_conn, str(file_path), options)
# Create command preview (with redacted password)
# Redact password more carefully
try:
source_safe_preview = source_conn.replace(source_conn.split('://')[1].split(':')[1].split('@')[0], '***')
except:
source_safe_preview = "postgres://user:***@host/db" # Fallback preview
cmd_preview = f'"{source_safe_preview}" -F{format_flag} -v'
if options.get("compression") and options["compression"] != "default":
cmd_preview += f' -Z {options["compression"]}'
if options.get("schema"):
cmd_preview += f' -n "{options["schema"]}"'
cmd_preview += f' -f "{os.path.basename(file_path)}"'
return JSONResponse(content={
"success": True,
"message": "Dump process initiated",
"file_path": str(file_path),
"command_preview": cmd_preview
})
except Exception as e:
log_message(f"Failed to start dump: {str(e)}", "error")
return JSONResponse(
status_code=500,
content={"success": False, "message": f"An unexpected error occurred: {str(e)}"}
)
@app.post("/start-restore")
async def start_restore(data: Dict[str, Any], background_tasks: BackgroundTasks):
"""Start a database restore process"""
try:
target_conn = data.get("target_conn")
dump_file = data.get("dump_file")
options = data.get("options", {})
if not target_conn:
return JSONResponse(
status_code=400,
content={"success": False, "message": "Target connection string is required"}
)
if not dump_file:
return JSONResponse(
status_code=400,
content={"success": False, "message": "Dump file is required"}
)
# Basic validation: Test connection before starting restore
if not test_connection_logic(target_conn):
return JSONResponse(
status_code=400,
content={"success": False, "message": "Target connection failed. Cannot start restore."}
)
# Validate dump file path exists and is within the dumps directory
dumps_dir = Path("dumps").resolve()
dump_file_path = Path(dump_file).resolve()
if not dump_file_path.exists() or not str(dump_file_path).startswith(str(dumps_dir)):
logger.error(f"Invalid or non-existent dump file specified: {dump_file}")
return JSONResponse(
status_code=400,
content={"success": False, "message": "Invalid or non-existent dump file selected."}
)
# Stop any running process first (important!)
if migration_state["running"]:
logger.warning("Another process is running. Stopping it before starting restore.")
stopped = stop_current_process()
if not stopped:
logger.error("Failed to stop the existing process. Cannot start restore.")
return JSONResponse(
status_code=500,
content={"success": False, "message": "Failed to stop the currently running process."}
)
time.sleep(0.5) # Allow time for termination
# Reset state before starting background task
with migration_lock:
migration_state["id"] = str(uuid.uuid4()) # New ID for new operation
migration_state["running"] = False # Will be set true by run_restore
migration_state["operation"] = "restore"
migration_state["start_time"] = None
migration_state["end_time"] = None
migration_state["dump_file"] = None # Not relevant for restore state itself
migration_state["dump_file_size"] = 0
migration_state["previous_size"] = 0
migration_state["dump_completed"] = False
migration_state["restore_completed"] = False
migration_state["last_activity"] = time.time()
# Keep logs
migration_state["process"] = None
migration_state["progress"] = { # Reset progress
"current_table": None,
"tables_completed": 0,
"total_tables": 0, # We don't easily know this for restore
"current_size_mb": 0,
"growth_rate_mb_per_sec": 0,
"estimated_time_remaining": None,
"percent_complete": 0
}
# Start restore in background
background_tasks.add_task(run_restore, target_conn, str(dump_file_path), options)
# Create command preview (with redacted password)
try:
target_safe_preview = target_conn.replace(target_conn.split('://')[1].split(':')[1].split('@')[0], '***')
except:
target_safe_preview = "postgres://user:***@host/db"
cmd_preview = f'-d "{target_safe_preview}" -v'
if options.get("no_owner", True):
cmd_preview += " --no-owner"
if options.get("clean", False):
cmd_preview += " --clean"
if options.get("single_transaction", True):
cmd_preview += " --single-transaction"
cmd_preview += f' "{os.path.basename(dump_file)}"'
return JSONResponse(content={
"success": True,
"message": "Restore process initiated",
"command_preview": cmd_preview
})
except Exception as e:
log_message(f"Failed to start restore: {str(e)}", "error")
return JSONResponse(
status_code=500,
content={"success": False, "message": f"An unexpected error occurred: {str(e)}"}
)
@app.post("/stop-process")
async def stop_process_endpoint():
"""Stop the current database process"""
try:
stopped = stop_current_process()
if stopped:
return JSONResponse(content={
"success": True,
"message": "Process stop initiated successfully" # Changed message slightly
})
else:
# Check if it wasn't running in the first place or if stop failed
with migration_lock:
is_running = migration_state["running"]
was_process = migration_state["process"] is not None # Check if we *thought* a process existed
if not is_running and not was_process:
return JSONResponse(content={
"success": False, # Technically not an error, but no action taken
"message": "No process was running to stop"
})
else: # Stop was called, but failed internally or process already gone
# The stop_current_process function now returns False on failure or if already stopped
# Check logs for specific reason
return JSONResponse(
status_code=200, # Return 200, but indicate potential issue in message
content={
"success": False, # Indicate stop wasn't fully successful *now*
"message": "Stop command executed. Check logs for termination status."
})
except Exception as e:
log_message(f"Failed to stop process via endpoint: {str(e)}", "error")
return JSONResponse(
status_code=500,
content={"success": False, "message": f"An unexpected error occurred: {str(e)}"}
)
@app.get("/status")
async def get_status():
"""Get the current migration status"""
# Return a copy to avoid potential modification issues if state grows complex
with migration_lock:
state_copy = migration_state.copy()
# Ensure process object is not sent over JSON
state_copy["process"] = None
# Optionally limit log size sent back if it gets large
# MAX_LOGS_IN_STATUS = 100
# if len(state_copy["log"]) > MAX_LOGS_IN_STATUS:
# state_copy["log"] = state_copy["log"][-MAX_LOGS_IN_STATUS:]
return state_copy
@app.post("/clear-logs")
async def clear_logs():
"""Clear all logs"""
with migration_lock:
migration_state["log"] = []
log_message("Logs cleared by user.", "info")
return JSONResponse(content={"success": True, "message": "Logs cleared"})
@app.get("/list-dumps")
async def list_dumps():
"""List available dump files"""
try:
dumps_dir = Path("dumps")
if not dumps_dir.exists():
dumps_dir.mkdir()
return JSONResponse(content={"success": True, "dumps": []})
dump_files = []
for f in dumps_dir.iterdir(): # Use iterdir for potentially large directories
# List files AND directories (for format=d)
if f.is_file() or f.is_dir():
try:
stat_result = f.stat()
file_size = stat_result.st_size
modified_time = datetime.datetime.fromtimestamp(stat_result.st_mtime)
# Calculate size for directories (basic sum of top-level files)
if f.is_dir():
try:
dir_size = sum(item.stat().st_size for item in f.iterdir() if item.is_file())
file_size = dir_size
except Exception as dir_err:
logger.warning(f"Could not calculate size for directory {f.name}: {dir_err}")
file_size = 0 # Or mark as unknown size
dump_files.append({
"name": f.name,
"path": str(f), # Send full path back
"size_mb": file_size / (1024 * 1024),
"date": modified_time.strftime("%Y-%m-%d %H:%M:%S"),
"is_dir": f.is_dir() # Indicate if it's a directory dump
})
except OSError as stat_err:
logger.error(f"Could not stat file/dir {f.name}: {stat_err}")
# Skip this file/dir if cannot access stats
# Sort by modified time, newest first
dump_files.sort(key=lambda x: x["date"], reverse=True)
return JSONResponse(content={"success": True, "dumps": dump_files})
except Exception as e:
log_message(f"Failed to list dumps: {str(e)}", "error")
return JSONResponse(
status_code=500,
content={"success": False, "message": f"An unexpected error occurred: {str(e)}"}
)
@app.get("/downloads/{file_name:path}") # Use path parameter to allow slashes if needed (though unlikely now)
async def download_file(file_name: str):
"""Download a dump file (does not support directory download)"""
try:
dumps_dir = Path("dumps").resolve()
# Sanitize file_name to prevent path traversal
file_name = file_name.replace("..", "").replace("/", "").replace("\\", "")
file_path = dumps_dir / file_name
# Check existence and that it's within the dumps dir and is a file
if not file_path.exists() or not str(file_path).startswith(str(dumps_dir)):
raise HTTPException(status_code=404, detail="File not found or invalid path")
if file_path.is_dir():
raise HTTPException(status_code=400, detail="Directory downloads are not supported via this endpoint.")
return FileResponse(
path=str(file_path),
filename=file_name, # Suggest original (sanitized) name to browser
media_type="application/octet-stream" # Generic binary type
)
except HTTPException:
raise # Re-raise HTTPException
except Exception as e:
logger.error(f"Error preparing file download for {file_name}: {e}")
raise HTTPException(status_code=500, detail="Could not process file download.")
if __name__ == "__main__":
import uvicorn
# Use reload=True for development, but turn off for production
# Assuming the file is named main.py for reload to work correctly
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True)
# For production: uvicorn.run(app, host="0.0.0.0", port=7860)