# storage.py import os from pathlib import Path import boto3 import shutil from typing import Optional, List class StorageManager: def __init__( self, local_dir: Path, bucket_name: Optional[str] = None, prefix: Optional[str] = None, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, region_name: str = "eu-north-1" ): self.local_dir = local_dir self.bucket_name = bucket_name self.prefix = prefix self.use_s3 = bool(bucket_name and prefix) # Initialize S3 client if credentials are provided if self.use_s3: self.s3_client = boto3.client( "s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name ) def check_local_data(self, required_files: List[str], required_dirs: List[str] = None) -> bool: """ Check if all required files and directories exist locally. Args: required_files: List of required file names required_dirs: List of required directory names Returns: bool: True if all required data exists, False otherwise """ self.local_dir.mkdir(parents=True, exist_ok=True) # Check files for file_name in required_files: if not (self.local_dir / file_name).exists(): print(f"Missing required file: {file_name}") return False # Check directories if required_dirs: for dir_name in required_dirs: if not (self.local_dir / dir_name).is_dir(): print(f"Missing required directory: {dir_name}") return False return True def download_s3_file(self, s3_key: str, local_path: Path) -> bool: """Download single file from S3.""" try: self.s3_client.download_file(self.bucket_name, s3_key, str(local_path)) print(f"Downloaded: {s3_key} -> {local_path}") return True except Exception as e: print(f"Error downloading {s3_key}: {str(e)}") return False def download_s3_folder(self, specific_prefix: str = None) -> bool: """ Download entire folder from S3 to local directory. Args: specific_prefix: Optional specific prefix to download only a subfolder """ try: if not self.use_s3: raise ValueError("S3 credentials not configured") prefix = f"{self.prefix}{specific_prefix}" if specific_prefix else self.prefix response = self.s3_client.list_objects_v2( Bucket=self.bucket_name, Prefix=prefix ) if 'Contents' not in response: print(f"No files found in S3 bucket {self.bucket_name} with prefix {prefix}") return False success = True for obj in response['Contents']: s3_key = obj['Key'] if s3_key.endswith('/'): continue relative_path = Path(s3_key).relative_to(self.prefix) local_file_path = self.local_dir / relative_path local_file_path.parent.mkdir(parents=True, exist_ok=True) if not self.download_s3_file(s3_key, local_file_path): success = False return success except Exception as e: print(f"Error downloading S3 folder: {str(e)}") return False def sync_data(self, required_files: List[str], required_dirs: List[str] = None) -> bool: """ Check local data and sync from S3 if needed. Args: required_files: List of required file names required_dirs: List of required directory names Returns: bool: True if all required data is available after sync """ if self.check_local_data(required_files, required_dirs): print("All required files and directories found locally") return True if self.use_s3: print("Downloading required data from S3...") if not self.download_s3_folder(): return False # If we have specific directories to sync if required_dirs: for dir_name in required_dirs: if not self.download_s3_folder(dir_name): return False return self.check_local_data(required_files, required_dirs) print("Missing required data and S3 is not configured") return False