import os import pandas as pd import numpy as np import json from src.utils import Utils class Reader: def __init__(self, config): self.config = config self.utils = Utils() self.cache_dir = config.get("cache_dir", "./cache") # default cache directory def read(self, input_path=None, reader_config=None): # If reader_config is None, use the class-level config if reader_config is None: reader_config = self.config file_format = reader_config.get("format", None) input_path = input_path or reader_config.get("input_path", "") # Decide which method to use based on file format if file_format == "parquet": return self._read_dataframe_from_parquet(input_path, reader_config) elif file_format == "csv": return self._read_dataframe_from_csv(input_path) elif file_format == "s3_csv": return self._read_dataframe_from_csv_s3(input_path, reader_config) elif file_format == "json_folder": return self._read_json_files_to_dataframe(input_path) else: raise ValueError(f"Unsupported file format: {file_format}") def _read_dataframe_from_parquet(self, input_path=None, reader_config=None): if reader_config is None: reader_config = self.config input_path = input_path or reader_config.get("input_path", "") if input_path.startswith("s3://"): # Check if the file is cached local_cache_path = os.path.join(self.cache_dir, os.path.basename(input_path)) if os.path.exists(local_cache_path): print("reading from cache") print(local_cache_path) return pd.read_parquet(local_cache_path) print("reading from s3") credentials = reader_config.get("credentials", {}) storage_options = { 'key': credentials.get("access_key_id", ""), 'secret': credentials.get("secret_access_key", ""), 'client_kwargs': {'endpoint_url': credentials.get("endpoint_url", "")} } # Read from S3 and cache locally df = pd.read_parquet(input_path, storage_options=storage_options) os.makedirs(self.cache_dir, exist_ok=True) # Check and create if not exists df.to_parquet(local_cache_path) # Save to cache return df else: return pd.read_parquet(input_path) def _read_dataframe_from_csv(self, file_path): return self.utils.read_dataframe_from_csv(file_path) def _read_json_files_to_dataframe(self, folder_path): self.utils.load_json_files_to_dataframe(folder_path) def _read_dataframe_from_csv_s3(self, input_path, reader_config): credentials = reader_config.get("credentials", {}) endpoint_url = credentials.get("endpoint_url", "") access_key_id = credentials.get("access_key_id", "") secret_access_key = credentials.get("secret_access_key", "") # Constructing the storage options for s3fs storage_options = { 'key': access_key_id, 'secret': secret_access_key, 'client_kwargs': {'endpoint_url': endpoint_url} } # Use pandas to read the CSV file directly from S3 try: df = pd.read_csv(input_path, storage_options=storage_options) return df except Exception as e: print(f"An error occurred while reading the CSV file from S3: {e}") return None