import abc
import numbers

import sympy
from numpy import ones, array, arange, concatenate, mean, argmin, nanmin, reshape, zeros, sqrt, ndarray, broadcast_to

from brian2.parsing.sympytools import sympy_to_str, str_to_sympy
from brian2.units.fundamentalunits import DIMENSIONLESS, get_dimensions, fail_for_dimension_mismatch
from brian2.utils.stringtools import get_identifiers

from brian2 import (NeuronGroup, defaultclock, get_device, Network,
                    StateMonitor, SpikeMonitor, second, get_local_namespace,
                    Quantity, get_logger)
from brian2.input import TimedArray
from brian2.equations.equations import Equations, SUBEXPRESSION
from brian2.devices import set_device, reset_device, device
from brian2.devices.cpp_standalone.device import CPPStandaloneDevice
from brian2.core.functions import Function
from .simulator import RuntimeSimulator, CPPStandaloneSimulator
from .metric import Metric, SpikeMetric, TraceMetric, MSEMetric, normalize_weights
from .optimizer import Optimizer
from .utils import callback_setup, make_dic

logger = get_logger(__name__)

[docs]def get_param_dic(params, param_names, n_traces, n_samples): """Transform parameters into a dictionary of appropiate size""" params = array(params) d = dict() for name, value in zip(param_names, params.T): d[name] = (ones((n_traces, n_samples)) * value).T.flatten() return d
[docs]def get_spikes(monitor, n_samples, n_traces): """ Get spikes from spike monitor change format from dict to a list, remove units. """ spike_trains = monitor.spike_trains() assert len(spike_trains) == n_samples*n_traces spikes = [] i = -1 for sample in range(n_samples): sample_spikes = [] for trace in range(n_traces): i += 1 sample_spikes.append(array(spike_trains[i], copy=False)) spikes.append(sample_spikes) return spikes
[docs]def get_full_namespace(additional_namespace, level=0): # Get the local namespace with all the values that could be relevant # in principle -- by filtering things out, we avoid circular loops namespace = {key: value for key, value in get_local_namespace(level=level + 1).items() if isinstance(value, (Quantity, numbers.Number, Function))} namespace.update(additional_namespace) return namespace
[docs]def setup_fit(): """ Function sets up simulator in one of the two available modes: runtime or standalone. The `.Simulator` that will be used depends on the currently set `.Device`. In the case of `.CPPStandaloneDevice`, the device will also be reset if it has already run a simulation. Returns ------- simulator : `.Simulator` """ simulators = { 'CPPStandaloneDevice': CPPStandaloneSimulator(), 'RuntimeDevice': RuntimeSimulator() } if isinstance(get_device(), CPPStandaloneDevice): if device.has_been_run is True: build_options = dict(device.build_options) get_device().reinit() get_device().activate(**build_options) return simulators[get_device().__class__.__name__]
[docs]def get_sensitivity_equations(group, parameters, namespace=None, level=1, optimize=True): """ Get equations for sensitivity variables. Parameters ---------- group : `NeuronGroup` The group of neurons that will be simulated. parameters : list of str Names of the parameters that are fit. namespace : dict, optional The namespace to use. level : `int`, optional How much farther to go down in the stack to find the namespace. optimize : bool, optional Whether to remove sensitivity variables from the equations that do not evolve if initialized to zero (e.g. ``dS_x_y/dt = -S_x_y/tau`` would be removed). This avoids unnecessary computation but will fail in the rare case that such a sensitivity variable needs to be initialized to a non-zero value. Defaults to ``True``. Returns ------- sensitivity_eqs : `Equations` The equations for the sensitivity variables. """ if namespace is None: namespace = get_local_namespace(level) namespace.update(group.namespace) eqs = group.equations diff_eqs = eqs.get_substituted_expressions(group.variables) diff_eq_names = [name for name, _ in diff_eqs] system = sympy.Matrix([str_to_sympy(diff_eq[1].code) for diff_eq in diff_eqs]) J = system.jacobian([str_to_sympy(d) for d in diff_eq_names]) sensitivity = [] sensitivity_names = [] for parameter in parameters: F = system.jacobian([str_to_sympy(parameter)]) names = [str_to_sympy(f'S_{diff_eq_name}_{parameter}') for diff_eq_name in diff_eq_names] sensitivity.append(J * sympy.Matrix(names) + F) sensitivity_names.append(names) new_eqs = [] for names, sensitivity_eqs, param in zip(sensitivity_names, sensitivity, parameters): for name, eq, orig_var in zip(names, sensitivity_eqs, diff_eq_names): if param in namespace: unit = eqs[orig_var].dim / namespace[param].dim elif param in group.variables: unit = eqs[orig_var].dim / group.variables[param].dim else: raise AssertionError(f'Parameter {param} neither in namespace nor variables') unit = repr(unit) if not unit.is_dimensionless else '1' if optimize: # Check if the equation stays at zero if initialized at zero zeroed = eq.subs(name, sympy.S.Zero) if zeroed == sympy.S.Zero: # No need to include equation as differential equation if unit == '1': new_eqs.append(f'{sympy_to_str(name)} = 0 : {unit}') else: new_eqs.append(f'{sympy_to_str(name)} = 0*{unit} : {unit}') continue rhs = sympy_to_str(eq) if rhs == '0': # avoid unit mismatch rhs = f'0*{unit}/second' new_eqs.append('d{lhs}/dt = {rhs} : {unit}'.format(lhs=sympy_to_str(name), rhs=rhs, unit=unit)) new_eqs = Equations('\n'.join(new_eqs)) return new_eqs
[docs]def get_sensitivity_init(group, parameters, param_init): """ Calculate the initial values for the sensitivity parameters (necessary if initial values are functions of parameters). Parameters ---------- group : `NeuronGroup` The group of neurons that will be simulated. parameters : list of str Names of the parameters that are fit. param_init : dict The dictionary with expressions to initialize the model variables. Returns ------- sensitivity_init : dict Dictionary of expressions to initialize the sensitivity parameters. """ sensitivity_dict = {} for var_name, expr in param_init.items(): if not isinstance(expr, str): continue identifiers = get_identifiers(expr) for identifier in identifiers: if (identifier in group.variables and getattr(group.variables[identifier], 'type', None) == SUBEXPRESSION): raise NotImplementedError('Initializations that refer to a ' 'subexpression are currently not ' 'supported') sympy_expr = str_to_sympy(expr) for parameter in parameters: diffed = sympy_expr.diff(str_to_sympy(parameter)) if diffed != sympy.S.Zero: if getattr(group.variables[parameter], 'type', None) == SUBEXPRESSION: raise NotImplementedError('Sensitivity ' f'S_{var_name}_{parameter} ' 'is initialized to a non-zero ' 'value, but it has been ' 'removed from the equations. ' 'Set optimize=False to avoid ' 'this.') init_expr = sympy_to_str(diffed) sensitivity_dict[f'S_{var_name}_{parameter}'] = init_expr return sensitivity_dict
[docs]class Fitter(metaclass=abc.ABCMeta): """ Base Fitter class for model fitting applications. Creates an interface for model fitting of traces with parameters draw by gradient-free algorithms (through ask/tell interfaces). Initiates n_neurons = num input traces * num samples, to which drawn parameters get assigned and evaluates them in parallel. Parameters ---------- dt : `~brian2.units.fundamentalunits.Quantity` The size of the time step. model : `~brian2.equations.equations.Equations` or str The equations describing the model. input : `~numpy.ndarray` or `~brian2.units.fundamentalunits.Quantity` A 2D array of shape ``(n_traces, time steps)`` given the input that will be fed into the model. output : `~brian2.units.fundamentalunits.Quantity` or list Recorded output of the model that the model should reproduce. Should be a 2D array of the same shape as the input when fitting traces with `TraceFitter`, a list of spike times when fitting spike trains with `SpikeFitter`. input_var : str The name of the input variable in the model. Note that this variable should be *used* in the model (e.g. a variable ``I`` that is added as a current in the membrane potential equation), but not *defined*. output_var : str The name of the output variable in the model. Only needed when fitting traces with `.TraceFitter`. n_samples: int Number of parameter samples to be optimized over in a single iteration. threshold: `str`, optional The condition which produces spikes. Should be a boolean expression as a string. reset: `str`, optional The (possibly multi-line) string with the code to execute on reset. refractory: `str` or `~brian2.units.fundamentalunits.Quantity`, optional Either the length of the refractory period (e.g. 2*ms), a string expression that evaluates to the length of the refractory period after each spike (e.g. '(1 + rand())*ms'), or a string expression evaluating to a boolean value, given the condition under which the neuron stays refractory after a spike (e.g. 'v > -20*mV') method: `str`, optional Integration method penalty : str, optional The name of a variable or subexpression in the model that will be added to the error at the end of each iteration. Note that only the term at the end of the simulation is taken into account, so this term should not be varying in time. param_init: `dict`, optional Dictionary of variables to be initialized with respective values """ def __init__(self, dt, model, input, output, input_var, output_var, n_samples, threshold, reset, refractory, method, param_init, penalty, use_units=True): """Initialize the fitter.""" if dt is None: raise ValueError("dt-sampling frequency of the input must be set") if isinstance(model, str): model = Equations(model) if input_var not in model.identifiers: raise NameError("%s is not an identifier in the model" % input_var) self.dt = dt self.simulator = None self.parameter_names = model.parameter_names self.n_traces, n_steps = input.shape self.duration = n_steps * dt self.n_neurons = self.n_traces * n_samples self.n_samples = n_samples self.method = method self.threshold = threshold self.reset = reset self.refractory = refractory self.penalty = penalty self.input = input self.output_var = output_var if output_var == 'spikes': self.output_dim = DIMENSIONLESS else: self.output_dim = model[output_var].dim fail_for_dimension_mismatch(output, self.output_dim, 'The provided target values ' '("output") need to have the same ' 'units as the variable ' '{}'.format(output_var)) self.model = model self.use_units = use_units self.iteration = 0 input_dim = get_dimensions(input) input_dim = '1' if input_dim is DIMENSIONLESS else repr(input_dim) input_eqs = "{} = input_var(t, i % n_traces) : {}".format(input_var, input_dim) self.model += input_eqs if output_var != 'spikes': # For approaches that couple the system to the target values, # provide a convenient variable output_expr = 'output_var(t, i % n_traces)' output_dim = ('1' if self.output_dim is DIMENSIONLESS else repr(self.output_dim)) output_eqs = "{}_target = {} : {}".format(output_var, output_expr, output_dim) self.model += output_eqs input_traces = TimedArray(input.transpose(), dt=dt) self.input_traces = input_traces # initialization of attributes used later self._best_params = None self._best_error = None self.optimizer = None self.metric = None if not param_init: param_init = {} for param, val in param_init.items(): if not (param in self.model.diff_eq_names or param in self.model.parameter_names): raise ValueError("%s is not a model variable or a " "parameter in the model" % param) self.param_init = param_init
[docs] def setup_simulator(self, network_name, n_neurons, output_var, param_init, calc_gradient=False, optimize=True, online_error=False, level=1): simulator = setup_fit() namespace = get_full_namespace({'input_var': self.input_traces, 'n_traces': self.n_traces}, level=level+1) if hasattr(self, 't_start'): # OnlineTraceFitter namespace['t_start'] = self.t_start if self.output_var != 'spikes': namespace['output_var'] = TimedArray(self.output.transpose(), dt=self.dt) neurons = self.setup_neuron_group(n_neurons, namespace, calc_gradient=calc_gradient, optimize=optimize, online_error=online_error) network = Network(neurons) if isinstance(output_var, str): output_var = [output_var] if 'spikes' in output_var: network.add(SpikeMonitor(neurons, name='spikemonitor')) record_vars = [v for v in output_var if v != 'spikes'] if calc_gradient: if not len(output_var) == 1: raise AssertionError('Cannot calculate gradient with multiple ' 'output variables.') record_vars.extend([f'S_{output_var[0]}_{p}' for p in self.parameter_names]) if len(record_vars): network.add(StateMonitor(neurons, record_vars, record=True, name='statemonitor', dt=self.dt)) if calc_gradient: param_init = dict(param_init) param_init.update(get_sensitivity_init(neurons, self.parameter_names, param_init)) simulator.initialize(network, param_init, name=network_name) return simulator
[docs] def setup_neuron_group(self, n_neurons, namespace, calc_gradient=False, optimize=True, online_error=False, name='neurons'): """ Setup neuron group, initialize required number of neurons, create namespace and initialize the parameters. Parameters ---------- n_neurons: int number of required neurons **namespace arguments to be added to NeuronGroup namespace Returns ------- neurons : ~brian2.groups.neurongroup.NeuronGroup group of neurons """ # We only want to specify the method argument if it is not None – # otherwise it should use NeuronGroup's default value kwds = {} if self.method is not None: kwds['method'] = self.method model = self.model + Equations('iteration : integer (constant, shared)') neurons = NeuronGroup(n_neurons, model, threshold=self.threshold, reset=self.reset, refractory=self.refractory, name=name, namespace=namespace, dt=self.dt, **kwds) if calc_gradient: sensitivity_eqs = get_sensitivity_equations(neurons, parameters=self.parameter_names, optimize=optimize, namespace=namespace) neurons = NeuronGroup(n_neurons, model + sensitivity_eqs, threshold=self.threshold, reset=self.reset, refractory=self.refractory, name=name, namespace=namespace, dt=self.dt, **kwds) if online_error: neurons.run_regularly('total_error += ({} - {}_target)**2 * ' 'int(t>=t_start)'.format(self.output_var, self.output_var), when='end') return neurons
[docs] @abc.abstractmethod def calc_errors(self, metric): """ Abstract method required in all Fitter classes, used for calculating errors Parameters ---------- metric: `~.Metric` children Child of Metric class, specifies optimization metric """ pass
[docs] def optimization_iter(self, optimizer, metric, penalty): """ Function performs all operations required for one iteration of optimization. Drawing parameters, setting them to simulator and calulating the error. Parameters ---------- optimizer : `Optimizer` metric : `Metric` penalty : str, optional The name of a variable or subexpression in the model that will be added to the error at the end of each iteration. Note that only the term at the end of the simulation is taken into account, so this term should not be varying in time. Returns ------- results : list recommended parameters parameters: list of list drawn parameters errors: list calculated errors """ parameters = optimizer.ask(n_samples=self.n_samples) d_param = get_param_dic(parameters, self.parameter_names, self.n_traces, self.n_samples), d_param, self.parameter_names, iteration=self.iteration) errors = self.calc_errors(metric) if penalty is not None: error_penalty = getattr(self.simulator.neurons, penalty + '_') if self.use_units: error_dim = metric.get_dimensions(self.output_dim) penalty_dim = self.simulator.neurons.variables[penalty].dim fail_for_dimension_mismatch(error_dim, penalty_dim, error_message='The term used as penalty has to have ' 'the same units as the error.') errors += error_penalty optimizer.tell(parameters, errors) results = optimizer.recommend() return results, parameters, errors
[docs] def fit(self, optimizer, metric=None, n_rounds=1, callback='text', restart=False, online_error=False, start_iteration=None, penalty=None, level=0, **params): """ Run the optimization algorithm for given amount of rounds with given number of samples drawn. Return best set of parameters and corresponding error. Parameters ---------- optimizer: `~.Optimizer` children Child of Optimizer class, specific for each library. metric: `~.Metric` children Child of Metric class, specifies optimization metric n_rounds: int Number of rounds to optimize over (feedback provided over each round). callback: `str` or `~typing.Callable` Either the name of a provided callback function (``text`` or ``progressbar``), or a custom feedback function ``func(parameters, errors, best_parameters, best_error, index)``. If this function returns ``True`` the fitting execution is interrupted. restart: bool Flag that reinitializes the Fitter to reset the optimization. With restart True user is allowed to change optimizer/metric. online_error: bool, optional Whether to calculate the squared error between target trace and simulated trace online. Defaults to ``False``. start_iteration: int, optional A value for the ``iteration`` variable at the first iteration. If not given, will use 0 for the first call of ``fit`` (and for later calls when ``restart`` is specified). Later calls will continue to increase the value from the previous calls. penalty : str, optional The name of a variable or subexpression in the model that will be added to the error at the end of each iteration. Note that only the term at the end of the simulation is taken into account, so this term should not be varying in time. If not given, will reuse the value specified during ``Fitter`` initialization. level : `int`, optional How much farther to go down in the stack to find the namespace. **params bounds for each parameter Returns ------- best_results : dict dictionary with best parameter set error: float error value for best parameter set """ if not (isinstance(metric, Metric) or metric is None): raise TypeError("metric has to be a child of class Metric or None " "for OnlineTraceFitter") if not (isinstance(optimizer, Optimizer)) or optimizer is None: raise TypeError("metric has to be a child of class Optimizer") if self.metric is not None and restart is False: if metric is not self.metric: raise Exception("You can not change the metric between fits") if self.optimizer is not None and restart is False: if optimizer is not self.optimizer: raise Exception("You can not change the optimizer between fits") if start_iteration is not None: self.iteration = start_iteration if penalty is None: penalty = self.penalty if self.optimizer is None or restart is True: if start_iteration is None: self.iteration = 0 optimizer.initialize(self.parameter_names, popsize=self.n_samples, **params) self.optimizer = optimizer self.metric = metric callback = callback_setup(callback, n_rounds) # Check whether we can reuse the current simulator or whether we have # to create a new one (only relevant for standalone, but does not hurt # for runtime) if self.simulator is None or self.simulator.current_net != 'fit': self.simulator = self.setup_simulator('fit', self.n_neurons, output_var=self.output_var, online_error=online_error, param_init=self.param_init, level=level+1) # Run Optimization Loop for index in range(n_rounds): best_params, parameters, errors = self.optimization_iter(optimizer, metric, penalty) self.iteration += 1 self._best_error = nanmin(self.optimizer.errors) # create output variables self._best_params = make_dic(self.parameter_names, best_params) if self.use_units: if self.output_var == 'spikes': output_dim = DIMENSIONLESS else: output_dim = self.output_dim # Correct the units for the normalization factor error_dim = self.metric.get_normalized_dimensions(output_dim) best_error = Quantity(float(self.best_error), dim=error_dim) errors = Quantity(errors, dim=error_dim) param_dicts = [{p: Quantity(v, dim=self.model[p].dim) for p, v in zip(self.parameter_names, one_param_set)} for one_param_set in parameters] else: param_dicts = [{p: v for p, v in zip(self.parameter_names, one_param_set)} for one_param_set in parameters] best_error = self.best_error if callback(param_dicts, errors, self.best_params, best_error, index) is True: break return self.best_params, self.best_error
@property def best_params(self): if self._best_params is None: return None if self.use_units: params_with_units = {p: Quantity(v, dim=self.model[p].dim) for p, v in self._best_params.items()} return params_with_units else: return self._best_params @property def best_error(self): if self._best_error is None: return None if self.use_units: error_dim = self.metric.get_dimensions(self.output_dim) return Quantity(self._best_error, dim=error_dim) else: return self._best_error
[docs] def results(self, format='list', use_units=None): """ Returns all of the gathered results (parameters and errors). In one of the 3 formats: 'dataframe', 'list', 'dict'. Parameters ---------- format: str The desired output format. Currently supported: ``dataframe``, ``list``, or ``dict``. use_units: bool, optional Whether to use units in the results. If not specified, defaults to `.Tracefitter.use_units`, i.e. the value that was specified when the `.Tracefitter` object was created (``True`` by default). Returns ------- object 'dataframe': returns pandas `~pandas.DataFrame` without units 'list': list of dictionaries 'dict': dictionary of lists """ if use_units is None: use_units = self.use_units names = list(self.parameter_names) params = array(self.optimizer.tested_parameters) params = params.reshape(-1, params.shape[-1]) if use_units: error_dim = self.metric.get_dimensions(self.output_dim) errors = Quantity(array(self.optimizer.errors).flatten(), dim=error_dim) else: errors = array(array(self.optimizer.errors).flatten()) dim = self.model.dimensions if format == 'list': res_list = [] for j in arange(0, len(params)): temp_data = params[j] res_dict = dict() for i, n in enumerate(names): if use_units: res_dict[n] = Quantity(temp_data[i], dim=dim[n]) else: res_dict[n] = float(temp_data[i]) res_dict['error'] = errors[j] res_list.append(res_dict) return res_list elif format == 'dict': res_dict = dict() for i, n in enumerate(names): if use_units: res_dict[n] = Quantity(params[:, i], dim=dim[n]) else: res_dict[n] = array(params[:, i]) res_dict['error'] = errors return res_dict elif format == 'dataframe': from pandas import DataFrame if use_units: logger.warn('Results in dataframes do not support units. ' 'Specify "use_units=False" to avoid this warning.', name_suffix='dataframe_units') data = concatenate((params, array(errors)[None, :].transpose()), axis=1) return DataFrame(data=data, columns=names + ['error'])
[docs] def generate(self, output_var=None, params=None, param_init=None, iteration=1e9, level=0): """ Generates traces for best fit of parameters and all inputs. If provided with other parameters provides those. Parameters ---------- output_var: str or sequence of str Name of the output variable to be monitored, or the special name ``spikes`` to record spikes. Can also be a sequence of names to record multiple variables. params: dict Dictionary of parameters to generate fits for. param_init: dict Dictionary of initial values for the model. iteration: int, optional Value for the ``iteration`` variable provided to the simulation. Defaults to a high value (1e9). This is based on the assumption that the model implements some coupling of the fitted variable to the target variable, and that this coupling inversely depends on the iteration number. In this case, one would usually want to switch off the coupling when generating traces/spikes for given parameters. level : `int`, optional How much farther to go down in the stack to find the namespace. Returns ------- fit Either a 2D `.Quantity` with the recorded output variable over time, with shape <number of input traces> × <number of time steps>, or a list of spike times for each input trace. If several names were given as ``output_var``, then the result is a dictionary with the names of the variable as the key. """ if params is None: params = self.best_params if param_init is None: param_init = self.param_init else: param_init = dict(self.param_init) self.param_init.update(param_init) if output_var is None: output_var = self.output_var self.simulator = self.setup_simulator('generate', self.n_traces, output_var=output_var, param_init=param_init, level=level+1) param_dic = get_param_dic([params[p] for p in self.parameter_names], self.parameter_names, self.n_traces, 1), param_dic, self.parameter_names, iteration=iteration, name='generate') if not isinstance(output_var, str): fits = {name: self._simulation_result(name) for name in output_var} else: fits = self._simulation_result(output_var) return fits
def _simulation_result(self, output_var): if output_var == 'spikes': fits = get_spikes(self.simulator.spikemonitor, 1, self.n_traces)[0] # a single "sample" else: fits = getattr(self.simulator.statemonitor, output_var)[:] return fits
[docs]class TraceFitter(Fitter): """ A `Fitter` for fitting recorded traces (e.g. of the membrane potential). Parameters ---------- model input_var input output_var output dt n_samples method reset refractory threshold param_init use_units: bool, optional Whether to use units in all user-facing interfaces, e.g. in the callback arguments or in the returned parameter dictionary and errors. Defaults to ``True``. """ def __init__(self, model, input_var, input, output_var, output, dt, n_samples=30, method=None, reset=None, refractory=False, threshold=None, param_init=None, penalty=None, use_units=True): super().__init__(dt, model, input, output, input_var, output_var, n_samples, threshold, reset, refractory, method, param_init, penalty=penalty, use_units=use_units) self.output = Quantity(output) self.output_ = array(output) # We store the bounds set in, so that Tracefitter.refine # can reuse them self.bounds = None if output_var not in self.model.names: raise NameError("%s is not a model variable" % output_var) if output.shape != input.shape: raise ValueError("Input and output must have the same size")
[docs] def calc_errors(self, metric): """ Returns errors after simulation with StateMonitor. To be used inside `optim_iter`. """ traces = getattr(self.simulator.networks['fit']['statemonitor'], self.output_var+'_') # Reshape traces for easier calculation of error traces = reshape(traces, (traces.shape[0]//self.n_traces, self.n_traces, -1)) errors = metric.calc(traces, self.output_, self.dt) return errors
[docs] def fit(self, optimizer, metric=None, n_rounds=1, callback='text', restart=False, start_iteration=None, penalty=None, level=0, **params): if not isinstance(metric, TraceMetric): raise TypeError("You can only use TraceMetric child metric with " "TraceFitter") if metric.t_weights is not None: if not metric.t_weights.shape == (self.output.shape[1], ): raise ValueError("The 't_weights' argument of the metric has " "to be a one-dimensional array of length " f"{self.output.shape[1]} but has shape " f"{metric.t_weights.shape}") self.bounds = dict(params) best_params, error = super().fit(optimizer, metric, n_rounds, callback, restart, start_iteration=start_iteration, penalty=penalty, level=level+1, **params) return best_params, error
[docs] def generate_traces(self, params=None, param_init=None, iteration=1e9, level=0): """Generates traces for best fit of parameters and all inputs""" fits = self.generate(params=params, output_var=self.output_var, param_init=param_init, iteration=iteration, level=level+1) return fits
[docs] def refine(self, params=None, t_start=None, t_weights=None, normalization=None, callback='text', calc_gradient=False, optimize=True, iteration=1e9, level=0, **kwds): """ Refine the fitting results with a sequentially operating minimization algorithm. Uses the `lmfit <>`_ package which itself makes use of `scipy.optimize <>`_. Has to be called after ``, but a call with ``n_rounds=0`` is enough. Parameters ---------- params : dict, optional A dictionary with the parameters to use as a starting point for the refinement. If not given, the best parameters found so far by `` will be used. t_start : `~brian2.units.fundamentalunits.Quantity`, optional Start of time window considered for calculating the fit error. If not set, will reuse the `t_start` value from the previously used metric. t_weights : `~.ndarray`, optional A 1-dimensional array of weights for each time point. This array has to have the same size as the input/output traces that are used for fitting. A value of 0 means that data points are ignored. The weight values will be normalized so only the relative values matter. For example, an array containing 1s, and 2s, will weigh the regions with 2s twice as high (with respect to the squared error) as the regions with 1s. Using instead values of 0.5 and 1 would have the same effect. Cannot be combined with ``t_start``. If not set, will reuse the `t_weights` value from the previously used metric. normalization : float, optional A normalization term that will be used rescale results before handing them to the optimization algorithm. Can be useful if the algorithm makes assumptions about the scale of errors, e.g. if the size of steps in the parameter space depends on the absolute value of the error. The difference between simulated and target traces will be divided by this value. If not set, will reuse the `normalization` value from the previously used metric. callback: `str` or `~typing.Callable` Either the name of a provided callback function (``text`` or ``progressbar``), or a custom feedback function ``func(parameters, errors, best_parameters, best_error, index)``. If this function returns ``True`` the fitting execution is interrupted. calc_gradient: bool, optional Whether to add "sensitivity variables" to the equation that track the sensitivity of the equation variables to the parameters. This information will be used to pass the local gradient of the error with respect to the parameters to the optimization function. This can lead to much faster convergence than with an estimated gradient but comes at the expense of additional computation. Defaults to ``False``. optimize : bool, optional Whether to remove sensitivity variables from the equations that do not evolve if initialized to zero (e.g. ``dS_x_y/dt = -S_x_y/tau`` would be removed). This avoids unnecessary computation but will fail in the rare case that such a sensitivity variable needs to be initialized to a non-zero value. Only taken into account if ``calc_gradient`` is ``True``. Defaults to ``True``. iteration: int, optional Value for the ``iteration`` variable provided to the simulation. Defaults to a high value (1e9). This is based on the assumption that the model implements some coupling of the fitted variable to the target variable, and that this coupling inversely depends on the iteration number. In this case, one would usually want to switch off the coupling when refining the solution. level : int, optional How much farther to go down in the stack to find the namespace. kwds Additional arguments can overwrite the bounds for individual parameters (if not given, the bounds previously specified in the call to `` will be used). All other arguments will be passed on to `.lmfit.minimize` and can be used to e.g. change the method, or to specify method-specific arguments. Returns ------- parameters : dict The parameters at the end of the optimization process as a dictionary. result : `.lmfit.MinimizerResult` The result of the optimization process. Notes ----- The default method used by `lmfit` is least-squares minimization using a Levenberg-Marquardt method. Note that there is no support for specifying a `Metric`, the given output trace(s) will be subtracted from the simulated trace(s) and passed on to the minimization algorithm. This method always uses the runtime mode, independent of the selection of the current device. """ try: import lmfit except ImportError: raise ImportError('Refinement needs the "lmfit" package.') if params is None: if self.best_params is None: raise TypeError('You need to either specify parameters or run ' 'the fit function first.') params = self.best_params if t_weights is not None: t_weights = normalize_weights(t_weights) elif t_start is None: t_weights = getattr(self.metric, 't_weights', None) if t_weights is None: t_start = getattr(self.metric, 't_start', 0*second) else: t_start = None if t_start is not None and t_weights is not None: raise ValueError("Cannot use both 't_weights' and 't_start'.") if normalization is None: normalization = getattr(self.metric, 'normalization', 1.) else: normalization = 1/normalization callback_func = callback_setup(callback, None) # Set up Parameter objects parameters = lmfit.Parameters() for param_name in self.parameter_names: if param_name not in kwds: if self.bounds is None: raise TypeError('You need to either specify bounds for all ' 'parameters or run the fit function first.') min_bound, max_bound = self.bounds[param_name] else: min_bound, max_bound = kwds.pop(param_name) parameters.add(param_name, value=array(params[param_name]), min=array(min_bound), max=array(max_bound)) self.simulator = self.setup_simulator('refine', self.n_traces, output_var=self.output_var, param_init=self.param_init, calc_gradient=calc_gradient, optimize=optimize, level=level+1) if t_weights is None: t_start_steps = int(round(t_start / self.dt)) def _calc_error(params): param_dic = get_param_dic([params[p] for p in self.parameter_names], self.parameter_names, self.n_traces, 1), param_dic, self.parameter_names, iteration=iteration, name='refine') trace = getattr(self.simulator.statemonitor, self.output_var+'_') if t_weights is None: residual = trace[:, t_start_steps:] - self.output_[:, t_start_steps:] else: residual = (trace - self.output_) * sqrt(t_weights) return residual.flatten() * normalization def _calc_gradient(params): residuals = [] for name in self.parameter_names: trace = getattr(self.simulator.statemonitor, f'S_{self.output_var}_{name}_') if t_weights is None: residual = trace[:, t_start_steps:] else: residual = trace * sqrt(t_weights) residuals.append(residual.flatten() * normalization) gradient = array(residuals) return gradient.T tested_parameters = [] errors = [] def _callback_wrapper(params, iter, resid, *args, **kwds): error = mean(resid**2) errors.append(error) if self.use_units: error_dim = self.output_dim**2 * get_dimensions(normalization)**2 all_errors = Quantity(errors, dim=error_dim) params = {p: Quantity(val, dim=self.model[p].dim) for p, val in params.items()} else: all_errors = array(errors) params = {p: float(val) for p, val in params.items()} tested_parameters.append(params) best_idx = argmin(errors) best_error = all_errors[best_idx] best_params = tested_parameters[best_idx] return callback_func(params, all_errors, best_params, best_error, iter) assert 'Dfun' not in kwds if calc_gradient: kwds.update({'Dfun': _calc_gradient}) if 'iter_cb' in kwds: # Use the given callback but raise a warning if callback is not # set to None if callback is not None: logger.warn('The iter_cb keyword has been specified together ' f'with callback={callback!r}. Only the iter_cb ' 'callback will be used. Use the standard ' 'callback mechanism or set callback=None to ' 'remove this warning.', name_suffix='iter_cb_callback') iter_cb = kwds.pop('iter_cb') else: iter_cb = _callback_wrapper result = lmfit.minimize(_calc_error, parameters, iter_cb=iter_cb, **kwds) if self.use_units: param_dict = {p: Quantity(float(val), dim=self.model[p].dim) for p, val in result.params.items()} else: param_dict = {p: float(val) for p, val in result.params.items()} return param_dict, result
[docs]class SpikeFitter(Fitter): def __init__(self, model, input, output, dt, reset, threshold, input_var='I', refractory=False, n_samples=30, method=None, param_init=None, penalty=None, use_units=True): """Initialize the fitter.""" if method is None: method = 'exponential_euler' super().__init__(dt, model, input, output, input_var, 'spikes', n_samples, threshold, reset, refractory, method, param_init, penalty=penalty, use_units=use_units) self.output = [Quantity(o) for o in output] self.output_ = [array(o) for o in output] if param_init: for param, val in param_init.items(): if not (param in self.model.identifiers or param in self.model.names): raise ValueError("%s is not a model variable or an " "identifier in the model" % param) self.param_init = param_init self.simulator = None
[docs] def calc_errors(self, metric): """ Returns errors after simulation with SpikeMonitor. To be used inside optim_iter. """ spikes = get_spikes(self.simulator.networks['fit']['spikemonitor'], self.n_samples, self.n_traces) errors = metric.calc(spikes, self.output, self.dt) return errors
[docs] def fit(self, optimizer, metric=None, n_rounds=1, callback='text', restart=False, start_iteration=None, penalty=None, level=0, **params): if not isinstance(metric, SpikeMetric): raise TypeError("You can only use SpikeMetric child metric with " "SpikeFitter") best_params, error = super().fit(optimizer, metric, n_rounds, callback, restart, start_iteration=start_iteration, penalty=penalty, level=level+1, **params) return best_params, error
[docs] def generate_spikes(self, params=None, param_init=None, iteration=1e9, level=0): """Generates traces for best fit of parameters and all inputs""" fits = self.generate(params=params, output_var='spikes', param_init=param_init, iteration=iteration, level=level+1) return fits
[docs]class OnlineTraceFitter(Fitter): def __init__(self, model, input_var, input, output_var, output, dt, n_samples=30, method=None, reset=None, refractory=False, threshold=None, param_init=None, t_start=0*second, penalty=None): """Initialize the fitter.""" super().__init__(dt, model, input, output, input_var, output_var, n_samples, threshold, reset, refractory, method, param_init, penalty=penalty) self.output = Quantity(output) self.output_ = array(output) if output_var not in self.model.names: raise NameError("%s is not a model variable" % output_var) if output.shape != input.shape: raise ValueError("Input and output must have the same size") # Replace input variable by TimedArray output_traces = TimedArray(output.transpose(), dt=dt) output_dim = get_dimensions(output) squared_output_dim = ('1' if output_dim is DIMENSIONLESS else repr(output_dim**2)) error_eqs = Equations('total_error : {}'.format(squared_output_dim)) self.model = self.model + error_eqs self.t_start = t_start if param_init: for param, val in param_init.items(): if not (param in self.model.identifiers or param in self.model.names): raise ValueError("%s is not a model variable or an " "identifier in the model" % param) self.param_init = param_init self.simulator = None
[docs] def fit(self, optimizer, n_rounds=1, callback='text', restart=False, start_iteration=None, penalty=None, level=0, **params): metric = MSEMetric() # not used, but makes error dimensions correct return super(OnlineTraceFitter, self).fit(optimizer, metric=metric, n_rounds=n_rounds, callback=callback, restart=restart, online_error=True, penalty=penalty, start_iteration=start_iteration, level=level+1, **params)
[docs] def calc_errors(self, metric=None): """Calculates error in online fashion.To be used inside optim_iter.""" errors = self.simulator.neurons.total_error/int((self.duration-self.t_start)/defaultclock.dt) errors = mean(errors.reshape((self.n_samples, self.n_traces)), axis=1) return array(errors)
[docs] def generate_traces(self, params=None, param_init=None, level=0): """Generates traces for best fit of parameters and all inputs""" fits = self.generate(params=params, output_var=self.output_var, param_init=param_init, level=level+1) return fits