Source code for neurotools.stats.regressions

#!/usr/bin/python
# -*- coding: UTF-8 -*-
'''
Routines for common regression tasks.
'''
from __future__ import absolute_import
from __future__ import with_statement
from __future__ import division
from __future__ import nested_scopes
from __future__ import generators
from __future__ import unicode_literals
from __future__ import print_function

import warnings
import numpy as np
from scipy.stats import linregress
from scipy.optimize import minimize

from neurotools.util.functions import npdf
from neurotools.stats.minimize import minimize_retry

'''
Regress on the following model for synchrony
`synchrony(x) = np.cos(wx)*np.exp(-x/tau)+b`

angular synchrony np.cos(theta_x1-theta_x2) should
decay as a damped cosine, with some constant offset b. Note that
a nonzero constant offset may not indicate uniform synchrony, for
example, the direction of constant phase in a plane wave will contribute
a DC component.

Uses L2 penaly

X: List of distances
W: List of weights
Y: List of average pairwise distances

Model is np.cos(wx)*np.exp(-x/L)+b
Generates predictions Z
error is ∑ W*(Z-Y)^2

gradient of the error
dErr/dw np.sum(W*(np.cos(w*x)*np.exp(-x/L)+b-Y)**2)
dErr/dL np.sum(W*(np.cos(w*x)*np.exp(-x/L)+b-Y)**2)
dErr/db np.sum(W*(np.cos(w*x)*np.exp(-x/L)+b-Y)**2)

np.sum(W* dErr/dw (np.cos(w*x)*np.exp(-x/L)+b-Y)**2)
np.sum(W* dErr/dL (np.cos(w*x)*np.exp(-x/L)+b-Y)**2)
np.sum(W* dErr/db (np.cos(w*x)*np.exp(-x/L)+b-Y)**2)

np.sum(W* 2(np.cos(w*x)*np.exp(-x/L)+b-Y) dErr/dw (np.cos(w*x)*np.exp(-x/L)+b-Y))
np.sum(W* 2(np.cos(w*x)*np.exp(-x/L)+b-Y) dErr/dL (np.cos(w*x)*np.exp(-x/L)+b-Y))
np.sum(W* 2(np.cos(w*x)*np.exp(-x/L)+b-Y) dErr/db (np.cos(w*x)*np.exp(-x/L)+b-Y))

np.sum(W* 2(np.cos(w*x)*np.exp(-x/L)+b-Y) * -np.sin(w*x)*np.exp(-x/L) )
np.sum(W* 2(np.cos(w*x)*np.exp(-x/L)+b-Y) *  np.cos(w*x)*(-1/L)*np.exp(-x/L) )
np.sum(W* 2(np.cos(w*x)*np.exp(-x/L)+b-Y))

objective function is

def objective(w,L,b):
    z = np.cos(w*x)*np.exp(-x/L)+b
    error = np.sum( W*(z-Y)**2 )
    return error

def gradient(w,lambda,b):
    z = np.cos(w*x)*np.exp(-x/L)+b
    h = 2*(z-Y)
    dEdw = np.sum(W*h*-np.sin(w*x)*np.exp(-x/L))
    dEdL = np.sum(W*h* np.cos(w*x)*(-1/L)*np.exp(-x/L))
    dEdb = np.sum(W*H)
    return [dEdw,dEdL,dEdb]

Use the minimize function from scipy.optimize.

scipy.optimize.minimize(fun, x0, args=(), method=None, jac=None, hess=None,
    hessp=None, bounds=None, constraints=(), tol=None, callback=None,
    options=None)

    Minimization of scalar function of one or more variables.

    New in version 0.11.0.
    Parameters:
    fun : callable
        Objective function.
    x0 : ndarray
        Initial guess.
    args : tuple, optional
        Extra arguments passed to the objective function and its derivatives
        (Jacobian, Hessian).
    method : str or callable, optional
        Type of solver. Should be one of
            'Nelder-Mead'
            'Powell'
            'CG'
            'BFGS'
            'Newton-CG'
            'Anneal (deprecated as of scipy version 0.14.0)'
            'L-BFGS-B'
            'TNC'
            'COBYLA'
            'SLSQP'
            'dogleg'
            'trust-ncg'
            custom - a callable object (added in version 0.14.0)
        If not given, chosen to be one of BFGS, L-BFGS-B, SLSQP, d
        epending if the problem has constraints or bounds.
    jac : bool or callable, optional
        Jacobian (gradient) of objective function. Only for CG, BFGS,
        Newton-CG, L-BFGS-B, TNC, SLSQP, dogleg, trust-ncg. If jac is a Boolean
        and is True, fun is assumed to return the gradient along with the
        objective function. If False, the gradient will be estimated
        numerically. jac can also be a callable returning the gradient of the
        objective. In this case, it must accept the same arguments as fun.
    hess, hessp : callable, optional
        Hessian (matrix of second-order derivatives) of objective function or
        Hessian of objective function times an arbitrary vector p. Only for
        Newton-CG, dogleg, trust-ncg. Only one of hessp or hess needs to be
        given. If hess is provided, then hessp will be ignored. If neither hess
        nor hessp is provided, then the Hessian product will be approximated
        using finite differences on jac. hessp must compute the Hessian times
        an arbitrary vector.
    bounds : sequence, optional
        Bounds for variables (only for L-BFGS-B, TNC and SLSQP). (min, max)
        pairs for each element in x, defining the bounds on that parameter. Use
        None for one of min or max when there is no bound in that direction.
    constraints : dict or sequence of dict, optional
        Constraints definition (only for COBYLA and SLSQP). Each constraint is
        defined in a dictionary with fields:
            type : str
                Constraint type: 'eq' for equality, 'ineq' for inequality.
            fun : callable
                The function defining the constraint.
            jac : callable, optional
                The Jacobian of fun (only for SLSQP).
            args : sequence, optional
                Extra arguments to be passed to the function and Jacobian.
        Equality constraint means that the constraint function result is to be
        zero whereas inequality means that it is to be non-negative. Note that
        COBYLA only supports inequality constraints.
    tol : float, optional
        Tolerance for termination. For detailed control, use solver-specific
        options.
    options : dict, optional
        A dictionary of solver options. All methods accept the following
        generic options:
            maxiter : int
                Maximum number of iterations to perform.
            disp : bool
                Set to True to print convergence messages.
        For method-specific options, see show_options.
    callback : callable, optional
        Called after each iteration, as callback(xk), where xk is the current
        parameter vector.
    Returns:
    res : OptimizeResult
        The optimization result represented as a OptimizeResult object.
        Important attributes are: x the solution array, success a Boolean flag
        indicating if the optimizer exited successfully and message which
        describes the cause of the termination. See OptimizeResult for a
        description of other attributes.

'''

