search_demo / src /reader.py
bibliotecadebabel
first commit
37c2a8d
raw
history blame
3.59 kB
import os
import pandas as pd
import numpy as np
import json
from src.utils import Utils
class Reader:
def __init__(self, config):
self.config = config
self.utils = Utils()
self.cache_dir = config.get("cache_dir", "./cache") # default cache directory
def read(self, input_path=None, reader_config=None):
# If reader_config is None, use the class-level config
if reader_config is None:
reader_config = self.config
file_format = reader_config.get("format", None)
input_path = input_path or reader_config.get("input_path", "")
# Decide which method to use based on file format
if file_format == "parquet":
return self._read_dataframe_from_parquet(input_path, reader_config)
elif file_format == "csv":
return self._read_dataframe_from_csv(input_path)
elif file_format == "s3_csv":
return self._read_dataframe_from_csv_s3(input_path, reader_config)
elif file_format == "json_folder":
return self._read_json_files_to_dataframe(input_path)
else:
raise ValueError(f"Unsupported file format: {file_format}")
def _read_dataframe_from_parquet(self, input_path=None, reader_config=None):
if reader_config is None:
reader_config = self.config
input_path = input_path or reader_config.get("input_path", "")
if input_path.startswith("s3://"):
# Check if the file is cached
local_cache_path = os.path.join(self.cache_dir, os.path.basename(input_path))
if os.path.exists(local_cache_path):
print("reading from cache")
print(local_cache_path)
return pd.read_parquet(local_cache_path)
print("reading from s3")
credentials = reader_config.get("credentials", {})
storage_options = {
'key': credentials.get("access_key_id", ""),
'secret': credentials.get("secret_access_key", ""),
'client_kwargs': {'endpoint_url': credentials.get("endpoint_url", "")}
}
# Read from S3 and cache locally
df = pd.read_parquet(input_path, storage_options=storage_options)
os.makedirs(self.cache_dir, exist_ok=True) # Check and create if not exists
df.to_parquet(local_cache_path) # Save to cache
return df
else:
return pd.read_parquet(input_path)
def _read_dataframe_from_csv(self, file_path):
return self.utils.read_dataframe_from_csv(file_path)
def _read_json_files_to_dataframe(self, folder_path):
self.utils.load_json_files_to_dataframe(folder_path)
def _read_dataframe_from_csv_s3(self, input_path, reader_config):
credentials = reader_config.get("credentials", {})
endpoint_url = credentials.get("endpoint_url", "")
access_key_id = credentials.get("access_key_id", "")
secret_access_key = credentials.get("secret_access_key", "")
# Constructing the storage options for s3fs
storage_options = {
'key': access_key_id,
'secret': secret_access_key,
'client_kwargs': {'endpoint_url': endpoint_url}
}
# Use pandas to read the CSV file directly from S3
try:
df = pd.read_csv(input_path, storage_options=storage_options)
return df
except Exception as e:
print(f"An error occurred while reading the CSV file from S3: {e}")
return None