Spaces:
Runtime error
Runtime error
| import os | |
| import pandas as pd | |
| import numpy as np | |
| import json | |
| from src.utils import Utils | |
| class Reader: | |
| def __init__(self, config): | |
| self.config = config | |
| self.utils = Utils() | |
| self.cache_dir = config.get("cache_dir", "./cache") # default cache directory | |
| def read(self, input_path=None, reader_config=None): | |
| # If reader_config is None, use the class-level config | |
| if reader_config is None: | |
| reader_config = self.config | |
| file_format = reader_config.get("format", None) | |
| input_path = input_path or reader_config.get("input_path", "") | |
| # Decide which method to use based on file format | |
| if file_format == "parquet": | |
| return self._read_dataframe_from_parquet(input_path, reader_config) | |
| elif file_format == "csv": | |
| return self._read_dataframe_from_csv(input_path) | |
| elif file_format == "s3_csv": | |
| return self._read_dataframe_from_csv_s3(input_path, reader_config) | |
| elif file_format == "json_folder": | |
| return self._read_json_files_to_dataframe(input_path) | |
| else: | |
| raise ValueError(f"Unsupported file format: {file_format}") | |
| def _read_dataframe_from_parquet(self, input_path=None, reader_config=None): | |
| if reader_config is None: | |
| reader_config = self.config | |
| input_path = input_path or reader_config.get("input_path", "") | |
| if input_path.startswith("s3://"): | |
| # Check if the file is cached | |
| local_cache_path = os.path.join(self.cache_dir, os.path.basename(input_path)) | |
| if os.path.exists(local_cache_path): | |
| print("reading from cache") | |
| print(local_cache_path) | |
| return pd.read_parquet(local_cache_path) | |
| print("reading from s3") | |
| credentials = reader_config.get("credentials", {}) | |
| storage_options = { | |
| 'key': credentials.get("access_key_id", ""), | |
| 'secret': credentials.get("secret_access_key", ""), | |
| 'client_kwargs': {'endpoint_url': credentials.get("endpoint_url", "")} | |
| } | |
| # Read from S3 and cache locally | |
| df = pd.read_parquet(input_path, storage_options=storage_options) | |
| os.makedirs(self.cache_dir, exist_ok=True) # Check and create if not exists | |
| df.to_parquet(local_cache_path) # Save to cache | |
| return df | |
| else: | |
| return pd.read_parquet(input_path) | |
| def _read_dataframe_from_csv(self, file_path): | |
| return self.utils.read_dataframe_from_csv(file_path) | |
| def _read_json_files_to_dataframe(self, folder_path): | |
| self.utils.load_json_files_to_dataframe(folder_path) | |
| def _read_dataframe_from_csv_s3(self, input_path, reader_config): | |
| credentials = reader_config.get("credentials", {}) | |
| endpoint_url = credentials.get("endpoint_url", "") | |
| access_key_id = credentials.get("access_key_id", "") | |
| secret_access_key = credentials.get("secret_access_key", "") | |
| # Constructing the storage options for s3fs | |
| storage_options = { | |
| 'key': access_key_id, | |
| 'secret': secret_access_key, | |
| 'client_kwargs': {'endpoint_url': endpoint_url} | |
| } | |
| # Use pandas to read the CSV file directly from S3 | |
| try: | |
| df = pd.read_csv(input_path, storage_options=storage_options) | |
| return df | |
| except Exception as e: | |
| print(f"An error occurred while reading the CSV file from S3: {e}") | |
| return None |