Source code for cheesechaser.datapool.table

"""
This module provides classes for managing and accessing data pools stored on Hugging Face.

It includes implementations for table-based data pools, allowing efficient retrieval and 
management of data resources stored in archives on Hugging Face repositories. The module 
supports various file formats and provides mechanisms for mapping between resource IDs 
and their corresponding archive locations.
"""

import os.path
from threading import Lock
from typing import List, Optional, Tuple, Dict

import pandas as pd
from hfutils.operate import get_hf_client
from tqdm import tqdm

from .base import HfBasedDataPool, _n_path, FileUnrecognizableError, DataLocation


[docs]class TableBasedHfDataPool(HfBasedDataPool): """ A class representing a table-based data pool stored on Hugging Face. This class extends HfBasedDataPool to provide functionality for managing data that is organized in a tabular format, where each row represents a data item stored in an archive file. :param data_repo_id: The ID of the Hugging Face repository containing the data. :type data_repo_id: str :param archive_column: The name of the column containing archive filenames. :type archive_column: str :param file_in_archive_column: The name of the column containing filenames within archives. :type file_in_archive_column: str :param id_column: The name of the column containing unique identifiers for each data item. :type id_column: str :param data_revision: The revision of the data to use (default is 'main'). :type data_revision: str :param mock_use_id: Whether to use the ID as part of the filename when extracting (default is True). :type mock_use_id: bool :param hf_token: An optional Hugging Face API token for authentication. :type hf_token: Optional[str] """
[docs] def __init__(self, data_repo_id: str, archive_column: str, file_in_archive_column: str, id_column: str = 'id', data_revision: str = 'main', mock_use_id: bool = True, hf_token: Optional[str] = None): HfBasedDataPool.__init__( self, data_repo_id=data_repo_id, data_revision=data_revision, idx_repo_id=data_repo_id, idx_revision=data_revision, hf_token=hf_token, ) self._archive_column = archive_column self._file_in_archive_column = file_in_archive_column self._id_column = id_column self._mock_use_id = mock_use_id self._st = None self._lock = Lock()
def _get_dst_filename(self, location: DataLocation) -> str: """ Get the destination filename for a given data location. :param location: The data location object. :type location: DataLocation :return: The destination filename. :rtype: str """ if self._mock_use_id: _, ext = os.path.splitext(location.filename) return f'{location.resource_id}{ext.lower()}' else: return super()._get_dst_filename(location) def _load_df(self) -> pd.DataFrame: """ Load the dataframe containing the data pool information. This method should be implemented by subclasses to define how the dataframe is loaded. :return: The loaded dataframe. :rtype: pd.DataFrame :raises NotImplementedError: If not implemented in a subclass. """ raise NotImplementedError # pragma: no cover def _get_st(self) -> Tuple[Dict[int, str], Dict[Tuple[str, str], int]]: """ Get the internal state of the data pool. This method loads and caches the mapping between resource IDs and their corresponding archive locations. :return: A tuple containing two dictionaries: 1. Mapping from resource ID to archive filename 2. Mapping from (archive filename, file in archive) to resource ID :rtype: Tuple[Dict[int, str], Dict[Tuple[str, str], int]] """ with self._lock: if self._st is None: _d_int_to_archive: Dict[int, str] = {} _d_archive_to_id: Dict[Tuple[str, str], int] = {} for row in tqdm(self._load_df().to_dict('records'), desc='Table Scanning'): _d_int_to_archive[row[self._id_column]] = row[self._archive_column] _d_archive_to_id[ (_n_path(row[self._archive_column]), _n_path(row[self._file_in_archive_column])) ] = row[self._id_column] self._st = _d_int_to_archive, _d_archive_to_id return self._st def _file_to_resource_id(self, tar_file: str, filename: str) -> int: """ Convert a file path to its corresponding resource ID. :param tar_file: The name of the archive file. :type tar_file: str :param filename: The name of the file within the archive. :type filename: str :return: The resource ID corresponding to the file. :rtype: int :raises FileUnrecognizableError: If the file is not recognized in the data pool. """ _, _d_archive_to_id = self._get_st() token = (_n_path(tar_file), _n_path(filename)) if token in _d_archive_to_id: return _d_archive_to_id[token] else: raise FileUnrecognizableError(f'Resource in tar {tar_file!r}\'s file {filename!r} unrecognizable.') def _request_possible_archives(self, resource_id) -> List[str]: """ Get a list of possible archive filenames for a given resource ID. :param resource_id: The ID of the resource to look up. :type resource_id: int :return: A list of possible archive filenames containing the resource. :rtype: List[str] """ _d_int_to_archive, _ = self._get_st() if resource_id in _d_int_to_archive: return [_d_int_to_archive[resource_id]] else: return []
[docs]class SimpleTableHfDataPool(TableBasedHfDataPool): """ A simple implementation of TableBasedHfDataPool that loads data from a single table file. This class provides functionality to load data from a CSV or Parquet file stored in a Hugging Face repository. :param data_repo_id: The ID of the Hugging Face repository containing the data. :type data_repo_id: str :param archive_column: The name of the column containing archive filenames. :type archive_column: str :param file_in_archive_column: The name of the column containing filenames within archives. :type file_in_archive_column: str :param table_file: The name of the file containing the data table. :type table_file: str :param id_column: The name of the column containing unique identifiers for each data item. :type id_column: str :param data_revision: The revision of the data to use (default is 'main'). :type data_revision: str :param mock_use_id: Whether to use the ID as part of the filename when extracting (default is True). :type mock_use_id: bool :param hf_token: An optional Hugging Face API token for authentication. :type hf_token: Optional[str] """
[docs] def __init__(self, data_repo_id: str, archive_column: str, file_in_archive_column: str, table_file: str, id_column: str = 'id', data_revision: str = 'main', mock_use_id: bool = True, hf_token: Optional[str] = None): TableBasedHfDataPool.__init__( self, data_repo_id=data_repo_id, archive_column=archive_column, file_in_archive_column=file_in_archive_column, id_column=id_column, data_revision=data_revision, mock_use_id=mock_use_id, hf_token=hf_token, ) self._table_file = table_file
def _load_df(self) -> pd.DataFrame: """ Load the dataframe from the specified table file in the Hugging Face repository. This method supports loading from CSV and Parquet files. :return: The loaded dataframe containing the data pool information. :rtype: pd.DataFrame :raises RuntimeError: If the file format is not supported or cannot be determined. """ hf_client = get_hf_client(hf_token=self._hf_token) _, ext = os.path.splitext(self._table_file.lower()) if ext == '.csv': fn_reader = pd.read_csv elif ext == '.parquet': fn_reader = pd.read_parquet else: raise RuntimeError(f'Unable to determine the reading operation of file {self._table_file!r}.') df = fn_reader(hf_client.hf_hub_download( repo_id=self.data_repo_id, repo_type='dataset', revision=self.data_revision, filename=self._table_file, )) return df