cheesechaser.pipe.base
This module provides a data pipeline system for retrieving and processing resources from a data pool.
It includes classes for managing pipeline items, errors, sessions, and the main pipeline itself. The pipeline allows for concurrent retrieval of resources using a thread pool and provides a convenient interface for iterating over retrieved items.
Key components:
PipeItem
: Represents a successfully retrieved resource.PipeError
: Represents an error that occurred during resource retrieval.PipeSession
: Manages the pipeline session, including item iteration and shutdown.Pipe
: The main pipeline class for retrieving resources from a data pool.
This module is designed to work with large datasets and provides error handling and progress tracking capabilities.
Pipe
- class cheesechaser.pipe.base.Pipe(pool: DataPool)[source]
The main pipeline class for retrieving resources from a data pool.
- Parameters:
pool (DataPool) – The data pool to retrieve resources from.
- __weakref__
list of weak references to the object (if defined)
- batch_retrieve(resource_ids, max_workers: int = 12) PipeSession [source]
Retrieve multiple resources in parallel using a thread pool.
- Parameters:
resource_ids – An iterable of resource IDs or (ID, metainfo) tuples to retrieve.
max_workers (int) – The maximum number of worker threads to use.
- Returns:
A PipeSession object for iterating over the retrieved items.
- Return type:
- retrieve(resource_id, resource_metainfo)[source]
Retrieve a single resource from the data pool.
This method should be implemented by subclasses.
- Parameters:
resource_id – The ID of the resource to retrieve.
resource_metainfo – Additional metadata for the resource.
- Raises:
NotImplementedError – If not implemented by a subclass.
PipeItem
- class cheesechaser.pipe.base.PipeItem(id: int | str, data: Any, order_id: int, metainfo: dict | None)[source]
Represents a successfully retrieved resource item in the pipeline.
- Parameters:
id (Union[int, str]) – The unique identifier of the resource.
data (Any) – The actual data of the resource.
order_id (int) – The order ID of the resource in the retrieval sequence.
metainfo (Optional[dict]) – Additional metadata associated with the resource.
- __eq__(other)
Return self==value.
- __hash__ = None
- __init__(id: int | str, data: Any, order_id: int, metainfo: dict | None) None
- __repr__()
Return repr(self).
- __weakref__
list of weak references to the object (if defined)
PipeSession
- class cheesechaser.pipe.base.PipeSession(queue: Queue, is_start: Event, is_stopped: Event, is_finished: Event)[source]
Manages a pipeline session, providing methods to iterate over retrieved items and control the session.
- Parameters:
queue (Queue) – The queue containing retrieved items.
is_start (Event) – An event indicating whether the session has started.
is_stopped (Event) – An event indicating whether the session has been stopped.
is_finished (Event) – An event indicating whether the session has finished.
- __exit__(exc_type, exc_val, exc_tb)[source]
Exit the context manager, shutting down the session.
- Parameters:
exc_type – The type of the exception that caused the context to be exited.
exc_val – The instance of the exception that caused the context to be exited.
exc_tb – A traceback object encoding the stack trace.
- __iter__() Iterator[PipeItem] [source]
Iterate over the items in the pipeline.
- Returns:
An iterator of PipeItems.
- Return type:
Iterator[PipeItem]
- __weakref__
list of weak references to the object (if defined)
- next(block: bool = True, timeout: float | None = None) PipeItem [source]
Retrieve the next item from the pipeline.
- Parameters:
block (bool) – Whether to block until an item is available.
timeout (Optional[float]) – The maximum time to wait for an item.
- Returns:
The next PipeItem in the queue.
- Return type:
- Raises:
Empty – If no item is available within the specified timeout.