Source code for brainsets.utils.s3_utils

"""Generic S3 utilities for downloading data from public buckets."""

__all__ = [
    "get_cached_s3_client",
    "get_object_list",
    "download_prefix",
    "download_prefix_from_url",
]

# Drives the generated API reference; see docs/source/api_reference.py.
__api_ref__ = {
    "description": None,
    "sections": [{"autosummary": __all__}],
}


from functools import lru_cache
from pathlib import Path
from urllib.parse import urlparse

try:
    import boto3
    from botocore import UNSIGNED
    from botocore.client import BaseClient
    from botocore.config import Config
    from botocore.exceptions import ClientError

    BOTO_AVAILABLE = True
except ImportError:
    boto3 = None
    UNSIGNED = None
    BaseClient = None
    Config = None
    ClientError = None
    BOTO_AVAILABLE = False


def _check_boto_available(func_name: str) -> None:
    """Raise ImportError if boto3/botocore is not available."""
    if not BOTO_AVAILABLE:
        raise ImportError(
            f"{func_name} requires boto3 and botocore which are not installed. "
            "Install them with `pip install boto3`"
        )


[docs] @lru_cache(maxsize=1) def get_cached_s3_client( retry_mode: str = "adaptive", max_attempts: int = 5, max_pool_connections: int = 30, ): """Get a cached S3 client configured for anonymous access to public buckets. Uses boto3's retry modes which include: - Exponential backoff with random jitter - Automatic retries on transient errors, throttling (429), and 5xx status codes Args: retry_mode: Retry mode ("standard" or "adaptive") max_attempts: Maximum number of retry attempts max_pool_connections: Maximum number of connections in the pool Returns: A configured boto3 S3 client for unsigned/anonymous access Raises: ImportError: If boto3/botocore is not installed. """ _check_boto_available("get_cached_s3_client") return boto3.client( "s3", config=Config( signature_version=UNSIGNED, retries={ "mode": retry_mode, "total_max_attempts": max_attempts, }, max_pool_connections=max_pool_connections, ), )
[docs] def get_object_list( bucket: str, prefix: str, s3_client: "BaseClient | None" = None, ) -> list[str]: """List all object keys under a prefix (excludes directories). Args: bucket: S3 bucket name prefix: Key prefix to filter objects (e.g., "ds005555/") s3_client: Optional pre-configured S3 client Returns: List of object keys (relative to the prefix) Raises: RuntimeError: If listing fails ImportError: If boto3/botocore is not installed. """ _check_boto_available("get_object_list") if s3_client is None: s3_client = get_cached_s3_client() keys = [] try: paginator = s3_client.get_paginator("list_objects_v2") pages = paginator.paginate(Bucket=bucket, Prefix=prefix) for page in pages: if "Contents" not in page: continue for obj in page["Contents"]: key = obj["Key"] if not key.endswith("/") and key.startswith(prefix): relative_path = key[len(prefix) :] if relative_path: keys.append(relative_path) except Exception as e: raise RuntimeError(f"Error listing objects in {bucket}/{prefix}: {e}") from e return keys
[docs] def download_prefix( bucket: str, prefix: str, target_dir: Path, strip_prefix: str = None, s3_client: "BaseClient | None" = None, ) -> list[Path]: """Download all files matching a prefix pattern. Args: bucket: S3 bucket name prefix: Key prefix to match files target_dir: Local directory to download files to strip_prefix: Prefix to strip from keys when creating local paths. If None, uses the first path component (dataset_id). s3_client: Optional pre-configured S3 client Returns: List of downloaded file paths Raises: RuntimeError: If download fails or no files match ImportError: If boto3/botocore is not installed. Examples: >>> # Basic usage >>> download_prefix( bucket="openneuro.org", prefix="ds005555/sub-1/eeg/sub-1_task-Sleep", target_dir=Path("~/data/raw/brainset_ds005555") ) >>> # Custom strip_prefix >>> download_prefix( bucket="fcp-indi", prefix="data/Projects/EEG_Eyetracking_CMI_data/A00054400", target_dir=Path("~/data/raw/brainset_ds005555"), strip_prefix="data/Projects/" ) """ _check_boto_available("download_prefix") if s3_client is None: s3_client = get_cached_s3_client() target_dir = Path(target_dir) target_dir.mkdir(parents=True, exist_ok=True) if strip_prefix is None: # If prefix shows no sub-directories, use it as-is (eg. "ds005555/") if "/" not in prefix: strip_prefix = prefix else: strip_prefix = prefix.split("/")[0] + "/" strip_prefix = strip_prefix.rstrip("/") + "/" downloaded_files = [] try: paginator = s3_client.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=bucket, Prefix=prefix): if "Contents" not in page: continue for obj in page["Contents"]: obj_key = obj["Key"] if obj_key.endswith("/"): continue if obj_key.startswith(strip_prefix): rel_path = obj_key[len(strip_prefix) :] else: rel_path = obj_key local_path = target_dir / rel_path local_path.parent.mkdir(parents=True, exist_ok=True) try: s3_client.download_file(bucket, obj_key, str(local_path)) downloaded_files.append(local_path) except ClientError as e: raise RuntimeError(f"Failed to download {obj_key}: {e}") from e if not downloaded_files: raise RuntimeError( f"No files found matching prefix '{prefix}' in bucket '{bucket}'" ) except RuntimeError: raise except Exception as e: raise RuntimeError(f"Failed to download from {bucket}/{prefix}: {e}") from e return downloaded_files
[docs] def download_prefix_from_url(s3_url: str, target_dir: Path) -> list[Path]: """Download all files matching an S3 URL prefix pattern. Args: s3_url: S3 URL prefix pattern (e.g., 's3://bucket/prefix') target_dir: Local directory to download files to Returns: List of downloaded file paths Raises: ValueError: If URL is not a valid S3 URL RuntimeError: If download fails ImportError: If boto3/botocore is not installed. """ _check_boto_available("download_prefix_from_url") parsed = urlparse(s3_url) if parsed.scheme != "s3": raise ValueError(f"Invalid S3 URL: {s3_url}") bucket = parsed.netloc prefix = parsed.path.lstrip("/") return download_prefix(bucket, prefix, target_dir)