BrainsetPipeline

class brainsets.pipeline.BrainsetPipeline(raw_dir, processed_dir, args, tracker_handle=None, download_only=False)[source]

Bases: ABC

Abstract base class for defining processing pipelines. Pipelines are subclasses of this class. Pipelines are either run through the CLI or through brainsets.runner module.

Subclasses must implement:
The pipeline workflow consists of:
  1. Generating a manifest (list of assets to process) via get_manifest(). This happens on the root process.

  2. Downloading each asset via download(). This happens in parallel for multiple rows of the manifest.

  3. Processing each downloaded asset via process(). This also happens in parallel, on the same process as download().

Handling pipeline-specific command line arguments:
  • Subclasses can define pipeline-specific command-line arguments by setting parser.

  • The runner will automatically parse any extra CLI arguments using this parser.

  • The parsed arguments are passed to the get_manifest() as the args method parameter, and to the download() and process() methods via class attribute args.

Examples

>>> from argparse import ArgumentParser
>>> parser = ArgumentParser()
>>> parser.add_argument("--redownload", action="store_true")
>>> parser.add_argument("--reprocess", action="store_true")
>>>
>>> class Pipeline(BrainsetPipeline):
...     brainset_id = "my_brainset"
...     parser = parser
...
...     @classmethod
...     def get_manifest(cls, raw_dir, processed_dir, args):
...         # Return DataFrame of assets to process
...         return pd.DataFrame(...)
...
...     def download(self, manifest_item):
...         # Download the asset
...         # return filepath or handle of downloaded data
...         ...
...
...     def process(self, download_output):
...         # Process the downloaded data
...         ...
brainset_id: str

Unique identifier for the brainset. Must be set by the Pipeline subclass.

parser: ArgumentParser | None = None

Optional argparse.ArgumentParser object for pipeline-specific command-line arguments. If set by a subclass, the runner will automatically parse any extra command-line arguments using this parser. The parsed arguments are then passed to get_manifest() as a method argument, and to the download() and process() methods via self.args.

raw_dir: Path

Raw data directory assigned to this brainset by the pipeline runner.

processed_dir: Path

Processed data directory assigned to this brainset by the pipeline runner.

args: Namespace | None

Pipeline-specific arguments parsed from the command line. Set by the runner if parser is defined by subclass.

abstractmethod classmethod get_manifest(raw_dir, args)[source]

Returns a pandas.DataFrame, which is a table of assets to be downloaded and processed. Each row will be passed individually to the download() and process() methods.

The index of this DataFrame will be used to identify assets for when user wants to process a single asset.

Parameters:
  • raw_dir (Path) – Raw data directory assigned to this brainset by the pipeline runner.

  • args (Optional[Namespace]) – Pipeline-specific arguments parsed from the command line. Set by the runner if parser is defined by subclass.

Return type:

pandas.DataFrame

abstractmethod download(manifest_item)[source]

Download the asset indicated by manifest_item. All return values will be passed to process().

Parameters:

manifest_item (NamedTuple) – This is a single row of the manifest returned by get_manifest().

Return type:

Any

abstractmethod process(download_output)[source]

Process and save the dataset.

Parameters:

download_output (Any) – This will be the return value of the download() method.

update_status(status)[source]

Update the current status of the pipeline for a given asset. This will be shown on the terminal.