BrainsetPipeline¶
- class brainsets.pipeline.BrainsetPipeline(raw_dir, processed_dir, args, tracker_handle=None, download_only=False)[source]¶
Bases:
ABCAbstract base class for defining processing pipelines. Pipelines are subclasses of this class. Pipelines are either run through the CLI or through
brainsets.runnermodule.- Subclasses must implement:
Set the
brainset_idget_manifest(): Generate apd.DataFramelisting all assets to processdownload(): Download a single asset from the manifestprocess(): Transform downloaded data into standardized format
- The pipeline workflow consists of:
Generating a manifest (list of assets to process) via
get_manifest(). This happens on the root process.Downloading each asset via
download(). This happens in parallel for multiple rows of the manifest.Processing each downloaded asset via
process(). This also happens in parallel, on the same process asdownload().
- Handling pipeline-specific command line arguments:
Subclasses can define pipeline-specific command-line arguments by setting
parser.The runner will automatically parse any extra CLI arguments using this parser.
The parsed arguments are passed to the
get_manifest()as the args method parameter, and to thedownload()andprocess()methods via class attributeargs.
Examples
>>> from argparse import ArgumentParser >>> parser = ArgumentParser() >>> parser.add_argument("--redownload", action="store_true") >>> parser.add_argument("--reprocess", action="store_true") >>> >>> class Pipeline(BrainsetPipeline): ... brainset_id = "my_brainset" ... parser = parser ... ... @classmethod ... def get_manifest(cls, raw_dir, processed_dir, args): ... # Return DataFrame of assets to process ... return pd.DataFrame(...) ... ... def download(self, manifest_item): ... # Download the asset ... # return filepath or handle of downloaded data ... ... ... ... def process(self, download_output): ... # Process the downloaded data ... ...
- parser: ArgumentParser | None = None¶
Optional
argparse.ArgumentParserobject for pipeline-specific command-line arguments. If set by a subclass, the runner will automatically parse any extra command-line arguments using this parser. The parsed arguments are then passed to get_manifest() as a method argument, and to the download() and process() methods via self.args.
- args: Namespace | None¶
Pipeline-specific arguments parsed from the command line. Set by the runner if
parseris defined by subclass.
- abstractmethod classmethod get_manifest(raw_dir, args)[source]¶
Returns a
pandas.DataFrame, which is a table of assets to be downloaded and processed. Each row will be passed individually to thedownload()andprocess()methods.The index of this DataFrame will be used to identify assets for when user wants to process a single asset.
- Parameters:
raw_dir (Path) – Raw data directory assigned to this brainset by the pipeline runner.
args (Optional[Namespace]) – Pipeline-specific arguments parsed from the command line. Set by the runner if
parseris defined by subclass.
- Return type:
- abstractmethod download(manifest_item)[source]¶
Download the asset indicated by manifest_item. All return values will be passed to
process().- Parameters:
manifest_item (NamedTuple) – This is a single row of the manifest returned by
get_manifest().- Return type:
- abstractmethod process(download_output)[source]¶
Process and save the dataset.
- Parameters:
download_output (Any) – This will be the return value of the
download()method.