Spaces:
Running
Running
import os | |
import pandas as pd | |
import numpy as np | |
import json | |
from src.utils import Utils | |
class Reader: | |
def __init__(self, config): | |
self.config = config | |
self.utils = Utils() | |
self.cache_dir = config.get("cache_dir", "./cache") # default cache directory | |
def read(self, input_path=None, reader_config=None): | |
# If reader_config is None, use the class-level config | |
if reader_config is None: | |
reader_config = self.config | |
file_format = reader_config.get("format", None) | |
input_path = input_path or reader_config.get("input_path", "") | |
# Decide which method to use based on file format | |
if file_format == "parquet": | |
return self._read_dataframe_from_parquet(input_path, reader_config) | |
elif file_format == "csv": | |
return self._read_dataframe_from_csv(input_path) | |
elif file_format == "s3_csv": | |
return self._read_dataframe_from_csv_s3(input_path, reader_config) | |
elif file_format == "json_folder": | |
return self._read_json_files_to_dataframe(input_path) | |
else: | |
raise ValueError(f"Unsupported file format: {file_format}") | |
def _read_dataframe_from_parquet(self, input_path=None, reader_config=None): | |
if reader_config is None: | |
reader_config = self.config | |
input_path = input_path or reader_config.get("input_path", "") | |
if input_path.startswith("s3://"): | |
# Check if the file is cached | |
local_cache_path = os.path.join(self.cache_dir, os.path.basename(input_path)) | |
if os.path.exists(local_cache_path): | |
print("reading from cache") | |
print(local_cache_path) | |
return pd.read_parquet(local_cache_path) | |
print("reading from s3") | |
credentials = reader_config.get("credentials", {}) | |
storage_options = { | |
'key': credentials.get("access_key_id", ""), | |
'secret': credentials.get("secret_access_key", ""), | |
'client_kwargs': {'endpoint_url': credentials.get("endpoint_url", "")} | |
} | |
# Read from S3 and cache locally | |
df = pd.read_parquet(input_path, storage_options=storage_options) | |
os.makedirs(self.cache_dir, exist_ok=True) # Check and create if not exists | |
df.to_parquet(local_cache_path) # Save to cache | |
return df | |
else: | |
return pd.read_parquet(input_path) | |
def _read_dataframe_from_csv(self, file_path): | |
return self.utils.read_dataframe_from_csv(file_path) | |
def _read_json_files_to_dataframe(self, folder_path): | |
self.utils.load_json_files_to_dataframe(folder_path) | |
def _read_dataframe_from_csv_s3(self, input_path, reader_config): | |
credentials = reader_config.get("credentials", {}) | |
endpoint_url = credentials.get("endpoint_url", "") | |
access_key_id = credentials.get("access_key_id", "") | |
secret_access_key = credentials.get("secret_access_key", "") | |
# Constructing the storage options for s3fs | |
storage_options = { | |
'key': access_key_id, | |
'secret': secret_access_key, | |
'client_kwargs': {'endpoint_url': endpoint_url} | |
} | |
# Use pandas to read the CSV file directly from S3 | |
try: | |
df = pd.read_csv(input_path, storage_options=storage_options) | |
return df | |
except Exception as e: | |
print(f"An error occurred while reading the CSV file from S3: {e}") | |
return None |