Source code for brainsets.utils.dandi_utils
import numpy as np
import pandas as pd
from temporaldata import ArrayDict, IrregularTimeSeries
from brainsets.descriptions import SubjectDescription
from brainsets.taxonomy import (
RecordingTech,
Sex,
Species,
)
[docs]
def extract_metadata_from_nwb(nwbfile):
recording_date = nwbfile.session_start_time.strftime("%Y%m%d")
related_publications = nwbfile.related_publications
return dict(
recording_date=recording_date, related_publications=related_publications
)
[docs]
def extract_subject_from_nwb(nwbfile):
r"""DANDI has requirements for metadata included in `subject`. This includes:
subject_id: A subject identifier must be provided.
species: either a latin binomial or NCBI taxonomic identifier.
sex: must be "M", "F", "O" (other), or "U" (unknown).
date_of_birth or age: this does not appear to be enforced, so will be skipped.
"""
species = nwbfile.subject.species
if "NCBITaxon" in species:
species = "NCBITaxon_" + species.split("_")[-1]
return SubjectDescription(
id=nwbfile.subject.subject_id.lower(),
species=Species.from_string(species),
sex=Sex.from_string(nwbfile.subject.sex),
)
[docs]
def extract_spikes_from_nwbfile(nwbfile, recording_tech):
# spikes
timestamps = []
unit_index = []
# units
unit_meta = []
units = nwbfile.units.spike_times_index[:]
electrodes = nwbfile.units.electrodes.table
# all these units are obtained using threshold crossings
for i in range(len(units)):
if recording_tech == RecordingTech.UTAH_ARRAY_THRESHOLD_CROSSINGS:
# label unit
group_name = electrodes["group_name"][i]
unit_id = f"group_{group_name}/elec{i}/multiunit_{0}"
elif recording_tech == RecordingTech.UTAH_ARRAY_SPIKES:
# label unit
electrode_id = nwbfile.units[i].electrodes.item().item()
group_name = electrodes["group_name"][electrode_id]
unit_id = f"group_{group_name}/elec{electrode_id}/unit_{i}"
else:
raise ValueError(f"Recording tech {recording_tech} not supported")
# extract spikes
spiketimes = units[i]
timestamps.append(spiketimes)
if len(spiketimes) > 0:
unit_index.append([i] * len(spiketimes))
# extract unit metadata
unit_meta.append(
{
"id": unit_id,
"unit_number": i,
"count": len(spiketimes),
"type": int(recording_tech),
}
)
# convert unit metadata to a Data object
unit_meta_df = pd.DataFrame(unit_meta) # list of dicts to dataframe
units = ArrayDict.from_dataframe(
unit_meta_df,
unsigned_to_long=True,
)
# concatenate spikes
timestamps = np.concatenate(timestamps)
unit_index = np.concatenate(unit_index)
# create spikes object
spikes = IrregularTimeSeries(
timestamps=timestamps,
unit_index=unit_index,
domain="auto",
)
# make sure to sort ethe spikes
spikes.sort()
return spikes, units