Source code for miv_simulator.clamps.cell

from typing import Dict, List, Optional, Tuple, Union

import uuid

import numpy as np
from miv_simulator import cells, synapses
from miv_simulator.cells import BiophysCell
from miv_simulator.env import Env
from miv_simulator.synapses import get_syn_filter_dict
from miv_simulator.utils import (
    AbstractEnv,
    Context,
    config_logging,
    get_module_logger,
)
from miv_simulator.utils import io as io_utils
from miv_simulator.utils import is_interactive
from miv_simulator.utils import neuron as neuron_utils
from miv_simulator.utils.neuron import (
    configure_hoc_env,
    h,
    make_rec,
    run_iclamp,
)
from mpi4py import MPI  # Must come before importing NEURON
from neuroh5.io import append_cell_attributes
from neuron import h
from numpy import ndarray
from scipy.optimize import curve_fit

# This logger will inherit its settings from the root logger, created in miv_simulator.env
logger = get_module_logger(__name__)
if hasattr(h, "nrnmpi_init"):
    h.nrnmpi_init()

context = Context()


[docs]def init_biophys_cell( env: AbstractEnv, pop_name: str, gid: int, load_weights: bool = True, load_connections: bool = True, register_cell: bool = True, write_cell: bool = False, validate_tree: bool = True, cell_dict: Dict[ str, Optional[ Union[ Dict[ str, Union[ ndarray, Dict[str, Union[int, Dict[int, ndarray], ndarray]], ], ], Dict[str, ndarray], Tuple[ Dict[ str, Dict[ str, List[ Tuple[ int, Tuple[ndarray, Dict[str, List[ndarray]]], ] ], ], ], Dict[str, Dict[str, Dict[str, Dict[str, int]]]], ], ] ], ] = {}, ) -> BiophysCell: """ Instantiates a BiophysCell instance and all its synapses. :param env: an instance of env.Env :param pop_name: population name :param gid: gid :param load_connections: bool :param register_cell: bool :param validate_tree: bool :param write_cell: bool :param cell_dict: dict Environment can be instantiated as: env = Env(config_file, template_paths, dataset_prefix) :param template_paths: str; colon-separated list of paths to directories containing hoc cell templates :param dataset_prefix: str; path to directory containing required neuroh5 data files """ rank = int(env.pc.id()) ## Determine template name for this cell type template_name = env.celltypes[pop_name]["template"] ## Determine if a mechanism configuration file exists for this cell type mech_dict = None if "mech_file_path" in env.celltypes[pop_name]: mech_dict = env.celltypes[pop_name]["mech_dict"] ## Determine if correct_for_spines flag has been specified for this cell type synapse_config = env.celltypes[pop_name]["synapses"] if "correct_for_spines" in synapse_config: correct_for_spines_flag = synapse_config["correct_for_spines"] else: correct_for_spines_flag = False ## Load cell gid and its synaptic attributes and connection data if template_name.lower() == "pr_nrn": cell = cells.make_PR_cell( env, pop_name, gid, tree_dict=cell_dict.get("morph", None), synapses_dict=cell_dict.get("synapse", None), connection_graph=cell_dict.get("connectivity", None), weight_dict=cell_dict.get("weight", None), mech_dict=mech_dict, load_synapses=True, load_weights=load_weights, load_edges=load_connections, ) elif template_name.lower() == "sc_nrn": cell = cells.make_SC_cell( env, pop_name, gid, tree_dict=cell_dict.get("morph", None), synapses_dict=cell_dict.get("synapse", None), connection_graph=cell_dict.get("connectivity", None), weight_dict=cell_dict.get("weight", None), mech_dict=mech_dict, load_synapses=True, load_weights=load_weights, load_edges=load_connections, ) else: cell = cells.make_biophys_cell( env, pop_name, gid, tree_dict=cell_dict.get("morph", None), synapses_dict=cell_dict.get("synapse", None), connection_graph=cell_dict.get("connectivity", None), weight_dict=cell_dict.get("weight", None), mech_dict=mech_dict, load_synapses=True, load_weights=load_weights, load_edges=load_connections, validate_tree=validate_tree, ) cells.init_biophysics( cell, reset_cable=True, correct_cm=correct_for_spines_flag, correct_g_pas=correct_for_spines_flag, env=env, ) synapses.init_syn_mech_attrs(cell, env) if register_cell: cells.register_cell(env, pop_name, gid, cell) is_reduced = False if hasattr(cell, "is_reduced"): is_reduced = cell.is_reduced if not is_reduced: cells.report_topology(env, cell) env.cell_selection[pop_name] = [gid] if is_interactive: context.update(locals()) if write_cell: write_selection_file_path = "%s/%s_%d.h5" % ( env.results_path, env.modelName, gid, ) if rank == 0: io_utils.mkout(env, write_selection_file_path) env.comm.barrier() io_utils.write_cell_selection(env, write_selection_file_path) if load_connections: io_utils.write_connection_selection(env, write_selection_file_path) return cell
[docs]def measure_deflection(t, v, t0, t1, stim_amp=None): """Measure voltage deflection (min or max, between start and end).""" start_index = int(np.argwhere(t >= t0 * 0.999)[0]) end_index = int(np.argwhere(t >= t1 * 0.999)[0]) deflect_fn = np.argmin if stim_amp is not None and (stim_amp > 0): deflect_fn = np.argmax v_window = v[start_index:end_index] peak_index = deflect_fn(v_window) + start_index return { "t_peak": t[peak_index], "v_peak": v[peak_index], "peak_index": peak_index, "t_baseline": t[start_index], "v_baseline": v[start_index], "baseline_index": start_index, "stim_amp": stim_amp, }
## ## Code based on https://www.github.com/AllenInstitute/ipfx/ipfx/subthresh_features.py ##
[docs]def fit_membrane_time_constant(t, v, t0, t1, rmse_max_tol=1.0): """Fit an exponential to estimate membrane time constant between start and end Parameters ---------- v : numpy array of voltages in mV t : numpy array of times in ms t0 : start of time window for exponential fit t1 : end of time window for exponential fit rsme_max_tol: minimal acceptable root mean square error (default 1e-4) Returns ------- a, inv_tau, y0 : Coefficients of equation y0 + a * exp(-inv_tau * x) returns np.nan for values if fit fails """ def exp_curve(x, a, inv_tau, y0): return y0 + a * np.exp(-inv_tau * x) start_index = int(np.argwhere(t >= t0 * 0.999)[0]) end_index = int(np.argwhere(t >= t1 * 0.999)[0]) p0 = (v[start_index] - v[end_index], 0.1, v[end_index]) t_window = (t[start_index:end_index] - t[start_index]).astype(np.float64) v_window = v[start_index:end_index].astype(np.float64) try: popt, pcov = curve_fit(exp_curve, t_window, v_window, p0=p0) except RuntimeError: logging.info("Curve fit for membrane time constant failed") return np.nan, np.nan, np.nan pred = exp_curve(t_window, *popt) rmse = np.sqrt(np.mean((pred - v_window) ** 2)) if rmse > rmse_max_tol: logging.debug( "RMSE %f for the Curve fit for membrane time constant exceeded the maximum tolerance of %f" % (rmse, rmse_max_tol) ) return np.nan, np.nan, np.nan return popt
[docs]def measure_time_constant( t, v, t0, t1, stim_amp, frac=0.1, baseline_interval=100.0, min_snr=20.0 ): """Calculate the membrane time constant by fitting the voltage response with a single exponential. Parameters ---------- v : numpy array of voltages in mV t : numpy array of times in ms t0 : start of stimulus interval in ms t1 : end of stimulus interval in ms stim_amp : stimulus amplitude frac : fraction of peak deflection to find to determine start of fit window. (default 0.1) baseline_interval : duration before `start` for baseline Vm calculation min_snr : minimum signal-to-noise ratio (SNR) to allow calculation of time constant. If SNR is too low, np.nan will be returned. (default 20) Returns ------- tau : membrane time constant in ms """ if np.max(t) < t0 or np.max(t) < t1: logging.debug( "measure_time_constant: time series ends before t0 = {t0} or t1 = {t1}" ) return np.nan # Assumes this is being done on a hyperpolarizing step deflection_results = measure_deflection(t, v, t0, t1, stim_amp) v_peak = deflection_results["v_peak"] peak_index = deflection_results["peak_index"] v_baseline = deflection_results["v_baseline"] start_index = deflection_results["baseline_index"] # Check that SNR is high enough to proceed signal = np.abs(v_baseline - v_peak) noise_interval_start_index = int( np.argwhere(t >= (t0 - baseline_interval) * 0.999)[0] ) noise = np.std(v[noise_interval_start_index:start_index]) t_noise_start = t[noise_interval_start_index] if noise == 0: # noiseless - likely a deterministic model snr = np.inf else: snr = signal / noise if snr < min_snr: logging.debug( "measure_time_constant: signal-to-noise ratio too low for time constant estimate ({:g} < {:g})".format( snr, min_snr ) ) return np.nan search_result = np.flatnonzero( v[start_index:] <= frac * (v_peak - v_baseline) + v_baseline ) if not search_result.size: logger.debug( "measure_time_constant: could not find interval for time constant estimate" ) return np.nan fit_start_index = search_result[0] + start_index fit_end_index = peak_index fit_start = t[fit_start_index] fit_end = t[fit_end_index] a, inv_tau, y0 = fit_membrane_time_constant(t, v, fit_start, fit_end) return 1.0 / inv_tau
def measure_passive( gid, pop_name, v_init, env: AbstractEnv, prelength=1000.0, mainlength=3000.0, stimdur=1000.0, stim_amp=-0.1, cell_dict={}, ): biophys_cell = init_biophys_cell( env, pop_name, gid, register_cell=False, cell_dict=cell_dict ) hoc_cell = biophys_cell.hoc_cell iclamp_res = run_iclamp( hoc_cell, prelength=prelength, mainlength=mainlength, stimdur=stimdur, stim_amp=stim_amp, ) t = iclamp_res["t"] v = iclamp_res["v"] t0 = iclamp_res["t0"] t1 = iclamp_res["t1"] if np.max(t) < t0 or np.max(t) < t1: logging.debug( "measure_passive: time series ends before t0 = {t0} or t1 = {t1}" ) return {"Rinp": np.nan, "tau": np.nan} deflection_results = measure_deflection(t, v, t0, t1, stim_amp=stim_amp) v_peak = deflection_results["v_peak"] v_baseline = deflection_results["v_baseline"] Rin = (v_peak - v_baseline) / stim_amp tau0 = measure_time_constant(t, v, t0, t1, stim_amp) results = { "Rin": np.asarray([Rin], dtype=np.float32), "tau0": np.asarray([tau0], dtype=np.float32), } logger.info(f"results = {results}") env.synapse_attributes.del_syn_id_attr_dict(gid) if gid in env.biophys_cells[pop_name]: del env.biophys_cells[pop_name][gid] return results def measure_ap(gid, pop_name, v_init, env: AbstractEnv, cell_dict={}): biophys_cell = init_biophys_cell( env, pop_name, gid, register_cell=False, cell_dict=cell_dict ) hoc_cell = biophys_cell.hoc_cell h.dt = env.dt prelength = 100.0 stimdur = 10.0 soma = list(hoc_cell.soma)[0] initial_amp = 0.05 h.tlog = h.Vector() h.tlog.record(h._ref_t) h.Vlog = h.Vector() h.Vlog.record(soma(0.5)._ref_v) thr = cells.find_spike_threshold_minimum( hoc_cell, loc=0.5, sec=soma, duration=stimdur, initial_amp=initial_amp ) results = { "spike threshold current": np.asarray([thr], dtype=np.float32), "spike threshold trace t": np.asarray( h.tlog.to_python(), dtype=np.float32 ), "spike threshold trace v": np.asarray( h.Vlog.to_python(), dtype=np.float32 ), } env.synapse_attributes.del_syn_id_attr_dict(gid) if gid in env.biophys_cells[pop_name]: del env.biophys_cells[pop_name][gid] return results def measure_ap_rate( gid, pop_name, v_init, env: AbstractEnv, prelength=1000.0, mainlength=3000.0, stimdur=1000.0, stim_amp=0.2, minspikes=50, maxit=5, cell_dict={}, ): biophys_cell = init_biophys_cell( env, pop_name, gid, register_cell=False, cell_dict=cell_dict ) hoc_cell = biophys_cell.hoc_cell tstop = prelength + mainlength soma = list(hoc_cell.soma)[0] stim1 = h.IClamp(soma(0.5)) stim1.delay = prelength stim1.dur = stimdur stim1.amp = stim_amp h("objref nil, tlog, Vlog, spikelog") h.tlog = h.Vector() h.tlog.record(h._ref_t) h.Vlog = h.Vector() h.Vlog.record(soma(0.5)._ref_v) h.spikelog = h.Vector() nc = biophys_cell.spike_detector nc.record(h.spikelog) logger.info(f"ap_rate_test: spike threshold is {nc.threshold}") h.tstop = tstop it = 1 ## Increase the injected current until at least maxspikes spikes occur ## or up to maxit steps while h.spikelog.size() < minspikes: logger.info(f"ap_rate_test: iteration {it}") h.dt = env.dt neuron_utils.simulate(v_init, prelength, mainlength) if (h.spikelog.size() < minspikes) & (it < maxit): logger.info( f"ap_rate_test: stim1.amp = {stim1.amp:.2f} spikelog.size = {h.spikelog.size()}" ) stim1.amp = stim1.amp + 0.1 h.spikelog.clear() h.tlog.clear() h.Vlog.clear() it += 1 else: break logger.info( f"ap_rate_test: stim1.amp = {stim1.amp:.2f} spikelog.size = {h.spikelog.size()}" ) isivect = h.Vector(h.spikelog.size() - 1, 0.0) tspike = h.spikelog.x[0] for i in range(1, int(h.spikelog.size())): isivect.x[i - 1] = h.spikelog.x[i] - tspike tspike = h.spikelog.x[i] isimean = isivect.mean() isivar = isivect.var() isistdev = isivect.stdev() isilast = int(isivect.size()) - 1 if isivect.size() > 10: isi10th = 10 else: isi10th = isilast ## Compute the last spike that is largest than the first one. ## This is necessary because some models generate spike doublets, ## (i.e. spike with very short distance between them, which confuse the ISI statistics. isilastgt = int(isivect.size()) - 1 while isivect.x[isilastgt] < isivect.x[1]: isilastgt = isilastgt - 1 if not (isilastgt > 0): isivect.printf() raise RuntimeError("Unable to find ISI greater than first ISI") results = { "spike_count": np.asarray([h.spikelog.size()], dtype=np.uint32), "FR_mean": np.asarray([1.0 / isimean], dtype=np.float32), "ISI_mean": np.asarray([isimean], dtype=np.float32), "ISI_var": np.asarray([isivar], dtype=np.float32), "ISI_stdev": np.asarray([isistdev], dtype=np.float32), "ISI_adaptation_1": np.asarray( [isivect.x[0] / isimean], dtype=np.float32 ), "ISI_adaptation_2": np.asarray( [isivect.x[0] / isivect.x[isilast]], dtype=np.float32 ), "ISI_adaptation_3": np.asarray( [isivect.x[0] / isivect.x[isi10th]], dtype=np.float32 ), "ISI_adaptation_4": np.asarray( [isivect.x[0] / isivect.x[isilastgt]], dtype=np.float32 ), } env.synapse_attributes.del_syn_id_attr_dict(gid) if gid in env.biophys_cells[pop_name]: del env.biophys_cells[pop_name][gid] return results def measure_fi(gid, pop_name, v_init, env: AbstractEnv, cell_dict={}): biophys_cell = init_biophys_cell( env, pop_name, gid, register_cell=False, cell_dict=cell_dict ) hoc_cell = biophys_cell.hoc_cell soma = list(hoc_cell.soma)[0] h.dt = 0.025 prelength = 1000.0 mainlength = 2000.0 tstop = prelength + mainlength stimdur = 1000.0 stim1 = h.IClamp(soma(0.5)) stim1.delay = prelength stim1.dur = stimdur stim1.amp = 0.2 h("objref tlog, Vlog, spikelog") h.tlog = h.Vector() h.tlog.record(h._ref_t) h.Vlog = h.Vector() h.Vlog.record(soma(0.5)._ref_v) h.spikelog = h.Vector() nc = biophys_cell.spike_detector nc.record(h.spikelog) h.tstop = tstop frs = [] stim_amps = [stim1.amp] for it in range(1, 9): neuron_utils.simulate(v_init, prelength, mainlength) logger.info( "fi_test: stim1.amp = %g spikelog.size = %d\n" % (stim1.amp, h.spikelog.size()) ) stim1.amp = stim1.amp + 0.1 stim_amps.append(stim1.amp) frs.append(h.spikelog.size()) h.spikelog.clear() h.tlog.clear() h.Vlog.clear() results = { "FI_curve_amplitude": np.asarray(stim_amps, dtype=np.float32), "FI_curve_frequency": np.asarray(frs, dtype=np.float32), } env.synapse_attributes.del_syn_id_attr_dict(gid) if gid in env.biophys_cells[pop_name]: del env.biophys_cells[pop_name][gid] return results def measure_gap_junction_coupling(gid, population, v_init, env: AbstractEnv): h("objref gjlist, cells, Vlog1, Vlog2") pc = env.pc h.cells = h.List() h.gjlist = h.List() biophys_cell1 = init_biophys_cell( env, population, gid, register_cell=False, cell_dict=cell_dict ) hoc_cell1 = biophys_cell1.hoc_cell cell1 = cells.make_neurotree_hoc_cell(template_class, neurotree_dict=tree) cell2 = cells.make_neurotree_hoc_cell(template_class, neurotree_dict=tree) h.cells.append(cell1) h.cells.append(cell2) ggid = 20000000 source = 10422930 destination = 10422670 weight = 5.4e-4 srcsec = int(cell1.somaidx.x[0]) dstsec = int(cell2.somaidx.x[0]) stimdur = 500 tstop = 2000 pc.set_gid2node(source, int(pc.id())) nc = cell1.connect2target(h.nil) pc.cell(source, nc, 1) soma1 = list(cell1.soma)[0] pc.set_gid2node(destination, int(pc.id())) nc = cell2.connect2target(h.nil) pc.cell(destination, nc, 1) soma2 = list(cell2.soma)[0] stim1 = h.IClamp(soma1(0.5)) stim1.delay = 250 stim1.dur = stimdur stim1.amp = -0.1 stim2 = h.IClamp(soma2(0.5)) stim2.delay = 500 + stimdur stim2.dur = stimdur stim2.amp = -0.1 log_size = old_div(tstop, h.dt) + 1 h.tlog = h.Vector(log_size, 0) h.tlog.record(h._ref_t) h.Vlog1 = h.Vector(log_size) h.Vlog1.record(soma1(0.5)._ref_v) h.Vlog2 = h.Vector(log_size) h.Vlog2.record(soma2(0.5)._ref_v) gjpos = 0.5 neuron_utils.mkgap( env, cell1, source, gjpos, srcsec, ggid, ggid + 1, weight ) neuron_utils.mkgap( env, cell2, destination, gjpos, dstsec, ggid + 1, ggid, weight ) pc.setup_transfer() pc.set_maxstep(10.0) h.stdinit() h.finitialize(v_init) pc.barrier() h.tstop = tstop pc.psolve(h.tstop) def measure_psc( gid, pop_name, presyn_name, env: AbstractEnv, v_init, v_holding, load_weights=False, cell_dict={}, ): biophys_cell = init_biophys_cell( env, pop_name, gid, register_cell=False, load_weights=load_weights, cell_dict=cell_dict, ) hoc_cell = biophys_cell.hoc_cell h.dt = env.dt stimdur = 1000.0 tstop = stimdur tstart = 0.0 soma = list(hoc_cell.soma)[0] se = h.SEClamp(soma(0.5)) se.rs = 10 se.dur = stimdur se.amp1 = v_holding h("objref nil, tlog, ilog, Vlog") h.tlog = h.Vector() h.tlog.record(h._ref_t) h.Vlog = h.Vector() h.Vlog.record(soma(0.5)._ref_v) h.ilog = h.Vector() ilog.record(se._ref_i) h.tstop = tstop neuron_utils.simulate(v_init, 0.0, stimdur) vec_i = h.ilog.to_python() vec_v = h.Vlog.to_python() vec_t = h.tlog.to_python() idx = np.where(vec_t > tstart)[0] vec_i = vec_i[idx] vec_v = vec_v[idx] vec_t = vec_t[idx] t_holding = vec_t[0] i_holding = vec_i[0] i_peak = np.max(np.abs(vec_i[1:])) peak_index = np.where(np.abs(vec_i) == i_peak)[0][0] t_peak = vec_t[peak_index] logger.info( "measure_psc: t_peak = %f i_holding = %f i_peak = %f" % (t_peak, i_holding, i_peak) ) amp_i = abs(i_peak - i_holding) * 1000 logger.info(f"measure_psc: amp_i = {amp_i:f}") return amp_i def measure_psp( gid, pop_name, presyn_name, syn_mech_name, swc_type, env: AbstractEnv, v_init, erev, syn_layer=None, weight=1, syn_count=1, load_weights=False, cell_dict={}, ): biophys_cell = init_biophys_cell( env, pop_name, gid, register_cell=False, load_weights=load_weights, cell_dict=cell_dict, ) synapses.config_biophys_cell_syns( env, gid, pop_name, insert=True, insert_netcons=True, insert_vecstims=True, ) hoc_cell = biophys_cell.hoc_cell h.dt = env.dt prelength = 200.0 mainlength = 50.0 rules = {"sources": [presyn_name]} if swc_type is not None: rules["swc_types"] = [swc_type] if syn_layer is not None: rules["layers"] = [syn_layer] syn_attrs = env.synapse_attributes syn_filters = get_syn_filter_dict(env, rules=rules, convert=True) syns = syn_attrs.filter_synapses(biophys_cell.gid, **syn_filters) print( "total number of %s %s synapses: %d" % (presyn_name, swc_type if swc_type is not None else "", len(syns)) ) stimvec = h.Vector() stimvec.append(prelength + 1.0) count = 0 target_syn_pps = None for target_syn_id, target_syn in iter(syns.items()): target_syn_pps = syn_attrs.get_pps(gid, target_syn_id, syn_mech_name) target_syn_nc = syn_attrs.get_netcon(gid, target_syn_id, syn_mech_name) target_syn_nc.weight[0] = weight setattr(target_syn_pps, "e", erev) vs = target_syn_nc.pre() vs.play(stimvec) if syn_count <= count: break count += 1 if target_syn_pps is None: raise RuntimeError( "measure_psp: Unable to find %s %s synaptic point process" % (presyn_name, swc_type) ) sec = target_syn_pps.get_segment().sec v_rec = make_rec( "psp", pop_name, gid, biophys_cell.hoc_cell, sec=sec, dt=env.dt, loc=0.5, param="v", ) h.tstop = mainlength + prelength h("objref nil, tlog, ilog") h.tlog = h.Vector() h.tlog.record(h._ref_t) h.ilog = h.Vector() h.ilog.record(target_syn_pps._ref_i) neuron_utils.simulate(v_init, prelength, mainlength) vec_v = np.asarray(v_rec["vec"].to_python()) vec_i = np.asarray(h.ilog.to_python()) vec_t = np.asarray(h.tlog.to_python()) idx = np.where(vec_t >= prelength)[0] vec_v = vec_v[idx][1:] vec_t = vec_t[idx][1:] vec_i = vec_i[idx][1:] i_peak_index = np.argmax(np.abs(vec_i)) i_peak = vec_i[i_peak_index] v_peak = vec_v[i_peak_index] amp_v = abs(v_peak - vec_v[0]) amp_i = abs(i_peak - vec_i[0]) print( "measure_psp: v0 = %f v_peak = %f (at t %f)" % (vec_v[0], v_peak, vec_t[i_peak_index]) ) print(f"measure_psp: i_peak = {i_peak:f} (at t {vec_t[i_peak_index]:f})") print(f"measure_psp: amp_v = {amp_v:f} amp_i = {amp_i:f}") results = { f"{presyn_name} {syn_mech_name} PSP": np.asarray( [amp_v], dtype=np.float32 ), f"{presyn_name} {syn_mech_name} PSP i": np.asarray( vec_i, dtype=np.float32 ), f"{presyn_name} {syn_mech_name} PSP v": np.asarray( vec_v, dtype=np.float32 ), f"{presyn_name} {syn_mech_name} PSP t": np.asarray( vec_t, dtype=np.float32 ), } env.synapse_attributes.del_syn_id_attr_dict(gid) if gid in env.biophys_cells[pop_name]: del env.biophys_cells[pop_name][gid] return results def cell_clamps( config_file, erev, population, presyn_name, gid, load_weights, measurements, template_paths, dataset_prefix, results_path, results_file_id, results_namespace_id, syn_mech_name, syn_weight, syn_count, syn_layer, swc_type, stim_amp, v_init, dt, use_cvode, verbose, config_prefix="", ): config_logging(verbose) if results_file_id is None: results_file_id = uuid.uuid4() if results_namespace_id is None: results_namespace_id = "Cell Clamp Results" comm = MPI.COMM_WORLD np.seterr(all="raise") params = dict(locals()) params["config"] = params.pop("config_file") env = Env(**params) configure_hoc_env(env) io_utils.mkout(env, env.results_file_path) env.cell_selection = {} if measurements is not None: measurements = [x.strip() for x in measurements.split(",")] attr_dict = {} attr_dict[gid] = {} if "passive" in measurements: attr_dict[gid].update(measure_passive(gid, population, v_init, env)) if "ap" in measurements: attr_dict[gid].update(measure_ap(gid, population, v_init, env)) if "ap_rate" in measurements: logger.info("ap_rate") attr_dict[gid].update( measure_ap_rate(gid, population, v_init, env, stim_amp=stim_amp) ) if "fi" in measurements: attr_dict[gid].update(measure_fi(gid, population, v_init, env)) if "gap" in measurements: measure_gap_junction_coupling(gid, population, v_init, env) if "psp" in measurements: assert presyn_name is not None assert syn_mech_name is not None assert erev is not None assert syn_weight is not None attr_dict[gid].update( measure_psp( gid, population, presyn_name, syn_mech_name, swc_type, env, v_init, erev, syn_layer=syn_layer, syn_count=syn_count, weight=syn_weight, load_weights=load_weights, ) ) if results_path is not None: append_cell_attributes( env.results_file_path, population, attr_dict, namespace=env.results_namespace_id, comm=env.comm, io_size=env.io_size, )