[docs] def damped_cosine(X,Y,W): ''' Regress a damped cosine impulse response to point data `X` and `Y` using weighting `W`. Todo: constrain b, L to be positive Parameters ---------- X: 1D array-like List of distances Y: 1D array-like List of average pairwise distances W: 1D array-like List of weights Returns ------- result : object Optimization result returned by `scipy.optimize.minimize`. See `scipy.optimize` documentation for details. Example ------- :: X = 0.4*arange(9) Y = np.exp(-X/4+1)*np.cos(X) Z = Y+randn(*shape(X)) W = ones(shape(X)) w,L,b = damped_cosine(X,Z,W).x plot(X,Y) plot(X,Z) plot(X,np.cos(w*X)*np.exp(-X/L+b)) ''' X = np.float64(X) Y = np.float64(Y) def objective(wLb): (w,L,b) = wLb z = np.cos(w*X)*np.exp(-X/L+b) error = np.sum( W*(z-Y)**2 ) return error def gradient(wLb): (w,L,b) = wLb # todo: double check this gradient z = np.cos(w*X)*np.exp(-X/L)+b h = 2*(z-Y) dEdw = np.sum(W*h*-np.sin(w*X)*np.exp(-X/L)) dEdL = np.sum(W*h* np.cos(w*X)*(-1/L)*np.exp(-X/L)) dEdb = np.sum(W*h) return arr([dEdw,dEdL,dEdb]) result = minimize(objective,[1,1,0])#,jac=gradient) if not result.success: print(result.message) warnings.warn('Optimization failed: %s'%result.message) return result
[docs] def weighted_least_squares(X,Y,W): ''' Initialize power law fit EPS = 1e-10 use = (X>EPS)&(Y>EPS) weighted_least_squares(np.log(X+EPS)[use],np.log(Y+EPS)[use],1/(EPS+X[use])) Parameters ---------- X: List of distances Y: List of amplitudes W: Weights for points Returns ------- result : object Optimization result returned by scipy.optimize.minimize. See scipy.optimize documentation for details. ''' X = np.float64(X) Y = np.float64(Y) def objective(ab): a,b=(a,b) return np.sum( W*(Y-(X*a+b))**2) a,b,_,_,_ = linregress(X,Y) result = minimize(objective,[a,b]) if not result.success: print(result.message) warnings.warn('Optimization failed: %s'%result.message) return result
[docs] def power_law(X,Y,W): ''' Fit a power law, but with error terms computed by r^2 in the original space. Parameters ---------- X: List of distances Y: List of amplitudes W: Weights for points Returns ------- ''' ''' power law form is `np.log(y)=a*np.log(x)+b` or `y = b*x^a` initial best guess using linear regression. result = power_law(X,Y,1/X**16) a,b = result.x EPS = 1e-10 use = (X>EPS)&(Y>EPS) a,b = weighted_least_squares(np.log(X)[use],np.log(Y)[use],W[use]).x plot(sorted(X),b*arr(sorted(X))**a) from numpy.polynomial.polynomial import polyfit X,Y = ravel(f),ravel(y[:,i]) a,b = power_law(X,Y,1/X**2) plot(sorted(X),b*arr(sorted(X))**a) ''' X = np.float64(X) Y = np.float64(Y) EPS = 1e-10 use = (X>EPS)&(Y>EPS) a,b = polyfit(np.log(X)[use],np.log(Y)[use],1,w=W[use]) ''' def objective(ab): a,b = (a,b) z = np.exp(b+a*np.log(X)) obj = np.sum((W*(Y-z)**2)[use]) print(a,b,obj) return obj result = minimize(objective,[a,b]) ''' return a,np.exp(b)
[docs] def gaussian_function(X,Y): ''' Parameters ---------- X: List of distances Y: List of amplitudes Returns ------- result : object Optimization result returned by scipy.optimize.minimize. See scipy.optimize documentation for details. ''' X = np.float64(X) Y = np.float64(Y) def objective(theta): (mu,sigma,scale,dc) = theta z = npdf(mu,sigma,X)*scale+dc error = np.sum( (z-Y)**2 ) return error result = minimize(objective,[0,1,1]) return result
[docs] def half_gaussian_function(X,Y): ''' Parameters ---------- X: List of distances Y: List of amplitudes Returns ------- ''' X = np.float64(X) Y = np.float64(Y) def objective(theta): (sigma,scale,dc) = theta z = npdf(0,sigma,X)*scale+dc error = np.sum( (z-Y)**2 ) return error result = minimize(objective,[1,1,0]) if not result.success: print(result.message) raise RuntimeError('Optimization failed: %s'%result.message) sigma,scale,dc = result.x return sigma,scale,dc
[docs] def exponential_decay(X,Y): ''' Fit exponential decay from an initial value to a final value with some time (or length, etc) constant. ``tau,scale,dc = exponential_decay(X,Y)`` fits ``z = np.exp(-X/tau)*scale+dc`` using least squares. Parameters ---------- X: List of distances Y: List of amplitudes Returns ------- tau : float Length constant of exponential fit scale: float Scale parameter (magnitude at zero) of exponential fit dc : float DC offset of exponential fit (asymptotic value) ''' X = np.float64(X) Y = np.float64(Y) def error(theta): (tau,scale,dc) = theta z = np.exp(np.minimum(50.0,-X/tau))*scale+dc return np.mean( (z-Y)**2.0 ) result = minimize_retry(error,[1,1,0],verbose=False,printerrors=False,show_progress=False) #if not result.success: # print(result.message) # warnings.warn('Optimization failed: %s'%result.message) # lamb,scale,dc = result.x tau,scale,dc = result return tau,scale,dc
[docs] def robust_line(X,Y): ''' 2-variable linear regression with L1 penalty returns the tuple (m,b) for line in y = mx+b format Parameters ---------- X: List of distances Y: List of amplitudes Returns ------- result.x : array-like Optimization result returned by scipy.optimize.minimize. See scipy.optimize documentation for details. ''' X = np.float64(X) Y = np.float64(Y) def pldist(x,y,m,b): return (-m*x+y-b)/np.sqrt(m**2+1) def objective(H): m,b = H return np.sum([np.abs(pldist(x,y,m,b)) for (x,y) in zip(X,Y)]) res = scipy.optimize.minimize(objective,[1,0],method = 'Nelder-Mead') if not result.success: print(result.message) warnings.warn('Optimization failed: %s'%result.message) return res.x
import neurotools.linalg.matrix as nmatrix import neurotools.stats.pvalues as npv def cubic_spline_regression(x,y, df = 5, NBOOTSTRAP=1000, reg=1e-15, show_progress=False): ''' Bivariate x→y cubic spline regression with bootstrap convidence intervals. Depends on the `cr()` function in the `patsy` package. Parametrs --------- x: length NSAMPLES iterable of scalars Independent variable y: length NSAMPLES iterable of scalars Dependent variable Other Parameters ---------------- df: int>0; default 5 Degrees of freedom for cubic spline regression. NBOOTSTRAP: int>0; default 1000 Number of samples to use for bootstrap reg: positive float; default 1e-15 Regularization for linear least squares show_progress: boolean; default False Show progress bar while sampling bootstrap Returns ------- x_hat: np.float32 Sorted copy of `x` y_hat: np.float32 Smooted curve evaluated at `x_eval` samples: np.float32 NBOOTSTRAP × len(x_eval) bootstrap samples. ''' # Import in function TODO # I don't want to create a hard dependency # I do want this function to fail with instructions # to install patsy if not present (TODO) from patsy import cr x = np.float32([*x]) y = np.float32([*y]) order = np.argsort(x) x = x[order] y = y[order] def cubic_spline_smooth(xy,df=5): x,y = np.float32(xy).T μ = np.mean(y) y -= μ B = cr(x,df=df,constraints="center") M = nmatrix.reglstsq(B,y,reg=reg) return B@M.ravel() + μ population = [*zip(x,y)] y_hat = cubic_spline_smooth(population) samples = np.float64(npv.bootstrap_statistic( cubic_spline_smooth, population, ntrials=NBOOTSTRAP, show_progress=show_progress)) return x, y_hat, samples import scipy.interpolate import neurotools.linalg.matrix as nmatrix import neurotools.stats.pvalues as npv
[docs] def cubic_spline_regression(x,y,x_eval, df = 5, NBOOTSTRAP=1000, reg=1e-15, show_progress=False): ''' Bivariate x→y cubic spline regression with bootstrap convidence intervals. Splines are spaced according to the points in `x_eval`, not `x`. Depends on the `cr()` function in the `patsy` package. Parametrs --------- x: length NSAMPLES iterable of scalars Independent variable y: length NSAMPLES iterable of scalars Dependent variable x_eval: iterable of scalars Points at which to evalute the resulting model. Other Parameters ---------------- df: int>0; default 5 Degrees of freedom for cubic spline regression. NBOOTSTRAP: int>0; default 1000 Number of samples to use for bootstrap reg: positive float; default 1e-15 Regularization for linear least squares show_progress: boolean; default False Show progress bar while sampling bootstrap Returns ------- y_hat: np.float32 Smooted curve evaluated at `x_eval` samples: np.float32 NBOOTSTRAP × len(x_eval) bootstrap samples. ''' # Import in function TODO # I don't want to create a hard dependency # I do want this function to fail with instructions # to install patsy if not present (TODO) from patsy import cr x = np.float32([*x]) y = np.float32([*y]) x_eval = np.float32([*x_eval]) # Basis functions B = cr(x_eval,df=df,constraints="center") Bf = scipy.interpolate.interp1d( x_eval, B, axis=0, bounds_error=False, fill_value='extrapolate', assume_sorted=False) def cubic_spline_smooth(xy,df=5): x,y = np.float32(xy).T μ = np.mean(y) y -= μ M = nmatrix.reglstsq(Bf(x),y,reg=reg) return B@M.ravel() + μ population = [*zip(x,y)] y_hat = cubic_spline_smooth(population) samples = np.float64(npv.bootstrap_statistic( cubic_spline_smooth, population, ntrials=NBOOTSTRAP, show_progress=show_progress)) return y_hat, samples
from neurotools.jobs.parallel import limit_cores, parmap, parcontext from neurotools.stats.information import betapr from neurotools.stats.pvalues import bootstrap_statistic from neurotools.linalg.matrix import reglstsq
[docs] class CircregressResult(): def __init__( self, theta, y, nboot = 1000, nshuff = 1000, parallel = True, show_progress = False, #save_samples = True, #save_training_data = True ): ''' Attributes ---------- pvalue: float Probability of observing a modulation depth at least as large as d assuming no circular-linear correlation. Computed via a phase randomization null test assuming data points are indepent. a: float Weighting of cosine component b: float Weighting of sine component c: float Weighting of DC component d: float Modulation depth sqrt(a²+b²) dlo: float Bootstrap 2.5th percentil of modulation depth dhi: float Bootstrap 97.5th percentil of modulation depth theta: Peak angle theta_lo: Bootstrap 2.5th percentil of peak angle (relative) theta_hi: Bootstrap 97.5th percentil of peak angle (relative) R2: float Coefficient of determination R² R2lo: Bootstrap 2.5th percentile of R² R2hi: Bootstrap 97.5th percentile of R² ''' # Convert to polar featuress x = np.float32([ np.cos(theta), np.sin(theta), np.ones(theta.shape) ]) y = np.float32(y).copy() w = np.squeeze( CircregressResult._linregress([*zip(x.T,y)]) ) with parcontext(): wb = np.squeeze( parmap( CircregressResult._boothelper , [(i,(x,y)) for i in range(nboot)], show_progress = show_progress, debug=not parallel) ).reshape(nboot ,3) ws = np.squeeze( parmap( CircregressResult._shuffhelper, [(i,(x,y)) for i in range(nshuff)], show_progress = show_progress, debug=not parallel) ).reshape(nshuff,3) ''' if save_training_data: self.x = x self.y = y if save_samples: self.w_bootstrap = wb self.w_shuffle = ws ''' self.w = w self.x = x self.y = y self.a = w[0] self.b = w[1] self.c = w[2] self.w_bootstrap = wb self.w_shuffle = ws # Confidence on magnitude r2 = np.linalg.norm(w [ :2],2 )**2 r2lo, r2hi = np.nanpercentile(self.get_d2_samples(),[2.5,97.5]) self.d = r2**.5 self.dlo = r2lo**.5 self.dhi = r2hi**.5 # Angle confidence θhat = np.angle(w [ :2]@[1,1j]) center = np.exp(1j*θhat) θboot = np.angle( (wb[:,:2]@[1,1j])/center ) θlo,θhi = np.nanpercentile(θboot,[2.5,97.5]) θlo += θhat θhi += θhat self.theta = θhat self.theta_lo = θlo self.theta_hi = θhi # Coefficient of dertermination R2 = 1 - np.mean((w@x-y)**2)/np.var(y) R2lo, R2hi = np.nanpercentile(self.get_R2_samples(),[2.5,97.5]) self.R2 = R2 self.R2lo = R2lo self.R2hi = R2hi # P-value on weight magnitude # r20 is the squared modulation depth from the shuffle null. r20 = np.linalg.norm(ws[:,:2],2,1)**2 pvalue = betapr(sum(r20>r2),len(r20)) self.pvalue = pvalue
[docs] def get_theta_samples(self): wb = self.w_bootstrap return np.angle((wb[:,:2]@[1,1j]))
[docs] def get_d2_samples(self): wb = self.w_bootstrap return np.linalg.norm(wb[:,:2],2,1)**2
[docs] def get_R2_samples(self): wb = self.w_bootstrap x = self.x y = self.y # Coefficient of dertermination return 1 - np.mean((wb@x-y)**2,-1)/np.var(y)
def __iter__(self): ''' Returns ------- pvalue: float Probability of observing a modulation depth at least as large as d assuming no circular-linear correlation. Computed via a phase randomization null test assuming data points are indepent. a: float Weighting of cosine component b: float Weighting of sine component c: float Weighting of DC component d: float Modulation depth sqrt(a²+b²) dlo: float Bootstrap 2.5th percentil of modulation depth dhi: float Bootstrap 97.5th percentil of modulation depth theta: Peak angle theta_lo: Bootstrap 2.5th percentil of peak angle (relative) theta_hi: Bootstrap 97.5th percentil of peak angle (relative) R2: float Coefficient of determination R² R2lo: Bootstrap 2.5th percentile of R² R2hi: Bootstrap 97.5th percentile of R² ''' yield self.pvalue yield self.a yield self.b yield self.c yield self.d yield self.dlo yield self.dhi yield self.theta yield self.theta_lo yield self.theta_hi yield self.R2 yield self.R2lo yield self.R2hi def _asdict(self): return { 'pvalue':self.pvalue, 'a':self.a, 'b':self.b, 'c':self.c, 'd':self.d, 'dlo':self.dlo, 'dhi':self.dhi, 'theta':self.theta, 'theta_lo':self.theta_lo, 'theta_hi':self.theta_hi, 'R2':self.R2, 'R2lo':self.R2lo, 'R2hi':self.R2hi, } def __getitem__(self,s): return [*self][s] def _linregress(p): x,y = zip(*p) return reglstsq(np.float32(x),y) def _boothelper(p): i,(x,t) = p np.random.seed(i) limit_cores(1) xt = [*zip(x.T,t)] return i,np.array(bootstrap_statistic( CircregressResult._linregress, xt,1,show_progress=False)) def _shuffhelper(p): i,(x,t) = p np.random.seed(i) limit_cores(1) ti = np.random.choice(t,len(t),replace=False) xt = [*zip(x.T,ti)] return i,np.squeeze( CircregressResult._linregress(xt))
[docs] def circregress(*args,**kwargs): return CircregressResult(*args,**kwargs)