nwb-linkml/nwb_linkml/tests/fixtures/nwb.py

478 lines
15 KiB
Python
Raw Normal View History

from datetime import datetime
from itertools import product
2024-07-02 04:44:35 +00:00
from pathlib import Path
2023-10-06 04:22:00 +00:00
import numpy as np
2024-07-02 04:44:35 +00:00
import pytest
from hdmf.common import DynamicTable, VectorData
from pynwb import NWBHDF5IO, NWBFile, TimeSeries
from pynwb.base import TimeSeriesReference, TimeSeriesReferenceVectorData
from pynwb.behavior import Position, SpatialSeries
from pynwb.ecephys import LFP, ElectricalSeries
from pynwb.file import Subject
from pynwb.icephys import VoltageClampSeries, VoltageClampStimulusSeries
from pynwb.image import ImageSeries
from pynwb.ophys import (
CorrectedImageStack,
Fluorescence,
ImageSegmentation,
MotionCorrection,
OnePhotonSeries,
OpticalChannel,
RoiResponseSeries,
TwoPhotonSeries,
)
2024-07-02 04:44:35 +00:00
2024-07-02 04:23:31 +00:00
@pytest.fixture(scope="session")
def nwb_file_base() -> NWBFile:
nwbfile = NWBFile(
session_description="All that you touch, you change.", # required
identifier="1111-1111-1111-1111", # required
session_start_time=datetime(year=2024, month=1, day=1), # required
session_id="session_1234", # optional
experimenter=[
"Lauren Oya Olamina",
], # optional
institution="Earthseed Research Institute", # optional
experiment_description="All that you change, changes you.", # optional
keywords=["behavior", "belief"], # optional
related_publications="doi:10.1016/j.neuron.2016.12.011", # optional
)
subject = Subject(
subject_id="001",
age="P90D",
description="mouse 5",
species="Mus musculus",
sex="M",
)
nwbfile.subject = subject
return nwbfile
def _nwb_timeseries(nwbfile: NWBFile) -> NWBFile:
data = np.arange(100, 200, 10)
timestamps = np.arange(10.0)
time_series_with_timestamps = TimeSeries(
name="test_timeseries",
description="an example time series",
data=data,
unit="m",
timestamps=timestamps,
)
nwbfile.add_acquisition(time_series_with_timestamps)
return nwbfile
def _nwb_position(nwbfile: NWBFile) -> NWBFile:
position_data = np.array([np.linspace(0, 10, 50), np.linspace(0, 8, 50)]).T
position_timestamps = np.linspace(0, 50).astype(float) / 200
spatial_series_obj = SpatialSeries(
name="SpatialSeries",
description="(x,y) position in open field",
data=position_data,
timestamps=position_timestamps,
reference_frame="(0,0) is bottom left corner",
)
# name is set to "Position" by default
position_obj = Position(spatial_series=spatial_series_obj)
behavior_module = nwbfile.create_processing_module(
name="behavior", description="processed behavioral data"
)
behavior_module.add(position_obj)
nwbfile.add_trial_column(
name="correct",
description="whether the trial was correct",
)
nwbfile.add_trial(start_time=1.0, stop_time=5.0, correct=True)
nwbfile.add_trial(start_time=6.0, stop_time=10.0, correct=False)
return nwbfile
def _nwb_ecephys(nwbfile: NWBFile) -> NWBFile:
"""
Extracellular Ephys
https://pynwb.readthedocs.io/en/latest/tutorials/domain/ecephys.html
"""
generator = np.random.default_rng()
device = nwbfile.create_device(name="array", description="old reliable", manufacturer="diy")
nwbfile.add_electrode_column(name="label", description="label of electrode")
nshanks = 4
nchannels_per_shank = 3
electrode_counter = 0
for ishank in range(nshanks):
# create an electrode group for this shank
electrode_group = nwbfile.create_electrode_group(
name=f"shank{ishank}",
description=f"electrode group for shank {ishank}",
device=device,
location="brain area",
)
# add electrodes to the electrode table
for ielec in range(nchannels_per_shank):
nwbfile.add_electrode(
group=electrode_group,
label=f"shank{ishank}elec{ielec}",
location="brain area",
)
electrode_counter += 1
all_table_region = nwbfile.create_electrode_table_region(
region=list(range(electrode_counter)), # reference row indices 0 to N-1
description="all electrodes",
)
raw_data = generator.standard_normal((50, 12))
raw_electrical_series = ElectricalSeries(
name="ElectricalSeries",
description="Raw acquisition traces",
data=raw_data,
electrodes=all_table_region,
starting_time=0.0,
# timestamp of the first sample in seconds relative to the session start time
rate=20000.0, # in Hz
)
nwbfile.add_acquisition(raw_electrical_series)
# --------------------------------------------------
# LFP
# --------------------------------------------------
generator = np.random.default_rng()
lfp_data = generator.standard_normal((50, 12))
lfp_electrical_series = ElectricalSeries(
name="ElectricalSeries",
description="LFP data",
data=lfp_data,
electrodes=all_table_region,
starting_time=0.0,
rate=200.0,
)
lfp = LFP(electrical_series=lfp_electrical_series)
ecephys_module = nwbfile.create_processing_module(
name="ecephys", description="processed extracellular electrophysiology data"
)
ecephys_module.add(lfp)
return nwbfile
def _nwb_units(nwbfile: NWBFile) -> NWBFile:
generator = np.random.default_rng()
# Spike Times
nwbfile.add_unit_column(name="quality", description="sorting quality")
firing_rate = 20
n_units = 10
res = 1000
duration = 20
for _ in range(n_units):
spike_times = np.where(generator.random(res * duration) < (firing_rate / res))[0] / res
nwbfile.add_unit(spike_times=spike_times, quality="good")
return nwbfile
def _nwb_icephys(nwbfile: NWBFile) -> NWBFile:
device = nwbfile.create_device(name="Heka ITC-1600")
electrode = nwbfile.create_icephys_electrode(
name="elec0", description="a mock intracellular electrode", device=device
)
stimulus = VoltageClampStimulusSeries(
name="ccss",
data=[1, 2, 3, 4, 5],
starting_time=123.6,
rate=10e3,
electrode=electrode,
gain=0.02,
sweep_number=np.uint64(15),
)
# Create and icephys response
response = VoltageClampSeries(
name="vcs",
data=[0.1, 0.2, 0.3, 0.4, 0.5],
conversion=1e-12,
resolution=np.nan,
starting_time=123.6,
rate=20e3,
electrode=electrode,
gain=0.02,
capacitance_slow=100e-12,
resistance_comp_correction=70.0,
sweep_number=np.uint64(15),
)
# we can also add stimulus template data as follows
rowindex = nwbfile.add_intracellular_recording(
electrode=electrode, stimulus=stimulus, response=response, id=10
)
rowindex2 = nwbfile.add_intracellular_recording(
electrode=electrode,
stimulus=stimulus,
stimulus_start_index=1,
stimulus_index_count=3,
response=response,
response_start_index=2,
response_index_count=3,
id=11,
)
rowindex3 = nwbfile.add_intracellular_recording(electrode=electrode, response=response, id=12)
nwbfile.intracellular_recordings.add_column(
name="recording_tag",
data=["A1", "A2", "A3"],
description="String with a recording tag",
)
location_column = VectorData(
name="location",
data=["Mordor", "Gondor", "Rohan"],
description="Recording location in Middle Earth",
)
lab_category = DynamicTable(
name="recording_lab_data",
description="category table for lab-specific recording metadata",
colnames=[
"location",
],
columns=[
location_column,
],
)
# Add the table as a new category to our intracellular_recordings
nwbfile.intracellular_recordings.add_category(category=lab_category)
nwbfile.intracellular_recordings.add_column(
name="voltage_threshold",
data=[0.1, 0.12, 0.13],
description="Just an example column on the electrodes category table",
category="electrodes",
)
stimulus_template = VoltageClampStimulusSeries(
name="ccst",
data=[0, 1, 2, 3, 4],
starting_time=0.0,
rate=10e3,
electrode=electrode,
gain=0.02,
)
nwbfile.add_stimulus_template(stimulus_template)
nwbfile.intracellular_recordings.add_column(
name="stimulus_template",
data=[
TimeSeriesReference(0, 5, stimulus_template),
# (start_index, index_count, stimulus_template)
TimeSeriesReference(1, 3, stimulus_template),
TimeSeriesReference.empty(stimulus_template),
],
# if there was no data for that recording, use empty reference
description=(
"Column storing the reference to the stimulus template for the recording (rows)."
),
category="stimuli",
col_cls=TimeSeriesReferenceVectorData,
)
icephys_simultaneous_recordings = nwbfile.get_icephys_simultaneous_recordings()
icephys_simultaneous_recordings.add_column(
name="simultaneous_recording_tag",
description="A custom tag for simultaneous_recordings",
)
simultaneous_index = nwbfile.add_icephys_simultaneous_recording(
recordings=[rowindex, rowindex2, rowindex3],
id=12,
simultaneous_recording_tag="LabTag1",
)
repetition_index = nwbfile.add_icephys_repetition(
sequential_recordings=[simultaneous_index], id=17
)
nwbfile.add_icephys_experimental_condition(repetitions=[repetition_index], id=19)
nwbfile.icephys_experimental_conditions.add_column(
name="tag",
data=np.arange(1),
description="integer tag for a experimental condition",
)
return nwbfile
def _nwb_ca_imaging(nwbfile: NWBFile) -> NWBFile:
"""
Calcium Imaging
https://pynwb.readthedocs.io/en/latest/tutorials/domain/ophys.html
"""
generator = np.random.default_rng()
device = nwbfile.create_device(
name="Microscope",
description="My two-photon microscope",
manufacturer="The best microscope manufacturer",
)
optical_channel = OpticalChannel(
name="OpticalChannel",
description="an optical channel",
emission_lambda=500.0,
)
imaging_plane = nwbfile.create_imaging_plane(
name="ImagingPlane",
optical_channel=optical_channel,
imaging_rate=30.0,
description="a very interesting part of the brain",
device=device,
excitation_lambda=600.0,
indicator="GFP",
location="V1",
grid_spacing=[0.01, 0.01],
grid_spacing_unit="meters",
origin_coords=[1.0, 2.0, 3.0],
origin_coords_unit="meters",
)
one_p_series = OnePhotonSeries(
name="OnePhotonSeries",
description="Raw 1p data",
data=np.ones((1000, 100, 100)),
imaging_plane=imaging_plane,
rate=1.0,
unit="normalized amplitude",
)
nwbfile.add_acquisition(one_p_series)
two_p_series = TwoPhotonSeries(
name="TwoPhotonSeries",
description="Raw 2p data",
data=np.ones((1000, 100, 100)),
imaging_plane=imaging_plane,
rate=1.0,
unit="normalized amplitude",
)
nwbfile.add_acquisition(two_p_series)
corrected = ImageSeries(
name="corrected", # this must be named "corrected"
description="A motion corrected image stack",
data=np.ones((1000, 100, 100)),
unit="na",
format="raw",
starting_time=0.0,
rate=1.0,
)
xy_translation = TimeSeries(
name="xy_translation",
description="x,y translation in pixels",
data=np.ones((1000, 2)),
unit="pixels",
starting_time=0.0,
rate=1.0,
)
corrected_image_stack = CorrectedImageStack(
corrected=corrected,
original=one_p_series,
xy_translation=xy_translation,
)
motion_correction = MotionCorrection(corrected_image_stacks=[corrected_image_stack])
ophys_module = nwbfile.create_processing_module(
name="ophys", description="optical physiology processed data"
)
ophys_module.add(motion_correction)
img_seg = ImageSegmentation()
ps = img_seg.create_plane_segmentation(
name="PlaneSegmentation",
description="output from segmenting my favorite imaging plane",
imaging_plane=imaging_plane,
reference_images=one_p_series, # optional
)
ophys_module.add(img_seg)
for _ in range(30):
image_mask = np.zeros((100, 100))
# randomly generate example image masks
x = generator.integers(0, 95)
y = generator.integers(0, 95)
image_mask[x : x + 5, y : y + 5] = 1
# add image mask to plane segmentation
ps.add_roi(image_mask=image_mask)
ps2 = img_seg.create_plane_segmentation(
name="PlaneSegmentation2",
description="output from segmenting my favorite imaging plane",
imaging_plane=imaging_plane,
reference_images=one_p_series, # optional
)
for _ in range(30):
# randomly generate example starting points for region
x = generator.integers(0, 95)
y = generator.integers(0, 95)
# define an example 4 x 3 region of pixels of weight '1'
pixel_mask = [(ix, iy, 1) for ix in range(x, x + 4) for iy in range(y, y + 3)]
# add pixel mask to plane segmentation
ps2.add_roi(pixel_mask=pixel_mask)
ps3 = img_seg.create_plane_segmentation(
name="PlaneSegmentation3",
description="output from segmenting my favorite imaging plane",
imaging_plane=imaging_plane,
reference_images=one_p_series, # optional
)
for _ in range(30):
# randomly generate example starting points for region
x = generator.integers(0, 95)
y = generator.integers(0, 95)
z = generator.integers(0, 15)
# define an example 4 x 3 x 2 voxel region of weight '0.5'
voxel_mask = []
for ix, iy, iz in product(range(x, x + 4), range(y, y + 3), range(z, z + 2)):
voxel_mask.append((ix, iy, iz, 0.5))
# add voxel mask to plane segmentation
ps3.add_roi(voxel_mask=voxel_mask)
rt_region = ps.create_roi_table_region(region=[0, 1], description="the first of two ROIs")
roi_resp_series = RoiResponseSeries(
name="RoiResponseSeries",
description="Fluorescence responses for two ROIs",
data=np.ones((50, 2)), # 50 samples, 2 ROIs
rois=rt_region,
unit="lumens",
rate=30.0,
)
fl = Fluorescence(roi_response_series=roi_resp_series)
ophys_module.add(fl)
return nwbfile
@pytest.fixture(scope="session")
def nwb_file(tmp_output_dir, nwb_file_base, request: pytest.FixtureRequest) -> Path:
"""
NWB File created with pynwb that uses all the weird language features
Borrowing code from pynwb docs in one humonogous fixture function
since there's not really a reason to
"""
nwb_path = tmp_output_dir / "test_nwb.nwb"
2024-09-02 20:41:04 +00:00
if nwb_path.exists() and not request.config.getoption("--clean"):
return nwb_path
nwbfile = nwb_file_base
nwbfile = _nwb_timeseries(nwbfile)
nwbfile = _nwb_position(nwbfile)
nwbfile = _nwb_ecephys(nwbfile)
nwbfile = _nwb_units(nwbfile)
nwbfile = _nwb_icephys(nwbfile)
with NWBHDF5IO(nwb_path, "w") as io:
io.write(nwbfile)
return nwb_path