cheesechaser.pipe.base

This module provides a data pipeline system for retrieving and processing resources from a data pool.

It includes classes for managing pipeline items, errors, sessions, and the main pipeline itself. The pipeline allows for concurrent retrieval of resources using a thread pool and provides a convenient interface for iterating over retrieved items.

Key components:

  • PipeItem: Represents a successfully retrieved resource.

  • PipeError: Represents an error that occurred during resource retrieval.

  • PipeSession: Manages the pipeline session, including item iteration and shutdown.

  • Pipe: The main pipeline class for retrieving resources from a data pool.

This module is designed to work with large datasets and provides error handling and progress tracking capabilities.

Pipe

class cheesechaser.pipe.base.Pipe(pool: DataPool)[source]

The main pipeline class for retrieving resources from a data pool.

Parameters:

pool (DataPool) – The data pool to retrieve resources from.

__init__(pool: DataPool)[source]
__weakref__

list of weak references to the object (if defined)

batch_retrieve(resource_ids, max_workers: int = 12) PipeSession[source]

Retrieve multiple resources in parallel using a thread pool.

Parameters:
  • resource_ids – An iterable of resource IDs or (ID, metainfo) tuples to retrieve.

  • max_workers (int) – The maximum number of worker threads to use.

Returns:

A PipeSession object for iterating over the retrieved items.

Return type:

PipeSession

retrieve(resource_id, resource_metainfo)[source]

Retrieve a single resource from the data pool.

This method should be implemented by subclasses.

Parameters:
  • resource_id – The ID of the resource to retrieve.

  • resource_metainfo – Additional metadata for the resource.

Raises:

NotImplementedError – If not implemented by a subclass.

PipeItem

class cheesechaser.pipe.base.PipeItem(id: int | str, data: Any, order_id: int, metainfo: dict | None)[source]

Represents a successfully retrieved resource item in the pipeline.

Parameters:
  • id (Union[int, str]) – The unique identifier of the resource.

  • data (Any) – The actual data of the resource.

  • order_id (int) – The order ID of the resource in the retrieval sequence.

  • metainfo (Optional[dict]) – Additional metadata associated with the resource.

__eq__(other)

Return self==value.

__hash__ = None
__init__(id: int | str, data: Any, order_id: int, metainfo: dict | None) None
__repr__()

Return repr(self).

__weakref__

list of weak references to the object (if defined)

PipeSession

class cheesechaser.pipe.base.PipeSession(queue: Queue, is_start: Event, is_stopped: Event, is_finished: Event)[source]

Manages a pipeline session, providing methods to iterate over retrieved items and control the session.

Parameters:
  • queue (Queue) – The queue containing retrieved items.

  • is_start (Event) – An event indicating whether the session has started.

  • is_stopped (Event) – An event indicating whether the session has been stopped.

  • is_finished (Event) – An event indicating whether the session has finished.

__enter__()[source]

Enter the context manager.

Returns:

The PipeSession instance.

Return type:

PipeSession

__exit__(exc_type, exc_val, exc_tb)[source]

Exit the context manager, shutting down the session.

Parameters:
  • exc_type – The type of the exception that caused the context to be exited.

  • exc_val – The instance of the exception that caused the context to be exited.

  • exc_tb – A traceback object encoding the stack trace.

__init__(queue: Queue, is_start: Event, is_stopped: Event, is_finished: Event)[source]
__iter__() Iterator[PipeItem][source]

Iterate over the items in the pipeline.

Returns:

An iterator of PipeItems.

Return type:

Iterator[PipeItem]

__weakref__

list of weak references to the object (if defined)

next(block: bool = True, timeout: float | None = None) PipeItem[source]

Retrieve the next item from the pipeline.

Parameters:
  • block (bool) – Whether to block until an item is available.

  • timeout (Optional[float]) – The maximum time to wait for an item.

Returns:

The next PipeItem in the queue.

Return type:

PipeItem

Raises:

Empty – If no item is available within the specified timeout.

shutdown(wait=True, timeout: float | None = None)[source]

Shutdown the pipeline session.

Parameters:
  • wait (bool) – Whether to wait for the session to finish.

  • timeout (Optional[float]) – The maximum time to wait for the session to finish.